nccl 04 nvidia 官方小程序

1,代码重新编辑

为了地毯式地检查结果的正确性,这里修改了代码

主要步骤为

step1:  data_p指向的空间中,分别生成随机数;

step2:  分别拷贝到gpu的sendbuff的显存中;

step3:  通过nccl_all_reduce sum;

step4:  取回 recvbuff中的数据;

step5:  将data_p中的数据 allreduce sum;

step6: 对比 recvbuff中的数据与 data_p中的数据的一致性;


#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <omp.h>


//NN is tile length
#define NN 312
#define MM 156
#define MATRIX_A 0xB5026F5AA96619E9ULL
#define UM 0xFFFFFFFF80000000ULL /* Most significant 33 bits */
#define LM 0x7FFFFFFFULL /* Least significant 31 bits */

class thread_mt199937{

public:
void srand(unsigned long long seed)
{
    init_genrand64(seed);
}

/* generates a random number on [0, 2^63-1]-interval */
long long genrand64_int63(void)
{
    return (long long)(genrand64_int64() >> 1);
}

/* generates a random number on [0,1]-real-interval */
double genrand64_real1(void)
{
    return (genrand64_int64() >> 11) * (1.0/9007199254740991.0);
}

/* generates a random number on [0,1)-real-interval */
double genrand64_real2(void)
{
    return (genrand64_int64() >> 11) * (1.0/9007199254740992.0);
}

/* generates a random number on (0,1)-real-interval */
double genrand64_real3(void)
{
    return ((genrand64_int64() >> 12) + 0.5) * (1.0/4503599627370496.0);
}


private:
/* The array for the state vector */
unsigned long long mt[NN];
/* mti==NN+1 means mt[NN] is not initialized */
int mti=NN+1;

/* initializes mt[NN] with a seed */
void init_genrand64(unsigned long long seed)
{
    mt[0] = seed;
    for (mti=1; mti<NN; mti++)
        mt[mti] =  (6364136223846793005ULL * (mt[mti-1] ^ (mt[mti-1] >> 62)) + mti);
}

/* initialize by an array with array-length */
/* init_key is the array for initializing keys */
/* key_length is its length */
void init_by_array64(unsigned long long init_key[],
		     unsigned long long key_length)
{
    unsigned long long i, j, k;
    init_genrand64(19650218ULL);
    i=1; j=0;
    k = (NN>key_length ? NN : key_length);
    for (; k; k--) {
        mt[i] = (mt[i] ^ ((mt[i-1] ^ (mt[i-1] >> 62)) * 3935559000370003845ULL))
          + init_key[j] + j; /* non linear */
        i++; j++;
        if (i>=NN) { mt[0] = mt[NN-1]; i=1; }
        if (j>=key_length) j=0;
    }
    for (k=NN-1; k; k--) {
        mt[i] = (mt[i] ^ ((mt[i-1] ^ (mt[i-1] >> 62)) * 2862933555777941757ULL))
          - i; /* non linear */
        i++;
        if (i>=NN) { mt[0] = mt[NN-1]; i=1; }
    }

    mt[0] = 1ULL << 63; /* MSB is 1; assuring non-zero initial array */
}

/* generates a random number on [0, 2^64-1]-interval */
unsigned long long genrand64_int64(void)
{
    int i;
    unsigned long long x;
    static unsigned long long mag01[2]={0ULL, MATRIX_A};

    if (mti >= NN) { /* generate NN words at one time */

        /* if init_genrand64() has not been called, */
        /* a default initial seed is used     */
        if (mti == NN+1)
            init_genrand64(5489ULL);

        for (i=0;i<NN-MM;i++) {
            x = (mt[i]&UM)|(mt[i+1]&LM);
            mt[i] = mt[i+MM] ^ (x>>1) ^ mag01[(int)(x&1ULL)];
        }
        for (;i<NN-1;i++) {
            x = (mt[i]&UM)|(mt[i+1]&LM);
            mt[i] = mt[i+(MM-NN)] ^ (x>>1) ^ mag01[(int)(x&1ULL)];
        }
        x = (mt[NN-1]&UM)|(mt[0]&LM);
        mt[NN-1] = mt[MM-1] ^ (x>>1) ^ mag01[(int)(x&1ULL)];

        mti = 0;
        //printf("LL::\n");
    }

    x = mt[mti++];

    x ^= (x >> 29) & 0x5555555555555555ULL;
    x ^= (x << 17) & 0x71D67FFFEDA60000ULL;
    x ^= (x << 37) & 0xFFF7EEE000000000ULL;
    x ^= (x >> 43);

    return x;
}

};


#endif

//#define NN 312

// multi thread accelerating, to be singleton
class parallel_mt19937{
public:
//    int _base_seed;
void srand(unsigned long long seed)
{
    _base_seed = seed;
}
void rand_float(float* A, unsigned long len);

private:
    unsigned long long _base_seed = 0;
};

//int pmt19937::_base_seed = 0;

void parallel_mt19937::rand_float(float* A, unsigned long len)
{
    #pragma omp parallel
    {
        unsigned long block_dim = omp_get_num_threads();
        unsigned long thid = omp_get_thread_num();

        if(thid == block_dim -1)
            std::cout << "Here are " << thid << " threads generating random number ..."<<std::endl;
        unsigned long tile_count = (len+NN-1)/NN;

        thread_mt199937 *t_mt_p = new thread_mt199937();// to be singleton

        for(unsigned long tile_id=thid; tile_id<tile_count; tile_id+=block_dim){
            //each tile has a specific seed: (_base_seed + tile_id) to smt19937, to keep consistence
            unsigned long tile_seed = _base_seed + tile_id;
            t_mt_p->srand(tile_seed);

            //if(thid == 35)                std::cout << "Hello from thread " << thid << std::endl;
            unsigned long tile_idx_start = tile_id*NN;
            unsigned long tile_idx_end = (((tile_id+1)*NN) <= len)? (tile_id+1)*NN: len;

            for(unsigned long idx = tile_idx_start; idx<tile_idx_end; idx++)
                A[idx] = float(t_mt_p->genrand64_real2());
        }
        delete t_mt_p;
    }
}



void init_data_float(float** data_p, int nDev, int size,  unsigned long seed)
{
  for(int idx=0; idx<nDev; idx++){
    *(data_p+idx) = nullptr;
    *(data_p+idx) = (float*)malloc(size*sizeof(float));
  }

  parallel_mt19937 gen_rand;

  for(int idx =0; idx<nDev; idx++){
    gen_rand.srand(seed+idx);
    gen_rand.rand_float(*(data_p + idx), size);
  }

}

void host_all_reduce_origin(float** sendbuff, int nDev, int size)
{

  for(int idx=1; idx<nDev; idx++){
#pragma omp paralell for
    for(int i=0; i<size; i++)
      sendbuff[0][i] += sendbuff[idx][i];
  }
}

void check_equal(float** result_recv_data_buff, float* sendbuff_0, int nDev, int size)
{

  for(int idx=0; idx<nDev; idx++){
#pragma omp paralell for
    for(int i=0; i<size; i++){
      if(result_recv_data_buff[idx][i] != sendbuff_0[i])
        printf("ERROR: %7.4f != %7.4f  idx=%d, i=%d\n", result_recv_data_buff[idx][i], sendbuff_0[i], idx, i);
        return;
    }
  }
}

void free_buff(float** buffArray, int n)
{
  for(int i=0; i<n; i++){
    if(buffArray[i] != nullptr){
      free(buffArray[i]);
    }
  }
}

//

#include <stdlib.h>
#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"





#define CUDACHECK(cmd) do {                         \
  cudaError_t err = cmd;                            \
  if (err != cudaSuccess) {                         \
    printf("Failed: Cuda error %s:%d '%s'\n",       \
        __FILE__,__LINE__,cudaGetErrorString(err)); \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)


#define NCCLCHECK(cmd) do {                         \
  ncclResult_t res = cmd;                           \
  if (res != ncclSuccess) {                         \
    printf("Failed, NCCL error %s:%d '%s'\n",       \
        __FILE__,__LINE__,ncclGetErrorString(res)); \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)


int main(int argc, char* argv[])
{
  ncclComm_t comms[4];


  //managing 4 devices
  //int nDev = 4;
  int nDev = 2;
  int size = 32*1024*1024*16;
  int devs[4] = { 0, 1, 2, 3 };

  float** data_p = (float**)malloc(nDev*sizeof(float*));
  init_data_float(data_p, nDev, size, 2024);



  //allocating and initializing device buffers
  float** sendbuff = (float**)malloc(nDev * sizeof(float*));
  float** recvbuff = (float**)malloc(nDev * sizeof(float*));
  cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);


  for (int i = 0; i < nDev; ++i) {
    CUDACHECK(cudaSetDevice(i));
    CUDACHECK(cudaMalloc((void**)sendbuff + i, size * sizeof(float)));
    CUDACHECK(cudaMalloc((void**)recvbuff + i, size * sizeof(float)));

    //CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));
    CUDACHECK(cudaMemcpy(sendbuff[i], data_p[i], size*sizeof(float), cudaMemcpyHostToDevice));
    //CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));
    CUDACHECK(cudaStreamCreate(s+i));
  }

printf("LL:: 04\n");

  //initializing NCCL
  NCCLCHECK(ncclCommInitAll(comms, nDev, devs));
printf("LL:: 05\n");

   //calling NCCL communication API. Group API is required when using
   //multiple devices per thread
  NCCLCHECK(ncclGroupStart());
  for (int i = 0; i < nDev; ++i)
    NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,
        comms[i], s[i]));
  NCCLCHECK(ncclGroupEnd());
printf("LL:: 06\n");
/*LL::
In a sum allreduce operation between k ranks,
each rank will provide an array in of N values,
and receive identical results in array out of N values,
where out[i] = in0[i]+in1[i]+…+in(k-1)[i]
*/
  //synchronizing on CUDA streams to wait for completion of NCCL operation
  for (int i = 0; i < nDev; ++i) {
    CUDACHECK(cudaSetDevice(i));
    CUDACHECK(cudaStreamSynchronize(s[i]));
  }
printf("LL:: 07\n");
/*check:
each recvbuff[0] = sendbuff_0[0]+ sendbuff_1[0];
0x01010101 + 0x01010101 = 0x02020202;
*/
  float** result_recv_data_buff;
  result_recv_data_buff = (float**)malloc(nDev*sizeof(float*));
  init_data_float(result_recv_data_buff, nDev, size, 2024+nDev);

printf("LL:: 08\n");
  for(int idx=0; idx<nDev; idx++){
    cudaMemcpy(result_recv_data_buff[idx], recvbuff[idx], size*sizeof(float), cudaMemcpyDeviceToHost);
  }

printf("LL:: 09\n");
  host_all_reduce_origin(data_p, nDev, size);

printf("LL:: 10\n");
// ditanshi check
  check_equal(result_recv_data_buff, data_p[0], nDev, size);

printf("LL:: 11\n");

  free_buff(data_p, nDev);
  free_buff(result_recv_data_buff, nDev);
  //free device buffers
  for (int i = 0; i < nDev; ++i) {
    CUDACHECK(cudaSetDevice(i));
    CUDACHECK(cudaFree(sendbuff[i]));
    CUDACHECK(cudaFree(recvbuff[i]));
  }


  //finalizing NCCL
  for(int i = 0; i < nDev; ++i)
      ncclCommDestroy(comms[i]);


  printf("Success \n");
  return 0;
}




Makefile

EXE := ex_1_1_SingleProcessSingleThreadMultipleDevices

all: $(EXE)

INC := -I /usr/local/cuda/include -I /home/hipper/ex_nccl_20240701/local/include/
LD_FLAGS := -L /usr/local/cuda/lib64 -L /home/hipper/ex_nccl_20240701/local/lib -lcudart -lnccl -fopenmp



%: %.cpp
	g++ -g $< -o $@ $(INC) $(LD_FLAGS)



.PHONY: clean
clean:
	-rm -rf $(EXE)

2,编译运行

3,问题

显存占用了 7GB 和 8GB,实际数据应该只有2GB,recvbuff 2GB,总共4GB

上一篇:STM32第十二课:ADC检测烟雾浓度(MQ2)-需求实现


下一篇:EtherCAT通讯介绍