7. 设计无锁并发数据结构
非阻塞数据结构的分类
书上给出了一个用 std::atomic_flag
实现自旋锁的代码片段:
class spinlock_mutex {
std::atomic_flag flag;
public:
spinlock_mutex() : flag(ATOMIC_FLAG_INIT) {}
void lock() {
while (flag.test_and_set(std::memory_order_acquire));
}
void unlock() {
flag.clear(std::memory_order_release);
}
};
自旋锁是一个 nonblocking 的例子(书上认为它没有任何阻塞调用,因此是非阻塞的),但是却不是 lock-free 的。
书上给出了下面几类数据结构:
- Obstruction-free:如果其他线程都被暂停了,那么任何一个选定的线程都能在一定步数内完成操作。
- Lock-free:多个线程同时在进行操作,有一个线程能在一定步数内完成操作。常见的范式是在循环中进行 CAS。可能有饥饿发生。
- Wait-free:多个线程同时在进行操作,每个线程都能在一定步数内完成操作。
https://en.wikipedia.org/wiki/Non-blocking_algorithm 给出了更精炼的描述:
A non-blocking algorithm is lock-free if there is guaranteed system-wide progress, and wait-free if there is also guaranteed per-thread progress.
根据以上信息,用户空间下的自旋锁也不是 obstruction-free 的。2025 年 2 月 7 日:如果没有上下文切换,受自旋锁保护的操作又能在有上界的时间内完成,那么自旋锁当然是 lock-free 的(比如在内核中)。
由于 obstruction-free 的应用场景有限,而 wait-free 的算法相当难写,所以 lock-free 的算法比较常见。Lock-free 的算法不仅有并发优势,而且更鲁棒,不会出现持有锁的线程异常终止,导致其他线程无法继续前进的情况。Lock-free 算法可能会出现活锁(live lock),但是这个过程是相对来说比较短暂的。根据百度百科,活锁可以认为是一种特殊的饥饿。
无锁的线程安全栈
push()
和 pop()
操作
template<typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data;
node* next;
node(T const& data_):
data(std::make_shared<T>(data_))
{}
};
std::atomic<node*> head;
public:
void push(T const& data)
{
node* const new_node = new node(data);
new_node->next = head.load();
while (!head.compare_exchange_weak(new_node->next, new_node));
}
std::shared_ptr<T> pop()
{
node* old_head = head.load();
while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
// 这里 old_head->data 其实可以加上 std::move
return old_head ? old_head->data : std::shared_ptr<T>();
}
};
data
选择用 std::shared_ptr<T>
来存储而不是 T
类型,是因为如果 pop()
的接口为 pop(T &)
,那么在取下结点,最后一步赋值的时候如果抛出异常,那么数据结构无法恢复(因为结点已经取下来了)。正因此 pop()
的接口被设计为无参但返回 std::shared_ptr<T>
。
push()
操作:
- Create a new node.
- Set its
next
pointer to the currenthead
node. - Set the
head
node to point to it.
第 2 步和第 3 步刚好可以用一个 CAS 完成。如果失败的话 CAS 会将最新的 head
值加载到给定的 expected
参数中,所以 while
循环的循环体不需要清理代码。
pop()
操作:
- Read the current value of head.
- Read
head->next
. - Set
head
tohead->next
. - Return the data from the retrieved node.
- Delete the retrieved node.
第 5 步的问题比较大,假设线程 A 和 B 都在执行 pop()
操作,线程 A 刚删除完旧 head
,但线程 B 恰好进行到了第 2 步,已经读到了旧 head
,正要去读这个被删除的 head
结点的 next
字段,这就会导致内存非法访问。上面给出的代码是用内存泄漏来规避内存非法访问。push()
操作不会影响 pop()
的安全性,因为 push()
操作只会读 head
的地址,不会读 head
里面的内容。
如果是垃圾回收语言,这里就已经把代码写完了。
自行实现垃圾回收
回收链表 + pop()
访问计数
上一小节中我们可以看到 push()
操作没有问题,pop()
操作有内存泄漏的问题。pop()
操作内存泄漏是因为我们无法安全删除当前的结点。我们可以把当前的结点暂时加入到另外一个(回收)链表中以防止内存泄漏,这个回收链表和工作链表的结构相同,都是 std::atomic<node *>
,因为用的是 push()
操作,所以也没有问题。
事实上,push()
操作对于一段链表(不仅仅是单个结点)也是安全的,书上实现了这样的函数:
// first -> ... -> last -> nullptr
void chain_pending_nodes(node* first, node* last) {
last->next = to_be_deleted;
while (!to_be_deleted.compare_exchange_weak(last->next, first));
}
// 工具函数,找到链表的尾结点并调用前一个重载
void chain_pending_nodes(node* nodes) {
node* last = nodes;
while (node* const next = last->next) {
last = next;
}
chain_pending_nodes(nodes, last);
}
在有了回收链表中的逻辑后,pop()
变成:
std::shared_ptr<T> pop()
{
node* old_head = head.load();
while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
auto ret = old_head ? std::move(old_head->data) : std::shared_ptr<T>();
try_reclaim(old_head); // 尝试回收
return ret;
}
其中 try_reclaim()
尝试对 old_head
进行清理,如果不能安全清理,则将其暂时放到回收链表中。
什么时候才能对回收链表进行清理呢?为了确认什么时候我们能安全清理回收链表中的结点,我们需要增加一个计数变量 threads_in_pop
,类型是 std::atomic<unsigned>
,用来追踪在 pop()
函数中的线程数。该变量在 pop()
进入时计数 +1,退出时计数 -1,计数减少的逻辑应该放在 try_reclaim
中做。现在 pop()
变成:
std::shared_ptr<T> pop()
{
++threads_in_pop;
node* old_head = head.load();
while (old_head && !head.compare_exchange_weak(old_head, old_head->next));
auto ret = old_head ? std::move(old_head->data) : std::shared_ptr<T>();
try_reclaim(old_head); // 尝试回收,--threads_in_pop 在这里面做
return ret;
}
怎么实现 try_reclaim()
呢?先检查计数的当前值,如果计数为 1,说明当前只有本线程在 pop()
中,刚取下来的结点可以直接删除,并且可以考虑看看回收链表中的其他结点是不是可以安全删除的。否则,当前结点不能安全删除,将其放到回收链表中去。
void try_reclaim(node* old_head) {
if (threads_in_pop == 1) {
node* nodes_to_delete = to_be_deleted.exchange(nullptr);
if (!--threads_in_pop) {
// 这里可以安全清理结点:
// 计数归零要么是其他线程没来过,要么来了又走了。
// 无论如何,只要当前在 pop() 中的线程只有本线程,则说明可以安全回收。
// 这里的情形和双重校验锁有点相似,需要检查两次!
// delete_nodes 实现很简单,就是对(可能为空的)链表一直删到底。
delete_nodes(nodes_to_delete);
} else if (nodes_to_delete) {
// 这里清理结点不安全,需要将结点放回去:
// threads_in_pop 减少后仍不为 0,这说明有新的线程进 pop() 了。
// 其他线程可能在 if 判断结束后、exchange 之前把新结点加入到了回收链表中,
// 这些结点可能还会被其他线程访问,因此删除不安全。
chain_pending_nodes(nodes_to_delete);
}
// threads_in_pop == 1 成立,表明当前结点可以安全删除。
// delete 语句放到 if 的下一行会更好理解,但 delete 比较耗时,我们希望能抓住
// threads_in_pop == 1 这个条件还成立的机会,所以把 delete 放在了最后。
delete old_head;
} else {
chain_pending_node(old_head); // 不能安全删除,放到回收链表中。
--threads_in_pop;
}
}
这样还是有点问题:如果总是有两个及以上的线程执行 pop()
操作,那么不会有结点被回收!
Hazard pointers
思路
只用一个 pop()
访问计数的问题是信息太少,每个线程都没有足够的信息来确定是否可以对回收链表安全回收。Hazard pointers 的思路是:每个线程都有一个原子变量来表示当前正在访问的指针,而且这个指针是对全局可见的,一种实现方案是有一个指针原子变量的全局数组,而每个线程的原子变量是其中的一个元素。如果某个线程在使用完一个结点,尝试删除之前,发现其他线程都没有正在访问要删除的这个指针,则说明这个指针是可以删除的,否则将其 push()
到回收链表。除了检查当前指针之外,在 pop()
退出之前还要检查回收链表的每个结点是否可以删除,逻辑和检查当前结点是一样的。因此,pop()
在清理阶段的时间复杂度为 $O(最大工作线程数 \times 回收链表长度)$。由于 $回收链表长度 \ge 最大工作线程数$ 时一定有结点可以回收,所以这个时间复杂度可以改写为 $O(最大工作线程数^2)$。
为了让每次搜索全局链表都能找到可以回收的结点,可以仅在 $回收链表长度 \ge 最大工作线程数 \times 2$ 时进行回收,这样至少可以回收一半的结点,这是一种空间换时间的思路。在这种优化下,每次内存回收的复杂度依然不会改变。为了回收和最大工作线程数等量的结点,需要扫描的复杂度为 $O(最大工作线程数^2)$,那么每次 pop()
的均摊复杂度为 $O(最大工作线程数)$。
全局回收链表上的访问是原子的,因而开销比较大。可以对每个工作线程准备一个 thread_local
回收链表。工作线程只负责回收本地链表中的结点。如果线程即将退出,那么本地链表应该合并到全局链表中去,这一点可以由 RAII 保证。对应地,如果一个线程发现全局链表非空,也要将全局链表并入本地链表。
实现
pop()
函数:
std::shared_ptr<T> pop()
{
std::atomic<void*>& hp = get_hazard_pointer_for_current_thread();
node* old_head = head.load();
do {
node* temp;
do {
temp = old_head;
hp.store(old_head);
old_head = head.load();
} while (old_head != temp);
// 1. 内外双循环的必要性:
// 外循环的 while 语句要访问 old_head->next,因此 old_head 必须是有效指针。
// 内循环的 while 判断 old_head != temp 是为了保证存储 hp 的过程中 head 没有变化。
// 一旦 hp 成功存储,这个指针就被保护了,old_head 也就不会被别的线程删除。
// 2. 在循环中一般会用 compare_exchange_weak,但是这里用 compare_exchange_strong
// 是不想让假性失败导致高开销的重试(在循环中有几个原子操作)。
} while (old_head && !head.compare_exchange_strong(old_head, old_head->next));
// 上面是在有保护的情况下取走 head
// -------------------------------
// 下面是尝试回收当前指针和回收链表中的其他指针
hp.store(nullptr);
std::shared_ptr<T> res;
if (old_head) {
res.swap(old_head->data);
// outstanding_hazard_pointers_for 遍历全局记录,对每个记录简单调用 load() 来检查
// 是否 old_head 还在使用中。如果扫过去一遍没有找到,则 old_head 不可能被使用。这是因为
// 本线程已经把 old_head 取下来了,之前没有正在使用它的线程不可能再访问到它了。
// 时间复杂度:O(最大工作线程数)
if (outstanding_hazard_pointers_for(old_head)) {
// 将该结点加入待删除列表。
// 书上是将该结点再次封装到一个新的类型中,该类型在析构函数中会将结点指针转换回原来的
// T* 类型,然后调用 delete 操作符。这个逻辑也可以手动来写。
reclaim_later(old_head);
} else {
delete old_head;
}
// 将回收链表头和 nullptr 交换,然后遍历每个结点,对每个结点调用
// outstanding_hazard_pointers_for,能回收就回收,不能回收就将结点重新放到回收链表中。
// 时间复杂度:O(最大工作线程数 * 回收队列长度) = O(最大工作线程数^2)
delete_nodes_with_no_hazards();
}
return res;
}
Tip
书上这句话好像是在说只要 CAS 循环里面有实质工作就选 strong 版本,如果只是失败后不断尝试但是循环内部是空的就选 weak 版本:
You’re using
compare_exchange_strong()
here because you’re doing work inside the while loop: a spurious failureon compare_exchange_weak()
would result in resetting the hazard pointer unnecessarily.
为了让每个线程的风险指针对其他线程可见,可以使用一个全局风险指针记录数组,每个线程的风险指针则是其中的元素。具体而言,全局风险指针记录数组的每个元素不是简单的指针,而是是一个带有线程 id 原子变量和指针原子变量的结构,线程 id 默认初始化表示这个位置没有分配给任何线程。全局风险指针记录数组的大小也是固定分配的。get_hazard_pointer_for_current_thread()
直接返回一个 thread_local
变量中的指针部分,该变量在初始化时从全局风险指针记录数组上搜索一个空的位置分配给当前线程。由于获取风险指针的函数返回了 thread_local
变量,因此尽管分配指针的操作很耗时,但每个线程只需要分配一次。在工作线程退出时,相关析构函数会将对应全局记录清理。
unsigned const max_hazard_pointers=100;
struct hazard_pointer
{
std::atomic<std::thread::id> id;
std::atomic<void*> pointer; // 只关心指针地址,这里的 pointer 隐去了类型
};
hazard_pointer hazard_pointers[max_hazard_pointers];
class hp_owner
{
hazard_pointer* hp;
public:
hp_owner(hp_owner const&) = delete;
hp_owner operator=(hp_owner const&) = delete;
hp_owner() : hp(nullptr)
{
for (unsigned i = 0; i < max_hazard_pointers; ++i) {
std::thread::id old_id;
if (hazard_pointers[i].id.compare_exchange_strong(
old_id, std::this_thread::get_id()))
{
hp = &hazard_pointers[i];
break;
}
}
if (!hp) {
throw std::runtime_error("No hazard pointers available");
}
}
std::atomic<void*>& get_pointer()
{
return hp->pointer;
}
~hp_owner()
{
hp->pointer.store(nullptr);
hp->id.store(std::thread::id());
}
};
std::atomic<void*>& get_hazard_pointer_for_current_thread()
{
thread_local static hp_owner hazard;
return hazard.get_pointer();
}
delete_nodes_with_no_hazards()
我觉得可以继续优化,每次都把不可回收的结点直接放回回收链表,会有大量的原子操作,有点慢。可以将不能回收的结点组织成链,再对整个链表只做一次 while (xx.compare_exchange_weak(...))
并入回收链表中。
引用计数
std::shared_ptr<T>
上的原子操作
基于访问计数的回收算法只能在没有其他线程访问 pop()
时回收结点,导致结点堆积,在重负荷下仍然有内存泄漏。Hazard pointers 解决了这个问题。还有没有别的思路呢?可以想到原子引用计数的实现逻辑更简单。
成书时间是在 C++20 之前,没有 std::atomic<shared_ptr<T>>
,因此书上是在 std::shared_ptr<T>
上做 std::atomic_load()
操作。书 P227 给了一个可供参考的实现。
我个人认为上图代码再获取 old_head
后立即将 old_head->next
置空只是一个取下结点的好习惯,实际上从 pop()
中返回时,old_head
会析构,old_head->next
也会自动被析构。
然后书介绍起了 <experimental/atomic>
头文件,它提供一个新的模板类 std::experimental::atomic_shared_ptr<T>
。C++20 已经将 std::atomic<std::shared_ptr<T>>
偏特化加入标准了。
不过 std::atomic<std::shared_ptr<T>>
没有无锁保证!即便是机器支持 CX16,共享指针也不是一个平凡类型,它在拷贝、赋值、析构的时候要修改控制块的引用计数,导致其对应原子类型很难实现无锁。在 libstdc++ 的实现中,无论是 std::atomic_load()
对共享指针的特化,还是 std::atomic<std::shared_ptr<T>>
,均是有锁的。
分裂引用计数
这个应该是对应 CppCon 2023 Lock-free Atomic Shared Pointers Without a Split Reference Count 的 split reference-counting。
- 和共享指针上的原子操作相比,分裂引用计数的计数值是实现者维护的,而不是在拷贝、赋值、析构时自动维护的。因此如果硬件支持相应大小的 CAS 指令,就可以写成无锁代码。
- 和手动维护的单个内部引用计数相比,分裂引用计数在开始结点访问时会先把计数值加到外部计数上,防止直接操作内部计数时遭遇结点被删除的未定义行为。
分裂引用计数结构的外围包含一个外部计数和一个指针,这个指针指向的结构包含数据本身和一个内部计数。一个指针加上一个外部计数也超过了机器字长,所以无锁操作同样也不能在所有平台上面保证。如果硬件支持 CX16,那么分裂引用计数就是无锁的。否则,如果是在 x86 上我们还能利用这样的硬件特性:虽然指针是 64 位,但实际有用的虚拟地址位数可能只有 48 位,剩下的 16 位可以用来存储引用计数,缺点是计数范围受限。其他硬件上也有类似的特性可以挖掘。
Tip
书上开始介绍分裂引用计数这个方法就是因为共享指针原子变量的无锁性质不能得到保证,但是分裂引用计数本身也不一定是无锁的。不过,因为分裂引用计数结构是平凡类型,只要硬件支持对该大小的变量进行原子操作就可以无锁。相当于是放宽了要求。
Tip
前面说到了 x86-64 处理器上虚拟地址最大是 48 位。但是这个说法并不完全准确。根据 wiki.osdev.org/Paging,Intel 提供了扩展到 57 位虚拟地址、5 级页表的扩展支持。4 级页表的结构是 9(顶级 / 一级)-9(二级)-9(三级)-9(四级)-12(4K 页面页内地址),5 级页表很容易想到是在顶层再次扩展了 9 位。如果使用了依赖 48 位虚拟地址的压缩指针,那么这样的代码是在 5 级页表下不可移植的。
扩展页表支持只能在非 64 位下模式设置,也就意味着开机后不能修改。在 Windows Server 上面的修改可以参考 https://lenovopress.lenovo.com/lp1911.pdf 。
X86 不同硬件配置下的页表级数参考 https://unix.stackexchange.com/a/379238/ 。
Tip
物理地址的长度和虚拟地址不一样,不同硬件的物理地址长度可能也不一样。我的 Intel 笔记本上物理地址占 46 位,而实验室服务器上 CPU 为 AMD,物理地址占 43 位。 看了其他几台 Intel 的服务器,物理地址也是 46 位。
实现思路是这样:
template<typename T>
class lock_free_stack
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node
{
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
};
std::atomic<counted_node_ptr> head; // 不一定无锁
public:
~lock_free_stack()
{
while(pop());
}
void push(T const& data)
{
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
new_node.ptr->next = head.load();
while(!head.compare_exchange_weak(new_node.ptr->next, new_node));
}
};
在 pop()
的时候要考虑保护读出来的 head
,将其外部引用计数加 1,增加外部引用计数的过程要用 CAS 重试直到保证“读到 head,且增加它的引用计数”这个操作是一个不可打断的整体。
template<typename T>
class lock_free_stack
{
private:
// other parts as in listing 7.11
void increase_head_count(counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter = old_counter;
++new_counter.external_count;
}
// 在将增加了外部计数的 counter 放回 head 之前要先检查 head 是否改变。
// 如果失败了那么 head 的当前值写回到 old_counter 中,再次重试直到成功。
while (!head.compare_exchange_strong(old_counter, new_counter));
old_counter.external_count = new_counter.external_count;
}
public:
std::shared_ptr<T> pop()
{
counted_node_ptr old_head = head.load();
for (;;)
{
increase_head_count(old_head);
node* const ptr = old_head.ptr;
if (!ptr)
{
return std::shared_ptr<T>();
}
// 如果失败了,head 的当前值会更新到 old_head 当中,
// 这就是为什么 for 循环内部不需要加载 head 值。
if (head.compare_exchange_strong(old_head, ptr->next))
{
std::shared_ptr<T> res;
res.swap(ptr->data);
// 修改计数,检查是否需要回收结点。
// 内部计数和外部计数其实是一个整体,它们的总和表示真实引用,这里的
// internal_count + external_count + offset 应该等于 0。为什么
// 这个 offset 要取 -2?因为外部计数初始值是 1 表明链表对结点的引用,
// 另外此线程在进入循环时就将外部计数加 1 了,表明 pop() 过程对结点的
// 引用,现在要把这两部分都减掉。
int const count_increase = old_head.external_count - 2;
if (ptr->internal_count.fetch_add(count_increase) == -count_increase)
{
delete ptr;
}
return res;
}
// 如果失败,说明这个 head 被其他结点取下了,开始新的循环重新取一个结点。
// 不过因为本节点和其他节点读到了同样的 head,可能导致这个 head 没有被
// 正常回收,所以这里减少内部计数看看结点是否需要回收。
else if (ptr->internal_count.fetch_sub(1) == 1)
{
delete ptr;
}
}
}
};
关于分裂引用计数如何等效于单个引用计数,可以参考 https://stackoverflow.com/a/68963769/ :
We can think about this as two phases: in the first phase our counter is split (into external_count and internal_count) and in the second phase our counter is merged into a single variable - internal_count. In the first phase we need to add external_count to internal_count to get real counter value. In the second phase we have to use internal_count only, because external_count has some old value that does not make any sense now (as the book says, now “external counter is discarded”).
以分裂引用计数为例,考虑内存序要求
之前的代码都使用了默认的、也是最严格的 std::memory_order_seq_cst
内存序。
考虑分裂引用计数的 push()
实现:
void push(T const& data) {
counted_node_ptr new_node;
new_node.ptr = new node(data);
new_node.external_count = 1;
new_node.ptr->next = head.load(std::memory_order_relaxed); // A
while (!head.compare_exchange_weak(new_node.ptr->next, new_node,
std::memory_order_release, // B
std::memory_order_relaxed));// C
}
- A 是在加载
head
地址,我们只需要知道这个值是什么,并不通过这个值去做计算或者访问成员,也不需要其他同步保证,所以用 relaxed 语义即可。 - B 是将
new_node
写入head
,我们希望这个值能被其他线程可见(达成同步),所以用 release 语义。 - C 是在 CAS 失败的时候将最新的节点值加载到
new_node.ptr->next
中以便下次尝试。这个值只有本线程能访问,不需要同步,所以用 relaxed 语义。 - 注意这里 CAS 循环体是空的,所以用
compare_exchange_weak
可能比 strong 版本的更高效。
pop()
中的 increase_head_count()
函数:
void increase_head_count(counted_node_ptr & old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count;
} while (!head.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire, // A
std::memory_order_relaxed)); // B
old_counter.external_count = new_counter.external_count;
}
- A 是一个 Read-Modify-Write 操作,不能简单地认为它是 store 操作。这里想和
push()
操作同步(因为后面要读指针里面的内容,即通过将head
换成head.ptr->next
来实现取下结点),所以用 acquire 语义。 - B 用 relaxed 的原因同
push()
。
下面是 pop()
的其他部分:
std::shared_ptr<T> pop()
{
counted_node_ptr old_head = head.load(std::memory_order_relaxed);
for(;;)
{
increase_head_count(old_head);
node* const ptr = old_head.ptr;
if (!ptr)
{
return std::shared_ptr<T>();
}
if (head.compare_exchange_strong(old_head, ptr->next,
std::memory_order_relaxed)) // A
{
std::shared_ptr<T> res;
res.swap(ptr->data);
int const count_increase = old_head.external_count - 2;
if(ptr->internal_count.fetch_add(count_increase,
std::memory_order_release) == -count_increase) // B
{
delete ptr;
}
return res;
}
else if (ptr->internal_count.fetch_add(-1,
std::memory_order_relaxed) == 1) // C
{
ptr->internal_count.load(std::memory_order_acquire); // D
delete ptr;
}
}
}
- A 在
compare_exchange_strong
成功后代码会访问ptr->internal_count
,这个是push()
操作放进去的。但是我们在increase_head_count()
操作中已经做过和push()
的同步了,所以这里用 relaxed 就行。 - 如果 A 这一步的 CAS 失败了,除去
push()
之外,只可能是其他线程的pop()
成功了。成功时我们还需要进行swap
操作,要读取ptr->data
,希望这个操作排在其他线程删除这个指针前面(如果不是本线程删除这个指针)。因此 B 在修改内部引用计数的时候做 release 操作来做同步。 - 承接 2,如果 CAS 失败,我们在删除指针之前要做好同步。可以在 C 上面选择 release 语义。
- 在 C 上面选择 release 内存序是 overkill。因为只有最多一个线程能成功进入 C 处的
if
真正调用delete
,对每个线程都用 release 内存序是不必要的。因此 C 可以改成 relaxed 内存序,并且在删除指针之前额外增加一个 acquire 内存序的读取操作来达到同步效果。
这里 2~4 的同步过程是为了保证指令的执行效果看上去是有序的。设想没有做同步,可能会出现这种情况($t_0 < t_1 < t_2$):
// 线程 1
if (head.compare_exchange_strong(old_head, ptr->next,
std::memory_order_relaxed))
{
int const count_increase = old_head.external_count - 2;
if(ptr->internal_count.fetch_add(count_increase,
std::memory_order_release) == -count_increase) // t0
{
delete ptr; // 还有其他线程在使用,因此没进来
}
std::shared_ptr<T> res;
res.swap(ptr->data); // t2: ptr->data 和上面的指令没有相关性,因此可以重排
return res;
}
// 线程 2
else if (ptr->internal_count.fetch_add(-1,
std::memory_order_relaxed) == 1)
{
// ptr->internal_count.load(std::memory_order_acquire); // 假设没有做同步
// 同步是需要两个线程配合的,这里注释掉一个就能破坏同步
delete ptr; // t1
}
比较这几种处理内存安全问题的方式
pop()
简单访问计数:有结点堆积的可能性,导致内存永远无法回收。- Hazard pointers:总是能无锁。
- 共享指针上的原子操作:实现简单,但是受限于标准库实现,在大多数实现上做不到无锁。
- 分裂引用计数:有 CX16 则能无锁,设计成压缩指针也能无锁。
最后,书上还提到:
As we move on to look at writing a lock-free queue, you’ll see a similar pattern: lots of the complexity in lock-free code comes from managing memory.
无锁的线程安全队列
这一节在分裂引用计数的无锁栈基础上来考虑怎么实现无锁队列,很多地方可以做对比。
新增 tail
结点
队列更复杂的地方是既有 head
又有 tail
(栈只有 head
)。继续沿用分裂引用计数的思路,并且把 head
和 tail
都使用原子变量来存储。
重新考虑结点计数值的初始化:
- 无锁栈外部计数器被初始化为 1(被
head
引用),无锁队列外部计数器被初始化为 2(被head
和tail
引用)。被head
引用表示还在链表上,被tail
引用表示是尾结点,每个结点刚被加入时都是尾结点。 - 无锁栈的内部计数器被初始化为 0,这一点在无锁队列上保持不变。
外部计数器的计数器
在当前的实现中,某个结点可能同时被 head
和 tail
引用,因此代码引入了一个外部计数器的计数器,称为 external_counters
。这个计数器和内部计数放在一起,用位域共享一个 int
类型字段的空间。
为什么需要外部计数器的计数器?
在无锁栈的实现中,每次 pop()
操作失败后都会对内部计数器减 1。当内部计数器减到 0 时(也就是减 1 后的值为 0),意味着可以安全地删除该结点。这要求删除结点的操作必须观察到内部计数为 1,而这只能通过回收唯一的外部计数器并将其值累加到内部计数器来实现,因为其他操作只会减小内部计数器。但是,在无锁队列中,由于 head
和 tail
都可能对同一个结点保留外部引用,因此内部计数器为 1 时只能说明至少有一个外部计数器被回收,但无法确认是否所有外部计数器都已回收。考虑到结点的总计数是由外部计数和内部计数的总和定的,如果还有外部计数器没有加回来,就不能得到真正的计数值。
pop()
pop()
操作的循环重试中,先要增加 head
的外部计数,如果发现队列为空(头结点和尾结点指向同一个结点)或者 CAS 修改 head
失败(有其他线程抢先 pop()
掉了头结点),就要补偿之前增加的外部计数。补偿的方式不是直接撤销外部计数,而是将内部计数减 1。将内部计数减 1 之后,要检查内部计数是否为 0,同时没有其他外部计数器了,如果条件满足则 delete
这个结点。如果队列非空且 CAS 成功,那么结点就从链表上成功取下了,需要将外部计数器数量减 1,并将它的值叠加到结点的内部计数器上,最后取出结点上的数据并返回。
考虑到 pop()
修改 head
为 head->next
之后要能直接取得数据,所以在以下 A 和 B 两种模型中应该选择 A。这可以保持 pop()
简单,尽管会让 push()
的更新变得复杂一点:不能将结点创建好后一次性 CAS 到 next
字段上,而是要分别修改 data
和 next
字段。
P.S. x 表示结点的 data 非空
A) ✔
head head
+---+ +---+ +---+
| | ==> | x |-->| |
+---+ +---+ +---+
tail tail
B) ❌
head head
+---+ +---+ +---+
| | ==> | |-->| x |
+---+ +---+ +---+
tail tail
push()
无锁队列的 push()
操作涉及到将数据放到 tail->data
中:
- 一方面要实现操作的原子化,保证多个线程可以安全访问。之前的无锁栈的例子使用的
data
字段是std::shared_ptr
类型,如果改成std::atomic<std::shared_ptr>
就不能保证无锁。因此需要将其改成std::atomic<T*>
,并在处理过程中和函数的返回值处用std::unique_ptr
来确保内存安全。 - 另一方面要安全访问
tail->data
就要确保tail
不被删掉。因此 push() 在开始访问tail
时要先增加外部计数、结束访问时要减少旧tail
的外部计数。之前无锁栈的push()
操作是不需要访问结点内部内容的,因此不操作计数器。
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr=new node;
new_next.external_count=1;
counted_node_ptr old_tail=tail.load();
for(;;)
{
increase_external_count(tail,old_tail);
T* old_data=nullptr;
if(old_tail.ptr->data.compare_exchange_strong(
old_data,new_data.get())) // A 先尝试把 data 放上去
{
old_tail.ptr->next=new_next; // 放上去之后才修改 next
old_tail=tail.exchange(new_next); // B
free_external_counter(old_tail);
new_data.release();
break;
}
old_tail.ptr->release_ref();
}
}
书上首先给的是一个有忙等待的实现,这会导致队列有锁,如果一个线程 A 操作成功,但是在 B 操作之前时间片用完了,那么其他线程就只能因为 data
字段已经被设置而在 CAS 上不断失败。这个问题会在之后的小节修正。
避免 push()
中的忙等待
上一小节中只有成功存储数据的线程可以操作 tail
,导致其他线程在忙等待。如果其他线程在碰到数据字段已经被写入时,可以自行开辟新的 dummy 结点并将其挂在 next
字段上,然后重试写入 data
,就可以实现无锁。这意味着 push()
还要对 tail->next
指针原子操作,所以 counted_node_ptr
类型需要改成 std::atomic<counted_node_ptr>
类型。(书 P245)
和无锁栈相比,无锁队列又有两个字段被原子化了,结果是:每个字段都是原子变量。
// 无锁栈(只展示了数据成员)
struct node
{
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
};
// 无锁队列(只展示了数据成员)
struct node
{
std::atomic<T*> data; // 原子化
std::atomic<node_counter> count; // int 变成 node_counter,包含新的外部计数器计数
std::atomic<counted_node_ptr> next; // 原子化
};
新的 push()
实现如下:
template<typename T>
class lock_free_queue
{
private:
void set_new_tail(counted_node_ptr &old_tail,
counted_node_ptr const &new_tail)
{
node* const current_tail_ptr = old_tail.ptr;
while (!tail.compare_exchange_weak(old_tail, new_tail) &&
old_tail.ptr == current_tail_ptr);
if (old_tail.ptr == current_tail_ptr)
free_external_counter(old_tail);
else
current_tail_ptr->release_ref();
}
public:
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;)
{
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get()))
{
counted_node_ptr old_next = {0};
if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
{
delete new_next.ptr;
new_next = old_next;
}
// set_new_tail 可能会失败,如果失败说明其他线程成功了。从结果来说 tail
// 一定会被更新,因此 old_tail 不再被 tail 引用。区别是如果是本线程成功
// 本线程就要负责清理外部计数,否则只减小内部计数。
set_new_tail(old_tail, new_next);
new_data.release();
break;
}
else
{
counted_node_ptr old_next = {0};
// 尽管放 data 失败了,还是可以尝试把 next 放上的。
// 是谁放的 next 无所谓,因为稳定后的 tail 结点是 dummy 的,里面没放数据。
if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
{
old_next = new_next;
// 如果已经成功放上了,就申请一个新的结点,保证 new_next.ptr 可以用于
// 下一轮的重试。
new_next.ptr = new node;
}
set_new_tail(old_tail, old_next);
}
}
}
};
这个过程有比较多的 new
和 delete
操作,因此最好是有个专门的内存分配器。
ABA 问题
如果使用了结点回收等机制,就容易出现 ABA 问题。ABA 问题在基于 compare/exchange 的算法中常见:两个指针的值相等,可能实际上是旧结点已经被回收过一次再分配出去了,它们应该是不同的结点却被 CAS 认为值相等。要避免这种问题出现,首先会想到的是在指针仍被使用时不对其进行回收。
除了指针之外,非指针的其他类型也可能会出现这种情况。某个线程在两个时间点看到的值相等,可能是值变化之后又变回去了,并不是一直没有变过。中间有别的线程的干预时,按理来说本线程应该重试,但是在这种情况下本线程没办法知道变量是否被其他线程修改过。通过将值和版本号存储在一起做 CAS,每次操作后都增加版本号,就可以解决 ABA 问题。