线程安全
概念
多线程对于临界资源的访问操作是安全的。
实现
同步互斥
互斥:
通过对临界资源同一时间的唯一访问保证访问安全。
同步:
通过条件判断让线程对临界资源的访问更加合理。
互斥的实现
互斥锁
原理
本质上是一个0、1计数器,标记临界资源的两种状态。
- 访问资源之前加锁,通过计数器判断能否访问,不能则阻塞。
- 访问资源之后解锁,通过计数器标记资源为可访问,唤醒阻塞。
注意:
互斥锁本身也是临界资源,但是互斥锁自身通过原子操作保证自己安全。
操作流程
1.定义互斥锁
pthread_mutex_t mutex;
2.初始化互斥锁
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
参数:
mutex:互斥锁地址。类型是 pthread_mutex_t 。
attr:设置互斥量的属性,通常可采用默认属性,即可将 attr 设为 NULL。
返回值:
成功:0,成功申请的锁默认是打开的。
失败:非 0 错误码
3.访问临界资源之前加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
参数:
mutex:互斥锁地址。
返回值:
成功:0
失败:非 0 错误码
int pthread_mutex_trylock(pthread_mutex_t *mutex);
调用该函数时,若互斥锁未加锁,则上锁,返回 0;
若互斥锁已加锁,则函数直接返回失败,即 EBUSY。
4.访问临界资源之后解锁
int pthread_mutex_unlock(pthread_mutex_t * mutex);
参数:
互斥锁地址。
返回值:
成功:0
失败:非 0 错误码
5销毁互斥锁释放资源
int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数:
互斥锁地址。
返回值:
成功:0
失败:非 0 错误码
代码示例
1 #include<stdio.h>
2 #include<stdlib.h>
3 #include<pthread.h>
4 #include<unistd.h>
5 int tickets=100;
6
7 void* scalpers(void* arg)
8 {
9 pthread_mutex_t* mutex = (pthread_mutex_t*)arg;
10 while(1)
11 {
12 pthread_mutex_lock(mutex);
13 if(tickets>0)
14 {
15 usleep(1);
16 printf("我抢到了一张票:%d\n",tickets);
17 tickets--;
18 pthread_mutex_unlock(mutex);
19 }
20 else
21 {
22 pthread_mutex_unlock(mutex);
23 pthread_exit(NULL);
24 }
25 }
26 return NULL;
27 }
28
29 int main()
30 {
31 pthread_mutex_t mutex;
32
33 int ret;
34 pthread_t tid[5];
35 pthread_mutex_init(&mutex,NULL);
36 for(int i =0;i<5;i++)
37 {
38 ret =pthread_create(&tid[i],NULL,scalpers,&mutex);
39 if(ret!=0)
40 {
41 printf("pthread creat error.\n");
42 return -1;
43 }
44 }
45 for(int i =0;i<5;i++)
46 {
47 pthread_join(tid[i],NULL);
48 }
49 pthread_mutex_destroy(&mutex);
50 return 0;
51 }
死锁
程序流程无法推进,卡死。
四个必要条件
- 互斥条件
进程要求对所分配的资源进行排它性控制,即在一段时间内某资源仅为一进程所占用。 - 不可剥夺条件
当进程因请求资源而阻塞时,对已获得的资源保持不放。 - 请求与保持条件
进程已获得的资源在未使用完之前,不能剥夺,只能在使用完时由自己释放。 - 环路等待条件
在发生死锁时,必然存在一个进程——资源的环形链。
避免
银行家算法…………
预防
- 保证加解锁顺序一致。
- 若请求不到新锁,则释放已有的锁,
同步的实现
条件变量
原理
pcb等待队列、阻塞/唤醒阻塞接口。
- 当前线程对临界资源访问不合理的,则调用阻塞接口阻塞线程
- 若其他线程促使当前线程对资源获取合理,则调用唤醒接口。
注意:
条件变量本身并不确定资源什么时候获取合理,因此资源是否获取合理需要程序员自行判断。
操作流程
1.定义条件变量
pthread_cond_t cond;
2.初始化条件变量
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
参数:
cond:条件变量地址
attr:条件变量属性,通常为默认值,传NULL即可
3.如果资源获取不合理,阻塞
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
函数作用:
1.不满足条件则阻塞
2.解锁(相当于pthread_mutex_unlock(&mutex);
(1.2)两步为一个原子操作。
3.当被唤醒,pthread_cond_wait函数返回时,唤醒阻塞并重新加锁pthread_mutex_lock(&mutex);
4.使得资源获取合理,唤醒阻塞
int pthread_cond_signal(pthread_cond_t *cond);
唤醒至少一个阻塞在条件变量上的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
唤醒全部阻塞在条件变量上的线程
5.销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
注意:
条件变量要搭配互斥锁使用。
临界资源是其他线程促使条件满足,其中的条件是否满足也是临界资源的判断,所以要加锁。
代码示例
厨师做饭,客人吃饭。
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <stdlib.h>
4 #include <pthread.h>
5 pthread_mutex_t mutex;
6 pthread_cond_t cond_customer;
7 pthread_cond_t cond_cooker;
8 int bowl = 1;//1有0没有
9
10 void *cooker(void *arg)
11 {
12 while(1)
13 {
14 pthread_mutex_lock(&mutex);
15 while(bowl==1)
16 {
17 pthread_cond_wait(&cond_cooker,&mutex);
18 }
19 printf("本大厨做了碗蛋炒饭~~~\n");
20 bowl++;
21 pthread_cond_signal(&cond_customer);
22 pthread_mutex_unlock(&mutex);
23 }
24 return NULL;
25 }
26
27 void* customer(void *arg)
28 {
29 while(1)
30 {
31 pthread_mutex_lock(&mutex);
32 while(bowl==0)
33 {
34 pthread_cond_wait(&cond_customer,&mutex);
35 }
36 printf("太好吃了!!\n");
37 bowl--;
38 pthread_cond_signal(&cond_cooker);
39 pthread_mutex_unlock(&mutex);
40 }
41 return NULL;
42 }
43 int main()
44 {
45 pthread_t cotid,cutid;
46 int ret;
47 pthread_mutex_init(&mutex,NULL);
48 pthread_cond_init(&cond_cooker,NULL);
49 pthread_cond_init(&cond_customer,NULL);
50 for(int i =0;i<4;i++)
51 {
52 ret = pthread_create(&cotid,NULL,cooker,NULL);
53 if(ret!=0)
54 {
55 printf("pthread create error\n");
56 return -1;
57 }
58 }
59 for(int i =0;i<4;i++)
60 {
61 ret = pthread_create(&cutid,NULL,customer,NULL);
62 if(ret!=0)
63 {
64 printf("pthread create error\n ");
65 return -1;
66 }
67 }
68 pthread_join(cutid,NULL);
69 pthread_join(cotid,NULL);
70 pthread_mutex_destroy(&mutex);
71 pthread_cond_destroy(&cond_cooker);
72 pthread_cond_destroy(&cond_customer);
73 return 0;
74 }
注意事项
- 条件变量使用过程中,条件的判断应使用循环操作
- 条件变量使用过程中,若有多种角色则需要使用多个条件变量不同的角色分开等待,分开唤醒防止唤醒错误。
生产者消费者模型
在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。大概的结构如下图。
优点
- 解耦合
如果让生产者直接调用消费者,那么生产者对于消费者就会产生依赖。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。 - 支持忙闲不均
未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。 - 支持并发
生产者和消费者可以是两个独立的并发主体。
实现
生产者与生产者:互斥
消费者与消费者:互斥
生产者与消费者:同步+互斥
代码实现
1 #include<iostream>
2 #include<queue>
3 #include<pthread.h>
4 #include<stdio.h>
5
6 #define MAX_QUEUE 5
7 #define COUNT 5
8
9 class BlockQueue
10 {
11 private:
12 int _capacity;
13 pthread_mutex_t _mutex;
14 pthread_cond_t cond_pro;
15 pthread_cond_t cond_cus;
16 std:: queue<int> _queue;
17 public:
18 BlockQueue(int cap = MAX_QUEUE)
19 :_capacity(cap)
20 {
21 pthread_mutex_init(&_mutex,NULL);
22 pthread_cond_init(&cond_pro,NULL);
23 pthread_cond_init(&cond_cus,NULL);
24 }
25 ~BlockQueue()
26 {
27 pthread_mutex_destroy(&_mutex);
28 pthread_cond_destroy(&cond_pro);
29 pthread_cond_destroy(&cond_cus);
30 }
31
32 bool Push(int data)
33 {
34 pthread_mutex_lock(&_mutex);
35 while(_queue.size()==_capacity)
36 {
37 pthread_cond_wait(&cond_pro,&_mutex);
38 }
39 _queue.push(data);
40 pthread_cond_signal(&cond_cus);
41 pthread_mutex_unlock(&_mutex);
42 return true;
43 }
44 bool Pop(int* data)
45 {
46 pthread_mutex_lock(&_mutex);
47 while(_queue.empty())
48 {
49 pthread_cond_wait(&cond_cus,&_mutex);
50 }
51 *data = _queue.front();
52 _queue.pop();
53 pthread_cond_signal(&cond_pro);
54 pthread_mutex_unlock(&_mutex);
55 return true;
56 }
57 };
58
59
60 void* producter(void* arg)
61 {
62 BlockQueue* q=(BlockQueue*)arg;
63 int i =0;
64 while(1)
65 {
66 q->Push(i);
67 printf("%p-push data:%d\n",pthread_self(),i++);
68
69 }
70 return NULL;
71
72
73 }
74 void* customer(void* arg)
75 {
76 BlockQueue* q=(BlockQueue*)arg;
77 while(1)
78 {
79 int data;
80 q->Pop(&data);
81 printf("%p-get data:%d\n",pthread_self(),data);
82 }
83 return NULL;
84
85 }
86
87
88 int main()
89 {
90 BlockQueue q;
91 int ret;
92 pthread_t ptid[COUNT],ctid[COUNT];
93 for(int i =0;i<COUNT;i++)
94 {
95 ret = pthread_create(&ptid[i],NULL,producter,&q);
96 if(ret!=0)
97 {
98 printf("pthread create error\n");
99 return -1;
100 }
101 }
102 for(int i =0;i<COUNT;i++)
103 {
104 ret =pthread_create(&ctid[i],NULL,customer,&q);
105 if(ret!=0)
106 {
107 printf("pthread create error\n");
108 return -1;
109 }
110 }
111 for(int i =0;i<COUNT;i++)
112 {
113 pthread_join(ptid[i],NULL);
114 pthread_join(ctid[i],NULL);
115 }
116 return 0;
117 }
信号量
本质
计数器+pcb等待队列
操作
- p计数-1,计数<0阻塞
- v计数+1,唤醒阻塞
同步实现
通过计数器对资源计数
获取资源之前P,产生资源之后P
互斥实现
初始化值为1,表示只有一个资源
访问资源前进行P,访问资源完后V
操作流程
1.定义信号量
sem_t sem;
2.初始化信号量
int sem_init __P ((sem_t *__sem, int __pshared, unsigned int __value));
sem为指向信号量结构的一个指针;
pshared不为0时此信号量在进程间共享,否则只能为当前进程的所有线程共享;
value给出了信号量的初始值。
3.P
sem_wait( sem_t *sem )
阻塞当前线程直到信号量sem的值大于0,解除阻塞后将sem的值减一,表明公共资源经使用后减少
4.V
sem_post( sem_t *sem )
用来增加信号量的值
5.销毁信号量
sem_destroy(sem_t *sem)
用来释放信号量sem,属于无名信号量
代码示例
信号量实现生产消费模型
1 #include <iostream>
2 #include <vector>
3 #include <pthread.h>
4 #include<stdio.h>
5 #include <semaphore.h>
6
7 using namespace std;
8
9 #define MAX_QUEUE 5
10
11 class RingQueue
12 {
13 private:
14 int _capacity;
15 int _read;
16 int _write;
17 vector<int> arr;
18 sem_t _lock;//实现互斥
19 sem_t _idle;//空闲节点数量
20 sem_t _data;//数据节点数量
21 public:
22 RingQueue(int cap = MAX_QUEUE)
23 :_capacity(cap)
24 ,_read(0)
25 ,_write(0)
26 ,arr(cap)
27 {
28 sem_init(&_lock,0,1);
29 sem_init(&_idle,0,cap);
30 sem_init(&_data,0,0);
31 }
32 ~RingQueue()
33 {
34 sem_destroy(&_lock);
35 sem_destroy(&_idle);
36 sem_destroy(&_data);
37 }
38 bool Push(int data)
39 {
40 sem_wait(&_idle);
41 sem_wait(&_lock);
42 arr[_write]=data;
43 _write=(_write+1)%_capacity;
44 sem_post(&_lock);
45 sem_post(&_data);
46 return true;
47 }
48 bool Pop(int* data)
49 {
50 sem_wait(&_data);
51 sem_wait(&_lock);
52 *data=arr[_read];
53 _read=(_read+1)%_capacity;
54 sem_post(&_lock);
55 sem_post(&_idle);
56 return true;
57 }
58 };
59
60 void *productor(void *arg)
61 {
62 RingQueue *q = (RingQueue*)arg;
63 int i = 0;
64 while(1) {
65 q->Push(i);
66
67 printf("%p-push data:%d\n", pthread_self(), i++);
68
69 }
70 return NULL;
71
72 }
73 void *customer(void *arg)
74 {
75 RingQueue *q = (RingQueue*)arg;
76 while(1) {
77 int data;
78 q->Pop(&data);
79
80 printf("%p-get data:%d\n", pthread_self(), data);
81
82 }
83 return NULL;
84
85 }
86 int main (int argc, char *argv[])
87 {
88 RingQueue q;
89 int count = 4, ret;
90 pthread_t ptid[4], ctid[4];
91 for (int i = 0; i < count; i++) {
92 ret = pthread_create(&ptid[i],NULL,productor,&q);
93 if (ret != 0) {
94 printf("thread create error\n");
95 return -1;
96
97 }
98
99 }
100 for (int i = 0; i < count; i++) {
101 ret = pthread_create(&ctid[i],NULL,customer,&q);
102 if (ret != 0) {
103 printf("thread create error\n");
104 return -1;
105
106 }
107
108 }
109 for (int i = 0; i < count; i++) {
110 pthread_join(ptid[i], NULL);
111 pthread_join(ctid[i], NULL);
112
113 }
114 return 0;
115
116 }