▶ 使用 OpenMP 和队列数据结构,在各线程之间传递信息
● 代码,使用 critical 子句和 atomic 指令来进行读写保护
// queue.h
#ifndef _QUEUE_H_
#define _QUEUE_H_ struct queue_node_s // 定义队列结点,包含信息来源,信息内容,下一个结点的指针
{
int src;
int mesg;
struct queue_node_s* next_p;
}; struct queue_s // 定义队列,包含入队数,出队数,头尾指针
{
int enqueued;
int dequeued;
struct queue_node_s* front_p;
struct queue_node_s* tail_p;
}; struct queue_s* Allocate_queue(void);
void Free_queue(struct queue_s* q_p);
void Print_queue(struct queue_s* q_p);
void Enqueue(struct queue_s* q_p, int src, int mesg);
int Dequeue(struct queue_s* q_p, int* src_p, int* mesg_p);
int Search(struct queue_s* q_p, int mesg, int* src_p);
#endif // queue.c
#include <stdio.h>
#include <stdlib.h>
#include "queue.h" #ifdef USE_MAIN
int main(void)//检验函数
{
char op;
int src, mesg;
struct queue_s *q_p = Allocate_queue();
for (printf("Op? (e, d, p, s, f, q)\n"), scanf(" %c", &op); op != 'q' && op != 'Q'; printf("Op? (e, d, p, s, f, q)\n"), scanf(" %c", &op));
{
switch (op)
{
case 'e':
case 'E':
printf("Src? Mesg?\n");
scanf("%d%d", &src, &mesg);
Enqueue(q_p, src, mesg);
break;
case 'd':
case 'D':
if (Dequeue(q_p, &src, &mesg))
printf("Dequeued src = %d, mesg = %d\n", src, mesg);
else
printf("Queue is empty\n");
break;
case 's':
case 'S':
printf("Mesg?\n");
scanf("%d", &mesg);
if (Search(q_p, mesg, &src))
printf("Found %d from %d\n", mesg, src);
else
printf("Didn't find %d\n", mesg);
break;
case 'p':
case 'P':
Print_queue(q_p);
break;
case 'f':
case 'F':
Free_queue(q_p);
break;
default:
printf("%c isn't a valid command\n", op);
printf("Please try again\n");
}
}
Free_queue(q_p);
free(q_p);
return ;
}
#endif struct queue_s* Allocate_queue()// 创建队列
{
struct queue_s *q_p = malloc(sizeof(struct queue_s));
q_p->enqueued = q_p->dequeued = ;
q_p->front_p = q_p->tail_p = NULL;
return q_p;
} void Free_queue(struct queue_s* q_p)// 释放队列
{
struct queue_node_s *curr_p, *temp_p;
for (curr_p = q_p->front_p; curr_p != NULL; temp_p = curr_p, curr_p = curr_p->next_p, free(temp_p));
q_p->enqueued = q_p->dequeued = ;
q_p->front_p = q_p->tail_p = NULL;
} void Print_queue(struct queue_s* q_p)// 打印队列
{
struct queue_node_s *curr_p;
printf("queue = \n");
for (curr_p = q_p->front_p; curr_p != NULL; curr_p = curr_p->next_p)
printf(" src = %d, mesg = %d\n", curr_p->src, curr_p->mesg);
printf("enqueued = %d, dequeued = %d\n", q_p->enqueued, q_p->dequeued);
printf("\n");
} void Enqueue(struct queue_s* q_p, int src, int mesg)// 入队,按信息 src 和 mesg 向队列 q_p 添加结点
{
struct queue_node_s *n_p = malloc(sizeof(struct queue_node_s));
n_p->src = src;
n_p->mesg = mesg;
n_p->next_p = NULL;
if (q_p->tail_p == NULL)
{
q_p->front_p = n_p;
q_p->tail_p = n_p;
}
else
{
q_p->tail_p->next_p = n_p;
q_p->tail_p = n_p;
}
q_p->enqueued++;
} int Dequeue(struct queue_s* q_p, int* src_p, int* mesg_p)// 出队,信息放入 src_p 和 mesg_p
{
struct queue_node_s* temp_p;
if (q_p->front_p == NULL)
return ;
*src_p = q_p->front_p->src;
*mesg_p = q_p->front_p->mesg;
temp_p = q_p->front_p;
if (q_p->front_p == q_p->tail_p)
q_p->front_p = q_p->tail_p = NULL;
else
q_p->front_p = temp_p->next_p;
free(temp_p);
q_p->dequeued++;
return ;
} int Search(struct queue_s* q_p, int mesg, int* src_p)// 查找操作
{
struct queue_node_s *curr_p;
for (curr_p = q_p->front_p; curr_p != NULL; curr_p = curr_p->next_p)
{
if (curr_p->mesg == mesg)
{
*src_p = curr_p->src;
return ;
}
}
return ;
} // omp_msgps.c
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
#include "queue.h" const int MAX_MSG = ; void Usage(char *prog_name)// 输入参数错误时提示信息
{
fprintf(stderr, "usage: %s <number of threads> <number of messages>\n", prog_name);
fprintf(stderr, " number of messages = number sent by each thread\n");
getchar();
exit();
} void Send_msg(struct queue_s* msg_queues[], int my_rank, int thread_count, int msg_number)// 发送信息给 dest 的队列
{
int mesg = rand() % MAX_MSG;// int mesg = -msg_number;
int dest = rand() % thread_count;
# pragma omp critical
Enqueue(msg_queues[dest], my_rank, mesg);
# ifdef DEBUG
printf("Thread %d > sent %d to %d\n", my_rank, mesg, dest);
# endif
} void Try_receive(struct queue_s* q_p, int my_rank)// 如果队列 q_p 不空,则接受信息
{
int src, mesg;
int queue_size = q_p->enqueued - q_p->dequeued;
if (queue_size == )
return;
else if (queue_size == )
# pragma omp critical
Dequeue(q_p, &src, &mesg);
else
Dequeue(q_p, &src, &mesg);
printf("Thread %d > received %d from %d\n", my_rank, mesg, src);
} int Done(struct queue_s* q_p, int done_sending, int thread_count)// 检查是否空队
{
int queue_size = q_p->enqueued - q_p->dequeued;
if (queue_size == && done_sending == thread_count)
return ;
return ;
} int main(int argc, char* argv[])
{
int thread_count, send_max, done_sending;
struct queue_s** msg_queues;
if (argc != || (thread_count = strtol(argv[], NULL, )) <= || (send_max = strtol(argv[], NULL, ) < ))
Usage(argv[]);
msg_queues = malloc(thread_count * sizeof(struct queue_node_s*));
done_sending = ; # pragma omp parallel num_threads(thread_count) default(none) shared(thread_count, send_max, msg_queues, done_sending)
{
int my_rank = omp_get_thread_num(), msg_number;
srand(my_rank);
msg_queues[my_rank] = Allocate_queue(); # pragma omp barrier// 等待所有线程都建好队列在开始收发信息 for (msg_number = ; msg_number < send_max; msg_number++)// 每个线程都发送 send_max 个信息
{
Send_msg(msg_queues, my_rank, thread_count, msg_number);
Try_receive(msg_queues[my_rank], my_rank);
}
# pragma omp atomic
done_sending++;
# ifdef DEBUG
printf("Thread %d > done sending\n", my_rank);
# endif while (!Done(msg_queues[my_rank], done_sending, thread_count))
Try_receive(msg_queues[my_rank], my_rank); Free_queue(msg_queues[my_rank]);
free(msg_queues[my_rank]);
}
free(msg_queues);
getchar();
return ;
}
● 代码,使用 omp 锁来进行读写保护
// queue_lk.h
#ifndef _QUEUE_LK_H_
#define _QUEUE_LK_H_
#include <omp.h> struct queue_node_s
{
int src;
int mesg;
struct queue_node_s* next_p;
}; struct queue_s// 定义队列,多一个 omp_lock_t
{
omp_lock_t lock;
int enqueued;
int dequeued;
struct queue_node_s* front_p;
struct queue_node_s* tail_p;
}; struct queue_s* Allocate_queue(void);
void Free_queue(struct queue_s* q_p);
void Print_queue(struct queue_s* q_p);
void Enqueue(struct queue_s* q_p, int src, int mesg);
int Dequeue(struct queue_s* q_p, int* src_p, int* mesg_p);
int Search(struct queue_s* q_p, int mesg, int* src_p);
#endif // queue_lk.c
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
#include "queue_lk.h" #ifdef USE_MAIN
int main(void)
{
char op;
int src, mesg, not_empty;
struct queue_s *q_p = Allocate_queue();
for (printf("Op? (e, d, p, s, f, q)\n"), scanf(" %c", &op); op != 'q' && op != 'Q'; printf("Op? (e, d, p, s, f, q)\n"), scanf(" %c", &op))
{
switch (op)
{
case 'e':
case 'E':
printf("Src? Mesg?\n");
scanf("%d%d", &src, &mesg);
omp_set_lock(&q_p->lock);// DIFF
Enqueue(q_p, src, mesg);
omp_unset_lock(&q_p->lock);
break;
case 'd':
case 'D':
omp_set_lock(&q_p->lock);// DIFF,注意把出队放在锁中间,而判断空队则是在释放锁以后进行
not_empty = Dequeue(q_p, &src, &mesg);
omp_unset_lock(&q_p->lock);
if (not_empty)
printf("Dequeued src = %d, mesg = %d\n", src, mesg);
else
printf("Queue is empty\n");
break;
case 's':
case 'S':
printf("Mesg?\n");
scanf("%d", &mesg);
if (Search(q_p, mesg, &src))
printf("Found %d from %d\n", mesg, src);
else
printf("Didn't find %d\n", mesg);
break;
case 'p':
case 'P':
Print_queue(q_p);
break;
case 'f':
case 'F':
omp_set_lock(&q_p->lock);// DIFF
Free_queue(q_p);
omp_unset_lock(&q_p->lock);
break;
default:
printf("%c isn't a valid command\n", op);
printf("Please try again\n");
}
}
Free_queue(q_p);
omp_destroy_lock(&q_p->lock);// DIFF
free(q_p);
return ;
}
#endif struct queue_s* Allocate_queue()
{
struct queue_s *q_p = malloc(sizeof(struct queue_s));
q_p->enqueued = q_p->dequeued = ;
q_p->front_p = q_p->tail_p = NULL;
omp_init_lock(&q_p->lock);// DIFF
return q_p;
} void Free_queue(struct queue_s* q_p)
{
struct queue_node_s *curr_p, *temp_p;
for (curr_p = q_p->front_p; curr_p != NULL; temp_p = curr_p, curr_p = curr_p->next_p, free(temp_p));
q_p->enqueued = q_p->dequeued = ;
q_p->front_p = q_p->tail_p = NULL;
} void Print_queue(struct queue_s* q_p)
{
struct queue_node_s *curr_p;
printf("queue = \n");
for (curr_p = q_p->front_p; curr_p != NULL; curr_p = curr_p->next_p)
printf(" src = %d, mesg = %d\n", curr_p->src, curr_p->mesg);
printf("enqueued = %d, dequeued = %d\n", q_p->enqueued, q_p->dequeued);
printf("\n");
} void Enqueue(struct queue_s* q_p, int src, int mesg)
{
struct queue_node_s *n_p = malloc(sizeof(struct queue_node_s));
n_p->src = src;
n_p->mesg = mesg;
n_p->next_p = NULL;
if (q_p->tail_p == NULL)
{
q_p->front_p = n_p;
q_p->tail_p = n_p;
}
else
{
q_p->tail_p->next_p = n_p;
q_p->tail_p = n_p;
}
q_p->enqueued++;
} int Dequeue(struct queue_s* q_p, int* src_p, int* mesg_p)
{
struct queue_node_s* temp_p;
if (q_p->front_p == NULL)
return ;
*src_p = q_p->front_p->src;
*mesg_p = q_p->front_p->mesg;
temp_p = q_p->front_p;
if (q_p->front_p == q_p->tail_p)
q_p->front_p = q_p->tail_p = NULL;
else
q_p->front_p = temp_p->next_p;
free(temp_p);
q_p->dequeued++;
return ;
} int Search(struct queue_s* q_p, int mesg, int* src_p)
{
struct queue_node_s *curr_p;
for (curr_p = q_p->front_p; curr_p != NULL; curr_p = curr_p->next_p)
{
if (curr_p->mesg == mesg)
{
*src_p = curr_p->src;
return ;
}
}
return ;
} // omp_msglk.c
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
#include "queue_lk.h" const int MAX_MSG = ; void Usage(char *prog_name)
{
fprintf(stderr, "usage: %s <number of threads> <number of messages>\n", prog_name);
fprintf(stderr, " number of messages = number sent by each thread\n");
getchar();
exit();
} void Send_msg(struct queue_s* msg_queues[], int my_rank, int thread_count, int msg_number)
{
int mesg = random() % MAX_MSG;// int mesg = -msg_number;
int dest = random() % thread_count;
struct queue_s* q_p = msg_queues[dest];
omp_set_lock(&q_p->lock);// DIFF
Enqueue(q_p, my_rank, mesg);
omp_unset_lock(&q_p->lock);
# ifdef DEBUG
printf("Thread %d > sent %d to %d\n", my_rank, mesg, dest);
# endif
} void Try_receive(struct queue_s* q_p, int my_rank)
{
int src, mesg;
int queue_size = q_p->enqueued - q_p->dequeued;
if (queue_size == )
return;
else if (queue_size == )
{
omp_set_lock(&q_p->lock);// DIFF
Dequeue(q_p, &src, &mesg);
omp_unset_lock(&q_p->lock);
}
else
Dequeue(q_p, &src, &mesg);
printf("Thread %d > received %d from %d\n", my_rank, mesg, src);
} int Done(struct queue_s* q_p, int done_sending, int thread_count)
{
int queue_size = q_p->enqueued - q_p->dequeued;
if (queue_size == && done_sending == thread_count)
return ;
return ;
} int main(int argc, char* argv[])
{
int thread_count, send_max, done_sending;
struct queue_s** msg_queues;
if (argc != || (thread_count = strtol(argv[], NULL, )) <= || (send_max = strtol(argv[], NULL, ) < ))
Usage(argv[]);
msg_queues = malloc(thread_count * sizeof(struct queue_node_s*));
done_sending = ; # pragma omp parallel num_threads(thread_count) default(none) shared(thread_count, send_max, msg_queues, done_sending)
{
int my_rank = omp_get_thread_num(), msg_number;
srand(my_rank);
msg_queues[my_rank] = Allocate_queue(); # pragma omp barrier for (msg_number = ; msg_number < send_max; msg_number++)
{
Send_msg(msg_queues, my_rank, thread_count, msg_number);
Try_receive(msg_queues[my_rank], my_rank);
}
# pragma omp atomic
done_sending++;
# ifdef DEBUG
printf("Thread %d > done sending\n", my_rank);
# endif while (!Done(msg_queues[my_rank], done_sending, thread_count))
Try_receive(msg_queues[my_rank], my_rank); Free_queue(msg_queues[my_rank]);
free(msg_queues[my_rank]);
}
free(msg_queues);
getchar();
return ;
}