线程同步问题广泛应用于多种场景,特别是与网络数据收发等耗时操作有关的场景。线程的操作往往比较抽象,线程大多运行在程序的后台,无法直观的查看其运行状态,因此,本文以图解的形式,为读者讲述线程同步的原理,并附以相关例程方便大家调试。
本文讲述一个经典的生产者-消费者线程同步模型,用于描述与后台缓存队列相关线程的同步过程。线程模型以互斥量(pthread_mutex_t)、条件变量为基础(pthread_cond_t),讲述互斥量如何与条件变量相互配合,以生产者-消费者的方式共同维护临界区资源
1、生产者-消费者线程模型
本文主要讨论posix标准下的生产者-消费者线程模型,posix标准多用于类linux相关环境
POSIX: The Portable Operating System Interface (POSIX) is a family of standards specified by the IEEE Computer Society for maintaining compatibility between operating systems. POSIX defines the application programming interface (API), along with command line shells and utility interfaces, for software compatibility with variants of Unix and other operating systems
1.1 线程模型工作原理
生产者-消费者(producer-consumer)问题是一个经典的线程同步问题,它可以描述为两个或者多个线程共同维护同一个临界区资源(critical resource),其中,生成者线程负责(例如从网络接口中抽取数据等)向临界区注入数据,消费者线程负责从临界区抽取数据,并对数据进行处理,。下图为生产者-消费者线程同步模型示意图。
图中上面的process_msg为消费者线程,下面的enqueue_msg为生产者线程,从左到右代表了线程执行的时间线。
我们先来看消费者线程,消费者线程率先获取互斥锁对象(图中的红点表示互斥锁对象),获得了对临界区资源的独占处理权及cpu资源的优先使用权,然后开始执行自己的线程函数。
在消费者的线程函数中,首先检查临界区资源是否满足执行条件,若满足执行条件,则从队列中取出数据执行自己的逻辑。
如果临界区资源不满足执行条件,如队列为空,此时,消费者线程通过在pthread_cond_wait中临时释放互斥锁,并将自己投入休眠状态,等待被生产者线程向临界区注入数据,将自己唤醒并重新获得互斥锁,这时消费者线程会阻塞在pthread_cond_wait调用中。
消费者线程通过在pthread_cond_wait中临时释放互斥锁后,将自己投入休眠状态,此时生成者线程将获得互斥锁,并获得了对临界区资源的独占处理权及cpu资源的优先使用权,然后开始执行自己的线程函数。生成者线程向临界区资源注入数据,如向队列中注入待解码的数据包,然后,通过pthread_cond_signal唤醒消费者线程(图中虚线所示),随即通过unlock_mutex释放互斥锁。
在生成者线程释放互斥锁后,消费者线程已被唤醒,并重新获取互斥锁,再次检查临界区资源,如果满足条件,则执行自己的线程函数,然后释放互斥锁,等待下一次执行。
这里需要说明一下,在实际情况下,并不总是消费者线程优先获得互斥锁,这是由cpu调度决定的。
下面给出一些示例代码来描述这个过程,例程中临界区资源的处理非常简单,只是对一个整型数据做+1处理,在实际的场景中,无论线程对临界区资源的处理有多么复杂的操作,生产者与消费者线程同步的原理与例程中的模型不会有任何差异。这里我们抽象出一个关于生产者-消费者的线程模型,方便大家理解
1.2 例程编译验证
下面给出例程的编译脚步,编译过程非常简单,在源码及Makefile编译脚本目录执行[make]命令,即可完成例程编译,执行下面的操作即可开始例程的运行。
Makefile编译脚本
test: main.c
gcc -o test -g3 main.c -l pthread
clean:
rm test
代码编译运行
make
./test
代码运行结果
./test
consumer_before_lock--------------0
consumer_lock---------------------0
consumer_wait---------------------0
producer_before_lock++++++++++++++++++++++++++++++++0
producer_lock+++++++++++++++++++++++++++++++++++++++0
producer_process_critical_resource++++++++++++++++++0
producer_unlock+++++++++++++++++++++++++++++++++++++1
consumer_revive-------------------1
consumer_wait---------------------1
producer_before_lock++++++++++++++++++++++++++++++++1
producer_lock+++++++++++++++++++++++++++++++++++++++1
producer_process_critical_resource++++++++++++++++++1
producer_unlock+++++++++++++++++++++++++++++++++++++2
consumer_revive-------------------2
consumer_wait---------------------2
producer_before_lock++++++++++++++++++++++++++++++++2
producer_lock+++++++++++++++++++++++++++++++++++++++2
producer_process_critical_resource++++++++++++++++++2
producer_unlock+++++++++++++++++++++++++++++++++++++3
consumer_revive-------------------3
consumer_wait---------------------3
producer_before_lock++++++++++++++++++++++++++++++++3
producer_lock+++++++++++++++++++++++++++++++++++++++3
producer_process_critical_resource++++++++++++++++++3
producer_unlock+++++++++++++++++++++++++++++++++++++4
consumer_revive-------------------4
consumer_wait---------------------4
producer_before_lock++++++++++++++++++++++++++++++++4
producer_lock+++++++++++++++++++++++++++++++++++++++4
producer_process_critical_resource++++++++++++++++++4
producer_unlock+++++++++++++++++++++++++++++++++++++5
consumer_revive-------------------5
consumer_wait---------------------5
producer_before_lock++++++++++++++++++++++++++++++++5
producer_lock+++++++++++++++++++++++++++++++++++++++5
producer_process_critical_resource++++++++++++++++++5
producer_unlock+++++++++++++++++++++++++++++++++++++6
consumer_revive-------------------6
consumer_wait---------------------6
producer_before_lock++++++++++++++++++++++++++++++++6
producer_lock+++++++++++++++++++++++++++++++++++++++6
producer_process_critical_resource++++++++++++++++++6
producer_unlock+++++++++++++++++++++++++++++++++++++7
consumer_revive-------------------7
consumer_wait---------------------7
producer_before_lock++++++++++++++++++++++++++++++++7
producer_lock+++++++++++++++++++++++++++++++++++++++7
producer_process_critical_resource++++++++++++++++++7
producer_unlock+++++++++++++++++++++++++++++++++++++8
consumer_revive-------------------8
consumer_wait---------------------8
producer_before_lock++++++++++++++++++++++++++++++++8
producer_lock+++++++++++++++++++++++++++++++++++++++8
producer_process_critical_resource++++++++++++++++++8
producer_unlock+++++++++++++++++++++++++++++++++++++9
consumer_revive-------------------9
consumer_wait---------------------9
producer_before_lock++++++++++++++++++++++++++++++++9
producer_lock+++++++++++++++++++++++++++++++++++++++9
producer_process_critical_resource++++++++++++++++++9
producer_unlock+++++++++++++++++++++++++++++++++++++10
consumer_revive-------------------10
consumer_process_critical_resource------------------10
consumer_unlock-----------------------0
producer_before_lock++++++++++++++++++++++++++++++++0
producer_lock+++++++++++++++++++++++++++++++++++++++0
producer_process_critical_resource++++++++++++++++++0
producer_unlock+++++++++++++++++++++++++++++++++++++1
consumer_before_lock--------------0
consumer_lock---------------------1
consumer_wait---------------------1
producer_before_lock++++++++++++++++++++++++++++++++1
producer_lock+++++++++++++++++++++++++++++++++++++++1
producer_process_critical_resource++++++++++++++++++1
producer_unlock+++++++++++++++++++++++++++++++++++++2
consumer_revive-------------------2
consumer_wait---------------------2
producer_before_lock++++++++++++++++++++++++++++++++2
producer_lock+++++++++++++++++++++++++++++++++++++++2
producer_process_critical_resource++++++++++++++++++2
producer_unlock+++++++++++++++++++++++++++++++++++++3
consumer_revive-------------------3
consumer_wait---------------------3
可以看到,消费者线程不断的被唤醒,在临界区资源不满足执行条件的情况下,又阻塞到pthread_cond_wait并进入睡眠状态,重新等待生产者pthread_cond_signal的唤醒
更多线程同步的例程见[公众号:断点实验室]的其他文章
1.3 例程源码清单
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
static int val=0;//临界区资源,critical resource
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;//静态初始化互斥锁
pthread_cond_t ready=PTHREAD_COND_INITIALIZER;//静态初始化条件变量
//生产者临界区资源处理函数
void producer_proc(int *cr){
printf("producer_process_critical_resource++++++++++++++++++%d\n\n",val);
*cr+=1;//处理临界区资源,做+1简单处理
}
//消费者临界区资源处理函数
void consumer_proc(int *cr){
printf("consumer_process_critical_resource------------------%d\n\n",val);
*cr=0;//处理临界区资源,这里直接重置为0
}
//生产者线程函数
void *producer_fun(void *p) {
while(1){
printf("producer_before_lock++++++++++++++++++++++++++++++++%d\n\n",val);
pthread_mutex_lock(&lock);//取得互斥锁
printf("producer_lock+++++++++++++++++++++++++++++++++++++++%d\n\n",val);
//处理临界区资源,做+1简单处理
producer_proc(&val);
pthread_cond_signal(&ready);//唤醒等待线程
printf("producer_unlock+++++++++++++++++++++++++++++++++++++%d\n\n",val);
pthread_mutex_unlock(&lock);//释放互斥锁
sleep(1);
}//end for while
return NULL;
}
//消费者线程函数
void *consumer_fun(void *p) {
while(1){
printf("consumer_before_lock--------------%d\n\n",val);
pthread_mutex_lock(&lock);//取得互斥锁
printf("consumer_lock---------------------%d\n\n",val);
while(val<10) {//检查临界区资源是否满足执行条件
printf("consumer_wait---------------------%d\n\n",val);
//暂时释放互斥锁,休眠等待生产者线程发送就绪信号ready,该函数返回时互斥锁再次被锁住
pthread_cond_wait(&ready,&lock);
printf("consumer_revive-------------------%d\n\n",val);
}
//处理临界区资源,这里直接重置为0
consumer_proc(&val);
printf("consumer_unlock-----------------------%d\n\n",val);
pthread_mutex_unlock(&lock);
sleep(1);
}//end for while
return NULL;
}
int main(int argc,char *argv[]) {
pthread_t cid,pid;//线程id
//创建消费者线程
int ret=pthread_create(&cid,NULL,consumer_fun,NULL);
if(ret!=0) {//检查线程创建结果
printf("create consumer thread failed\n");
exit(1);
}
//创建生成者线程
ret=pthread_create(&pid,NULL,producer_fun,NULL);
if(ret!=0) {//检查线程创建结果
printf("create producer thread failed\n");
exit(1);
}
//等待线程结束
pthread_join(cid,NULL);
pthread_join(pid,NULL);
return 0;
}
// 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
// 公众号:断点实验室
// 扫描二维码,关注更多优质原创,内容包括:音视频开发、图像处理、网络、
// Linux,Windows、Android、嵌入式开发等