目录
前言
基于环形队列的生产消费模型
原理
普通实现
初步大纲:
核心细节:
进阶实现
思考总结
线程池
预先梳理
初步框架
核心内容
日志系统
线程安全与可重入
死锁
线程安全的单例模式
其他常见的各种锁
读者写者问题
全部代码
前言
线程的最后一部分内容,为保持思路连贯推荐大家参考往期文章:秒懂Linux之线程(中)-****博客
秒懂Linux之线程-****博客
基于环形队列的生产消费模型
原理
我们继续延用生产消费模型搭配环形队列来说明信号量原理~
之前我们把队列当作整体资源用锁来保护,现在我们把整体资源拆分成多个小份放在环形队列中,用信号量进行保护~
单生产单消费演示机制:
当然可以,在该条件下生产不用顾虑队列已满,消费也不用担心队列已空。是可以做到生产与消费的并发的~是不同于阻塞队列的,看似并发,实际上仍是依靠提醒(signal)机制。
模型伪代码:
可以看出生产者与消费者二者之间的资源都是向对方申请的~
普通实现
初步大纲:
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int cap) : _cap(cap), _ring_queue(cap), _productor_step(0), _comsumer_step(0)
{
sem_init(&_room_sem, 0, _cap);
sem_init(&_data_sem, 0, 0);
}
~RingQueue()
{
sem_destroy(&_room_sem);
sem_destroy(&_data_sem);
}
// 入队列
void Enqueue(const T &in)
{
// 生产行为
P(_room_sem);
// 申请成功一定有空间资源
_ring_queue[_productor_step++] = in;
_productor_step %= _cap;
V(_data_sem);
}
// 出队列
void Pop(T *out)
{
// 消费行为
P(_data_sem);
*out = _ring_queue[_comsumer_step++];
_comsumer_step %= _cap;
V(_room_sem);
}
private:
// 1.环形队列
int _cap; // 容量上限
std::vector<T> _ring_queue;
// 2.生产和消费的下标
int _productor_step;
int _comsumer_step;
// 3.定义信号量
sem_t _room_sem; // 生产者
sem_t _data_sem; // 消费者
};
using namespace MyThread;
int apple = 10;
//生产者生产
void PdRun(RingQueue<int> &rq)
{
int cnt = 10;
while(true)
{
rq.Enqueue(cnt);
std::cout << "Productor : " << cnt << std::endl;
cnt--;
sleep(1);
}
}
//消费者消费
void CsRun(RingQueue<int> &rq)
{
while(true)
{
int data =0;
rq.Pop(&data);
std::cout << "Consumer : " << data << std::endl;
sleep(1);
}
}
//代码复用
void StartComm(std::vector<Thread<RingQueue<int>>> *threads, int num, RingQueue<int> &bq, func_t<RingQueue<int>> func)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
threads->emplace_back(func, bq, name);
threads->back().start();
}
}
void StartProductor(std::vector<Thread<RingQueue<int>>> *threads, int num, RingQueue<int> &bq)
{
StartComm(threads, num, bq, PdRun);
}
void StartConsumer(std::vector<Thread<RingQueue<int>>> *threads, int num, RingQueue<int> &bq)
{
StartComm(threads, num, bq, CsRun);
}
void WaitThread(std::vector<Thread<RingQueue<int>>> &threads)
{
for (auto &e : threads)
{
e.join();
}
}
int main()
{
//用RingQueue来包装资源——资源最大容量10
RingQueue<int> *rq = new RingQueue<int>(10);
//采用模拟包装的线程:Thread
std::vector<Thread<RingQueue<int>>> threads;
//std::vector<Thread<int>> threads;
//生产者函数
StartProductor(&threads,1,*rq);
//消费者函数
StartConsumer(&threads,1,*rq);
//回收函数
WaitThread(threads);
return 0;
}
核心细节:
进入多生产者,多消费者模式~
只要是涉及到了多线程访问同一执行流都是要加锁的,这里我们分别在生产者与消费者这各加了一把锁,让竞争锁成功的消费与生产并发运行~
其中有个优化:先申请信号量再竞争锁~因为申请信号量是看资源的,人人都可以申请后面拿着信号量去抢锁就好了,如果是先竞争锁那在竞争失败时就会一直等待(太闲了)。这就好比排队去电影院买票(先竞争锁)和先买票再去排队(先申请信号量)~
这里在多线程时可能会出现bug:(个人见解)start启动线程走线程运行的函数还没来得及返回(A执行流)时当前执行流就再次走循环了,这时候的back()已经变更,造成数据错误~很诡异但不得不注意~只能把start启动单独拎出来~
进阶实现
我们使用生产消费模型肯定不是简单用<int>,而是要让生产者生产任务(task),消费者去执行任务(task)。
//生产者生产
void PdRun(RingQueue<Task> &rq,std::string name)
{
sleep(1);
int cnt = 10;
while(true)
{
// 生产任务
rq.Enqueue(Download);
std::cout << "Productor handler task: " << "[" << name << "]" << std::endl;
sleep(1);
}
}
//消费者消费
void CsRun(RingQueue<Task> &rq,std::string name)
{
sleep(1);
while(true)
{
//1.消费任务
Task t;
rq.Pop(&t);
std::cout << "Consumer handler task: " << "[" << name << "]" << std::endl;
//2.处理任务
t();
sleep(1);
}
}
#pragma once
#include <iostream>
#include <functional>
using Task = std::function<void()>;
void Download()
{
std::cout << "this is a download task" << std::endl;
}
思考总结
之前也思考了一下为何在有锁的情况下还要考虑信号量这种选择呢?也许仅仅是相较锁而言它省略了判断这一步?锁在队列为空为满时要去等待,而信号量只要申请成功意味着资源一定能拿到,不像锁那样申请成功进入临界区还得判断有无资源~不仅如此,当信号量计数>0时还会自动唤醒所等待的线程,从我们所学的视角来看信号量似乎替代了使用锁时候的排队(wait)与铃铛(signal),不过信号量有一点是无法避免的!在多线程进入同一执行流的情况下信号量仍是让它们并发运行的,而不像锁那样变成串行,这也是锁的必要性存在(多线程,例如多生产多消费模型)。因此在我看来,信号量与锁的结合可以有效优化代码结构~(省略wait与signal挺香的)~
线程池
线程池主要有两点:线程预先创建&&线程总数固定
预先梳理
对于线程池而言,它就是把之前写在main函数里面的那些方法都封装在我们自定义的线程池类中~
欧克,我们现在先不要把它们当作是线程池中的成员函数然后深入去理解它们~
初步框架
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
// #include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"
using namespace MyThread;
const static int gthreadnum = 3;
template<class T>
class ThreadPool
{
public:
ThreadPool(int threadnum = gthreadnum) : _threadnum(threadnum),_waitnum(0), _isrunning(false)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
}
void PoolRun()
{
while(true)
{
std::cout<<"I am threadpool"<<std::endl;
sleep(1);
}
}
void InitThreadPool()
{
//指向构建出所有的线程,并不启动
for(int num = 0;num<_threadnum;num++)
{
std::string name = "thread-"+std::to_string(num+1);
_threads.emplace_back(PoolRun,name);
}
}
void Allstart()
{
for(auto &e:_threads)
{
//线程启动
e.start();
}
}
void Wait()
{
for(auto&e:_threads)
{
//线程回收
e.join();
}
}
void Stop()
{
LockQueue();
_isrunning = false;
//强制停止线程池,但要唤醒所有线程去处理完剩下的任务
ThreadWakeupAll();
UnlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;
//收集自定义类(线程)
std::vector<Thread> _threads;
std::queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
int _waitnum;
bool _isrunning;//判断线程池是否退出
};
数据不再放到MyThread中,而是放在线程池类中的队列里~
解决隐藏this问题:
//本来是作为普通函数的,现在被封装进线程池成了成员函数,那就得处理this问题
//这里我们不用静态而是用另外一种方法:bind
void PoolRun()
{
while(true)
{
std::cout<<"I am threadpool"<<std::endl;
sleep(1);
}
}
void InitThreadPool()
{
//指向构建出所有的线程,并不启动
for(int num = 0;num<_threadnum;num++)
{
std::string name = "thread-"+std::to_string(num+1);
//死绑参数,默认Pool的第一个参数就是this,不用我们再传递
_threads.emplace_back(std::bind(&ThreadPool::PoolRun,this,std::placeholders::_1),name);
}
_isrunning = true;
}
以后我们在emplace时不用再传递数据参数了,因为把数据放到当前线程池类里~
void PoolRun(std::string name)
{
while(true)
{
std::cout<<"I am threadpool :"<<name<<std::endl;
sleep(1);
}
}
核心内容
在线程池类中处理任务:
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task() {}
Task(int a, int b) : _a(a), _b(b), _result(0)
{
}
void Excute()
{
_result = _a + _b;
}
std::string ResultToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
}
std::string DebugToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
}
void operator()()
{
Excute();
}
private:
int _a;
int _b;
int _result;
};
void PoolRun(std::string name)
{
while (true)
{
// 1.保证线程池中的任务队列安全
LockQueue();
// 2.队列中不一定有数据
while (_task_queue.empty()&&_isrunning)
{
_waitnum++;
ThreadSleep(); // 去排队
_waitnum--;
}
//2.1 如果线程池不运行&&任务队列为空
if(_task_queue.empty()&&!_isrunning)
{
UnlockQueue();//解锁
std::cout<<name<<"quit..."<<std::endl;
break;//直接退出,不拿任务了
}
//2.2 在任务队列不为空的情况下,无论线程池是否还在运行,都要把任务给做完
// 3.获取任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
// 4.处理任务,为该线程私有
t();
}
}
void InitThreadPool()
{
// 指向构建出所有的线程,并不启动
for (int num = 0; num < _threadnum; num++)
{
std::string name = "thread-" + std::to_string(num + 1);
// 死绑参数,默认Pool的第一个参数就是this,不用我们再传递
_threads.emplace_back(std::bind(&ThreadPool::PoolRun, this, std::placeholders::_1), name);
}
}
在类外传输任务进队列:
//任务入队列
bool Enqueue(const T &t)
{
bool ret = false;
LockQueue();
if (_isrunning)//判断线程池是否运行
{
// 从外界生产任务进队列
_task_queue.push(t);
if (_waitnum > 0)
{
ThreadWakeup();
}
ret = true;
}
UnlockQueue();
return ret;
}
int main()
{
//线程池:数据在队列中
std::unique_ptr<ThreadPool<Task>> tp = std::make_unique<ThreadPool<Task>>();
//封装线程池(本质上还是用的Mythread),在线程池中进行线程的相关操作
//线程构建初始化
tp->InitThreadPool();
//线程启动
tp->Allstart();
int tasknum = 10;
while(tasknum)
{
int a = rand() % 10 + 1;
usleep(1234);
int b = rand() % 5 + 1;
Task t(a, b);
tp->Enqueue(t);
sleep(1);
tasknum--;
}
tp->Stop();
//线程回收
tp->Wait();
return 0;
}
最终效果:
日志系统
下面我们再来往线程池中添加日志的效果~
bool gIsSave = false;
// 将内容写入的文件名
const std::string logname = "log.txt";
// 1.日志是有类别等级的
enum Level
{
DEBUG = 0,
INFO,
WARNING,
ERROR,
FATAL
};
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
std::string GetTimeString()
{
// 获取时间必要的变量
time_t curr_time = time(nullptr);
struct tm *format_time = localtime(&curr_time);
if (format_time == nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);
return time_buffer;
}
// 文件部分:把日志内容往文件里打印
void SaveFile(const std::string &filename, const std::string &message)
{
std::ofstream out(filename, std::ios::app);
if (!out.is_open())
{
return;
}
// 往文件打印日志内容
out << message;
out.close();
}
// 2.日志是有格式的
// 日志类别等级 时间 代码所在的文件名/行数 日志的内容
// 文件名 行数 打印方向 日志等级 日志内容
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{
// 把获取到的枚举常量转化成对应的等级字符串
std::string levelstr = LevelToString(level);
// 时间函数
std::string timestr = GetTimeString();
pid_t selfid = getpid();
char buffer[1024];
va_list arg;
va_start(arg, format);
// 适配可变参数,帮助我们进行多种类型的转化
vsnprintf(buffer, sizeof(buffer), format, arg);
va_end(arg);
std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(selfid) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
// true为向显示屏打印
if (!issave)
{
std::cout << message;
}
// false为向文件打印
else
{
SaveFile(logname, message);
}
}
//简化一下操作
// C99新特性__VA_ARGS__
#define LOG(level, format, ...) \
do \
{ \
LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \
} while (0)
#define EnableFile() \
do \
{ \
gIsSave = true; \
} while (0)
#define EnableScreen() \
do \
{ \
gIsSave = false; \
} while (0)
结合线程池来观察日志效果~
int main()
{
srand(time(nullptr) ^ getpid() ^ pthread_self());
EnableScreen(); // 开启日志显示器打印功能
// EnableFile();
//线程池:数据在队列中
std::unique_ptr<ThreadPool<Task>> tp = std::make_unique<ThreadPool<Task>>(5);
//封装线程池(本质上还是用的Mythread),在线程池中进行线程的相关操作
//线程构建初始化
tp->InitThreadPool();
//线程启动
tp->Allstart();
int tasknum = 10;
while(tasknum)
{
int a = rand() % 10 + 1;
usleep(1234);
int b = rand() % 5 + 1;
Task t(a, b);
LOG(INFO, "main thread push task: %s", t.DebugToString().c_str());
tp->Enqueue(t);
tasknum--;
sleep(1);
}
sleep(5);
//强制停止线程池
tp->Stop();
//线程回收
tp->Wait();
return 0;
}
线程安全与可重入
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
死锁
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
我们只要破坏其中一个死锁就无法构成了~
线程安全的单例模式
关于单例模式的具体细节可以参考这篇文章:秒懂C++之特殊类设计-****博客
//构造私有化
ThreadPool(int threadnum = gthreadnum) : _threadnum(threadnum),_waitnum(0), _isrunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
LOG(INFO, "ThreadPool Construct()");
}
//赋值与拷贝禁用
ThreadPool<T> &operator = (const ThreadPool<T> &) = delete;
ThreadPool(const ThreadPool<T> &) = delete;
public:
static ThreadPool<T> *GetInstance()
{
// 如果是多线程获取线程池对象下面的代码就有问题了!!
// 只有第一次会创建对象,后续都是获取
// 双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全
if (nullptr == _instance) // 保证第二次之后,所有线程,不用在加锁,直接返回_instance单例对象
{
LockGuard lockguard(&_lock);
if (nullptr == _instance)
{
_instance = new ThreadPool<T>();
_instance->InitThreadPool();
_instance->Allstart();
LOG(DEBUG, "创建线程池单例");
return _instance;
}
}
LOG(DEBUG, "获取线程池单例");
return _instance;
}
公有只留Wait、Enqueue、Stop函数,把InitThreadPool、Allstart、PoolRun也私有化~
int main()
{
// 获取单例的可能是多线程啊!!!
// 程序已经加载
LOG(DEBUG, "程序已经加载");
sleep(3);
ThreadPool<Task>::GetInstance();
sleep(2);
ThreadPool<Task>::GetInstance();
sleep(2);
ThreadPool<Task>::GetInstance();
sleep(2);
ThreadPool<Task>::GetInstance();
sleep(2);
ThreadPool<Task>::GetInstance()->Wait();
sleep(2);
return 0;
}
不需要构建对象也可以调用线程池的相关函数
其他常见的各种锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
- CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
- 自旋锁,公平锁,非公平锁?
这里我们讲一下自旋锁的相关理解~
其实有点类似我们之前在进程时学习的阻塞等待与非阻塞等待问题~
若是线程在临界区执行的时间较长:推荐其他线程挂起等待(阻塞等待)
若是线程在临界区执行的时间较短:推荐其他线程不要休眠,挂起,阻塞(非阻塞等待),而是一直去抢占锁,直到申请成功。而这个过程称为自旋~
读者写者问题
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
读者写者与生产消费最本质的区别就是:
消费者会把数据拿走,而读者不会!它只会拷贝~
下面通过伪代码来帮助大家理解~
不过pthread_rwlock_t是默认读者优先的,可以会出现写者饥饿问题,如果想要写者优先那可能得修改一下:比如当写者准备竞争时,那么后来的读者都得去等待,而原先的读者则等它们读完释放后再给写者锁。
全部代码
环形队列(信号量) · 276ea56 · 玛丽亚后/keep - Gitee.com
线程池 · f4cd1f1 · 玛丽亚后/keep - Gitee.com