[Linux]生产者消费者模型

概述

生产者消费者模型:一种典型的设计模式,是人们根据典型场景设计的解决方案

应用场景:应用于有大量的数据产生和进行处理的场景

具体实现:具有多个生产者和消费者线程来生产和处理庞大的数据量,但是生产者和消费者并不直接交互,而是通过中间的缓冲区进行协调工作;生产者产生数据存入缓冲区,消费者从缓冲区中拿到数据后进行处理。

[Linux]生产者消费者模型

优点:
解耦合:分离生产和消费,当数据的处理方式发生改变时,仅仅改变消费者即可
支持忙闲不均:如果生产的数据量过大,可以先存放在缓冲区中,让消费者慢慢处理
支持并发:可以开多个生产者线程或者消费者线程进行工作

为了能够支持并发,必须保证线程安全:
1、每个生产者之间应该保持互斥关系;每个消费者之间也应该保持互斥关系;
2、生产者与消费者之间应该保持同步与互斥关系

实现

底层实现:生产者线程和消费者线程(一个入队,一个出队) + 保证线程安全的队列作为缓冲区

1、实现线程安全队列
封装一个线程安全的队列类:class BlockQueue—阻塞队列
有空闲结点可以数据入队,没有则阻塞入队线程;
有数据结点可以数据出队,没有则阻塞出队线程;

思路:

class BlockQueue{
private:
 	int capacity;				//容量
 	std::queue<int> _queue;		//底层封装一个队列
 	pthread_mutex_t mutex;		//互斥锁
 	pthread_cond_t cond_pro;	//生产者条件变量
 	pthread_cond_t cond_cus;	//消费者条件变量
public:
 	BlockQueue(int capacity=5);	//构造函数
 	bool push(int data);		//入队
 	bool pop(int* data);		//出队
 	~BlockQueue();				//析构
};

2、创建线程进行数据入队、出队

完整代码:

#include<iostream>
#include<queue>
#include<pthread.h>
#include<cstdio>
#include<unistd.h>


class BlockQueue{
  private:
    int capacity;               //容量
    std::queue<int> _queue;     //底层封装一个队列
    pthread_mutex_t mutex;      //互斥锁
    pthread_cond_t cond_pro;    //生产者条件变量
    pthread_cond_t cond_cus;    //消费者条件变量
public:

    BlockQueue(int cap=5):capacity(cap){
      pthread_mutex_init(&mutex,NULL);
      pthread_cond_init(&cond_pro,NULL);
      pthread_cond_init(&cond_cus,NULL);

    }

    bool Push(int data){
     pthread_mutex_lock(&mutex);
     while(_queue.size()==capacity){
       pthread_cond_wait(&cond_pro,&mutex);
     }
     _queue.push(data);
     pthread_cond_signal(&cond_cus);
     pthread_mutex_unlock(&mutex);
      return true;
    }

    bool Pop(int* data){
      pthread_mutex_lock(&mutex);
      while(_queue.empty()){
        pthread_cond_wait(&cond_cus,&mutex);
      }
      *data=_queue.front();
      _queue.pop();
      pthread_cond_signal(&cond_pro);
      pthread_mutex_unlock(&mutex);
      return true;
    } 

    ~BlockQueue(){

      pthread_mutex_destroy(&mutex);
      pthread_cond_destroy(&cond_pro);
      pthread_cond_destroy(&cond_cus);
    }              
};

void* productor(void* arg){
  BlockQueue* q=(BlockQueue*)arg;
  int i=0;
  while(1){
    q->Push(i);
    printf("生产者入队数据:%d\n",i++);
  }
  return NULL;
}
void* customer(void* arg){
  BlockQueue* q=(BlockQueue*)arg;
  while(1){
    int data;
    q->Pop(&data);
    printf("消费者出队数据:%d\n",data);
  }
  return NULL;
}

int main(){
  BlockQueue q;
  pthread_t ptid[4],ctid[4];
  int ret;
  for(int i=0;i<4;i++){
    ret=pthread_create(&ptid[i],NULL,productor,&q);
    if(ret!=0){
      printf("create thread error\n");
      return -1;
    }
  } 

  for(int i=0;i<4;i++){
    ret=pthread_create(&ctid[i],NULL,customer,&q);
    if(ret!=0){
      printf("create thread error\n");
      return -1;
    }
  }

  for(int i=0;i<4;i++){
    pthread_join(ptid[i],NULL);
    pthread_join(ctid[i],NULL);
  }
  return 0;
}


部分结果:
[Linux]生产者消费者模型

注意:
这里我们定义的容量是5,所以正常情况下应该是入队5次,出队5次交替进行,但是由于cus和pro线程中的操作是非原子操作,所以时间片轮转时可能会打断之前未完成的操作。

上一篇:linux的线程与多线程


下一篇:详解Linux线程