5.0.2 阅读 libstdc++ 中原子变量 wait 和 notify 接口

Wait 和 notify 接口介绍

std::atomic<T>::wait 用来等待原子变量值的改变,如果原子变量值和给定的参数 old 相同则阻塞,直到被 notify_all() 或者 notify_one() 通知,或者自发地解除阻塞。因此用 wait 要在循环中使用。

// std::atomic<T>::wait
void wait( T old, std::memory_order order = std::memory_order_seq_cst ) const noexcept;
void wait( T old, std::memory_order order = std::memory_order_seq_cst ) const volatile noexcept;

Cppreference 上说这个接口通常比轮询和自旋锁更高效。看了一下 libstdc++ 的实现,实际上内部也是自旋锁,只是比我们在外面用自旋锁要高效一些。实现在 libstdc++-v3/include/bits/atomic_wait.h 文件中

wait 的实现

调用链路(同一缩进下函数的调用不区分顺序,不表示前者在后者之前调用):

__atomic_wait_address_v
    __detail::__waiter<std::true_type>::_M_do_wait_v
        __detail::__waiter_pool::_M_do_spin_v 🔍
            // do-while loop of:
            __detail::__waiter_pool::_M_do_wait 🔍
            // and
            __detail::_S_do_spin_v
                __detail::__atomic_spin 🔍

注意到 __waiter<std::true_type> 在代码中有个别名为 __enters_wait,它的模板参数实际上只影响 __waiter 在构造和析构时是否分别调用 _M_enter_wait()_M_leave_wait(),不影响其他函数的行为。

__detail::__waiter_pool::_M_do_spin_v 的实现:

template<typename _Tp, typename _ValFn>
void _M_do_wait_v(_Tp __old, _ValFn __vfn) {
    do {
        __platform_wait_t __val;
        if (__base_type::_M_do_spin_v(__old, __vfn, __val)) return;
        // __base_type::_M_w 类型为 __waiter_pool
        __base_type::_M_w._M_do_wait(__base_type::_M_addr, __val);
    } while (__detail::__atomic_compare(__old, __vfn()));
}

__detail::__waiter_pool::_M_do_wait 的实现如下。如果有平台的 wait 支持(_GLIBCXX_HAVE_PLATFORM_WAIT),就调用 __platform_wait(),否则使用互斥量和条件变量。

  • 如果 _GLIBCXX_HAVE_LINUX_FUTEX 功能测试宏为真(只有 Linux 提供 futex 特性),则定义 __platform_wait() 为发起 SYS_futex 系统调用。
  • 否则,在 atomic_wait.h 头文件中不对 __platform_wait() 定义。如果系统中有平台的等待支持,那么此函数会在其他地方定义,比如 Darwin 内核中有 __ulock_wait() / __ulock_wake()
struct __waiter_pool : __waiter_pool_base
{
    void _M_do_wait(const __platform_wait_t* __addr, __platform_wait_t __old) noexcept
    {
#ifdef _GLIBCXX_HAVE_PLATFORM_WAIT
        __platform_wait(__addr, __old);
#else
        __platform_wait_t __val;
        __atomic_load(__addr, &__val, __ATOMIC_SEQ_CST);
        if (__val == __old)
        {
            lock_guard<mutex> __l(_M_mtx);
            __atomic_load(__addr, &__val, __ATOMIC_RELAXED);
            if (__val == __old)
                _M_cv.wait(_M_mtx);
        }
#endif // __GLIBCXX_HAVE_PLATFORM_WAIT
    }
};

__detail::__atomic_spin 的实现:

inline constexpr auto __atomic_spin_count_relax = 12;
inline constexpr auto __atomic_spin_count = 16;

struct __default_spin_policy
{
  bool operator()() const noexcept
  { return false; }
};

template<typename _Pred, typename _Spin = __default_spin_policy>
bool __atomic_spin(_Pred& __pred, _Spin __spin = _Spin{}) noexcept {
    for (auto __i = 0; __i < __atomic_spin_count; ++__i) {
        if (__pred()) return true;
        if (__i < __atomic_spin_count_relax) __detail::__thread_relax();
        else __detail::__thread_yield();
    }
    while (__spin()) {
        if (__pred()) return true;
    }
    return false;
}

__atomic_spin 中 relax 和 yield 的实现(在 x86 等机器上 relax 是 pause):

    inline void
    __thread_yield() noexcept // 如果不支持 gthread(大多数情况支持)那么 CPU 会空转
    {
#if defined _GLIBCXX_HAS_GTHREADS && defined _GLIBCXX_USE_SCHED_YIELD
     __gthread_yield();       // 让出剩余时间片
#endif
    }

    inline void
    __thread_relax() noexcept
    {
#if defined __i386__ || defined __x86_64__
      __builtin_ia32_pause(); // 运行 pause 指令,让 CPU 空闲一段时间
#else
      __thread_yield();
#endif
    }

notify 的实现

notify_onenotify_all 的区别只在于 _M_notify 的一个参数 bool __all

__atomic_notify_address
    __detail::__waiter<std::false_type>::_M_notify
        #ifdef _GLIBCXX_HAVE_PLATFORM_WAIT
        1. __atomic_fetch_add
        2. __platform_notify
        #else
        1. { lock_guard<mutex>
        2. __atomic_fetch_add }
        3. __condvar::notify_all
        #endif

__atomic_fetch_add 是为了改变 __platform_wait_t* 指向内容的值,这是使用在用户空间实现等待的系统调用(如 futex)所需要的。__platform_wait_t* 不是原子变量本身,所以原子变量存储的值不会因此变化。

在创建 __waiter 的时候,其基类 __waiter_base 的构造函数会通过 _S_for 函数找到对应 __waiter_pool_base 的地址。这又是 libstdc++ 经典的哈希 + 池化策略,和 libstdc++ 原子操作自由函数 的实现类似,这样做可能会产生更多的假性唤醒。

static __waiter_pool_base&
_S_for(const void* __addr) noexcept
{
    constexpr uintptr_t __ct = 16;
    static __waiter_pool_base __w[__ct];
    auto __key = (uintptr_t(__addr) >> 2) % __ct;
    return __w[__key];
}

总结

在用户空间等待(比如 Linux 上的 futex 或者 Darwin 上的 ulock)是原子变量的 wait 操作高效的主要原因。此外,在自旋时适当 relax(翻译成 i386/x86 的 pause 指令)和 yield(让出时间片)也使得操作高效。如果平台没有实现等待功能,那么原子等待还是会使用 mutex 和条件变量。

wait 操作使得线程阻塞等待条件改变,因此我认为它不属于 lock-free 范畴。它将不同操作系统提供的类似 futex 的概念以统一的抽象暴露给了用户。