1. 互斥
①引入互斥
先上一份代码,根据代码进入概念
代码:模拟多线程抢票代码,其中票数设为全局变量
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <unistd.h>
using namespace std;
#define NUM 3 //定义的线程创建数
//描述线程的结构体,这里就定义了一个名字
class threadData
{
public:
threadData(int number)
{
threadname = "thread-" + to_string(number);
}
public:
string threadname;
};
int tickets = 100; // 票数 多线程中的共享资源
void *getTickets(void *args)
{
threadData *td = static_cast<threadData *>(args);
const char *name = td->threadname.c_str();
while(true)
{
if(tickets > 0)
{
usleep(1000);
printf("who=%s, get a ticket:%d\n", name, tickets);
tickets--;
}
else
break;
}
printf("%s ... quit\n", name);
return nullptr;
}
int main()
{
vector<pthread_t> tids;
vector<threadData *> thread_datas;
// 创建多线程
for (int i = 0; i < NUM; i++)
{
pthread_t tid;
threadData *td = new threadData(i);
thread_datas.push_back(td);
pthread_create(&tid, nullptr, getTickets, thread_datas[i]);
tids.push_back(tid);
}
for(const auto &wait : tids)
{
pthread_join(wait, nullptr);
}
for(const auto &td : thread_datas)
{
delete td;
}
return 0;
}
运行结果:
-
问题:
- 在讲信号量这一部分时说到,在语言层减一操作并不是安全的。因为转成汇编就是三条(也可能多条)汇编,进程在运行时也随时会被切换
-
tickets
(全局的)是我们上文定义的共享数据,出现了数据不一致问题。原因:无疑多线程并发访问。
-
解释:
-
tickets--
问题,图解:
注:objdump -S tickets
查看tickets的反汇编——取出tickets--
的汇编代码
- 运行结果错误原因:数据不一致
-
注:线程切换时机:线程切换 ——> 内核返回用户会检查时间片,到了就切换。
usleep期间,线程休眠进入内核态,usleep结束唤醒线程,返回用户态继续执行
②线程安全和可重入
概念
- 线程安全:多线程并发访问同一份代码,不会出现不同的结果
- 重入:同一个函数在被不同的执行流调用,当前执行流没有执行完,其它执行流再次进入。
可重入函数:一个函数在重入的情况下,运行结果不会出现任何不同或者问题
不可重入函数:函数被重入,运行结果出现不同或者问题。大部分都是不可重入函数
常见线程安全情况:
- 线程对共享资源,只有读取权限没有写入权限
- 类或者接口对于线程来说是原子性操作
- 多执行流切换,不会导致该接口的运行结果存在二义性
常见线程不安全情况:
- 不保护共享变量和静态变量的函数——调用线程不安全的函数
- 函数状态会被执行流调用影响
常见不可重入的情况:
- 调用malloc和free函数:因为malloc函数是用全局链表来管理堆的,多线程同时调用,可能导致分配出错,内存泄漏和覆盖等问题
- STL容器:STL容器使用动态内存分配、指针等来管理数据结构,多线程同时调用同一个容器进行读写操作,可能会导致数据结构破坏和不一致问题。STL容器在内部会使用全局变量或静态变量来维护一些数据结构,全局变量和静态变量可能导致竞态条件和数据不一致问题。
- 调用标准IO库函数:底层实现使用了全局的数据结构
- 使用静态数据结构的函数
常见可重入的情况:
- 不使用全局变量或静态变量
- 不使用malloc和new
- 使用本地数据,或者说把全局数据拷贝到本地使用
线程安全和可重入的联系
联系:
- 函数可重入,则线程安全
- 不可重入的函数,就不能多个线程并发访问
- 一个函数有全局变量或者静态变量那这个函数即是不可重入也是线程不安全
③互斥和周边概念
如何解决上述问题?
先引入一批周边概念:
临界资源:多线程执行流共享的资源(eg:受锁保护的资源)
临界区:每个线程内部,访问临界资源的代码。eg:
原子性:操作只有两态,要么完成,要么未完成。
共享变量:在线程间共享的变量
互斥: 任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,对临界资源有保护作用。简单点说,对共享数据的任何访问,保证任何时候只有一个执行流
解决问题的三个方面
- 代码必须要有互斥(下面讲)行为:当代码进入临界区执行时,不允许其它线程进入该临界区
- 多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只允许一个线程进入该临界区
- 如果线程不在临界区执行,该线程不能阻止其它线程进入临界区
要做到这三点,本质上就是需要一把锁——Linux中叫互斥量
④锁——互斥量
锁对应着上文的解决问题的三点,也就是锁的作用
加锁的本质: 用时间换安全
加锁的表现: 线程在临界区的代码串行执行
所以加锁原则: 尽量保证临界区的代码,越少越好
接口使用
1. 锁的初始化和销毁
库提供的数据类型:pthread_mutex_t
定义锁
初始化互斥量有两种方式:动态分配和静态分配
动态分配:
头文件:
#include <pthread.h>
函数声明:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
参数:
1. restrict mutex:指向需要初始化的互斥锁变量的指针
2. restrict attr:设置互斥锁属性,一般设置nullptr
返回值:
成功返回0,错误返回错误码
///
静态分配:
头文件:
#include <pthread.h>
使用:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
- 动态分配:运行时通过内存分配函数从堆内存中分配的。需要锁的时候动态创建锁,不需要时释放锁——释放内存
- 静态分配:编译时就为锁变量分配固定的内存空间,通常放在栈内存或全局数据区,不需要手动释放内存,锁的生命周期和程序一样。
销毁互斥量:
头文件:
#include <pthread.h>
函数声明:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数:
mutex:指向需要销毁的互斥锁变量的指针
返回值:
成功返回0,错误返回错误码
注:
- 不要销毁一个已经加锁的互斥量。
- 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要销毁
- 已经销毁的互斥量,确保后面没有再次对该互斥量加锁
2. 加锁和解锁
头文件:
#include <pthread.h>
函数声明:
int pthread_mutex_lock(pthread_mutex_t *mutex); //加锁
int pthread_mutex_unlock(pthread_mutex_t *mutex); //解锁
参数:
mutex:指向要加锁的互斥锁
返回值:
成功返回0,失败返回错误码
加锁和解锁的位置:
3. 使用锁修改抢票代码
- 测试代码1:
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <unistd.h>
using namespace std;
#define NUM 3
//全局的锁——静态分配
// pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lock;
class threadData
{
public:
threadData(int number)
{
threadname = "thread-" + to_string(number);
}
public:
string threadname;
};
int tickets = 100; // 票数
void *getTickets(void *args)
{
threadData *td = static_cast<threadData *>(args);
const char *name = td->threadname.c_str();
while(true)
{
//申请锁成功,才能往后继续执行,不成功挂起等待
pthread_mutex_lock(&lock);
if(tickets > 0)
{
usleep(1000);
printf("who=%s, get a ticket:%d\n", name, tickets);
tickets--;
pthread_mutex_unlock(&lock);
}
else
{
pthread_mutex_unlock(&lock);
break;
}
usleep(1000);
}
printf("%s ... quit\n", name);
return nullptr;
}
int main()
{
//动态分配
pthread_mutex_init(&lock, nullptr);
vector<pthread_t> tids;
vector<threadData *> thread_datas;
// 创建多线程
for (int i = 0; i < NUM; i++)
{
pthread_t tid;
threadData *td = new threadData(i);
thread_datas.push_back(td);
pthread_create(&tid, nullptr, getTickets, thread_datas[i]);
tids.push_back(tid);
}
for(const auto &wait : tids)
{
pthread_join(wait, nullptr);
}
for(const auto &td : thread_datas)
{
delete td;
}
pthread_mutex_destroy(&lock);
return 0;
}
运行结果:结果正常
注:如果刚解锁就立刻再去申请,可能导致整个票都是一个执行流抢的,使用我在释放完票之后就usleep(也可以当作抢完票之后,还需要和信息绑定什么的工作)。
原因:唤醒线程的成本比这边刚解锁那边就申请锁的成本更高,所以导致别的线程没有机会
- 测试代码2:把锁封装在线程类描述的结构体中
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <unistd.h>
using namespace std;
#define NUM 3
//全局的锁——静态分配
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
class threadData
{
public:
threadData(int number, pthread_mutex_t *mutex)
{
threadname = "thread-" + to_string(number);
lock = mutex;
}
public:
string threadname;
pthread_mutex_t *lock;
};
int tickets = 100; // 票数
void *getTickets(void *args)
{
threadData *td = static_cast<threadData *>(args);
const char *name = td->threadname.c_str();
while(true)
{
//申请锁成功,才能往后继续执行,不成功挂起等待
pthread_mutex_lock(td->lock);
if(tickets > 0)
{
usleep(1000);
printf("who=%s, get a ticket:%d\n", name, tickets);
tickets--;
pthread_mutex_unlock(td->lock);
}
else
{
pthread_mutex_unlock(td->lock);
break;
}
usleep(1000);
}
printf("%s ... quit\n", name);
return nullptr;
}
int main()
{
vector<pthread_t> tids;
vector<threadData *> thread_datas;
// 创建多线程
for (int i = 0; i < NUM; i++)
{
pthread_t tid;
threadData *td = new threadData(i, &lock);
thread_datas.push_back(td);
pthread_create(&tid, nullptr, getTickets, thread_datas[i]);
tids.push_back(tid);
}
for(const auto &wait : tids)
{
pthread_join(wait, nullptr);
}
for(const auto &td : thread_datas)
{
delete td;
}
return 0;
}
运行结果:正确
- 测试代码3:RAII风格
封装的一个类文件:LockGuard.hpp
#pragma once
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *lock)
: lock_(lock)
{}
void Lock()
{
pthread_mutex_lock(lock_);
}
void Unlock()
{
pthread_mutex_unlock(lock_);
}
~Mutex()
{}
private:
pthread_mutex_t *lock_;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock)
:mutex_(lock)
{}
~LockGuard()
{
mutex_.Unlock();
}
private:
Mutex mutex_;
};
tickets.cc
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include "LockGuard.hpp"
using namespace std;
#define NUM 3
//全局的锁——静态分配
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
class threadData
{
public:
threadData(int number)
{
threadname = "thread-" + to_string(number);
}
public:
string threadname;
};
int tickets = 100; // 票数
void *getTickets(void *args)
{
threadData *td = static_cast<threadData *>(args);
const char *name = td->threadname.c_str();
while(true)
{
//花括号框起来,花括号结束,类对象释放
{
LockGuard lockGuard(&lock);
if(tickets > 0)
{
usleep(1000);
printf("who=%s, get a ticket:%d\n", name, tickets);
tickets--;
}
else
{
break;
}
}
usleep(1000);
}
printf("%s ... quit\n", name);
return nullptr;
}
int main()
{
vector<pthread_t> tids;
vector<threadData *> thread_datas;
// 创建多线程
for (int i = 0; i < NUM; i++)
{
pthread_t tid;
threadData *td = new threadData(i);
thread_datas.push_back(td);
pthread_create(&tid, nullptr, getTickets, thread_datas[i]);
tids.push_back(tid);
}
for(const auto &wait : tids)
{
pthread_join(wait, nullptr);
}
for(const auto &td : thread_datas)
{
delete td;
}
return 0;
}
运行结果:
锁的原理
- 因为锁本身就是共享资源,所有就要保证申请和释放锁是原子性操作。
- 但是连
tickets--
这种自减操作都不是原子的,锁应该如何保证申请和释放都是原子的呢?但是自减操作被编译成的每一条汇编都是原子性的。- 为了实现互斥锁,大多数体系结构(X86…)都提供了swap或exchange指令,该指令的作用把寄存器和内存单元的数据做交换,由于只有一条指令所以是原子的。
- 注:在多处理器上,访问内存的总线周期也有前有后,所以一个处理器的交换(swap)指令执行时,另一个处理器的交换指令只能等待总线周期
内存总线: 处理器和内存之间传输数据和指令的通道
lock伪代码介绍:
模拟俩线程申请锁,图解:
unlock伪代码:解锁只需要把锁还回去,不需要做别的事,下次申请锁的时候,al寄存器会被赋值为0
小结
- 锁本身就是共享资源,所以申请锁和释放锁都被设计成原子性操作
- 一个持有锁访问临界区的线程,对于其它线程来说该线程访问临界区的过程是原子性的。
注:在临界区中,线程也是会被切换的,但是是持有锁被切走的,就算该线程不在,别的线程也没有能进入临界区访问临界资源的 - 一个线程持有锁之后,其它线程再访问锁就进入挂起阻塞状态。线程对于锁的竞争力不同,离锁更近的竞争力更强
- 纯互斥环境,如果锁的分配不够合理,就容易导致其它线程的饥饿问题。因为离锁近的竞争力更强。
- 让所有线程获取锁按照一定的顺序,按照一定的顺序获取资源就是同步
⑤死锁
概念:指在多线程和多进程环境中,各个进程(或线程)均占有不会释放的资源,但又互相申请被其它进程(或线程)所占用也不会释放的资源,而处于一种永久等待的状态。
一个锁也可能产生死锁:eg:线程(或进程)申请完锁再次申请该锁,本来锁就已经在这个执行流上了,在申请肯定也就申请不到了,就会导致死锁
死锁的四个必要条件:一前提+两原则+一重要
- 互斥条件(前提):一个资源每次只能被一个执行流使用
- 请求与保持条件(原则):一个执行流因为请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件(原则):一个执行流已获得的资源,在未使用完之前,不能强行剥夺
- 循环等待条件(重要):若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁:破坏四个必要条件中的任意一个
- 破坏请求与保持条件:例如可以使用
pthread_mutex_trylock
和pthread_mutex_lock
功能一样,也是申请锁资源,申请成功返回0,但是失败直接返回错误码,而不是进行挂起等待,这时就可以释放自身的锁,满足别的执行流 - 破坏不剥夺条件:当线程(或进程)无法获得自己需要的资源,那就直接释放这个获得不了的资源,来满足当前执行流
- 破坏循环等待条件:按照一定的顺序进行加锁
- 资源一次性分配,就不需要等待其它资源,可以直接执行了
- 前提条件,因为有互斥环境才有的互斥条件,所以可以尝试上述几种方式。
2. 同步——条件变量
在对锁进行小结时也说到,如果一个线程对锁的竞争力很强,其它线程竞争不到锁,那么其它线程就处于饥饿状态。所以就需要线程获取锁按照一定的顺序——同步
①相关概念
1. 条件变量: 当一个线程互斥的访问某一个变量的时候,在其它线程改变状态之前,该线程什么都做不了,就如同上述饥饿状态的线程,这种情况就需要条件变量
2. 同步: 在保证数据安全的前提下,让线程按照某种特定的顺序访问临界资源,从而有效的避免饥饿问题
3. 静态条件: 因为时序问题,出现的结果异常
Linux中条件变量是一种常用的同步机制,条件变量需要依赖于锁的使用,保护共享资源的访问顺序
②代码展现
通过代码介绍该机制
介绍接口:
以下介绍的条件变量的接口,头文件和返回值一样,定义条件变量
头文件:
#include <pthread.h>
返回值:
成功返回0,失败返回错误码
定义条件变量:
pthread_cond_t:库提供的数据类型
1. 条件变量的初始化和销毁:和锁的接口很相像——简要介绍
条件变量初始化:动态分配
函数声明:
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
参数:
1. restrict cond:指向要初始化的条件变量
2. restrict attr:设置条件变量的属性,一般置nullptr
条件变量初始化:静态分配
使用:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
条件变量的销毁:
函数声明:
int pthread_cond_destroy(pthread_cond_t *cond);
参数:
cond:指向要销毁的条件变量
2. 等待条件满足
导致线程等待条件变量,线程等待时自动释放锁,并在接收到唤醒信号时,重新获取锁并继续执行
函数声明:
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
参数:
restrict cond:指向一个条件变量,线程在这个条件变量上等待
restrict mutex:指向锁,用来释放和获取锁。注:线程不能带着锁进行等待,别的线程就没办法进来等待了
3. 唤醒等待线程
发送唤醒信号,通知等待的线程
函数声明:
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
参数:
cond:指向一个条件变量
作用:
1. pthread_cond_broadcast:发送信号通知一个正在等待条件变量的线程,只唤醒一个
2. pthread_cond_signal:发送信号通知所有正在等待条件变量的线程,全部唤醒
代码展现:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
int g_val = 0; //共享资源
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *start_routine(void *args)
{
pthread_detach(pthread_self()); //直接分离线程,不进行等待
//注:传参的时候,我传的不是i的地址,而是强转直接拷贝的形式,因为如果主线程一瞬间跑完创建五个线程
//当新创建的线程刚想要拿这个参数时,会发现这个i是5。因为传的是地址,所以导致数据不一致问题
uint64_t number = (uint64_t)args;
std::cout << "pthread-" << number << " create success!!!" << std::endl;
while(true)
{
pthread_mutex_lock(&mutex);
//这里仅仅只是为了说明为什么 pthread_cond_wait 要在pthread_mutex_lock和pthread_mutex_unlock中间
// //假如我们规定共享资源g_val大于10,小于20这个区间时,只能由主线程访问,别的线程只能进行等待
// if(g_val > 10 && g_val < 20)
// {
// //让线程进行等待,肯定是因为某种条件不满足。这里临界资源g_val不满足条件,所以要进行等待。
// // but判断也是访问临界资源,所以判断必须在加锁之后
// pthread_cond_wait(&cond, &mutex); //线程等待的时候会自动释放锁,被唤醒时会再持有锁
// }
pthread_cond_wait(&cond, &mutex);
std::cout << "pthread-" << number << ", g_val:" << g_val++ << std::endl;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
//创建五个线程
for(uint64_t i = 0; i < 5; i++) //unsigned long int ——> uint64_t
{
pthread_t tid;
pthread_create(&tid, nullptr, start_routine, (void*)i);
usleep(1000);
}
sleep(2);
std::cout << "main thread begin ctrl:" << std::endl;
while (true)
{
sleep(1);
pthread_cond_signal(&cond); //唤醒在cond中等待的线程,默认时对第一个
std::cout << "signal one thread..." << std::endl;
}
return 0;
}
运行结果:
因为我在创建线程的时候使用了usleep
所以是按照顺序创建的,也可以依次性创建,但是本质线程都可以按照某种特定的顺序访问临界资源。
有很多文字叙述就直接在代码中,借助代码描述了
3. 生产消费者模型
①理论
生产消费者模型的优点:
1. 解耦
2. 支持并发
3. 支持忙闲不均
②基于阻塞队列的实现
阻塞队列:在介绍生产消费者模型理论中有个仓库(特定结构的内存空间),其中阻塞队列是多线程编程常用于实现生产消费者模型的数据结构(特定结构的内存空间)。
理论:当阻塞队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素。当队列为满时,往队列生产元素的操作也会被阻塞,直到有元素被从队列中取出。
实现: 单线程环境 包含了四个文件BlcokQueue.hpp Task.hpp makefile main.cc
注:C++程序的源文件可以以.cc/.cxx和.cpp为后缀,.hpp后缀代表这个文件声明和定义放在一起
BlcokQueue.hpp:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template<class T>
class BlockQueue
{
public:
BlockQueue(int maxcap = defaultnum)
:_maxcap(maxcap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_c_cond, nullptr);
pthread_cond_init(&_p_cond, nullptr);
}
T pop()
{
pthread_mutex_lock(&_mutex);
//判断:判断临界资源条件是否满足,也是访问临界资源。所以要在加锁之后
//while:防止线程被伪唤醒的情况
while(_q.size() == 0)
{
//伪唤醒:比如说那边生产者把队列生产满了,就要唤醒消费者线程进行消费,
//但是一不小心多唤醒了消费者线程,但是实际只有一个线程能再持有锁,然后执行
//其它被唤醒的线程就被阻塞在这里,当持有锁的线程释放了锁,而在这里等待锁的线程
//抢到了锁资源,那就不会再去执行上面临界资源是否满足的判断条件,可能就会出现问题
//所以就需要循环判断
pthread_cond_wait(&_c_cond, &_mutex);
}
//走到这里,要么线程被唤醒了,要么队列不为空,可以直接访问
T out = _q.front();
_q.pop();
//消费数据了,所以可以有空位置可以唤醒生产者生产数据了
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
return out;
}
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
//while:防止线程被伪唤醒的情况,上面注释对伪唤醒做了介绍
while(_q.size() == _maxcap)
{
pthread_cond_wait(&_p_cond, &_mutex);
}
//走到这里,要么线程被唤醒了,要么队列不满,可以直接访问
_q.push(in);
//生产数据了,所以可以有数据被消费,可以唤醒消费者消费数据了
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_c_cond);
pthread_cond_destroy(&_p_cond);
}
private:
static const int defaultnum;
private:
std::queue<T> _q;
int _maxcap; //阻塞队列最大容量
pthread_mutex_t _mutex;
pthread_cond_t _c_cond; //消费者条件变量
pthread_cond_t _p_cond; //生产者条件变量
};
template<class T>
const int BlockQueue<T>::defaultnum = 20;
Task.hpp:
#pragma once
#include <iostream>
#include <string>
std::string opers = "+-*/%";
enum //errno
{
DIV_ZERO_ERR=1,
MOD_ZERO_ERR,
UNKNOW_ERR
};
//任务结构体
class Task
{
public:
Task(int x, int y, char oper)
:_data1(x),_data2(y),_oper(oper),_result(0), _exitcode(0)
{}
void run()
{
switch(_oper)
{
case '+':
_result = _data1 + _data2;
break;
case '-':
_result = _data1 - _data2;
break;
case '*':
_result = _data1 * _data2;
break;
case '/':
{
if(_data2 == 0)
_exitcode = DIV_ZERO_ERR;
else
_result = _data1 / _data2;
}
break;
case '%':
{
if(_data2 == 0)
_exitcode = MOD_ZERO_ERR;
else
_result = _data1 % _data2;
}
break;
default:
_exitcode = UNKNOW_ERR;
break;
}
}
//仿函数 —— 用着玩,以防忘记
void operator()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(_data1);
r += _oper;
r += std::to_string(_data2);
r += '=';
r += std::to_string(_result);
r += ", [code:";
r += std::to_string(_exitcode);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(_data1);
r += _oper;
r += std::to_string(_data2);
r += "=?";
return r;
}
~Task()
{}
private:
int _data1;
int _data2;
char _oper;
int _result;
int _exitcode;
};
makefile:
blockQueue:main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm blockQueue
main.cc:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <unistd.h>
void *consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while(true)
{
//获取任务 + 处理任务
Task t = bq->pop();
t();
std::cout << "处理任务:" << t.GetResult() << ", thread_id is" << pthread_self() << std::endl;
}
}
void *Producer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
int len = opers.size();
while(true)
{
//生产数据
int data1 = rand() % 10 + 1; //范围[1, 10]
usleep(1000);
int data2 = rand() % 10; //范围[0, 9]
char op = opers[rand() % len];
Task t(data1, data2, op);
// 传递
bq->push(t);
std::cout << "生产了一个任务:" << t.GetTask() << ", thread_id is" << pthread_self() << std::endl;
sleep(1);
}
}
int main()
{
srand(time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c, p;
pthread_create(&p, nullptr, Producer, bq);
pthread_create(&c, nullptr, consumer, bq);
pthread_join(p, nullptr);
pthread_join(c, nullptr);
delete bq;
return 0;
}
运行结果:
实现: 多线程环境 上面四个文件只需要更改main.cc
。
int main()
{
srand(time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[3], p[5];
for (int i = 0; i < 3; i++)
{
pthread_create(c + i, nullptr, consumer, bq);
}
for (int i = 0; i < 5; i++)
{
pthread_create(p + i, nullptr, Producer, bq);
}
for (int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 5; i++)
{
pthread_join(p[i], nullptr);
}
delete bq;
return 0;
}
运行结果:
注:
- 伪唤醒:多线程环境比如说那边生产者把队列生产满了,就要唤醒消费者线程进行消费,但是一不小心唤醒了多个消费者线程,但是实际只有一个线程能再持有锁然后执行,其它被唤醒的线程就被阻塞在这里,当持有锁的线程释放了锁,而在这里等待锁的线程如果抢到了锁资源,那就不会再去执行上面临界资源是否满足的判断条件,可能就会出现问题。所以就需要循环判断。我在BlcokQueue.hpp文件中会引发伪唤醒的地方进行的标注。
- 发现:多线程和单线程的生产消费模型,就修改了创建线程和等待线程的部分,原因是因为就是用了一把互斥锁,三种关系中都有互斥关系,所以每次只允许一个线程访问共享内存。
③基于环形队列的实现
POSIX信号量
在前面我们讲到了System V信号量,其和POSIX信号量的功能作用和原理是相同的,都是用于同步操作,达到无冲突访问共享资源的目的。这里主要介绍POSIX信号量的接口
信号量本质是一把计数器,计数器本质就是描述资源的数量。申请信号量时,就间接对共享资源做判断了。P()操作成功,所访问的资源一定是就绪的,P()操作失败就在信号量等待
信号量和锁:
- 锁:加锁——持有锁就代表这个资源可以访问,但是只允许一个执行流访问,因为这个资源只能看作一份。上文的例子中,一般在持有锁时还要对条件变量判断一次,然后在执行下面的内容,那是因为使用这一份资源是有条件的,如果仅仅是对这份资源进行只读操作,自然不需要判断
- 信号量:二元信号量就是锁
接口介绍:
POSIX信号量提供的一套接口头文件和返回值相同。使用时Link with -pthread
头文件:
#include <semaphore.h>
返回值:
成功返回0,失败返回-1。设置错误码
1. 初始化和销毁信号量
函数声明:
int sem_init(sem_t *sem, int pshared, unsigned int value); //初始化信号量
int sem_destroy(sem_t *sem); //销毁信号量
参数:
1. sem:指向信号量的指针
2. pshared:0表示线程间共享,非0表示进程间共享
3. value:用于初始化信号量的值(用户设置)
///
2. 等待和发布信号量
函数声明:
int sem_wait(sem_t *sem); //等待信号量 ——> P()操作 将信号量的值减一
int sem_post(sem_t *sem); //发布信号量 ——> V()操作 将信号量的值加一
参数:
sem:指向信号量的指针
出错时:信号量的值不改变
代码实现
环形队列: 对循环队列有疑问的建议看一下我的这篇用C实现的简单环形队列(这篇内容使用链表进行画图分析,底层实现使用的是数组实现)
环形队列和消费者生产者结合分析:
代码实现: 包含了四个文件Main.cxx Makefile RingQueue.hpp Task.hpp
其中Task.hpp和上文中的阻塞队列代码实现是一样的,就不贴上来了
RingQueue.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#include <ctime>
const static int defaultcap = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = defaultcap)
: _ringqueue(cap), _cap(cap), _c_step(0), _p_step(0)
{
sem_init(&_cdata_sem, 0, 0);
sem_init(&_pspace_sem, 0, cap);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
// 输入
void Push(const T &in)
{
//方案1
// Lock(_p_mutex);
// P(_pspace_sem);
//先P()后Lock的原因:更优
//1. 多线程环境中,P操作是原子的,申请成功代表有资源给该线程
// 像方案1中只有一个能申请锁,然后持有锁再申请信号量。其余线程阻塞在锁着里等待,
// 而方案2可以在持有锁的线程中执行的时候,其它线程也可以正常申请信号量,等待所资源即可
//2. 临界区的代码越少越好。P不需要保护
//所以:方案2可以让申请信号量和申请锁的时间变成并行
//方案2
P(_pspace_sem);
Lock(_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _cap;
Unlock(_p_mutex);
V(_cdata_sem);
}
// 输出
void Pop(T *out)
{
P(_cdata_sem);
Lock(_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _cap;
Unlock(_c_mutex);
V(_pspace_sem);
}
~RingQueue()
{
sem_destroy(&_cdata_sem);
sem_destroy(&_pspace_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
private:
std::vector<T> _ringqueue;
int _cap;
int _c_step; // 消费者下标
int _p_step; // 生产者下标
sem_t _cdata_sem; // 消费者关注的数据资源
sem_t _pspace_sem; // 生产者关注的空间资源
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
Makefile:
ringqueue:Main.cxx
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm ringqueue
Main.cxx:
#include "Task.hpp"
#include "RingQueue.hpp"
struct ThreadData
{
RingQueue<Task> *rq;
std::string threadname;
};
void *Producer(void *args)
{
ThreadData *td = static_cast<ThreadData *>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
int len = opers.size();
while (true)
{
// 1. 获取资源
int data1 = rand() % 10 + 1;
int data2 = rand() % 10;
char oper = opers[rand() % len];
Task t(data1, data2, oper);
// 2. 生产数据
rq->Push(t);
std::cout << "Producer data done, data is:" << t.GetTask() << ", name:" << name << std::endl;
}
delete td;
return nullptr;
}
void *Consumer(void *args)
{
ThreadData *td = static_cast<ThreadData *>(args);
RingQueue<Task> *rq = td->rq;
std::string name = td->threadname;
while (true)
{
usleep(1000);
// 1. 消费数据
Task t; // 这里Task类要加一个默认构造,要不然不能构造Task类型对像,Task.hpp文件也只有这一个变化
rq->Pop(&t);
// 2. 处理数据
t();
std::cout << "Consumer get data done, data is:" << t.GetResult() << ", name:" << name << std::endl;
}
delete td;
return nullptr;
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>();
pthread_t c[3], p[5];
for (int i = 0; i < 3; i++)
{
ThreadData *td = new ThreadData;
td->rq = rq;
td->threadname = "Consumer-" + std::to_string(i);
pthread_create(c + i, nullptr, Consumer, td);
}
for (int i = 0; i < 2; i++)
{
ThreadData *td = new ThreadData;
td->rq = rq;
td->threadname = "Producer-" + std::to_string(i);
pthread_create(p + i, nullptr, Producer, td);
}
for (int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 2; i++)
{
pthread_join(p[i], nullptr);
}
delete rq;
return 0;
}
运行结果:
注:加锁和申请信号量谁前谁后,哪一个更好——方案2更优(在对应的代码区域也进行了标识)
方案1:
Lock(_p_mutex);
P(_pspace_sem);
//先P()后Lock:更优
//1. 多线程环境中,P操作是原子的,申请成功代表有资源给该线程
// 像方案1中只有一个能申请锁,然后持有锁再申请信号量。其余线程阻塞在锁着里等待,
// 而方案2可以在持有锁的线程中执行的时候,其它线程也可以正常申请信号量,等待所资源即可
//2. 临界区的代码越少越好。P不需要保护
//所以:方案2可以让申请信号量和申请锁的时间变成并行
方案2:
P(_pspace_sem);
Lock(_p_mutex);