3. Sharing data between threads

三种避免竞争情况的办法

  1. 上锁。
  2. 无锁编程。通常通过修改数据结构和 invariants(数据结构要保持的约束)来完成。
  3. 事务(software transactional memory, STM)。

使用互斥量保护临界区

std::lock_guard(可以用 CTAD)和 std::mutex API。C++11 除了 std::mutex 之外还有 std::timed_mutexstd::recursive_mutexstd::recursive_timed_mutex

不要把受保护数据的引用传出锁的保护范围外,尤其是要注意不要把它的引用传入到来历不明的其他函数中。

案例:实现线程安全的栈(top()pop() 的原子化)

接口设计上的问题:栈的 API 中 empty() 和其他方法各自原子是不够的,因为 empty() 的结果在下一个方法运行时不一定还能成立。

我们也可能想把 pop()top() 两个函数结合,将它们合并成一个原子化的功能,但又会遇到异常安全的问题(元素的构造函数抛出异常时,被 pop 的数据会丢失)。

解决方案 1:用输出参数返回结果

pop() 不返回值,而是将值存在引用中。这要求用户提前分配好空间,而且在成功将值赋值出去之前,栈可以不用舍弃数据。这保证了失败时栈中的数据不丢失。

std::vector<int> result;
some_stack.pop(result);

缺点:用户要提前分配空间、元素类型要支持赋值操作符(很多类可能没有实现)、元素类型要支持默认构造或者能提前知道用来构造元素的参数。

解决方案 2:返回指针

返回指针,因为指针的复制是不会导致异常的。Java 就能采用这种方案。

智能指针在行为上和指针非常相似,其中的共享指针在拷贝时也不会抛出异常,所以书上给了一个共享指针的例子。(用 std::unique_ptr<> 不也可以吗?)

std::shared_ptr<T> pop() {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack();    
    std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // A
    data.pop();                                          
    return res;
}

A 处可能会抛出异常,因为要复制构造栈顶元素,但是就算抛出异常也是安全的,因为数据还在栈上没有丢失。最后返回 res 时又因为共享指针是我们已知的复制构造不会失败的类型,所以保证了安全。

缺点:要求元素能够复制构造。

Note

元素能复制构造是否是一个必要条件?

把元素复制而不是移动,意味着在确保操作能成功完成之前,数据不会丢失。如果移动构造函数抛出了异常,而且没有正确恢复被移动方数据,那么数据就丢失了。尽管我们希望移动构造函数在抛出异常时能复原之前的数据(如果不抛出异常就更好了),但这并不总是可能的。

Note

我认为有 NRVO 时,A 处应该也可以换成直接返回元素本身,而不必装到智能指针中。不过 NRVO 虽然被广泛实现,却并没有由语言保证。作者写这本书的时候,是不是 NRVO 没有普及呢?

解决方案 3:限制容器元素类型

禁止不能满足 std::is_nothrow_copy_constructible_v<T> 的类型 T 被使用。但是这样会大大减少容器的适用范围,因此这个方案不太好。

避免死锁

核心是按照顺序锁定

自己按顺序锁定有什么风险:如果要交换的对象中包含锁,那么我们不知道谁会被作为 swap 的第一个参数,也不知道如何能保证上锁顺序。(可以考虑按照互斥量的地址来锁定?因为互斥量必须具有地址稳定性才能工作。)

一次性获取多个锁(std::lockstd::scoped_lock

可以用 std::lock 同时锁定多个互斥量,然后用 std::lock_guardstd::adopt_lock 选项来继承锁。

friend void swap(X &lhs, X &rhs) {
    if (&lhs == &rhs) {
        return;
    }
    std::lock(lhs.m, rhs.m);
    std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
    std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
    swap(lhs.some_detail, rhs.some_detail);
}

C++17 的 std::scoped_lock 可以直接在构造函数中一步完成这个操作。

void swap(X &lhs, X &rhs) {
    if (&lhs == &rhs) {
        return;
    }
    std::scoped_lock guard(lhs.m, rhs.m); // CTAD
    swap(lhs.some_detail, rhs.some_detail);
}

分步骤获取锁

只能设定一些获取锁的规则,然后每个获取锁的线程相互协调。

其他形式的死锁

死锁不仅限于互斥量。线程之间相互等待都会导致死锁,比如线程之间用 std::threadjoin() 方法相互等待对方结束

避免嵌套锁

尽量避免在已经获取锁的情况下去获取别的锁。尤其是调用未知的其他函数时,被调用函数可能还会尝试上锁。

案例:实现层次锁

层次锁(hierarchical mutex)的定义:每个锁在定义时带有一个等级,规定在持有锁时,只能再次锁定等级更低的锁,不能锁定等级相同或更高的锁。因此,等级只能通过释放锁而提高。这样的约束使得多个线程之间可以按顺序上锁多个互斥量,从而避免死锁。

为了实现层次锁,每个线程需要保存局部的信息,这可以用 thread_local 变量实现。由于线程本地变量只由一个线程读取,我们只需要使用一个简单的整数值,而不必使用原子变量。书上对这个变量的定义如下。一开始就将其设置为 unsigned long 类型的最大值,就好像已经获取了一个具有该等级的锁一样。

thread_local unsigned long 
  hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

接下来,为了能使用标准库提供的通用方法,我们还需要为层次锁实现 lock()unlock()try_lock() 等方法。

std::unique_lock

std::unique_lock 能比 std::lock_guard 提供更多灵活性。

三种模式:继承锁、立即锁、延迟锁(新)

std::lock_guard 要么加 std::adopt_lock 来接受已锁定的锁,要么立即锁定。而 std::unique_lock 除了这两种模式之外,还能加 std::defer_lock 来延迟锁定。由于 std::unique_lock 提供了类似 std::mutex 的接口,所以可以将互斥量包装在其中,然后使用通用的锁定方法。

friend void swap(X &lhs, X &rhs) {
    if (&lhs == &rhs) {
        return;
    }
    std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
    std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
    std::lock(lock_a, lock_b);
    swap(lhs.some_detail, rhs.some_detail);
}

在上面这个例子里面,std::unique_lock 只是为了 RAII 存在的,锁定则是由 std::lock() 完成。作为对比,std::scoped_lock 就没有实现类似 std::mutex 的接口。

std::unique_lockowns_lock() 方法可以用来检查当前是否已经获取了锁,mutex() 方法可以获取一个指向管理的 mutex 的指针。

API 设计

  • std::unique_lock 包装 std::mutex 取代直接操作 std::mutex
  • std::scoped_lock 取代 std::lock
    • 之前 std::lock 并不是 RAII 的,所以还需要用 std::lock_guard 来配合使用,比较麻烦。
    • std::scoped_lock 行为上更像 std::lock_guard 一点。但是从命名上我觉得设计者是想要用它来取代 std::lock
  • 这里说的“取代”都是指的在某些场景下取代,并不是说前继者就完全不被需要了。

可以类比一下 std::unique_ptr

graph LR
    raw_ptr[T *] --> u_ptr["std::unique_ptr&ltT&gt (RAII)"]
    A[std::mutex] --> B["std::unique_lock (RAII)"]
    C[std::lock(函数)] --> D["std::scoped_lock (RAII)"]
    A1[std::shared_mutex] --> B1["std::shared_lock (RAII,后面讲)"]

此外,互斥量的正确使用要求地址稳定,std::unique_lock 虽然可以实现和 std::unique_ptr 类似的资源释放功能,却不可能脱离互斥量单独使用,相反,std::unique_ptr 是可以完全代替裸指针单独使用的。

在可移动性上,std::unique_lock 可移动、不可复制。std::lock_guardstd::scoped_lock 都不可移动、不可复制,因为它们都有用户提供的析构函数(移动构造 / 赋值会被删除),还显式禁用了拷贝构造 / 赋值。std::unique_lock 的构造函数是将其和互斥量关联起来的唯一方式,如果已经默认构造了 std::unique_lock,只能将新的 std::unique_lock 移动赋值给它

由于可以移动,std::unique_lock 可以实现锁的转让

std::unique_lock<std::mutex> get_lock() {
    extern std::mutex some_mutex;
    std::unique_lock<std::mutex> lk(some_mutex);
    prepare_data();
    return lk;
}

锁的转让的意义在于:先释放锁,再在其他函数中获取锁是不安全的。转让锁保证了锁一直被当前线程持有。

Note

如果有几个函数需要在持有锁的情况下进行操作,而这几个函数又可能发生相互调用,是否可以把 std::unique_lock 既作为参数,又作为返回值,在多个函数调用中不断流转?

using Lock = std::unique_lock<std::mutex>;
auto borrow_lock(Lock) -> Lock /* prvalue */;
Lock lk;
/* initialize lk */
lk = borrow_lock(std::move(lk));

感觉这样的设计挺糟糕?不如写个包装函数,获取锁之后再进行这些递归操作。

开销

static_assert(sizeof(std::lock_guard<std::mutex>) == sizeof(intptr_t));
static_assert(sizeof(std::unique_lock<std::mutex>) == sizeof(intptr_t) * 2);

在我这边看 std::lock_guard<std::mutex> 占用 8 字节用来放指针。std::unique_lock<std::mutex> 占用 16 字节用来放一个指针和一个 bool 标记,其中包含 7 个字节的 padding。

在初始化时保护数据

之前提到的都是在使用过程中保护数据,但是也有一些操作仅需要在初始化时保护数据(这时使用互斥量可能有点大材小用)。一个例子:懒初始化。

双验锁(❌)

先说一种常见 pattern:double-checked locking(双验锁)。先检查指针是否为空,如果为空就代表要初始化。先上锁,再检查指针是否还是空,如果还是空则应该由本线程初始化。

一段错误代码:

void undefined_behaviour_with_double_checked_locking() {
    if (!resource_ptr) {
        std::lock_guard<std::mutex> lk(resource_mutex);
        if (!resource_ptr) {
            resource_ptr.reset(new some_resource);
        }
    }
    resource_ptr->do_something();
}

在上锁之前就检查指针是否为空,这安全吗?

  1. 在 Java 的内存模型中,这样的读取应该是可以做到安全的(对象可以 GC,指针用 volatile 标识,Java 的内存模型中 volatile 有一定的同步作用),但是在 C++ 的内存模型中,由于缺乏同步,不能保证其他线程对指针的设置被当前线程的读可见。
  2. 另一方面,指针的读写是否是原子的并没有在语言标准中规定,而是取决于平台(Arm 上是弱内存序;x86 是强内存序,32 位读写是原子的、64 位读写在对齐到 8 字节整倍数地址上时是原子的,参考 https://qr.ae/p2545l )。这里的指针以智能指针的形式出现,智能指针不是简单的对象,就更不能保证操作的原子性了。
  3. 就算使用 std::atomic 解决了 1 的问题,std::atomic<std::shared_ptr<>> 也不是无锁的,解决不了问题 2。

std::call_once

要点:

  1. 也在 <mutex> 头文件下。
  2. std::once_flag 不能被移动。
  3. 如果 std::call_once 调用的函数对象抛出了异常,而不是正常返回,那么 std::once_flag 不会被设置,接下来的 std::call_once 还会继续尝试调用函数。
  4. 如果还没有任何一个 std::call_oncestd::once_flag 设置,那么后续的 std::call_once 都会 阻塞 至正在执行的 std::call_once 有一个结果(成功或失败)。

为此,std::once_flag 需要区分:调用完成、正在被其他线程调用中、可以调用三种状态。

同步规则:

All active calls on the same flag form a single total order consisting of zero or more exceptional calls, followed by one returning call. The end of each active call synchronizes-with the next active call in that order.

The return from the returning call synchronizes-with the returns from all passive calls on the same flag.

#include <chrono>
#include <mutex>
#include <thread>
#include <cstdio>

auto once = std::once_flag{};
auto foo = 0;

int main() {
    auto th = std::thread([] {
        std::this_thread::sleep_for(100 * std::chrono::milliseconds());
        // t=100ms
        std::call_once(once, [] {
            foo = 66;
        });
        printf("th: foo = %d\n", foo);
    });
    // Probably before th.
    // t=0
    std::call_once(once, [] {
        std::this_thread::sleep_for(200 * std::chrono::milliseconds());
        // t=200ms
        printf("foo = %d\n", foo);
        foo = 54;
    });
    th.join();
}
// Output:
// foo = 0
// th: foo = 54

上面的例子中,尽管 th 中的 std::call_once 在 t=100ms 时已经开始了,但因为主线程中的 std::call_once 还没有返回,所以就一直等着。最后 th 能看到主线程设置的 foo = 54 的值。

接下来尝试在主线程的 std::call_once 中引发异常:

#include <chrono>
#include <cstdio>
#include <mutex>
#include <thread>


auto once = std::once_flag{};
auto foo = 0;

int main() {
    auto th = std::thread([] {
        std::this_thread::sleep_for(100 * std::chrono::milliseconds());
        // t=100ms
        std::call_once(once, [] {
            foo = 66;
            printf("th: call_once() returns\n");
        });
        printf("th: foo = %d\n", foo);
    });
    // Probably before th.
    // t=0
    try {
        std::call_once(once, [] {
            std::this_thread::sleep_for(200 * std::chrono::milliseconds());
            // t=200ms
            printf("main: call_once() will throw\n");
            struct SomeError {};
            throw SomeError{};
        });
    } catch (...) {
    }
    th.join();
}
// main: call_once() will throw
// th: call_once() returns
// th: foo = 66

从结果来看,th 中的 std::call_once 等到主线程调用的函数对象抛出了异常后才开始执行对应的函数对象。(MSYS2 的 gcc 似乎有 bug,上面代码会死锁,而且 cppreference 中 call_once 的示例代码 也会死锁。在 Linux 上就没事。)

利用函数中静态成员的初始化

The initialization of such a variable (a local variable declared with static) is defined to occur the first time control passes through its declaration.

局部静态数据在第一次运行到这里时才会初始化。在 C++11 之前可能会被多个线程初始化,但是 C++11 规定这样的数据只能被一个线程初始化

将要初始化的对象包装成需要从一个函数获取引用(或指针)的形式,而这个函数又返回其中静态数据的引用(或指针),这样也能达到懒初始化效果。这种方法通常被用于要懒加载的全局单例。

Cppreference 上面说:

Initialization of function-local statics is guaranteed to occur only once even when called from multiple threads, and may be more efficient than the equivalent code using std::call_once.

这意味着在能使用函数中局部静态成员的情况下,应该优先使用它。

共享锁

实际上是读写锁。Cppreference 上的 shared_mutex 页面 中指出了 <shared_mutex> 头文件在 C++14 中增加,包含:

  • std::shared_timed_mutex(C++14)
  • std::shared_lock(C++14)
  • std::shared_mutex(C++17)

可以用给 std::mutex 上锁的方式来上锁 std::shared_mutex,这样上的锁就是互斥锁(exclusive lock)。如果想要上共享锁,就需要用 std::shared_lock,这个 RAII 类模板实现的功能和 std::unique_lock 相似。std::shared_lock 满足 Lockable;如果互斥量类型满足 SharedTimedLockable,std::shared_lock 还满足 TimedLockable。(不满足的话超时相关的方法也不会被删除,但都是 UB!)

Note

看起来如果想要用或互斥或共享的方式一次锁定多个互斥量,就需要把要共享的互斥量先以不获得锁的方式包在 std::shared_lock 中,然后用 std::scoped_lock 锁定。

递归锁(std::recursive_lock

Most of the time, if you think you want a recursive mutex, you probably need to change your design instead.