◆ 概要
笔者在 《简单的线程池(四)》 中采用了非阻塞的(nonblocking)线程同步方式,在此文中笔者将采用阻塞的(blocking)线程同步方式(参考 《简单的线程池(二)》) 实现相同特性的线程池。
本文不再赘述与 《简单的线程池(二)》、《简单的线程池(四)》 相同的内容。如有不明之处,请参考该博客。
◆ 实现
以下代码给出了此线程池的实现,(blocking_unique_pool.h)
class Thread_Pool {
private:
struct Task_Wrapper { ...
};
atomic<bool> _suspend_;
atomic<bool> _done_;
unsigned _workersize_;
thread* _workers_;
Blocking_Queue<Task_Wrapper>* _workerqueues_; // #1
void work(unsigned index) {
Task_Wrapper task;
while (!_done_.load(memory_order_acquire)) {
_workerqueues_[index].pop(task);
task();
while (_suspend_.load(memory_order_acquire))
std::this_thread::yield();
}
}
void stop() {
size_t remaining = 0;
_suspend_.store(true, memory_order_release);
for (unsigned i = 0; i < _workersize_; ++i)
remaining += _workerqueues_[i].size();
_suspend_.store(false, memory_order_release);
for (unsigned i = 0; i < _workersize_; ++i)
while (!_workerqueues_[i].empty())
std::this_thread::yield();
std::fprintf(stderr, "\n%zu tasks remain before destructing pool.\n", remaining);
_done_.store(true, memory_order_release);
for (unsigned i = 0; i < _workersize_; ++i)
_workerqueues_->push([] {}); // #2
for (unsigned i = 0; i < _workersize_; ++i)
if (_workers_[i].joinable())
_workers_[i].join();
delete[] _workers_;
delete[] _workerqueues_;
}
public:
Thread_Pool() : _suspend_(false), _done_(false) {
try {
_workersize_ = thread::hardware_concurrency();
_workers_ = new thread[_workersize_]();
_workerqueues_ = new Blocking_Queue<Task_Wrapper>[_workersize_]();
for (unsigned i = 0; i < _workersize_; ++i)
_workers_[i] = thread(&Thread_Pool::work, this, i);
} catch (...) {
stop();
throw;
}
}
~Thread_Pool() {
stop();
}
template<class Callable>
future<typename std::result_of<Callable()>::type> submit(Callable c) {
typedef typename std::result_of<Callable()>::type R;
packaged_task<R()> task(c);
future<R> r = task.get_future();
_workerqueues_[std::rand() % _workersize_].push(std::move(task));
return r;
}
};
此线程池采用阻塞式的任务队列存放线程池用户提交的任务(#1)。为了避免发生 《简单的线程池(二)》 中提及的死锁问题,在主线程调用工作线程的 join() 函数之前,向每个任务队列中放入一个假任务(#2),确保各个工作线程都能退出循环等待。
◆ 逻辑
以下类图展现了此线程池的代码主要逻辑结构。它与 《简单的线程池(二)》 中的线程池的区别在于 Thread_Pool 类到 Blocking_Queue<> 类的多重性由 1 变为 1..* 。
线程池用户提交任务与工作线程执行任务的并发过程与 《简单的线程池(二)》 中的一致,此处略。
◆ 验证
验证过程采用了 《简单的线程池(三)》 中定义的的测试用例,对应的测试结果均保存在 [github] cnblogs/15676560 中。
◆ 最后
完整的代码与测试数据请参考 [github] cnblogs/15676560 。