2. Managing threads
2.2 向线程传递参数
向线程传递参数时最好是都复制一份,而且转换成线程启动函数期望的类型。举例:
void foo(std::string) {
// ...
}
std::thread launch_thread() {
char buf[1024];
return std::thread{ foo, buf };
}
这里 foo
的参数是 std::string
类型,而 std::thread
在构造时复制的是 char *
类型的参数。等线程创建好,真正开始执行的时候,复制过来的 buf
就可能已经是悬挂引用了。
其他:想要引用可以用 std::ref
,想要使用一个对象的成员函数还需要额外传递对象的地址(this 指针)。
案例:并行 accumulate 的实现
下面 parallel_accumulate
这个方法使用的是书上的朴素实现,其他测试都是调用标准库。用 Release 模式编译测试,测试在我的笔记本上进行,结果仅供参考。
#include <benchmark/benchmark.h>
#include <algorithm>
#include <chrono>
#include <execution>
#include <iterator>
#include <numeric>
#include <print>
#include <random>
#include <ranges>
#include <thread>
#include <vector>
template <typename Iterator, typename T>
struct accumulate_block {
void operator()(Iterator first, Iterator last, T &result) {
result = std::accumulate(first, last, result);
}
};
template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length) {
return init;
}
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(
accumulate_block<Iterator, T>(), block_start, block_end, std::ref(results[i]));
block_start = block_end;
}
accumulate_block<Iterator, T>()(block_start, last, results[num_threads - 1]);
for (auto &entry : threads) {
entry.join();
}
return std::accumulate(results.begin(), results.end(), init);
}
static auto v = [] {
std::mt19937 gen{1234};
std::vector<float> v; // 替换 v 的元素类型,结果不同
for (auto _ : std::ranges::views::iota(0, static_cast<int64_t>(1e8))) {
v.push_back(gen());
}
return v;
}();
static void BM_accumulate(benchmark::State &state) {
for (auto _ : state) {
auto x = std::accumulate(v.begin(), v.end(), 0);
benchmark::DoNotOptimize(x);
}
}
static void BM_reduce(benchmark::State &state) {
for (auto _ : state) {
auto x = std::reduce(v.begin(), v.end(), 0);
benchmark::DoNotOptimize(x);
}
}
static void BM_reduce_par(benchmark::State &state) {
for (auto _ : state) {
auto x = std::reduce(std::execution::par, v.begin(), v.end(), 0);
benchmark::DoNotOptimize(x);
}
}
static void BM_reduce_par_unseq(benchmark::State &state) {
for (auto _ : state) {
auto x = std::reduce(std::execution::par_unseq, v.begin(), v.end(), 0);
benchmark::DoNotOptimize(x);
}
}
static void BM_parallel_accumulate(benchmark::State &state) {
for (auto _ : state) {
auto x = parallel_accumulate(v.begin(), v.end(), 0);
benchmark::DoNotOptimize(x);
}
}
BENCHMARK(BM_accumulate);
BENCHMARK(BM_parallel_accumulate);
BENCHMARK(BM_reduce);
BENCHMARK(BM_reduce_par);
BENCHMARK(BM_reduce_par_unseq);
BENCHMARK_MAIN();
总的来说:
- 书上朴素实现的虽然有些优化没有做到,但是(在对整数规约时)基本上是能和标准库一样快的,因为机器的计算能力已经达到上限了。
- 规约本身是一个很快的操作,因此元素数量较少时多线程处理没有速度提升,甚至可能会更慢。
- 在对浮点数规约时,标准库算法编译后可以使用向量化等手段来加速,比书上实现的还要快几倍。
一些细节:
- MSYS2 上如果缺少 tbb 库(需要额外安装),
std::reduce(std::execution::par, ...)
就还是串行的。MSYS2 上如果安装了 tbb 库,则编译时就会自动使用。如果不给出-ltbb12
链接选项,可执行文件就会链接失败。 - 看并行算法在 google benchmark 结果的时候主要关注 Time 而不是 CPU,因为 CPU 时间统计的是函数调用线程的,不包括那些被创建的线程!
- 在元素数量为 1e8 时,书上朴素实现的
int
、float
、double
类型的并行规约加速比为 3.2 倍(大概是 I/O-bound)、10.5 倍、10.5 倍。 - 在元素数量为 1e9 时,书上朴素实现的
int
、float
、double
类型的并行规约加速比为 3.2、14、15 倍。标准库的std::reduce
并行有点黑魔法,在浮点数规约时应该是向量化了,比书上写的朴素实现还要快大约 4.73(float
)或 2.46(double
)。在整数规约时,标准库并行虽然更快一点,但是速度也差不多。内存有限,元素再多就没办法测试了。