多线程——(2.线程安全)

线程安全

概念

多线程对于临界资源的访问操作是安全的。

实现

同步互斥

互斥:
通过对临界资源同一时间的唯一访问保证访问安全。

同步:
通过条件判断让线程对临界资源的访问更加合理。

互斥的实现

互斥锁

原理

本质上是一个0、1计数器,标记临界资源的两种状态。

  1. 访问资源之前加锁,通过计数器判断能否访问,不能则阻塞。
  2. 访问资源之后解锁,通过计数器标记资源为可访问,唤醒阻塞。

注意:
互斥锁本身也是临界资源,但是互斥锁自身通过原子操作保证自己安全。

操作流程

1.定义互斥锁

pthread_mutex_t mutex;

2.初始化互斥锁

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
参数:
mutex:互斥锁地址。类型是 pthread_mutex_t 。
attr:设置互斥量的属性,通常可采用默认属性,即可将 attr 设为 NULL。
返回值:
成功:0,成功申请的锁默认是打开的。
失败:非 0 错误码

3.访问临界资源之前加锁

int pthread_mutex_lock(pthread_mutex_t *mutex);
参数:
mutex:互斥锁地址。
返回值:
成功:0
失败:非 0 错误码
int pthread_mutex_trylock(pthread_mutex_t *mutex);
调用该函数时,若互斥锁未加锁,则上锁,返回 0;
若互斥锁已加锁,则函数直接返回失败,即 EBUSY。

4.访问临界资源之后解锁

int pthread_mutex_unlock(pthread_mutex_t * mutex);
参数:
互斥锁地址。
返回值:
成功:0
失败:非 0 错误码

5销毁互斥锁释放资源

int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数:
互斥锁地址。
返回值:
成功:0
失败:非 0 错误码

代码示例

  1 #include<stdio.h>
  2 #include<stdlib.h>
  3 #include<pthread.h>
  4 #include<unistd.h>
  5 int tickets=100;
  6 
  7 void* scalpers(void* arg)
  8 {
  9   pthread_mutex_t* mutex = (pthread_mutex_t*)arg;
 10   while(1)
 11   {
 12     pthread_mutex_lock(mutex);
 13     if(tickets>0)
 14     {
 15       usleep(1);
 16       printf("我抢到了一张票:%d\n",tickets);
 17       tickets--;
 18       pthread_mutex_unlock(mutex);
 19     }
 20     else  
 21     {
 22       pthread_mutex_unlock(mutex);                                                                                                                                  
 23       pthread_exit(NULL);
 24     }
 25   }
 26   return NULL;
 27 }
 28 
 29 int main()
 30 {
 31   pthread_mutex_t mutex;
 32 
 33   int ret;
 34   pthread_t tid[5];
 35   pthread_mutex_init(&mutex,NULL);
 36   for(int i =0;i<5;i++)
 37   {
 38     ret =pthread_create(&tid[i],NULL,scalpers,&mutex);
 39     if(ret!=0)
 40     {
 41       printf("pthread creat error.\n");
 42       return -1;
 43     }
 44   }
 45   for(int i =0;i<5;i++)
 46   {
 47     pthread_join(tid[i],NULL);
 48   }
 49   pthread_mutex_destroy(&mutex);
 50   return 0;
 51 }

死锁

程序流程无法推进,卡死。

四个必要条件

  1. 互斥条件
    进程要求对所分配的资源进行排它性控制,即在一段时间内某资源仅为一进程所占用。
  2. 不可剥夺条件
    当进程因请求资源而阻塞时,对已获得的资源保持不放。
  3. 请求与保持条件
    进程已获得的资源在未使用完之前,不能剥夺,只能在使用完时由自己释放。
  4. 环路等待条件
    在发生死锁时,必然存在一个进程——资源的环形链。

避免

银行家算法…………

预防

  1. 保证加解锁顺序一致。
  2. 若请求不到新锁,则释放已有的锁,

同步的实现

条件变量

原理

pcb等待队列、阻塞/唤醒阻塞接口。

  1. 当前线程对临界资源访问不合理的,则调用阻塞接口阻塞线程
  2. 若其他线程促使当前线程对资源获取合理,则调用唤醒接口。

注意:
条件变量本身并不确定资源什么时候获取合理,因此资源是否获取合理需要程序员自行判断。

操作流程

1.定义条件变量

pthread_cond_t  cond; 

2.初始化条件变量

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
参数:
cond:条件变量地址 
attr:条件变量属性,通常为默认值,传NULL即可

3.如果资源获取不合理,阻塞

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); 
函数作用:
   1.不满足条件则阻塞
   2.解锁(相当于pthread_mutex_unlock(&mutex);
     (1.2)两步为一个原子操作。
   3.当被唤醒,pthread_cond_wait函数返回时,唤醒阻塞并重新加锁pthread_mutex_lock(&mutex);

4.使得资源获取合理,唤醒阻塞

int pthread_cond_signal(pthread_cond_t *cond); 
唤醒至少一个阻塞在条件变量上的线程  
int pthread_cond_broadcast(pthread_cond_t *cond); 
唤醒全部阻塞在条件变量上的线程 

5.销毁条件变量

int pthread_cond_destroy(pthread_cond_t *cond); 

注意:
条件变量要搭配互斥锁使用。
临界资源是其他线程促使条件满足,其中的条件是否满足也是临界资源的判断,所以要加锁。

代码示例

厨师做饭,客人吃饭。

    1 #include <stdio.h>
    2 #include <unistd.h>
    3 #include <stdlib.h>
    4 #include <pthread.h>
    5 pthread_mutex_t mutex;
    6 pthread_cond_t cond_customer;
    7 pthread_cond_t cond_cooker;
    8 int bowl = 1;//1有0没有
    9 
   10 void *cooker(void *arg)
   11 {
   12   while(1)
   13   {
   14     pthread_mutex_lock(&mutex);
   15     while(bowl==1)
   16     {
   17       pthread_cond_wait(&cond_cooker,&mutex);
   18     }
   19     printf("本大厨做了碗蛋炒饭~~~\n");                                                                                                                            
   20     bowl++;
   21     pthread_cond_signal(&cond_customer);
   22     pthread_mutex_unlock(&mutex);
   23   }
   24   return NULL;
   25 }
   26 
   27 void* customer(void *arg)
   28 {
   29   while(1)
   30   {
   31     pthread_mutex_lock(&mutex);
   32     while(bowl==0)
   33     {
   34       pthread_cond_wait(&cond_customer,&mutex);
   35     }
   36     printf("太好吃了!!\n");
   37     bowl--;
   38     pthread_cond_signal(&cond_cooker);
   39     pthread_mutex_unlock(&mutex);
   40   }                                                                                                                                                               
   41   return NULL;
   42 }
   43 int main()
   44 {
   45   pthread_t cotid,cutid;
   46   int ret;
   47   pthread_mutex_init(&mutex,NULL);
   48   pthread_cond_init(&cond_cooker,NULL);
   49   pthread_cond_init(&cond_customer,NULL);
   50   for(int i =0;i<4;i++)
   51   {
   52     ret = pthread_create(&cotid,NULL,cooker,NULL);
   53     if(ret!=0)
   54     {
   55       printf("pthread create error\n");
   56       return -1;
   57     }
   58   }
   59   for(int i =0;i<4;i++)
   60   {
   61     ret = pthread_create(&cutid,NULL,customer,NULL);
   62     if(ret!=0)
   63     {
   64       printf("pthread create error\n ");
   65       return -1;
   66     }
   67   }
   68   pthread_join(cutid,NULL);
   69   pthread_join(cotid,NULL);
   70   pthread_mutex_destroy(&mutex);
   71   pthread_cond_destroy(&cond_cooker);
   72   pthread_cond_destroy(&cond_customer);
   73   return 0;
   74 }                                      

注意事项

  1. 条件变量使用过程中,条件的判断应使用循环操作
  2. 条件变量使用过程中,若有多种角色则需要使用多个条件变量不同的角色分开等待,分开唤醒防止唤醒错误。

生产者消费者模型

在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。大概的结构如下图。
多线程——(2.线程安全)

优点

  1. 解耦合
    如果让生产者直接调用消费者,那么生产者对于消费者就会产生依赖。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
  2. 支持忙闲不均
    未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
  3. 支持并发
    生产者和消费者可以是两个独立的并发主体。

实现

生产者与生产者:互斥
消费者与消费者:互斥
生产者与消费者:同步+互斥

代码实现

    1 #include<iostream>                                                                                                                                            
    2 #include<queue>
    3 #include<pthread.h>
    4 #include<stdio.h>
    5 
    6 #define MAX_QUEUE 5
    7 #define COUNT 5
    8 
    9 class BlockQueue
   10 {
   11 private:
   12     int _capacity;
   13     pthread_mutex_t _mutex;
   14     pthread_cond_t cond_pro;
   15     pthread_cond_t cond_cus;
   16    std:: queue<int> _queue;
   17 public:
   18     BlockQueue(int cap = MAX_QUEUE)
   19     :_capacity(cap)
   20     {
   21       pthread_mutex_init(&_mutex,NULL);
   22       pthread_cond_init(&cond_pro,NULL);
   23       pthread_cond_init(&cond_cus,NULL);
   24     }
   25     ~BlockQueue()
   26     {
   27       pthread_mutex_destroy(&_mutex);
   28       pthread_cond_destroy(&cond_pro);
   29      pthread_cond_destroy(&cond_cus); 
   30     }
   31 
   32     bool Push(int data)
   33     {
   34       pthread_mutex_lock(&_mutex);
   35       while(_queue.size()==_capacity)
   36       {
   37         pthread_cond_wait(&cond_pro,&_mutex);
   38       }
   39       _queue.push(data);
   40       pthread_cond_signal(&cond_cus);
   41       pthread_mutex_unlock(&_mutex);
   42       return true;
   43     }
   44     bool Pop(int* data)
   45     {
   46       pthread_mutex_lock(&_mutex);
   47       while(_queue.empty())
   48       {
   49         pthread_cond_wait(&cond_cus,&_mutex);
   50       }
   51       *data = _queue.front();
   52       _queue.pop();
   53       pthread_cond_signal(&cond_pro);
   54       pthread_mutex_unlock(&_mutex);
   55       return true;
   56     }
   57 };
   58 
   59 
   60 void* producter(void* arg)
   61 {                                                                                                                                                                 
   62   BlockQueue* q=(BlockQueue*)arg;
   63   int i =0;
   64   while(1)
   65   {
   66     q->Push(i);
   67     printf("%p-push data:%d\n",pthread_self(),i++);
   68 
   69   }
   70   return NULL;
   71 
   72 
   73 }
   74 void* customer(void* arg)
   75 {
   76   BlockQueue* q=(BlockQueue*)arg;
   77   while(1)
   78   {
   79     int data;
   80     q->Pop(&data);
   81     printf("%p-get data:%d\n",pthread_self(),data);
   82   }
   83   return NULL;
   84 
   85 }
   86 
   87 
   88 int main()
   89 {
   90   BlockQueue q;
   91   int ret;
   92   pthread_t ptid[COUNT],ctid[COUNT];
   93   for(int i =0;i<COUNT;i++)
   94   {
   95     ret = pthread_create(&ptid[i],NULL,producter,&q);                                                                                                             
   96     if(ret!=0)
   97     {
   98       printf("pthread create error\n");
   99       return -1;
  100     }
  101   }
  102   for(int i =0;i<COUNT;i++)
  103   {
  104     ret =pthread_create(&ctid[i],NULL,customer,&q);
  105     if(ret!=0)
  106     {
  107       printf("pthread create error\n");
  108       return -1;
  109     }
  110   }
  111   for(int i =0;i<COUNT;i++)
  112   {
  113     pthread_join(ptid[i],NULL);
  114     pthread_join(ctid[i],NULL);
  115   }
  116 return 0;
  117 }                  

信号量

本质

计数器+pcb等待队列

操作

  1. p计数-1,计数<0阻塞
  2. v计数+1,唤醒阻塞

同步实现

通过计数器对资源计数
获取资源之前P,产生资源之后P

互斥实现

初始化值为1,表示只有一个资源
访问资源前进行P,访问资源完后V

操作流程

1.定义信号量

sem_t sem;

2.初始化信号量

int sem_init __P ((sem_t *__sem, int __pshared, unsigned int __value));
sem为指向信号量结构的一个指针;
pshared不为0时此信号量在进程间共享,否则只能为当前进程的所有线程共享;
value给出了信号量的初始值。

3.P

sem_wait( sem_t *sem )
阻塞当前线程直到信号量sem的值大于0,解除阻塞后将sem的值减一,表明公共资源经使用后减少

4.V

sem_post( sem_t *sem )
用来增加信号量的值

5.销毁信号量

sem_destroy(sem_t *sem)
用来释放信号量sem,属于无名信号量

代码示例

信号量实现生产消费模型

    1 #include <iostream>
    2 #include <vector>
    3 #include <pthread.h>
    4 #include<stdio.h>
    5 #include <semaphore.h>
    6 
    7 using namespace std;
    8 
    9 #define MAX_QUEUE 5
   10 
   11 class RingQueue                                                                                                                                                   
   12 {
   13   private:
   14     int _capacity;
   15     int _read;
   16     int _write;
   17     vector<int> arr;
   18     sem_t _lock;//实现互斥
   19     sem_t _idle;//空闲节点数量
   20     sem_t _data;//数据节点数量
   21   public:
   22     RingQueue(int cap = MAX_QUEUE)
   23       :_capacity(cap)
   24       ,_read(0)
   25       ,_write(0)
   26       ,arr(cap)
   27     {
   28       sem_init(&_lock,0,1);
   29       sem_init(&_idle,0,cap);
   30       sem_init(&_data,0,0);
   31     }
   32     ~RingQueue()
   33     {
   34       sem_destroy(&_lock);
   35       sem_destroy(&_idle);
   36       sem_destroy(&_data);
   37     }
   38     bool Push(int data)
   39     {
   40       sem_wait(&_idle);
   41       sem_wait(&_lock);
   42       arr[_write]=data;
   43       _write=(_write+1)%_capacity; 
   44       sem_post(&_lock);
   45       sem_post(&_data);
   46      return true;
   47     }     
   48     bool Pop(int* data)
   49     {
   50       sem_wait(&_data);
   51       sem_wait(&_lock);
   52       *data=arr[_read];
   53       _read=(_read+1)%_capacity;
   54       sem_post(&_lock);
   55       sem_post(&_idle);
   56       return true;
   57     }    
   58 };
   59 
   60 void *productor(void *arg)
   61 {
   62   RingQueue *q = (RingQueue*)arg;
   63   int i = 0;
   64   while(1) {
   65     q->Push(i);
   66 
   67     printf("%p-push data:%d\n", pthread_self(), i++);
   68 
   69   }
   70   return NULL;
   71 
   72 }
   73 void *customer(void *arg)
   74 {
   75       RingQueue *q = (RingQueue*)arg;
   76       while(1) {
   77         int data;
   78         q->Pop(&data);
   79 
   80         printf("%p-get data:%d\n", pthread_self(), data);
   81                                     
   82       }
   83           return NULL;
   84 
   85 }
   86 int main (int argc, char *argv[])
   87 {
   88   RingQueue q;
   89   int count = 4, ret;
   90   pthread_t ptid[4], ctid[4];
   91   for (int i = 0; i < count; i++) {
   92     ret = pthread_create(&ptid[i],NULL,productor,&q);
   93     if (ret != 0) {
   94       printf("thread create error\n");
   95       return -1;
   96 
   97     }
   98 
   99   }
  100   for (int i = 0; i < count; i++) {
  101     ret = pthread_create(&ctid[i],NULL,customer,&q);
  102     if (ret != 0) {
  103       printf("thread create error\n");
  104       return -1;
  105 
  106     }
  107 
  108   }
  109   for (int i = 0; i < count; i++) {
  110     pthread_join(ptid[i], NULL);
  111     pthread_join(ctid[i], NULL);
  112 
  113   }                                                                                                                                                               
  114   return 0;
  115 
  116 }
上一篇:Undefined reference to‘pthread_create‘


下一篇:linux的线程与多线程