【LINUX】多线程(生产者消费者模型,POXIS信号量)

多线程

生产者消费者模型

为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生活中很常见这种例子:
在超市中,生产方会通过与超市交涉,将商品输送给超市,消费者在看到超市有自己的东西就回去消费,如果超市没有,消费者则会去发信号通知超市,超市就会去催促生产者,这就是生活着简单的例子。

生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

【LINUX】多线程(生产者消费者模型,POXIS信号量)
我们可以通过这种图更加直观的理解:

一般生产者和消费者在LINUX中会用进程线程来表示。

在这张图中可以明显看到有三层关系。

1.生产者-生产者
2.消费者-消费者
3.消费者-生产者

  • 第一层:
    • 生产者之间存在的是互斥,因为一个品牌方上架时,另一个品牌商就会去排队,直到第一个结束,才会轮到第二个。
  • 第二层:
    • 与第一层同理。
  • 第三层:
    • 可以理解为同步,当Empty或者Full,就会去互相发信号去提醒,也就是我们所说的条件变量

基于BlockingQueue的生产者消费者模型

BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

【LINUX】多线程(生产者消费者模型,POXIS信号量)

实战演示

线程执行代码

** 消费者通过代码输出数据。**

** 生产者通过代码输入数据**

void *consumer_run(void *arg)
{
      BlockQueue *bq = (BlockQueue*)arg;

      while(true)
      {
        int n=0;
        bq->Get(n);
        cout<<n<<endl;
      }
        
}
void *productor_run(void* arg)
{
  sleep(1);
  BlockQueue *bq=(BlockQueue*)arg;
  int count=0;
  while(1)
  {
    count=count%5+1;
    bq->Put(count);
    cout<<count<<endl;
    sleep(1);
  }
}

BlockQueue创建

首先我们要对消费者和生产者发信号,就需要两个条件变量,并且其关系为同步,所以要保证操作的原子性,必须加锁。

当队列为空时候消费者进来等待并给生产者发信号。
当队列为满时候生产者进来等待并给消费者发信号。

class BlockQueue
{
  private:
    std::queue<int> q;
    size_t cap;
    pthread_mutex_t lock;
    pthread_cond_t c_cond;
    pthread_cond_t p_cond;
    bool Isempty()
    {
      return q.empty();
    }
    bool IsFull()
    {
      if(cap==q.size())
      {
        return true;
      }
      return false;
    }
    void WakeUpProtector()
    {
      printf("wakeup protector\n");
      pthread_cond_signal(&p_cond);

    }
    void WakeUpComsumer()
    {
      printf("wakeup comsumer\n");
      pthread_cond_signal(&c_cond);
    }
    void ProtectorWait()
    {
      printf("waite protector\n");
      pthread_cond_wait(&p_cond,&lock);
    }
    void ConsumerWait()
    {
      printf("wait comsumer\n");
      pthread_cond_wait(&c_cond,&lock);
    }
    void LockQueue()
    {
      pthread_mutex_lock(&lock);
    }
    void UnLockQueue()
    {
      pthread_mutex_unlock(&lock);
    }
  public:
    BlockQueue(size_t _cap)
      :cap(_cap)
    {
      pthread_mutex_init(&lock,NULL);
      pthread_cond_init(&c_cond,NULL);
      pthread_cond_init(&p_cond,NULL);
    }
    void Put(int t)
    {
      LockQueue();
      if(IsFull())
      {
        WakeUpComsumer();
        ProtectorWait();
      }
        q.push(t);
        UnLockQueue();
    }
    void Get(int &t)
    {
      LockQueue();
      if(Isempty())
      {
        WakeUpProtector();
        ConsumerWait();
      }
      t=q.front();
      q.pop();
      UnLockQueue();
    }
    ~BlockQueue()
    {
      pthread_mutex_destroy(&lock);
      pthread_cond_destroy(&c_cond);
      pthread_cond_destroy(&p_cond);

    }
}; 

结果呈现:
通过结果可以看到消费者进来为空,通知生产者,生产者生产满了发信给消费者并阻塞,这样就会提高效率。
【LINUX】多线程(生产者消费者模型,POXIS信号量)

POXIS信号量

基本概念和创建

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()

信号量本质是一个计数器,描述临界资源有效个数的计数器。

临界资源可以看成多份,不冲突,提高效率

这里相当于将临界区划分为多个区域,如果进来一个线程空间就会–对应的就是P()操作,当出去一个线程就会++对应的就是V()操作,当所有的空间都被占用后,其就会让晚来的线程进行等待,直到有剩余空间。
【LINUX】多线程(生产者消费者模型,POXIS信号量)

基于环形队列的生产消费模型

这里环形队列需要用到环有关的信息,如果忘了冠希之前所分析的可以点链接回去复习以下。
链接: 约瑟夫环.

模拟代码实现

#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<semaphore.h>
#include<stdlib.h>
#include<vector>
using namespace std;
class RingQueue
{
private:
  vector<int> q;
  int _cap;
  sem_t data_sem;
  sem_t blank_sem;
  int consumer_step;
  int product_step;
public:
  RingQueue(int cap=10)
    :q(cap),_cap(cap)
  {
    sem_init(&data_sem,0,0);
    sem_init(&blank_sem,0,_cap);
    consumer_step=0;
    product_step=0;
  }
  void PutData(int &data)
  {
    sem_wait(&blank_sem);
    q[consumer_step]=data;
    consumer_step++;
    consumer_step%=_cap;
    sem_post(&data_sem);
  }
  void GetData(int &data)
  {
    sem_wait(&data_sem);
    data=q[product_step];
    product_step++;
    product_step%=_cap;
    sem_post(&blank_sem);
  }
  ~RingQueue()
  {
    sem_destroy(&data_sem);
    sem_destroy(&blank_sem);
  }

};

void* consumer(void* arg)
{
  RingQueue* q=(RingQueue*)arg;
  int data;
  while(1)
  {
    q->GetData(data);
    cout<<"consume data done:"<<data<<endl;
    sleep(1);
  }
}
void* productor(void* arg)
{
  RingQueue *q = (RingQueue*)arg;
  while(1)
  {
    int data=rand()%10;
    q->PutData(data);
    cout<<"Product data done"<<data<<endl;
    sleep(1);
  }
}
int main()
{
  RingQueue rq;
  pthread_t c,p;
  pthread_create(&c,NULL,consumer,(void*)&rq);
  pthread_create(&p,NULL,productor,(void*)&rq);
  pthread_join(c,NULL);
  pthread_join(p,NULL);
}

上一篇:详解Linux线程


下一篇:GDB 调试 .NET 程序实录 - .NET 调用 .so 出现问题怎么解决