6. 设计基于锁的并发数据结构

引入部分

设计并发数据结构需要考虑两类问题:

  1. 如何保护数据?
  2. 如何实现真正的并发?

本章实现线程安全的(1 个全局互斥量)、队列(2 个全局互斥量,头尾各用一个)、哈希表(每个 slot 一个互斥量)、链表(每个结点一个互斥量)。

6.2.1-6.2.2 用互斥锁保护 std::stackstd::queue

首先书上给了实现线程安全的栈和队列的例子。它们都是用 std::mutex 来保护对应的数据结构的。

在实现队列的时候,为了方便生产者 - 消费者问题的处理,添加了对条件变量的使用。条件变量的使用给异常安全带来了一个新的问题:

void threadsafe_queue::push(T new_value) {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(std::move(new_value));
    data_cond.notify_one();
}

std::shared_ptr<T> threadsafe_queue::wait_and_pop() {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk, [this] { return !data_queue.empty(); });
    std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    return res;
}

在有数据被生产出来时,生产线程调用 push() 方法,进而使用 notify_one() 通知了一个消费者线程。如果这个消费者线程在拷贝数据到 std::shared_ptr<T> 时发生了异常,那么其他线程不会收到通知来处理这个新产生的数据。

Note

  1. 不用担心 std::make_shared 本身的异常安全,标准库已经处理好了。
  2. 这里返回共享指针是因为共享指针传给返回值是不会引发异常的,而 T 类型可能会在拷贝 / 移动时发生异常并让刚从队列中取出的数据丢失。详见 3. Sharing data between threads 中实现线程安全栈的案例。

书上提出的解决方案有:

  1. 不用 notify_one() 而是用 notify_all(),但是这样显得有点浪费资源。
  2. 捕获异常,重新调用 notify_one(),然后重新抛出异常。
  3. 队列中不要存放 T,而是存放 std::shared_ptr<T>

书上给的例子也是第 3 种方案。具体来说,之前的数据结构是 std::queue<T>,现在的是 std::queue<std::shared_ptr<T>>,这样在条件变量放行之后就不会再出现数据拷贝失败导致的异常。同时,由于数据早就放到 std::shared_ptr 了,节约了持有锁时数据操作的时间(尤其是避免了 std::shared_ptr 的动态内存分配),有利于提高性能。

这种方案的缺点是:虽然 wait_and_pop(T &) 这种接口的 pop 方法用不到共享指针,但也要承担共享指针的开销(因为 队列的数据结构变化了)。

6.2.3 用更多的锁来提高并发性(e.g. 单链表队列)

显然在整个数据结构上只放一个锁会导致并发性下降,要是在操作数据结构的一部分时,只需要部分的锁就好了。书上给了一个单链表实现线程安全队列的例子(头一把锁,尾一把锁,这里暂时还没有体现到代码里面)。这个例子中暂时只有 try_pop()push() 两个操作。

template<typename T>
class queue {
private:
    struct node {
        T data;
        std::unique_ptr<node> next;
        node(T data_) : data(std::move(data_)) {}
    };
    std::unique_ptr<node> head;
    node* tail; // 注意这里的 tail 因为不带有所有权,所以是原始指针

public:
    queue() : tail(nullptr) {}
    queue(const queue& other) = delete;
    queue& operator=(const queue& other) = delete;
    // 省略 try_pop() 和 push() 操作,
    // 大体功能是 try_pop() 从头部取下元素,push() 从尾部增加元素
};

Tip

书上在这个位置还没有提到(以后的章节可能会)的一点是:最好重写析构函数,否则 queue 析构的时候 node 是递归析构的,有可能会炸栈。

用哑(dummy)结点免除 push()head 上锁的需要

push() 操作肯定要修改 tail,但是在这样的实现下,如果一开始 head 为空,那么 push() 操作也需要修改 head。这导致 push() 操作可能需要上两把锁(head 和 tail)!为了解决这个问题,可以添加一个哑结点:

template<typename T>
class queue {
private:
    struct node {
        std::shared_ptr<T> data; // 从 T 变成了 std::shared_ptr<T>
        std::unique_ptr<node> next;
    };
    std::unique_ptr<node> head;
    node* tail;

public:
    queue() : head(new node), tail(head.get()) {} // 注意这个构造函数
    queue(const queue& other) = delete;
    queue& operator=(const queue& other) = delete;
    // 省略 try_pop() 和 push() 操作
};

这样 push() 操作就只需要修改 tail 即可。注意 node 结构的 data 字段也从 T 类型改成了 shared_ptr<T> 类型,这是因为链表头部有一个哑结点(dummy node),但是我们不确定任意类型 T 是否是可以默认构造的,用默认构造的共享指针(空指针)就能规避这个问题。

这个构造函数也需要注意,成员初始化顺序是按照 headtailqueue 结构中的声明顺序来的,并不是按照在初始化列表中的排列来的。这里把 tail 放在 head 后面是为了和初始化顺序对应。

现在 try_pop() 操作仍然需要访问 headtail,但也只是在比较他们的值(以确定链表是否为空)时需要 tail。这样对 tail 锁的持有时间是很短的。如果只需要这两个简单操作,整个链表只需要 headtail 上各一把锁。

用两把锁实现线程安全链表

如果把锁放在结点的内部,那么一是每个结点都需要锁(即便不用),二是 try_pop() 在队列为空的时候对 headtail 上锁会锁到同一个 std::mutex 上,导致未定义行为。所以两把锁应该直接放在 queue 数据结构中,头用一个,尾用一个

push() 的逻辑很简单,try_pop() 的逻辑是:

{
    锁 head; // 这一步可以防止其他线程的 pop 操作进来
    {
        锁 tail;
        获得 tail 值;
        解锁 tail;
    }
    比较 tail 和 head 的值,确定队列是否有剩余元素,再进行之后的操作;
    解锁 head; // 实际上应该用 lock_guard 之类的 RAII 手段来保证解锁
}

这里一定要先锁 head 才能去访问 tail,否则有可能另外一个线程在中间也调用了 try_pop(),导致访问得到的 tail 其实已经从链表上移除了。尽管我们只是去比较地址,不会因为访问 tail 而导致内存非法访问,但是这种情况下地址比较必定会失败,导致程序认为队列中仍然有元素。假设这个时候队列中元素被删光了会怎么样?

  1. 首先我们会得到哑结点中的 data,这个 data 是空指针,符合原本的含义。
  2. 接下来 head 结点会被修改为 next,是空指针!这会导致接下来的程序运行出现问题。

审视并发性

现在是 push()try_pop() 分离,在一定程度上提高了并发性。因为现在是创建好 std::shared_ptr<T> 之后才上锁并 push,所以也避免了在持有锁的时候进行高开销的内存分配操作。可惜的是,try_pop() 还是需要(短暂地)锁住两把锁。

实现 wait_and_pop()

要实现阻塞的 pop() 操作就要像之前一样使用条件变量了。push() 操作也要相应地作更改:在返回之前通知条件变量,以告知消费者有新的数据可用。

wait_and_pop() 的操作则是:先用 std::unique_lock<std::mutex> 锁住 head 的锁,然后在条件变量上等待 head.get() = get_tail() 条件变为真(get_tail 函数会短暂锁住 tail 锁以实现线程安全访问)。接着执行取下头结点的操作。因为现在已经是持有锁了,所以如果取下头结点是放在函数里面进行的(就像书里那样),这个函数就不能再次获取锁。

这样一个线程安全的无界(unbounded)队列就实现完成了。

6.3.1 线程安全的查找表(实际选用哈希表)

结合查找表读多写少的使用场景,书上考虑了三种设计:二叉树、有序数组、哈希表。有序数组的并发性是很差的。二叉树的并发性同样很差,因为所有的操作都要从锁住根节点开始,即便在向下搜索的操作中可以释放之前获得的锁,但实际上会在锁根节点上阻塞很久。

最终选择的哈希表是拉链法设计,而且书里没有实现扩容,即在构造的时候就对哈希表固定容量。用了哈希表之后,不同的元素被分在不同的 slots 中,提高了并发性。为了进一步提高并发性,还使用了 std::shared_mutex 将读写分离。

6.3.2 线程安全的单链表

虽然说前面 6.2.3 也用链表和互斥量实现了单线程安全的队列,但是这个和实现链表还是不一样的——因为链表的 API 设计和队列不同。比如, 提供了 for_eachfind_first_if 两个遍历的方法。不再像队列那样不可非破坏地遍历,所以每个结点上都有 mutex。在进行遍历的时候从头节点开始锁,锁住之后就向下遍历,并释放已经访问过的结点(最多的时候同时持有两把锁)。

template<typename Function>
void for_each(Function f) {
    node* current = &head;
    std::unique_lock<std::mutex> lk(head.m);

    while (node* const next = current->next.get()) {
        std::unique_lock<std::mutex> next_lk(next->m);
        lk.unlock(); // 这就是为什么要选择 unique_lock:lock_guard 做不到这一点!
        f(*next->data);
        current = next;
        lk = std::move(next_lk);
    }
}

删除单链表上的元素则需要小心,尤其是要先获得待删结点的锁才能安全进行删除操作。

template<typename Predicate>
void remove_if(Predicate p) {
    node* current = &head;
    std::unique_lock<std::mutex> lk(head.m);

    while (node* const next = current->next.get()) {
        std::unique_lock<std::mutex> next_lk(next->m);

        if (p(*next->data)) {
            std::unique_ptr<node> old_next = std::move(current->next);
            current->next = std::move(next->next);
            next_lk.unlock();
        } else {
            lk.unlock();
            current = next;
            lk = std::move(next_lk);
        }
    }
}

另外,这个小节中的链表支持的插入操作是 push_front(),需要锁住 head.m,也不再需要维护 tail 结点。而 6.2.2-6.2.3 节实现的队列支持的插入操作是 push(),实际上是 push_back(),需要锁住 tail_mutex

Note

双链表的场景下怎么设计呢?假设我需要从链表中删除一个结点,需要先锁住结点自己,然后按顺序先锁 prev 再锁 next(换顺序可以),同时还需要双链表不成环才能规避死锁。

总的来说,这一小节线程安全单链表的实现和上一小节中作者放弃的二叉树实现查找表还是比较类似的,都是要先获取大范围的锁,然后一步步将锁的范围缩小来实现线程安全访问。