CUDA by Example: Appendex
再次考虑 dot 计算
在第 5 章的 dot 计算中,我们在每个块上做完 reduction 之后就将数据拷贝回到 CPU 了,然后让 CPU 做最后的加法。
为什么在 compute capability 2.0 之前,atomicAdd
只支持整数?因为原子加法是不能指定计算的发生顺序的,因而每个计算都必须遵守结合律,也就是 $(A+B)+C$ 必须等于 $A+(B+C)$。但是浮点数因为中间结果的舍入问题,并不能保证这一点!!
本节接下来是讲解用原子操作实现一个忙等待的 mutex,用于同步多个 CUDA 线程(因为在写书的时候浮点数的原子加法还没有受到设备的广泛支持)。atomicCAS
就是 CUDA 上的比较并交换。
// mutex 的类型是 int *,而且是分配在 GPU 上的
__device__ void lock( void ) {
while( atomicCAS( mutex, 0, 1 ) != 0 );
}
__device__ void unlock( void ) {
atomicExch( mutex, 1 );
}
Note
这里的 unlock
方法并不是直接对 *mutex
赋值为 1,因为 CUDA 中原子操作和普通的内存访问经过的路径不同,所以应该在 unlock
的时候也统一使用原子操作。
最后的 dot 核函数:
__global__ void dot( Lock lock, float *a,
float *b, float *c ) {
__shared__ float cache[threadsPerBlock];
int tid = threadIdx.x + blockIdx.x * blockDim.x;
int cacheIndex = threadIdx.x;
float temp = 0;
while (tid < N) {
temp += a[tid] * b[tid];
tid += blockDim.x * gridDim.x;
}
// set the cache values
cache[cacheIndex] = temp;
// synchronize threads in this block
__syncthreads();
// for reductions, threadsPerBlock must be a power of 2
// because of the following code
int i = blockDim.x/2;
while (i != 0) {
if (cacheIndex < i)
cache[cacheIndex] += cache[cacheIndex + i];
__syncthreads();
i /= 2;
}
if (cacheIndex == 0) {
// wait until we get the lock
lock.lock();
// we have the lock at this point, update and release
*c += cache[0];
lock.unlock();
}
}
首先是多了一个 Lock
参数,这个参数是将原子操作模拟 mutex 的过程封装之后的对象。中间是在第 5 章就已经介绍过的写法。然后最后加到 *c
上的操作是用这个 mutex 来保护的。现在已经可以用浮点数原子加法了,因而不必再这样写代码。
注意:由于无法预知多个线程竞争下的浮点数的结合顺序,最终的结果很可能和 CPU 不一样。
实现 hash 表
书中先给了一个 CPU 版本的实现,用的是拉链法,有内存池(新分配的 Entry
都从连续的内存池里拿出)、添加元素,但是没有删除元素,也没有动态扩容。
然后给了 GPU 版本的实现,只是将添加新元素的过程批量化了。其他部分和 CPU 差不多(甚至还要拷贝回到 CPU 上才能做)。在 GPU 版本的实现中,除了一组 key、一组 value 外,还要一组 lock,用来控制对 hash 表不同位置上元素的访问。这个 lock 就是在上一节中用 atomicCAS
实现的。
__global__ void add_to_table( unsigned int *keys, void **values,
Table table, Lock *lock ) {
int tid = threadIdx.x + blockIdx.x * blockDim.x;
int stride = blockDim.x * gridDim.x;
while (tid < ELEMENTS) {
unsigned int key = keys[tid];
size_t hashValue = hash( key, table.count );
for (int i=0; i<32; i++) {
if ((tid % 32) == i) {
Entry *location = &(table.pool[tid]);
location->key = key;
location->value = values[tid];
lock[hashValue].lock();
location->next = table.entries[hashValue];
table.entries[hashValue] = location;
lock[hashValue].unlock();
}
}
tid += stride;
}
}
首先是计算 hash,然后上锁、加元素、释放锁。每个线程使用的内存区域都是对应于自己位置的,所以 table.pool
中要先分配好足够本次 kv 对数量的空间。
for (int i=0; i<32; i++)
和 if ((tid % 32) == i)
有点难看懂。这么写是因为一个 warp 的大小是 32。如果直接去掉 for
和 if
,实际上是要整个 warp 中的 32 个线程都获得锁才能继续向下执行。这很可能导致死锁!这里的代码相当于遍历了 warp,让 warp 中的线程一个一个地获取锁,这样总有 warp 能正常向下计算,不会造成死锁。
Note
对比:半 warp 是常量内存请求广播的粒度,而整个 warp 是 warp 中线程在 lockstep 中执行的粒度。