概述
生产者消费者模型:一种典型的设计模式,是人们根据典型场景设计的解决方案
应用场景:应用于有大量的数据产生和进行处理的场景
具体实现:具有多个生产者和消费者线程来生产和处理庞大的数据量,但是生产者和消费者并不直接交互,而是通过中间的缓冲区进行协调工作;生产者产生数据存入缓冲区,消费者从缓冲区中拿到数据后进行处理。
优点:
解耦合:分离生产和消费,当数据的处理方式发生改变时,仅仅改变消费者即可
支持忙闲不均:如果生产的数据量过大,可以先存放在缓冲区中,让消费者慢慢处理
支持并发:可以开多个生产者线程或者消费者线程进行工作
为了能够支持并发,必须保证线程安全:
1、每个生产者之间应该保持互斥关系;每个消费者之间也应该保持互斥关系;
2、生产者与消费者之间应该保持同步与互斥关系
实现
底层实现:生产者线程和消费者线程(一个入队,一个出队) + 保证线程安全的队列作为缓冲区
1、实现线程安全队列
封装一个线程安全的队列类:class BlockQueue—阻塞队列
有空闲结点可以数据入队,没有则阻塞入队线程;
有数据结点可以数据出队,没有则阻塞出队线程;
思路:
class BlockQueue{
private:
int capacity; //容量
std::queue<int> _queue; //底层封装一个队列
pthread_mutex_t mutex; //互斥锁
pthread_cond_t cond_pro; //生产者条件变量
pthread_cond_t cond_cus; //消费者条件变量
public:
BlockQueue(int capacity=5); //构造函数
bool push(int data); //入队
bool pop(int* data); //出队
~BlockQueue(); //析构
};
2、创建线程进行数据入队、出队
完整代码:
#include<iostream>
#include<queue>
#include<pthread.h>
#include<cstdio>
#include<unistd.h>
class BlockQueue{
private:
int capacity; //容量
std::queue<int> _queue; //底层封装一个队列
pthread_mutex_t mutex; //互斥锁
pthread_cond_t cond_pro; //生产者条件变量
pthread_cond_t cond_cus; //消费者条件变量
public:
BlockQueue(int cap=5):capacity(cap){
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&cond_pro,NULL);
pthread_cond_init(&cond_cus,NULL);
}
bool Push(int data){
pthread_mutex_lock(&mutex);
while(_queue.size()==capacity){
pthread_cond_wait(&cond_pro,&mutex);
}
_queue.push(data);
pthread_cond_signal(&cond_cus);
pthread_mutex_unlock(&mutex);
return true;
}
bool Pop(int* data){
pthread_mutex_lock(&mutex);
while(_queue.empty()){
pthread_cond_wait(&cond_cus,&mutex);
}
*data=_queue.front();
_queue.pop();
pthread_cond_signal(&cond_pro);
pthread_mutex_unlock(&mutex);
return true;
}
~BlockQueue(){
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond_pro);
pthread_cond_destroy(&cond_cus);
}
};
void* productor(void* arg){
BlockQueue* q=(BlockQueue*)arg;
int i=0;
while(1){
q->Push(i);
printf("生产者入队数据:%d\n",i++);
}
return NULL;
}
void* customer(void* arg){
BlockQueue* q=(BlockQueue*)arg;
while(1){
int data;
q->Pop(&data);
printf("消费者出队数据:%d\n",data);
}
return NULL;
}
int main(){
BlockQueue q;
pthread_t ptid[4],ctid[4];
int ret;
for(int i=0;i<4;i++){
ret=pthread_create(&ptid[i],NULL,productor,&q);
if(ret!=0){
printf("create thread error\n");
return -1;
}
}
for(int i=0;i<4;i++){
ret=pthread_create(&ctid[i],NULL,customer,&q);
if(ret!=0){
printf("create thread error\n");
return -1;
}
}
for(int i=0;i<4;i++){
pthread_join(ptid[i],NULL);
pthread_join(ctid[i],NULL);
}
return 0;
}
部分结果:
注意:
这里我们定义的容量是5,所以正常情况下应该是入队5次,出队5次交替进行,但是由于cus和pro线程中的操作是非原子操作,所以时间片轮转时可能会打断之前未完成的操作。