文章目录
线程池的实现原理:
在并发程序设计中,由于线程的反复创建于销毁是非常消耗时间的,在存在大量的线程的创建于销毁的程序中,我们可以事先创建出一部分线程,然后管理这些线程去处理我们的任务,这样可以节省一大部分反复创建与销毁的时间开销,线程池的好处这里不多说了,看一下muduo 网络库对线程池的处理
构造函数
ThreadPool::ThreadPool(const string& nameArg)
: mutex_(),
//线程锁
notEmpty_(mutex_),
notFull_(mutex_),
name_(nameArg),
//线程池处理的最大的任务数
maxQueueSize_(0),
//标识线程池是否正在运行
running_(false)
{
}
析构函数
ThreadPool::~ThreadPool()
{
//如果线程池正在运行,停止线程池
if (running_)
{
stop();
}
}
start 线程池的启动函数
void ThreadPool::start(int numThreads)
{
//判断线程池是否为空
assert(threads_.empty());
//线程池的运行状态设置为true
running_ = true;
//设置线程池的线程的个数
threads_.reserve(numThreads);
//创建numsThreads 个线程,并启动这些线程
//这些线程在没有任务的时候进入睡眠状态,在Thread类中可以看出
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1);
threads_.emplace_back(new muduo::Thread(
std::bind(&ThreadPool::runInThread, this), name_+id));
//启动线程,在Thread类中,启动一个线程
threads_[i]->start();
}
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_();
}
}
stop 停止线程池
//用于回收所有的线程
void ThreadPool::stop()
{
{
MutexLockGuard lock(mutex_);
running_ = false;
//唤醒所有的线程
notEmpty_.notifyAll();
}
for (auto& thr : threads_)
{
thr->join();
}
}
run 线程池中添加一个任务
void ThreadPool::run(Task task)
{
if (threads_.empty())
{
//如果没有子线程,就在主线程中执行该任务
task();
}
else
{
//对线程池队列加锁
MutexLockGuard lock(mutex_);
//如果任务队列满了,即线程池中的任务数达到了上限,就调用wait阻塞,直到可以添加任务位置
while (isFull())
{
notFull_.wait();
}
assert(!isFull());
//将任务放入队列
queue_.push_back(std::move(task));
//唤醒一个线程取执行任务
notEmpty_.notify();
}
}
take 线程池中的线程从任务队列取一个线程
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
//如果任务队列为空,并且线程池处于运行状态
//则线程池中的线程wait 进入睡眠状态
while (queue_.empty() && running_)
{
//当前的子线程处于睡眠的状态
notEmpty_.wait();
}
Task task;
//任务队列不为空的时候
if (!queue_.empty())
{
//获取队列首部的任务
task = queue_.front();
//出队列一个任务
queue_.pop_front();
if (maxQueueSize_ > 0)
{
//此时任务队列肯定不满,可以添加新的任务
notFull_.notify();
}
}
//将获得的任务返回执行
return task;
}
runInThread 线程池中的线程的执行的区域
void ThreadPool::runInThread()
{
try
{
if (threadInitCallback_)
{
threadInitCallback_();
}
while (running_)
{
//Task task(take()); 返回一个任务
Task task(take());
if (task)
{
//执行任务队列获得的任务
task();
}
}
}
catch (const Exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw; // rethrow
}
}