1 //=================================================================
2 // Name : 基于分段的平行排序
3 // Author : 袁冬(2107020046)
4 // Copyright : 中国海洋大学
5 // LastUpdate : 2007.10.03
6 // Develop Evrionment : Windows Server 2003 SP2
7 // + Eclipse 3.3.1 + CDT 4.0.1
8 // + MinGW 3.81 + GDB 6.6
9 // + mpich2-1.0.6-win32-ia32
10 //=================================================================
11
12 /*
13 * 算法描述: 分段 -> 段内排序 -> 归约结果。
14 * 1,根进程读取输入,将元素个数广播给各个进程。
15 * 2,然后各进程计算段长度和段偏移。
16 * 3,然后根进程选择第一个段,标记站位符。
17 * 4,跟进程将剩余元素发送给下一进程,下一进程选择段的同时,根进程排序。
18 * 5,下一进程继续此过程,直到最后一个进程,所有元素都进行排序。
19 * 6,进程将排序好的元素,按照段偏移归约给根进程。
20 * 7,根进程输入结果。
21 *
22 * 时间复杂度计算:
23 * 设共有进程p,共有元素n
24 * 则段数为p,每段长度为m = n / p
25 * 最长时间为从根进程分段开始,至末进程规约为止,又注意到,分段时串行进行的,故
26 *
27 * t = 分段时间 * 段数 + 最后一段的排序时间
28 *
29 * 用大欧米伽表示法表示
30 *
31 * O(分段时间) = m * n = n * n / p
32 * O(最后一段的排序时间) = m * m
33 *
34 * 所以
35 *
36 * O(t) = n * n / p * p + m * m
37 * = n * n + m * m;
38 * = n * n
39 *
40 * 所以,此算法排序时间复杂度为n的平方,和起泡排序的时间复杂度相同。
41 *
42 * 分析与优化:
43 * 从时间复杂度的分析可以知道,算法的瓶颈在于分段算法,因为该算法从本质上讲,是串行进行的。
44 * 因个人水平有限,分段没有找到并行算法,导致整个算法为伪并行。
45 * 但是有一个办法可以将分算法的时间减半,
46 * 即从最大和最小两边开始,分别进行分组,可以使时间复杂度减低一半,但总体时间复杂度的O认为n*n。
47 *
48 * 另外,在“取得当前段”的算法中,如果每次循环i时,段索引没有改变,再下一轮时,可以不再遍历。
49 * 相当于加入缓存,估计此缓存命中几率比较高,可以较大幅度的改善时间复杂度。
50 *
51 */
52
53 //#define DEBUG 1 //调试符号
54
55 #define TRUE 1
56 #define FALSE 0
57 #define Bool int
58
59 #define MAX_LENGTH 100 //允许的最大元素个数
60 #define MAX_LENGTH_WITH_MARK 200 //允许的最大元素数及其对应的标记的长度,此值应为MAX_LENGTH的两倍
61
62 #include "mpi.h"
63 #include <stdio.h>
64 #include <stdlib.h>
65
66 int pID, pSize; //pID:当前进程ID,pSize:总的进程数
67 int input[MAX_LENGTH][2], length, packageSize; //input:输入,length:输入的元素个数,packageSize:段大小
68 int result[MAX_LENGTH], finalResult[MAX_LENGTH], packageIndex[MAX_LENGTH]; //result:当前结果,finalResult:最终结果,packageIndex:段值索引
69 int resultOffset; //resultOffset:短偏移
70
71 //读取输入
72 void read() {
73 int i;
74 //读取元素个数
75 printf("Input the length of the array:");
76 fflush(stdout);
77 scanf("%d", &length);
78
79 //读取元素
80 printf("Input the element of the array total %d:", length);
81 fflush(stdout);
82 for (i = 0; i < length; i++) {
83 //读取元素,经对应站位标记标记为0
84 scanf("%d", &input[i][0]);
85 input[i][1] = 0;
86 }
87 }
88 //输出
89 void output() {
90 int i;
91 for (i = 0; i < length; i++)
92 printf("%d ", finalResult[i]);
93 fflush(stdout);
94 }
95
96 //准备:计算进程常量
97 int prepare() {
98 packageSize = length / pSize; //段大小
99 resultOffset = packageSize * pID; //结果起始偏移值,表示该段位于整体的哪个部分
100
101 //对于最后一个进程,需要修正段大小,包含所有余下的元素
102 if (pID == pSize - 1)
103 packageSize += length % pSize;
104
105 #ifdef DEBUG
106 //调试信息:段大小
107 printf("resultOffset: %d, From %d.\n", resultOffset, pID);
108 #endif
109 }
110
111 //分段,取得当前进程负责的段
112 void findCurrentRange() {
113 int i, j = 0, maxIndex, beginIndex = 0;
114
115 //填充默认的值
116 for (i = 0; i < packageSize; i++) {
117 while (input[beginIndex][1])
118 beginIndex++;
119 packageIndex[i] = beginIndex;
120 beginIndex++;
121 }
122
123 #ifdef DEBUG
124 //调试信息:默认值
125 for (i = 0; i < packageSize; i++)
126 printf("%d", packageIndex[i]);
127 printf(" From %d\n", pID);
128 #endif
129
130 //查找所有元素,找到最小的packageSize个元素,取得其索引值
131 for (i = beginIndex; i < length; i++) {
132 //忽略被其他进程占用的元素
133 if (input[i][1])
134 continue;
135
136 //查找比当前元素索引中最大的元素的索引
137 maxIndex = 0;
138 for (j = 1; j < packageSize; j++)
139 if (input[packageIndex[j]][0] > input[packageIndex[maxIndex]][0])
140 maxIndex = j;
141
142 //如果元素索引中的最大的小于当前元素,则替换
143 if (input[packageIndex[maxIndex]][0] > input[i][0])
144 packageIndex[maxIndex] = i;
145 }
146 #ifdef DEBUG
147 //调试信息:当前段索引,用于判断是否取得了正确的段索引
148 for (i = 0; i < packageSize; i++)
149 printf("%d", packageIndex[i]);
150 printf(" From %d\n", pID);
151 #endif
152
153 //将索引转化为值,存放在result中,并标记输入信息,表明已占用
154 for (j = 0; j < packageSize; j++) {
155 result[resultOffset + j] = input[packageIndex[j]][0];
156 input[packageIndex[j]][1] = 1;
157 }
158
159 #ifdef DEBUG
160 //调试信息:排序前的当前段,用于判断是否取得了正确的段
161 for (i = 0; i < length; i++)
162 printf("%d", result[i]);
163 printf(" From %d\n", pID);
164 #endif
165 }
166 //排序一个段
167 void sort() {
168 //段内起泡
169 int i, j, temp;
170 for (i = 0; i < packageSize; i++)
171 for (j = 0; j < packageSize; j++) {
172 if (result[resultOffset + i] < result[resultOffset + j]) {
173 temp = result[resultOffset + i];
174 result[resultOffset + i] = result[resultOffset + j];
175 result[resultOffset + j] = temp;
176 }
177 }
178 #ifdef DEBUG
179 //调试信息:排序后的当前段
180 for (i = 0; i < length; i++)
181 printf("%d", result[i]);
182 printf(" From %d\n", pID);
183 #endif
184 }
185 //主处理进程
186 void process() {
187 //取得该进程负责的段
188 findCurrentRange();
189 //如果此进程不是最后一个进程,则将剩余部分传递给下一个进程
190 if (pID != pSize - 1)
191 MPI_Send(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID + 1, 0, MPI_COMM_WORLD);
192
193 //排序当前进程负责的段
194 sort();
195 //归约结果,因为最终结果初始皆为零,故采用求和的形式归约当前结果到最终结果
196 MPI_Reduce(result, finalResult, MAX_LENGTH_WITH_MARK, MPI_INT, MPI_SUM, 0,
197 MPI_COMM_WORLD);
198 }
199
200 //入口,主函数
201 int main(int argc, char* argv[]) {
202 MPI_Status status; //无用
203
204 MPI_Init(&argc, &argv); //初始化
205 MPI_Comm_rank(MPI_COMM_WORLD, &pID); //取得当前进程的ID
206 MPI_Comm_size(MPI_COMM_WORLD, &pSize); //取得总进程数
207
208 //根进程负责输入
209 if (!pID)
210 read();
211
212 //广播输入数数组的长度
213 MPI_Bcast(&length, 1, MPI_INT, 0, MPI_COMM_WORLD);
214 //准备:计算各进程常量
215 prepare();
216
217 //从根进程启动,处理第一段
218 if (!pID)
219 process();
220 else {
221 //其余进程等待上一进程传递的数据
222 MPI_Recv(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID - 1, 0, MPI_COMM_WORLD,
223 &status);
224 //收到数据后进行处理
225 process();
226 }
227
228 //根进程负责输出
229 if (!pID)
230 output();
231
232 //结束
233 MPI_Finalize();
234 return 0;
235 }
236
2 // Name : 基于分段的平行排序
3 // Author : 袁冬(2107020046)
4 // Copyright : 中国海洋大学
5 // LastUpdate : 2007.10.03
6 // Develop Evrionment : Windows Server 2003 SP2
7 // + Eclipse 3.3.1 + CDT 4.0.1
8 // + MinGW 3.81 + GDB 6.6
9 // + mpich2-1.0.6-win32-ia32
10 //=================================================================
11
12 /*
13 * 算法描述: 分段 -> 段内排序 -> 归约结果。
14 * 1,根进程读取输入,将元素个数广播给各个进程。
15 * 2,然后各进程计算段长度和段偏移。
16 * 3,然后根进程选择第一个段,标记站位符。
17 * 4,跟进程将剩余元素发送给下一进程,下一进程选择段的同时,根进程排序。
18 * 5,下一进程继续此过程,直到最后一个进程,所有元素都进行排序。
19 * 6,进程将排序好的元素,按照段偏移归约给根进程。
20 * 7,根进程输入结果。
21 *
22 * 时间复杂度计算:
23 * 设共有进程p,共有元素n
24 * 则段数为p,每段长度为m = n / p
25 * 最长时间为从根进程分段开始,至末进程规约为止,又注意到,分段时串行进行的,故
26 *
27 * t = 分段时间 * 段数 + 最后一段的排序时间
28 *
29 * 用大欧米伽表示法表示
30 *
31 * O(分段时间) = m * n = n * n / p
32 * O(最后一段的排序时间) = m * m
33 *
34 * 所以
35 *
36 * O(t) = n * n / p * p + m * m
37 * = n * n + m * m;
38 * = n * n
39 *
40 * 所以,此算法排序时间复杂度为n的平方,和起泡排序的时间复杂度相同。
41 *
42 * 分析与优化:
43 * 从时间复杂度的分析可以知道,算法的瓶颈在于分段算法,因为该算法从本质上讲,是串行进行的。
44 * 因个人水平有限,分段没有找到并行算法,导致整个算法为伪并行。
45 * 但是有一个办法可以将分算法的时间减半,
46 * 即从最大和最小两边开始,分别进行分组,可以使时间复杂度减低一半,但总体时间复杂度的O认为n*n。
47 *
48 * 另外,在“取得当前段”的算法中,如果每次循环i时,段索引没有改变,再下一轮时,可以不再遍历。
49 * 相当于加入缓存,估计此缓存命中几率比较高,可以较大幅度的改善时间复杂度。
50 *
51 */
52
53 //#define DEBUG 1 //调试符号
54
55 #define TRUE 1
56 #define FALSE 0
57 #define Bool int
58
59 #define MAX_LENGTH 100 //允许的最大元素个数
60 #define MAX_LENGTH_WITH_MARK 200 //允许的最大元素数及其对应的标记的长度,此值应为MAX_LENGTH的两倍
61
62 #include "mpi.h"
63 #include <stdio.h>
64 #include <stdlib.h>
65
66 int pID, pSize; //pID:当前进程ID,pSize:总的进程数
67 int input[MAX_LENGTH][2], length, packageSize; //input:输入,length:输入的元素个数,packageSize:段大小
68 int result[MAX_LENGTH], finalResult[MAX_LENGTH], packageIndex[MAX_LENGTH]; //result:当前结果,finalResult:最终结果,packageIndex:段值索引
69 int resultOffset; //resultOffset:短偏移
70
71 //读取输入
72 void read() {
73 int i;
74 //读取元素个数
75 printf("Input the length of the array:");
76 fflush(stdout);
77 scanf("%d", &length);
78
79 //读取元素
80 printf("Input the element of the array total %d:", length);
81 fflush(stdout);
82 for (i = 0; i < length; i++) {
83 //读取元素,经对应站位标记标记为0
84 scanf("%d", &input[i][0]);
85 input[i][1] = 0;
86 }
87 }
88 //输出
89 void output() {
90 int i;
91 for (i = 0; i < length; i++)
92 printf("%d ", finalResult[i]);
93 fflush(stdout);
94 }
95
96 //准备:计算进程常量
97 int prepare() {
98 packageSize = length / pSize; //段大小
99 resultOffset = packageSize * pID; //结果起始偏移值,表示该段位于整体的哪个部分
100
101 //对于最后一个进程,需要修正段大小,包含所有余下的元素
102 if (pID == pSize - 1)
103 packageSize += length % pSize;
104
105 #ifdef DEBUG
106 //调试信息:段大小
107 printf("resultOffset: %d, From %d.\n", resultOffset, pID);
108 #endif
109 }
110
111 //分段,取得当前进程负责的段
112 void findCurrentRange() {
113 int i, j = 0, maxIndex, beginIndex = 0;
114
115 //填充默认的值
116 for (i = 0; i < packageSize; i++) {
117 while (input[beginIndex][1])
118 beginIndex++;
119 packageIndex[i] = beginIndex;
120 beginIndex++;
121 }
122
123 #ifdef DEBUG
124 //调试信息:默认值
125 for (i = 0; i < packageSize; i++)
126 printf("%d", packageIndex[i]);
127 printf(" From %d\n", pID);
128 #endif
129
130 //查找所有元素,找到最小的packageSize个元素,取得其索引值
131 for (i = beginIndex; i < length; i++) {
132 //忽略被其他进程占用的元素
133 if (input[i][1])
134 continue;
135
136 //查找比当前元素索引中最大的元素的索引
137 maxIndex = 0;
138 for (j = 1; j < packageSize; j++)
139 if (input[packageIndex[j]][0] > input[packageIndex[maxIndex]][0])
140 maxIndex = j;
141
142 //如果元素索引中的最大的小于当前元素,则替换
143 if (input[packageIndex[maxIndex]][0] > input[i][0])
144 packageIndex[maxIndex] = i;
145 }
146 #ifdef DEBUG
147 //调试信息:当前段索引,用于判断是否取得了正确的段索引
148 for (i = 0; i < packageSize; i++)
149 printf("%d", packageIndex[i]);
150 printf(" From %d\n", pID);
151 #endif
152
153 //将索引转化为值,存放在result中,并标记输入信息,表明已占用
154 for (j = 0; j < packageSize; j++) {
155 result[resultOffset + j] = input[packageIndex[j]][0];
156 input[packageIndex[j]][1] = 1;
157 }
158
159 #ifdef DEBUG
160 //调试信息:排序前的当前段,用于判断是否取得了正确的段
161 for (i = 0; i < length; i++)
162 printf("%d", result[i]);
163 printf(" From %d\n", pID);
164 #endif
165 }
166 //排序一个段
167 void sort() {
168 //段内起泡
169 int i, j, temp;
170 for (i = 0; i < packageSize; i++)
171 for (j = 0; j < packageSize; j++) {
172 if (result[resultOffset + i] < result[resultOffset + j]) {
173 temp = result[resultOffset + i];
174 result[resultOffset + i] = result[resultOffset + j];
175 result[resultOffset + j] = temp;
176 }
177 }
178 #ifdef DEBUG
179 //调试信息:排序后的当前段
180 for (i = 0; i < length; i++)
181 printf("%d", result[i]);
182 printf(" From %d\n", pID);
183 #endif
184 }
185 //主处理进程
186 void process() {
187 //取得该进程负责的段
188 findCurrentRange();
189 //如果此进程不是最后一个进程,则将剩余部分传递给下一个进程
190 if (pID != pSize - 1)
191 MPI_Send(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID + 1, 0, MPI_COMM_WORLD);
192
193 //排序当前进程负责的段
194 sort();
195 //归约结果,因为最终结果初始皆为零,故采用求和的形式归约当前结果到最终结果
196 MPI_Reduce(result, finalResult, MAX_LENGTH_WITH_MARK, MPI_INT, MPI_SUM, 0,
197 MPI_COMM_WORLD);
198 }
199
200 //入口,主函数
201 int main(int argc, char* argv[]) {
202 MPI_Status status; //无用
203
204 MPI_Init(&argc, &argv); //初始化
205 MPI_Comm_rank(MPI_COMM_WORLD, &pID); //取得当前进程的ID
206 MPI_Comm_size(MPI_COMM_WORLD, &pSize); //取得总进程数
207
208 //根进程负责输入
209 if (!pID)
210 read();
211
212 //广播输入数数组的长度
213 MPI_Bcast(&length, 1, MPI_INT, 0, MPI_COMM_WORLD);
214 //准备:计算各进程常量
215 prepare();
216
217 //从根进程启动,处理第一段
218 if (!pID)
219 process();
220 else {
221 //其余进程等待上一进程传递的数据
222 MPI_Recv(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID - 1, 0, MPI_COMM_WORLD,
223 &status);
224 //收到数据后进行处理
225 process();
226 }
227
228 //根进程负责输出
229 if (!pID)
230 output();
231
232 //结束
233 MPI_Finalize();
234 return 0;
235 }
236
本文转自冬冬博客园博客,原文链接:http://www.cnblogs.com/yuandong/archive/2007/10/23/BubbleSort_With_MPI.html,如需转载请自行联系原作者