CPU作业之openMp实现快速排序

博客原文
欢迎关注博客:GaoMing’s blog

项目介绍

  • 利用OpenMp设计多线程进行快速排序

openMP相关知识

什么是openMP

OpenMP是由OpenMP Architecture Review Board牵头提出的,并已被广泛接受的、用于共享内存并行系统的多线程程序设计的一套编译指令 (Compiler Directive)

OpenMP支持、CC++Fortran;而支持OpenMP的编译器包括Sun CompilerGNU CompilerIntel Compiler等。

OpenMP提供了对并行算法的高层的抽象描述,程序员通过在源代码中加入专用的pragma来指明自己的意图,由此编译器可以自动将程序进行并行化,并在必要之处加入同步互斥以及通信。

当选择忽略这些pragma,或者编译器不支持OpenMP时,程序又可退化为通常的程序(一般为串行),代码仍然可以正常运作,只是不能利用多线程来加速程序执行。

openMP的Fork-Join并行执行模型

CPU作业之openMp实现快速排序
  • 起始时只有一个主线程,当遇到Fork操作时,创建或唤醒多个子线程进入并行任务执行
  • 并行执行结束后,实现隐式的同步,汇合到主线程中,即Join
  • 相邻的ForkJoin操作之间称为一个并行域并行域可以嵌套

openMP版本

  • 调用$gcc -v查看服务器openMP版本
  • 官网查看下载openMP版本

openMP编译制导

所有的编译制导指令必须以#pragma omp开头

  1. parallel指令

    • 用于构造一个并行块,如果它所构造的并行块有多于1条的语句,需要用一对大括号括起来。
    • 并行区域必须是一个完整的代码块,不能使用goto或其他语句跳出或转入。
    #include <stdio.h>
    int main(){
        #pragma omp parallel
        printf("The parallel region is run by thread %d.\n",omp_get_thread_num());
        return 0;
    }
    
  2. for指令

    • n用于对循环工作进行并行化,它通常需要与parallel合并使用。

    • for循环必须具备一定的规范格式:

      //没有与parallel同时使用,不能并行
      #include <stdio.h>
      int main(){
          int i;
          #pragma omp for
          for (i = 0; i < 4; i++)
              printf("i = %d, threadId = %d.\n", i, omp_get_thread_num());
          return 0;
      }
      
      
      //与parallel同时使用,能并行
      #include <stdio.h>
      int main(){
          int i;
          #pragma omp parallel for
          for (i = 0; i < 4; i++)
              printf("i = %d, threadId = %d.\n", i, omp_get_thread_num());
          return 0;
      }
      
  3. sections和section指令

    • 用于使各个线程执行不同的工作。

    • 每个section必须是一个结构化的代码块,不能有分支转入或跳出。

    • sections中可以定义多个section,每个section仅被一个一线程执行一次。

    • 当线程多于section数量时,每个线程最多执行一个section

    • 当线程少于section数量时,有线程会执行多于一个的section

      格式:

      #pragma omp sections [clause[[,] clause]…]
      {
           [#pragma omp section]
               structured  block
           [#pragma omp section]
               structured block
            ……
      }
      
      #include <stdio.h>
      int main(){
          #pragma omp parallel sections
          {
              #pragma omp section
              printf("Hello from %d.\n", omp_get_thread_num());
              #pragma omp section
              printf("Hi from %d.\n", omp_get_thread_num());
              #pragma omp section
              printf("Bye from %d.\n", omp_get_thread_num());
          }
          return 0;
      }
      
      
      输出:
      Bye from 17.
      Hi from 28.
      Hello from 36.
      
  4. single指令

    • 用于让紧随其后的语句串行执行

      例子:

      #include <stdio.h>
      #include <omp.h>
      int main(){
          #pragma omp parallel    {
              printf("Run in parallel, thread id = %d.\n", omp_get_thread_num());
              #pragma omp single        {
                  printf("Run in sequence, thread id = %d.\n", omp_get_thread_num());
              }
              printf("Run in parallel, thread id = %d.\n", omp_get_thread_num());
          }
          return 0;
      }
      

      输出:

      Run in parallel, thread id = 11.
      Run in sequence, thread id = 11.
      Run in parallel, thread id = 19.
      Run in parallel, thread id = 30.
      
  5. private数据属性

    • 将一个或多个变量声明为线程的私有变量。

    • 每个线程都有它自己的变量私有副本,其他线程无法访问。

    • 即使在并行区域外有同名的共享变量,共享变量在并行区域内不起任何作用,并且并行区域内不会操作到外面的共享变量。

    • 并行区域内的private变量和并行区域外同名的变量没有存储关联。

    • 如果需要继承原有共享变量的值,则应使用firstprivate子句。

    • 如果需要在退出并行区域时将私有变量最后的值赋值给对应的共享变量,则可使用lastprivate子句。

      例子1:

      #include <stdio.h>
      int main(){
          int k, i;
          k = 100;
          #pragma omp parallel for firstprivate(k),lastprivate(k)
          for (i = 0; i < 8; i++)
          {
              k += i;
              printf("k = %d in thread %d.\n", k, omp_get_thread_num());
          }
          printf("Finally, k = %d.\n", k);
          return 0;
      }
      

      输出:

      输出:
      k = 101 in thread 1.
      k = 107 in thread 7.
      k = 105 in thread 5.
      k = 100 in thread 0.
      k = 103 in thread 3.
      k = 104 in thread 4.
      k = 102 in thread 2.
      k = 106 in thread 6.
      Finally, k = 107.
      

      例子2:

      //默认是shared
      #include <stdio.h>
      int main(){
         int i, j;
         #pragma omp parallel for
         for (i = 0; i < 2; i++){
           for (j = 0; j < 5; j++) //j共享,线程1只执行了1次
               printf("i=%d, j=%d from id=%d.\n", i, j, omp_get_thread_num());
         }
         return 0;
      }
      

      输出:

      i=0, j=0 from id=0.
      i=0, j=1 from id=0.
      i=0, j=2 from id=0.
      i=0, j=3 from id=0.
      i=0, j=4 from id=0.
      i=1, j=0 from id=1.
      
  6. shared数据属性

    例子:

    #include <iostream>  
    #include <omp.h>   
    using namespace std;  
    int main() {  
        int sum = 0;    
        cout << "Before: " << sum << endl;    
    #pragma omp parallel for shared(sum)   
        for (int i = 0; i < 10; ++i) {  
            sum += i;  //存在数据竞争
    }  
        cout << "After: " << sum << endl;  
        return 0;  
    } 
    
  7. 设置线程数量

    • 编译制导的num_threads()子句。如:#pragma omp parallel num_threads(8)
    • 在并行区域外,使用运行库例程omp_set_num_threads()设定并行区域中使用。如:omp_set_num_threads(10)
    • 环境变量OMP_NUM_THREADS。如:export OMP_NUM_THREADS = 3
  8. 运行库例程与环境变量

    • int omp_get_num_threads(void): 返回当前并行区域中的线程数量
    • int omp_get_thread_num(void): 返回值当前并行区域中,当前线程在线程组中的编号。这个编号从0开始
    • int omp_get_num_procs(void): 返回值为当前程序可以使用的CPU核数
    • double omp_get_wtime(void): **返回值是以秒为单位的墙上时间。**在并行区域开始前和结束后分别调用该函数,并求取两次返回值的差,便可计算出并行执行的时间。

openMP快排

设计思路

利用OpenMp设计多线程进行快速排序,递归实现多线程同时进行子块的快速排序,需要线程块log2(n)个

自定义的库(quickSort.h)

  • 为了方便(MPI,openMP和MPI混合)对于相同函数的调用,编写通用函数库quickSort.h

    原码:

    #include "omp.h"
    //随机创建数组
    void rands(int *data,int sum);
    //交换函数
    void sw(int *a, int *b);
    //求2的n次幂
    int exp2(int wht_num);
    //求log2(n)
    int log2(int wht_num);
    //合并两个有序的数组
    void mergeList(int *c,int *a, int sta1, int end1, int *b, int sta2, int end2);
    //串行快速排序
    int partition(int *a, int sta, int end);
    void quickSort(int *a, int sta, int end);
    //openMP(8)并行快速排序
    void quickSort_parallel(int* array, int lenArray, int numThreads);
    void quickSort_parallel_internal(int* array, int left, int right, int cutoff);
    
    void rands(int *data, int sum){
        int i;
        for (i = 0; i < sum; i++)
        {
            data[i] = rand() % 100000000;
        }
    }
    
    void sw(int *a, int *b){
        int t = *a;
        *a = *b;
        *b = t;
    }
    int exp2(int wht_num){
        int wht_i;
        wht_i=1;
        while(wht_num>0)
        {
            wht_num--;
            wht_i=wht_i*2;
        }
        return wht_i;
    }
    int log2(int wht_num){
        int wht_i, wht_j;
        wht_i=1;
        wht_j=2;
        while(wht_j<wht_num)
        {
            wht_j=wht_j*2;
            wht_i++;
        }
        if(wht_j>wht_num)
            wht_i--;
        return wht_i;
    }
    int partition(int *a, int sta, int end){
        int i = sta, j = end + 1;
        int x = a[sta];
        while (1)
        {
            while (a[++i] < x && i < end);
            while (a[--j] > x);
            if (i >= j)
                break;
            sw(&a[i], &a[j]);
        }
        a[sta] = a[j];
        a[j] = x;
        return j;
    }
    //并行openMP排序
    void quickSort_parallel(int *a, int lenArray, int numThreads){
        int cutoff = 1000;
        #pragma omp parallel num_threads(numThreads) //指定线程数的数量
        {
            #pragma omp single //串行执行
            {
                quickSort_parallel_internal(a, 0, lenArray - 1, cutoff);
            }
        }
    }
    void quickSort_parallel_internal(int *a, int left, int right, int cutoff){
        int i = left, j = right;
        int tmp;
        int pivot = a[(left + right) / 2];
        //进行数组分割,分成两部分(符合左小右大)
        while (i <= j)
        {
            while (a[i] < pivot)
                i++;
            while (a[j] > pivot)
                j--;
            if (i <= j){
                tmp = a[i];
                a[i] = a[j];
                a[j] = tmp;
                i++;
                j--;
            }
        }
        //int j = partition(a,left,right);
        if (((right - left) < cutoff)){
            if (left < j){
                quickSort_parallel_internal(a, left, j, cutoff);
            }
            if (i < right){
                quickSort_parallel_internal(a, i, right, cutoff);
            }
        }
        else{
            #pragma omp task
            /*  task是“动态”定义任务的,在运行过程中,
                只需要使用task就会定义一个任务,
                任务就会在一个线程上去执行,那么其它的任务就可以并行的执行。
                可能某一个任务执行了一半的时候,或者甚至要执行完的时候,
                程序可以去创建第二个任务,任务在一个线程上去执行,一个动态的过程
            */
    	//对两部分再进行并行的线程排序
            {
                quickSort_parallel_internal(a, left, j, cutoff);
            }
            #pragma omp task
            {
                quickSort_parallel_internal(a, i, right, cutoff);
            }
        }
    }
    //合并两个已排序的数组
    void mergeList(int *c,int *a, int sta1, int end1, int *b, int sta2, int end2){
        int a_index = sta1; // 遍历数组a的下标
        int b_index = sta2; // 遍历数组b的下标
        int i = 0;          // 记录当前存储位置
        //int *c;
        //c = (int *)malloc(sizeof(int) * (end1 - sta1 + 1 + end2 - sta2 + 1));
        while (a_index < end1 && b_index < end2){
            if (a[a_index] <= b[b_index]){
                c[i] = a[a_index];
                a_index++;
            }
            else{
                c[i] = b[b_index];
                b_index++;
            }
            i++;
        }
        while (a_index < end1){
            c[i] = a[a_index];
            i++;
            a_index++;
        }
        while (b_index < end2){
            c[i] = b[b_index];
            i++;
            b_index++;
        }
    }
    //串行快速排序
    void quickSort(int *a, int sta, int end){
        if (sta < end){
            //printf("3\n");
            int mid = partition(a, sta, end);
            quickSort(a, sta, mid - 1);
            quickSort(a, mid + 1, end);
        }
    }
    
  • openMP快排实现代码

    #include <stdio.h>
    #include <omp.h>
    #include <stdlib.h>
    #include <time.h>
    #include "quickSort.h"
    const int n = 10000000;
    int main()
    {
        omp_set_num_threads(8);
        double omp_time_sta, omp_time_end;
        double time_sta, time_end;
        int *data1, *data2;
        int num = 8;
        data1 = (int *)malloc(sizeof(int) * n);
        data2 = (int *)malloc(sizeof(int) * n);
        rands(data1, n);
    	rands(data2, n);
        
        //并行快速排序
        omp_time_sta = omp_get_wtime();
        quickSort_parallel(data1, n, num);
    	omp_time_end = omp_get_wtime();
        
        //串行快速排序
        time_sta = omp_get_wtime();
        quickSort(data2, 0, n - 1);
        time_end = omp_get_wtime();
        
        //输出运行时间
        printf("-------------------\n");
        printf("并行处理时间 : %lf s\n", omp_time_end - omp_time_sta);
        printf("串行处理时间 : %lf s\n", time_end - time_sta);
        printf("-------------------\n");
        printf("\n\n");
        //输出排序后的数组
        // printf("-------------------\n");
        // printf("The final data1 : ");
        //print(data1, n);
        // printf("The final data2 : ");
        // print(data2,n);
        // printf("-------------------\n");
        // printf("\n\n");
        return 0;
    }
    
  • openMP快排算法Github上也有许多优秀的方法,大家可以自己去爬爬

分析

  • 运用openMP和快排算法的相通处,我们可以对串行快排进行并行化
  • 我给出的具体做法是:首先调用single使初始化数组分成两部分
  • 随后运用task,对两部分并行回溯(运用task而不是sections,是因为task是动态分配线程,比较灵活,用sections也不是不可以)
  • 核心内容均在quickSort.h中,自行查阅

性能分析

  • 没有想象中那么快,数据规模过大之后,设计到不断分配线程时间和cache不断替换等问题
  • 自行测试一下就看出来了
上一篇:mORMot学习笔记3 数据集转Json


下一篇:Java JDK8新特性之Lambada表达式