【实验目的】
- 掌握并灵活使用线程机制
- 掌握并能够灵活使用同步互斥机制
- 了解并能够较灵活地使用IO技术
【实验要求】
● 基于线程的生产者-消费者的合作问题
– 其中(生产者)从外设获取数据进行生产
– 另外(消费者)消费后进行输出,并存储输出结果。
●在Linux环境下使用POSIX库进行设计实现
●鼓励使用QT进行图形化显示
●根据情况决定是否进行答辩
●可以2人一组,但不能超过2人,在报告中必须要有明确分工
【问题描述】
●完成N个生产者和M个消费者线程之间的并发控制,N、M不低于30,数据发送和接收缓冲区尺寸不小于20个(每个产品占据一个)。
●其中生产者线程1、3、5、7、9生产的产品供所有奇数编号的消费者线程消费,只有所有奇数编号的消费者线程都消费后,该产品才能从缓冲区中撤销。
●其中生产者线程2、4、6、8、10生产的产品所有偶数编号的消费者线程都可消费,任一偶数编号消费者线程消费该消息后,该产品都可从缓冲区中撤销。
●其中11-20号生产者线程生产的产品仅供对应编号的消费者线程消费。
●其他编号生产者线程生产的产品可由任意的消费者线程消费。
●每个生产线程生产30个消息后结束运行。如果一个消费者线程没有对应的生产者线程在运行后,也结束运行。所有生产者都停止生产后,如果消费者线程已经没有 可供消费的产品,则也退出运行。
【小组分工】
问题的分析有两人共同完成;生产者和第二类,第四类主要由ZX负责。我负责第一、三类消费者的代码编写,以及代码的整理。
【实验原理】
初始化互斥锁:
pthread_mutex_init() int pthread_mutex_init(pthread_mutex_t *mp, const pthread_mutex_mutexattr *mattr);
如果互斥锁已初始化,则它会处于未锁定状态。互斥锁可以位于进程之间共享的内存中或者某个进程的专用内存中。
锁定互斥锁
使用pthread_mutex_lock() 可以锁定mutex 所指向的互斥锁。
pthread_mutex_lock 语法
int pthread_mutex_lock(pthread_mutex_t *mutex);
pthread_mutex_lock 返回值
pthread_mutex_lock() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。
解除锁定互斥锁
使用pthread_mutex_unlock() 可以解除锁定mutex 所指向的互斥锁。
销毁条件变量
使用cond_destroy() 可以销毁与cv 所指向的条件变量相关联的状态。用来存储该条件变量的空间不会释放。
信号量
Sem_init()/*初始化信号量*/ Sem_wait()/*wait,p原语*/ Sem_post()/*signal v原语*/
【实验分析】
首先应该清楚生产者之间是互斥的关系,消费者之间也是互斥的关系。而生产者与消费者之间是同步的关系。
本问题的难点在于不同编号的消费者与不同编号的消费者之间复杂的业务逻辑关系。只要搞清楚了各个生产者与消费者之间的业务逻辑关系,然后再把同步互斥问题解决,问题就基本得以解决了。
为了解决对产品缓冲区的读写互斥问题,必须定义一个互斥量。在本方案中定义了mutex_buffer。不管是生产者还是消费者,只要是要对产品缓冲区进行读写操作,都要对其上锁以实现互斥。这样,在任一时刻,只有一个线程可以对缓冲区进行操作。
为了解决生产者与消费者之间的同步问题,本方案中采用了信号量的同步机制。定义了两个信号量buffer_full、buffer_empty。Buffer_full初始化为0,用来表示可以消费的产品数量,buffer_empty初始化为BUFFER_SIZE(宏定义,表示产品缓冲区大小)。当buffer_full大于0的时候,表示有产品可以消费,对应的消费者可以对该产品进行消费;当buffer_full小于或等于零的时候表示没有可供消费的产品,这时候消费者发生阻塞,等待生产者生产产品。同理,当buffer_empty大于零的时候表示有可以供生产者生产产品的缓冲区空间,生产者可以生产产品;当buffer_empty等于零的时候,表示缓冲区是满的,生产者这时候会等待消费者将产品消费掉。
生产者与消费者之间业务逻辑的处理。因为在问题描述中,不同编号的生产者生产的产品所供应的消费者对象不同,为了解决这个问题,我在让生产者生产产品的时候把自己的编号写入到了产品里面。这样就必须在产品的结构体中定义一个变量用来记录该产品的生产商(在本方案中使用了int 型的变量num来表示生产商的)。定义这个变量的好处就是生产者不需要分类了。生产者的唯一工作就是生产产品,并将自己的编号写入到产品上。生产的产品该有谁来消费需要消费者自己来判断。这样就需要对消费者进行分类。这里把消费者分为了4类:是奇数号,但编号不属于11-20的消费者;是偶数号,但编号不属于11-20的消费者;是奇数号,且编号属于11-20的消费者;是偶数号,且编号属于11-20的消费者。对消费者如此分类与问题的业务逻辑有关。对于第一类消费者,1,3,5,7,9,21-30号生产者生产的产品它都可以消费。对于第二类消费者,2,4,6,8,10,21-30号生产者生产的产品它可以消费。对于第三类消费者,它不仅可以消费第一类消费者可以消费的产品,而且它还可以消费与自己的编号相同的产品。对于第四类消费者,它不仅可以消费第二类消费者可以消费的产品,而且它还可以消费与自己编号相同的产品。
产品的撤销问题。对于1,3,5,7,9号线程生产的产品,必须所有的奇数号的消费者都消费以后才能从缓冲区撤销(所谓从缓冲区撤销,就是将read指针向前移动一个单位,即read++)。这样在第一,三类缓冲区中需要进行一次判断,如果该产品是1,3,5,7,9号产品,那么该产品被自己消费后不能从缓冲区撤销。那么,消费者该怎样判断该产品是否已经被所有的奇数号消费者消费过了呢?我的解决办法是在在产品的结构体中定义了一个位标识符。该位标识符是一个无符号的三十二位的整形变量bitsign。产品生产出来以后该变量要被初始化为0。消费者对1,3,5,7,9号产品进行消费的时候先判断一下bitsign的1,3,5,7,9,···,29号位置上是否全都标记为1了。如果是,那么就说明已经被所有的奇数号消费者消费过,直接将其从缓冲区撤销就好了。如果不是,再判断在自己编号所在的位置是否已经置一,如果没有就说明自己没有消费过。这样就将产品的内容读出来,然后在bitsign相应的位置上写1。但是这时候不能将产品从缓冲区撤销。对于第二、四类消费者,因为业务逻辑要求只要有偶数消费者消费过该产品,产品就可以从缓冲区撤销这样就不需要bitsign了。只要读完产品然后从缓冲区撤销就可以了。
线程的撤销问题。生产者的撤销是很容易的,只要生产者线程生产完了三十个产品就可以结束了。消费者线程结束比较棘手。我们有两个解决方案。方案一:设置一个无符号的长整形变量dead,在生产者生产完30个产品,即将结束之前要在与自己的编号相对应的dead标志位上置位为1,表示该线程结束。每个消费者线程在读取缓冲区之前的数据之前,先检查dead的标记位如果对应的标记位为1,那么就表明这个消费者线程需要结束了。该方案缺陷,消费者无法保证已经把产品缓冲区的产品已经全部读取完毕。方案二:换一个思路,让消费者在进入临界区之前在dead对应的位置上置位1表明消费者有可能发生阻塞,当消费者已经进入临界区了,再将dead标记位对应的位置清零。再创建一个线程dead checker,如果每隔一段时间就对dead进行一次检测,如果检测到相应的位置上都已置位为1,那么就表明消费者需要结束了。Dead checker已经将所有的消费者线程撤销了,就把自己结束。由于时间有限,操作系统还没有复习,之前也没有想好,就用了方案一,后来测试以后不对,就想了方案二,但是还没有代码实现,也不知道对不对。有机会的话可以再实现。
【代码分析】
由于代码繁杂,这里只对关键代码分析。所有代码请看随设计报告附带的代码。
struct Product { int num;/*用于表示是哪一个生产者生产的*/ int data;/*生产的数据*/ unsigned int bitsign;/*位标识符,编号为num的消费者消费该产品后,在相应的位置写如1,标记为已经读过该产品*/ };
上面是定义的产品的结构体。
struct Product product[BUFFER_SIZE];
上面是定义的产品的缓冲区。
int write_pos;/*写指针,生产者每生产一个产品指针加一*/
int read_pos;/*读指针,每当有一个产品撤销,指针+1*/
write_pos 是消费者当前需要消费的产品的指针。
pthread_mutex_t mutex_buffer;
pthread_mutex_t mutex_dead;
这是定义的互斥量,mutex_buffer是对产品缓冲区上锁时使用的。
Mutex_dead是标记线程结束时使用的。
sem_t buffer_empty;
sem_t buffer_full;
这是定义的两个信号量。Buffer_empty代表了空的缓冲区个数。Buffer_full代表了产品的个数。这两个信号量用来控制生产者和消费者线程之间的同步。
void put(int number);是生产者生产产品的函数,number是该生产者的线程号。
【结果分析】
第一类消费者:
截图如下:
第一类消费者
从上图我们可以看到,一号生产者生产的产品已经被所有的奇数好消费者消费过,并且最后一个消费者已经将这个产品从缓冲区撤销了。
对于第二类消费者:
第二类消费者
我们从截图看到,二号生产者生产的产品被22号消费者消费过了后,将产品从缓冲区撤销了。接下来三号生产者生产了产品,被15号消费者消费过了。
第三四类消费者:
第三、四类消费者可以消费对应编号的产品。而且对应编号的产品只能被对应得消费者消费。截图如下:
第三、四类消费者
【性能分析】
本方案存在效率低下的问题。分析如下:
因为产品是按照一定的顺序被消费的,例如生产者在缓冲区生产了一号、二号产品,但是read_pos指针现在只有所有的奇数号消费者消费过一号产品后才能指向第二号产品。即产品是按照FIFO的顺序被访问的。这样就使得系统的效率大打折扣。这是系统应该改进的一个方面。
【存在问题】
消费者线程无法结束。这是本方案有待解决的一个问题。
【解决办法】
打算试一下前面提到的方案二,让消费者在进入临界区之前在dead对应的位置上置位1表明消费者有可能发生阻塞,当消费者已经进入临界区了,再将dead标记位对应的位置清零。再创建一个线程dead checker,如果每隔一段时间就对dead进行一次检测,如果检测到相应的位置上都已置位为1,那么就表明消费者需要结束了。Dead checker已经将所有的消费者线程撤销了,就把自己结束。
时间有限,还要复习操作系统,本方案留在寒假解决。
【领悟感想】
在本次课程设计中,刚开始研究本题目时感觉无从下手。经过认真的研究才渐渐有了眉目。我们的感想是,在写代码之前一定要先想好解决各种问题的方案。产品的结构体需要定义那些变量?线程号应该怎么产生?要对生产者分类还是对消费者分类?怎样分类?这些都是很关键的问题。
在本次课程设计中,我收获了很多知识和快乐。对互斥锁和信号量的理解加深了。更重要的是我获得了快乐,我认为快乐大体分为两类:消费型的快乐和创造型的快乐。消费型的快乐,比如吃一次大餐,获得了更多的社会资源···,这些可以使你快乐,但这些不能让你得到真正的满足,只能让你为了获得更多的快乐而变得更加贪婪。而创造型的快乐却更能给自己带来满足感和成就感,而且这种快乐是持续的,有更大的意义。
【参考文献】
《Operating System Concept》 by Abraham
《Linux 多线程编程手册》 SUN microsystem
《Linux 高级程序设计》 杨宗德 等
【附带源码】
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <pthread.h> 4 #include <string.h> 5 #include <math.h> 6 #include <semaphore.h> 7 8 #define BUFFER_SIZE 12 9 #define THREAD_NUM 30 10 11 struct Product 12 { 13 int num;/*用于表示是哪一个生产者生产的*/ 14 int data;/*生产的数据*/ 15 unsigned int bitsign;/*位标识符,编号为num的消费者消费该产品后,在相应的位置写如1,标记为已经读过该产品*/ 16 }; 17 18 struct Product product[BUFFER_SIZE]; 19 FILE *fp; 20 int write_pos;/*写指针,生产者每生产一个产品指针加一*/ 21 int read_pos;/*读指针,每当有一个产品撤销,指针+1*/ 22 int producer_num;/*生产者线程的编号*/ 23 int consumer_num;/*消费者线程的编号*/ 24 unsigned int dead;/*生产者线程撤销标记位,生产者进程结束时在相应的位置上置一以表示死亡*/ 25 pthread_t producer_id[THREAD_NUM]; 26 pthread_t consumer_id[THREAD_NUM]; 27 28 pthread_mutex_t mutex_buffer; 29 pthread_mutex_t mutex_dead; 30 pthread_mutex_t mutex_producer_num; 31 pthread_mutex_t mutex_consumer_num; 32 pthread_cond_t notempty; 33 pthread_cond_t notfull; 34 sem_t buffer_empty; 35 sem_t buffer_full; 36 37 38 void init(); 39 void *produce(); 40 void *consume(); 41 void create_producer(); 42 void create_consumer(); 43 void join(); 44 void put(int number); 45 void get(int number); 46 47 int main() 48 { 49 init(); 50 create_producer(); 51 create_consumer(); 52 join(); 53 //pthread_t ta,tb; 54 //pthread_create(&ta,NULL,produce,0); 55 //pthread_create(&tb,NULL,consume,0); 56 //pthread_join(ta,NULL); 57 //pthread_join(tb,NULL); 58 return 0; 59 } 60 61 void init() 62 { 63 dead = 0; 64 producer_num = 0; 65 consumer_num = 0; 66 write_pos = 0; 67 read_pos = 0; 68 pthread_mutex_init(&mutex_buffer,NULL); 69 pthread_mutex_init(&mutex_dead,NULL); 70 sem_init(&buffer_empty,0,BUFFER_SIZE); 71 sem_init(&buffer_full,0,0); 72 pthread_cond_init(¬empty,NULL); 73 pthread_cond_init(¬full,NULL); 74 } 75 76 void *produce() 77 { 78 /*为每个生产者编号*/ 79 int number; 80 pthread_mutex_lock(&mutex_producer_num); 81 producer_num ++; 82 number = producer_num; 83 pthread_mutex_unlock(&mutex_producer_num); 84 put(number); 85 } 86 87 void *consume() 88 { 89 /*为每个消费者编号*/ 90 int number; 91 pthread_mutex_lock(&mutex_consumer_num); 92 consumer_num++; 93 number = consumer_num; 94 pthread_mutex_unlock(&mutex_consumer_num); 95 get(number); 96 } 97 98 void create_producer() 99 { 100 //sleep(1); 101 int i; 102 for(i = 0; i<THREAD_NUM; i++) 103 { 104 pthread_create(&(producer_id[i]), NULL, produce, 0); 105 } 106 } 107 void join() 108 { 109 int i; 110 for(i=0;i<THREAD_NUM;i++) 111 { 112 pthread_join(producer_id[i],NULL); 113 pthread_join(consumer_id[i],NULL); 114 } 115 } 116 void create_consumer() 117 { 118 int i; 119 for(i = 0; i<THREAD_NUM; i++) 120 { 121 pthread_create( (consumer_id+i), NULL, consume, 0 ); 122 } 123 } 124 void put(int number) 125 { 126 int i; 127 for(i=0;i<30;i++) 128 { 129 sem_wait(&buffer_empty); 130 pthread_mutex_lock(&mutex_buffer); 131 (product+write_pos)->num = number; 132 (product+write_pos)->data = rand()%100; 133 (product+write_pos)->bitsign = 0; 134 write_pos++; 135 if(write_pos>=BUFFER_SIZE) 136 { 137 write_pos = 0; 138 } 139 pthread_mutex_unlock(&mutex_buffer); 140 sem_post(&buffer_full); 141 } 142 //sleep(1); 143 pthread_mutex_lock(&mutex_dead); 144 dead = dead | (1<<number); 145 pthread_mutex_unlock(&mutex_dead); 146 } 147 148 void get(int number) 149 { 150 if((number%2==1) && (number<11 || number>20) )/*第一类消费者*/ 151 { 152 while(1) 153 { 154 sleep(1); 155 pthread_mutex_lock(&mutex_dead); 156 if((dead & 0x7fe002aa)==0x7fe002aa)/*对应的1,3,5,7,9,21-30号生产者已经结束运行*/ 157 { 158 printf("consumer number %d cancel\n",number); 159 pthread_mutex_unlock(&mutex_dead); 160 pthread_cancel(pthread_self()); 161 } 162 pthread_mutex_unlock(&mutex_dead); 163 sem_wait(&buffer_full); 164 pthread_mutex_lock(&mutex_buffer); 165 if( (((product+read_pos)->num)%2==1 && ((product+read_pos)->num)<10) 166 ||((product+read_pos)->num)>20 )/*是自己需要的产品*/ 167 { 168 if(((product+read_pos)->num)>20) 169 { 170 printf("我是%d号消费者,我消费的产品:\n",number); 171 printf("产品号:%d, 产品内容%d\n",(product+read_pos)->num,(product+read_pos)->data); 172 read_pos++;/*产品从缓冲区撤销*/ 173 if(read_pos >= BUFFER_SIZE) 174 { 175 read_pos = 0; 176 } 177 pthread_mutex_unlock(&mutex_buffer); 178 sem_post(&buffer_empty); 179 } 180 else if( (((product+read_pos)->bitsign) & 0x2aaaaaaa)== 0x2aaaaaaa )/*奇数号消费者都消费过*/ 181 { 182 printf("奇数号消费者都消费过,产品撤销\n"); 183 read_pos++;/*产品从缓冲区撤销*/ 184 if(read_pos >= BUFFER_SIZE) 185 { 186 read_pos = 0; 187 } 188 pthread_mutex_unlock(&mutex_buffer); 189 sem_post(&buffer_empty); 190 } 191 else 192 { 193 if( (((product+read_pos)->bitsign) & (1<<number))==(1<<number) )/*自己消费过*/ 194 { 195 pthread_mutex_unlock(&mutex_buffer); 196 sem_post(&buffer_full); 197 } 198 else 199 { 200 printf("我是%d号消费者,我消费的产品:\n",number); 201 printf("产品号:%d, 产品内容%d\n",(product+read_pos)->num,(product+read_pos)->data); 202 ((product+read_pos)->bitsign) = ((product+read_pos)->bitsign)|(1<<number); 203 pthread_mutex_unlock(&mutex_buffer); 204 sem_post(&buffer_full); 205 } 206 } 207 } 208 else 209 { 210 pthread_mutex_unlock(&mutex_buffer); 211 sem_post(&buffer_full); 212 } 213 } 214 } 215 else if( (number%2==0)&&(number<11 || number>20) )/*第二类消费者*/ 216 { 217 while(1) 218 { 219 sleep(1); 220 pthread_mutex_lock(&mutex_dead); 221 if((dead & 0x7fe00554)==0x7fe00554 ) 222 { 223 pthread_mutex_unlock(&mutex_dead); 224 printf("consumer number %d cancel\n",number); 225 pthread_cancel(pthread_self()); 226 } 227 pthread_mutex_unlock(&mutex_dead); 228 sem_wait(&buffer_full); 229 pthread_mutex_lock(&mutex_buffer); 230 if( ((((product+read_pos)->num)%2==0)&&((product+read_pos)->num)<11) 231 || ((product+read_pos)->num>20) )/*产品编号是偶数,并且小于11;或者产品编号大于20*/ 232 { 233 printf("我是%d号消费者,我消费的产品内容:\n",number); 234 printf("产品号:%d, 产品内容%d\n",(product+read_pos)->num,(product+read_pos)->data); 235 read_pos++; 236 if(read_pos>=BUFFER_SIZE) 237 { 238 read_pos = 0; 239 } 240 pthread_mutex_unlock(&mutex_buffer); 241 sem_post(&buffer_empty); 242 } 243 else 244 { 245 pthread_mutex_unlock(&mutex_buffer); 246 sem_post(&buffer_full); 247 } 248 } 249 } 250 else if( (number%2==1)&&(number>10 && number<21) )/*第三类消费者*/ 251 { 252 while(1) 253 { 254 sleep(1); 255 pthread_mutex_lock(&mutex_dead); 256 if( ((dead & 0x7fe002aa)==0x7fe002aa)&&( (dead & (1<<number))==(1<<number)) ) 257 /*对应的1,3,5,7,9,21-30号生产者,和只给自己生产产品的已经结束运行*/ 258 { 259 pthread_mutex_unlock(&mutex_dead); 260 printf("consumer number %d cancel\n",number); 261 pthread_cancel(pthread_self()); 262 } 263 pthread_mutex_unlock(&mutex_dead); 264 sem_wait(&buffer_full); 265 pthread_mutex_lock(&mutex_buffer); 266 if( (((product+read_pos)->num)%2==1 && ((product+read_pos)->num)<10) 267 ||((product+read_pos)->num)>20 || ((product+read_pos)->num)==number )/*是自己需要的产品*/ 268 { 269 if(((product+read_pos)->num)==number) 270 { 271 printf("我是%d号消费者,我消费的产品:\n",number); 272 printf("产品号:%d, 产品内容%d\n",(product+read_pos)->num,(product+read_pos)->data); 273 read_pos++;/*产品从缓冲区撤销*/ 274 if(read_pos >= BUFFER_SIZE) 275 { 276 read_pos = 0; 277 } 278 pthread_mutex_unlock(&mutex_buffer); 279 sem_post(&buffer_empty); 280 } 281 else if(((product+read_pos)->num)>20) 282 { 283 printf("我是%d号消费者,我消费的产品:\n",number); 284 printf("产品号:%d, 产品内容%d\n",(product+read_pos)->num,(product+read_pos)->data); 285 read_pos++;/*产品从缓冲区撤销*/ 286 if(read_pos >= BUFFER_SIZE) 287 { 288 read_pos = 0; 289 } 290 pthread_mutex_unlock(&mutex_buffer); 291 sem_post(&buffer_empty); 292 } 293 else if( (((product+read_pos)->bitsign) & 0x2aaaaaaa)== 0x2aaaaaaa )/*奇数号消费者都消费过*/ 294 { 295 printf("奇数号消费者都消费过,产品撤销\n"); 296 read_pos++; 297 if(read_pos >= BUFFER_SIZE) 298 { 299 read_pos = 0; 300 } 301 pthread_mutex_unlock(&mutex_buffer); 302 sem_post(&buffer_empty); 303 } 304 else 305 { 306 if( (((product+read_pos)->bitsign) & (1<<number))==(1<<number) )/*自己消费过*/ 307 { 308 printf("我是%d号消费者,我已经消费过该产品\n",number); 309 pthread_mutex_unlock(&mutex_buffer); 310 sem_post(&buffer_full); 311 } 312 else 313 { 314 printf("我是%d号消费者,我消费的产品:\n",number); 315 printf("产品号:%d, 产品内容%d\n",(product+read_pos)->num,(product+read_pos)->data); 316 ((product+read_pos)->bitsign) = ((product+read_pos)->bitsign)|(1<<number); 317 pthread_mutex_unlock(&mutex_buffer); 318 sem_post(&buffer_full); 319 } 320 } 321 } 322 else 323 { 324 pthread_mutex_unlock(&mutex_buffer); 325 sem_post(&buffer_full); 326 } 327 } 328 } 329 else/*第四类消费者*/ 330 { 331 while(1) 332 { 333 sleep(1); 334 pthread_mutex_lock(&mutex_dead); 335 if((dead & 0x7fe00554)==0x7fe00554 ) 336 { 337 pthread_mutex_unlock(&mutex_dead); 338 printf("consumer number %d cancel\n",number); 339 pthread_cancel(pthread_self()); 340 } 341 pthread_mutex_unlock(&mutex_dead); 342 sem_wait(&buffer_full); 343 pthread_mutex_lock(&mutex_buffer); 344 if( ((((product+read_pos)->num)%2==0)&&((product+read_pos)->num)<11) 345 || ((product+read_pos)->num>20) || (product+read_pos)->num == number ) 346 /*产品编号是偶数,并且小于11;或者产品编号大于20;或者产品号与自己的号码相同*/ 347 { 348 printf("我是%d号消费者,我消费的产品内容:\n",number); 349 printf("产品号:%d, 产品内容%d\n",(product+read_pos)->num,(product+read_pos)->data); 350 read_pos++; 351 if(read_pos>=BUFFER_SIZE) 352 { 353 read_pos = 0; 354 } 355 pthread_mutex_unlock(&mutex_buffer); 356 sem_post(&buffer_empty); 357 } 358 else 359 { 360 pthread_mutex_unlock(&mutex_buffer); 361 sem_post(&buffer_full); 362 } 363 } 364 } 365 }