2#include "../common/cuda_check.cuh"
4#include <cooperative_groups/memcpy_async.h>
8bool is_hopper_architecture() {
11 CUDA_CHECK(cudaGetDeviceProperties(&prop, device));
12 return prop.major >= 9;
15namespace cg = cooperative_groups;
18__global__ void cluster_reduce_kernel(const T* __restrict__ input,
19 T* __restrict__ output,
21 extern __shared__ float smem[];
23 cg::cluster_group cluster = cg::this_cluster();
24 int cluster_rank = cluster.rank();
25 int cluster_size = cluster.size();
27 int tid = threadIdx.x;
28 int idx = blockIdx.x * blockDim.x + threadIdx.x;
30 float val = (idx < n) ? static_cast<float>(input[idx]) : 0.0f;
35 if (cluster.use_cluster()) {
36 for (int s = cluster_size / 2; s > 0; s >>= 1) {
37 int peer_rank = (cluster_rank ^ s);
38 if (cluster_rank < s) {
39 smem[tid] = smem[tid] + smem[tid + s * blockDim.x];
44 if (cluster_rank == 0) {
45 float block_sum = 0.0f;
46 for (int i = 0; i < cluster_size; ++i) {
47 block_sum += smem[i * blockDim.x];
49 atomicAdd(output, static_cast<T>(block_sum));
52 for (int s = blockDim.x / 2; s > 0; s >>= 1) {
54 smem[tid] += smem[tid + s];
60 atomicAdd(output, static_cast<T>(smem[0]));
66__global__ void cluster_reduce_fallback_kernel(const T* __restrict__ input,
67 T* __restrict__ output,
69 extern __shared__ float smem[];
71 int tid = threadIdx.x;
72 int idx = blockIdx.x * blockDim.x + threadIdx.x;
74 smem[tid] = (idx < n) ? static_cast<float>(input[idx]) : 0.0f;
77 for (int s = blockDim.x / 2; s > 0; s >>= 1) {
79 smem[tid] += smem[tid + s];
85 atomicAdd(output, static_cast<T>(smem[0]));
90void cluster_reduce<float>(const float* input, float* output, size_t n,
91 const ClusterConfig& config, cudaStream_t stream) {
92 if (input == nullptr || output == nullptr) {
93 throw std::invalid_argument("cluster_reduce expects non-null input and output pointers");
96 throw std::invalid_argument("cluster_reduce expects n > 0");
98 if (config.block_dims.x == 0) {
99 throw std::invalid_argument("cluster_reduce expects config.block_dims.x > 0");
102 int block_size = config.block_dims.x;
103 int grid_size = (n + block_size - 1) / block_size;
104 size_t smem_size = block_size * sizeof(float);
106 CUDA_CHECK(cudaMemsetAsync(output, 0, sizeof(float), stream));
108 if (config.use_cluster && is_hopper_architecture()) {
109 cluster_reduce_kernel<float><<<grid_size, block_size, smem_size, stream>>>(
112 cluster_reduce_fallback_kernel<float><<<grid_size, block_size, smem_size, stream>>>(
119void cluster_reduce_fallback<float>(const float* input, float* output, size_t n,
120 const ClusterConfig& config, cudaStream_t stream) {
121 if (input == nullptr || output == nullptr) {
122 throw std::invalid_argument("cluster_reduce expects non-null input and output pointers");
125 throw std::invalid_argument("cluster_reduce expects n > 0");
127 if (config.block_dims.x == 0) {
128 throw std::invalid_argument("cluster_reduce expects config.block_dims.x > 0");
131 int block_size = config.block_dims.x;
132 int grid_size = (n + block_size - 1) / block_size;
133 size_t smem_size = block_size * sizeof(float);
135 cluster_reduce_fallback_kernel<float><<<grid_size, block_size, smem_size, stream>>>(
140} // namespace hpc::cuda13