0%

Wait 和 notify 接口介绍

std::atomic<T>::wait 用来等待原子变量值的改变,如果原子变量值和给定的参数 old 相同则阻塞,直到被 notify_all() 或者 notify_one() 通知,或者自发地解除阻塞。因此用 wait 要在循环中使用。

// std::atomic<T>::wait
void wait( T old, std::memory_order order = std::memory_order_seq_cst ) const noexcept;
void wait( T old, std::memory_order order = std::memory_order_seq_cst ) const volatile noexcept;

Cppreference 上说这个接口通常比轮询和自旋锁更高效。看了一下 libstdc++ 的实现,实际上内部也是自旋锁,只是比我们在外面用自旋锁要高效一些。实现在 libstdc++-v3/include/bits/atomic_wait.h 文件中

wait 的实现

调用链路(同一缩进下函数的调用不区分顺序,不表示前者在后者之前调用):

__atomic_wait_address_v
    __detail::__waiter<std::true_type>::_M_do_wait_v
        __detail::__waiter_pool::_M_do_spin_v 🔍
            // do-while loop of:
            __detail::__waiter_pool::_M_do_wait 🔍
            // and
            __detail::_S_do_spin_v
                __detail::__atomic_spin 🔍

执行策略(C++17)

std::execution::seq
std::execution::par
std::execution::par_unseq
std::execution::unseq (C++20)

它们分别属于以下类型,但是使用的时候不要自己创建类型,应该直接使用标准库中提供的执行策略对象。

std::execution::sequenced_policy
std::execution::parallel_policy
std::execution::parallel_unsequenced_policy
std::execution::unsequenced_policy (C++20)

其中的并行策略仅仅是允许算法这样做,但不能强制算法按要求做。

Note that this is permission, not a requirement—the library may still execute the code on a single thread if it wishes.

Unwanted blocking

  • Deadlock
  • Livelock,和死锁的区别是在循环中积极检查条件,比如自旋锁,线程一直消耗 CPU 但始终无法前进
  • Blocking on I/O or other external input,线程在等待一个不定期的、可能永远不会到来的操作

Race conditions

  • Data races
  • Broken invariants
  • Lifetime issues

Code review

  1. 在并发访问下,哪些数据需要保护
  2. 如何确保数据受到保护?
  3. 此时其他线程可能位于代码的哪个部分?
  4. 当前线程持有哪些互斥锁?
  5. 其他线程可能持有哪些互斥锁?
  6. 在当前线程中执行的操作与在其他线程中执行的操作之间是否存在顺序要求?如何确保这些要求得到满足?
  7. 当前线程加载的数据是否仍然有效?是否可能已被其他线程修改?(比如 CAS、双重校验锁)
  8. 如果假设其他线程可能正在修改数据,这意味着什么?如何确保这种情况永远不会发生?

Testing

书 P345 给出了一些对线程安全队列进行测试的测试点。

线程池

实现可以 submit 任务并获取 future 的线程池

有了 std::future 就能对提交的任务做等待。

线程池初始化时就创建指定数量的工作线程,每个线程的任务就是在循环中从线程安全队列上获取任务并运行。每个任务的类型是 std::packaged_task<result_type()> task,每次有工作要提交都会包装到 std::packaged_task,工作的提交者因而可以获取 std::future

由于 C++23 才引入 std::move_only_function<>,书上实现了一个简单的替代。

修复 quicksort 工作线程的死锁

书上给了一个 quicksort 的例子,指出划分完成时,本线程先递归做完 new_higher 段的工作,再去等待 new_lower 段的工作的 future。这个实现中线程池中工作线程数量是固定的,如果所有工作线程都需要等待 future,那么就没有剩余的工作线程去真正推进任务了!有限线程数 + 线程等待尚未排队的任务 = 死锁。

解决方案是让线程在等待期间不要休眠,而是主动处理其他任务:

False sharing

  • Cache ping-pong

std::hardware_destructive_interference_size

还有 std::hardware_constructive_interference_size。在大多数情况下这两个值相等,而且都等于 cache line 的大小。

https://stackoverflow.com/questions/39680206/understanding-stdhardware-destructive-interference-size-and-stdhardware-cons 第一个回答的评论里面指出如果目标平台有多个,那么为了兼容性,这两个常量可能是不同的。在特定架构上这两个常量应该是相同的。

例子:矩阵计算

  1. 每个线程计算一组列。列可以先加载到当前线程,每次都读一整行,对 cache 友好。
  2. 每个线程计算一组行。相比于方案 1,读的 false sharing 更多,但是写操作的 false sharing 会更少。
  3. 分块矩阵。访问输入矩阵元素的数量减少了,同时保持计算出来的元素数相同,因而效率更高。

引言

这篇笔记是承接 CppCon 2023 Lock-free Atomic Shared Pointers Without a Split Reference Count内存模型基础、标准原子类型、自旋锁 来写的。

C++20 有 std::atomic<std::shared_ptr>std::atomic<std::weak_ptr> 的偏特化,之前连这两个偏特化都没有,因而会编译错误(std::atomic requires a trivially copyable type),只能使用对共享指针提供的原子操作自由函数std::atomic_*)。但是这样的类型并不是无锁的,可以通过 is_lock_free() 的返回值看出来,见 https://godbolt.org/z/b5P84jM9f 。根据 Daniel1 的幻灯片,MSVC 和 libstdc++ 中这两个类型都是有锁;根据我的查证,libc++ 截至 2025 年 1 月 5 日还没有实现这两个偏特化。

既然知道有锁了,libstdc++ 是怎么实现共享指针原子变量的线程安全访问的呢?

原子操作自由函数

C++20 之前原子操作自由函数对 std::shared_ptr<T> 的支持可见 atomic_load_explicit_Sp_locker它对指针值做 hash,得到互斥量池里的一个下标,从而获取一个 __gnu_cxx::__mutex 的引用。可以参考 以下代码

// shared_ptr.cc
#include "mutex_pool.h"

namespace __gnu_internal _GLIBCXX_VISIBILITY(hidden)
{
  /* Returns different instances of __mutex depending on the passed index
   * in order to limit contention.
   */
  __gnu_cxx::__mutex&
  get_mutex(unsigned char i)
  {
#ifdef _GLIBCXX_CAN_ALIGNAS_DESTRUCTIVE_SIZE
    // Increase alignment to put each lock on a separate cache line.
    struct alignas(__GCC_DESTRUCTIVE_SIZE) M : __gnu_cxx::__mutex { };
#else
    using M = __gnu_cxx::__mutex;
#endif
    // Use a static buffer, so that the mutexes are not destructed
    // before potential users (or at all)
    static __attribute__ ((aligned(__alignof__(M))))
      char buffer[(sizeof (M)) * (mask + 1)];
    static M *m = new (buffer) M[mask + 1];
    return m[i];
  }
}

非阻塞数据结构的分类

书上给出了一个用 std::atomic_flag 实现自旋锁的代码片段:

class spinlock_mutex {
    std::atomic_flag flag;

public:
    spinlock_mutex() : flag(ATOMIC_FLAG_INIT) {}

    void lock() {
        while (flag.test_and_set(std::memory_order_acquire));
    }

    void unlock() {
        flag.clear(std::memory_order_release);
    }
};

自旋锁是一个 nonblocking 的例子(书上认为它没有任何阻塞调用,因此是非阻塞的),但是却不是 lock-free 的。

书上给出了下面几类数据结构:

  1. Obstruction-free:如果其他线程都被暂停了,那么任何一个选定的线程都能在一定步数内完成操作。
  2. Lock-free:多个线程同时在进行操作,有一个线程能在一定步数内完成操作。常见的范式是在循环中进行 CAS。可能有饥饿发生
  3. Wait-free:多个线程同时在进行操作,每个线程都能在一定步数内完成操作。

引入部分

设计并发数据结构需要考虑两类问题:

  1. 如何保护数据?
  2. 如何实现真正的并发?

本章实现线程安全的(1 个全局互斥量)、队列(2 个全局互斥量,头尾各用一个)、哈希表(每个 slot 一个互斥量)、链表(每个结点一个互斥量)。

6.2.1-6.2.2 用互斥锁保护 std::stackstd::queue

首先书上给了实现线程安全的栈和队列的例子。它们都是用 std::mutex 来保护对应的数据结构的。

在实现队列的时候,为了方便生产者 - 消费者问题的处理,添加了对条件变量的使用。条件变量的使用给异常安全带来了一个新的问题:

C++ 内存模型概念

书上着重介绍了 synchronizes-with 和 happens-with 两个关系(relationship)。

Synchronizes-with

The synchronizes-with relationship is something that you can get only between operations on atomic types.

书上还说:一些操作的内部实现会使用原子操作(比如对 mutex 上锁),因此也能达到同步关系,但是同步关系从根本上是由原子操作提供的。

Synchronizes-with 关系提供一种跨线程的 happens-before 关系。

Happens-before

Sequenced-before(单线程)

单个线程中排在前面的操作发生在排在后面的操作之前(比如前一个语句和后一个语句),而且是 strongly happens before

GCC7 不再认为 x86 上的 16 字节原子变量无锁

原因:GCC7 开始在 std::atomic<T>::is_always_lock_free 不满足时会调用 libatomic 的 __atomic_is_lock_free() 函数,但是 libatomic 对无锁的内涵理解不同。

https://godbolt.org/z/nc34E716Y 这里表明 gcc7 处理 is_lock_free 的行为发生了变化,即便有了 -mcx16 编译选项,也不认为 16 字节原子变量是无锁的,gcc6 则认为 16 字节原子变量在对齐满足时是无锁的(无论 -mcx16 标志是否存在)。Clang 则是在有无 -mcx16 选项时呈现出不同的结果。GitHub 上有个相关的 讨论 。简单来说是 gcc7 之后不会直接通过当前编译架构来直接决定一个类型是否无锁(clang 会),而是将这个逻辑转移到对 libatomic 的函数的调用上,而 libatomic 认为 16 字节原子变量在 x86 上不算无锁。

GCC 的 libatomic 对是否无锁的判断可见 glfree.c 。函数签名为 bool libat_is_lock_free (size_t n, void *ptr)。对于 2 的幂(case 0/1/2/4/8/16先判断是否其大小是否可以做到无锁(稍后介绍),如果不行则放到一个更大的范围内去判断,在更大的范围内判断时要考虑其所占空间是否能刚好被更大的无锁原子类型覆盖。比如:

假设 8 字节的对齐访问是无锁的,那么以下 5 个字节的访问是无锁的:
| . . . . . . . . |
        * * * * *     ✔
这也是无锁的:
| . . . . . . . . |
      * * * * *       ✔
这是有锁的:
| . . . . . . . . | .
          * * * *   * 🔒

而 libatomic 判断一个大小是否能做到无锁,直接看代码注释就可以(C2 宏将两个 token 拼接起来,得到一个之前定义的宏):