文章目录
前言
这段时间看了《C++并发编程实战》的基础内容,想着利用最近学的知识自己实现一个简单的线程池。
思路
个人对线程池的理解是:利用已经创建的固定数量的线程去执行指定的任务,从而避免线程重复创建和销毁带来的额外开销。
C++11中,线程我们可以理解为对应一个thread对象,任务可以理解为要执行的函数,通常是耗时的函数。
我们的任务多少和顺序并非固定的,因此需要有一个方法能添加指定的任务,任务存放的地方应该是一个任务队列,因为我们的线程数量有限,当任务很多时同时执行的任务数量也有限,因此任务需要排队,遵循先来后到的原则。
当要执行一个任务时,意味着先将这个任务从队列取出,再执行相应任务,而“取出”动作的执行者是线程池中的线程,这意味我们的队列需要考虑多个线程在同一队列上执行“取出”操作的问题,实际上,取出任务操作和添加任务操作也不能同时进行,否则会产生竞争条件;另一方面,程序本身如果就是多线程的,多个线程同时添加任务的操作也应该是互斥的。
当没有任务可以执行时,所有线程应该什么也不做,当出现了一个任务时,应该将这个任务分配到任一线程中执行。实现上我们固然可以使用轮询的方式判断当前队列是否有任务,有则取出(即使加了互斥锁似乎也无法避免竞争条件?),但这样会消耗无谓的CPU资源,写轮询周期难以选取。其实,我们可以使用condition_variable代替轮询。
上述任务的创建和取出其实就是经典的生产者消费者模型。
我们将上面的内容都封装在一个类中,取名ThreadPool,用户可以在构造ThreadPool对象时指定线程池大小,之后可以随时添加要执行的任务。
实现
class ThreadPool
{
public:
ThreadPool(int n);
~ThreadPool();
void pushTask(packaged_task<void()> &&task);
private:
vector<thread*> threadPool;
deque<packaged_task<void()>> taskQueue;
void taskConsumer();
mutex taskMutex;
condition_variable taskQueueCond;
};
ThreadPool::ThreadPool(int n)
{
for (int i = 0; i < n; i++)
{
thread *t = new thread(&ThreadPool::taskConsumer,this);
threadPool.push_back(t);
t->detach();
}
}
ThreadPool::~ThreadPool()
{
while (!threadPool.empty())
{
thread *t=threadPool.back();
threadPool.pop_back();
delete t;
}
}
void ThreadPool::pushTask(packaged_task<void()> &&task)
{
{
lock_guard<mutex> guard(taskMutex);
taskQueue.push_back(std::move(task));
}
taskQueueCond.notify_one();
}
void ThreadPool::taskConsumer()
{
while (true)
{
unique_lock<mutex> lk(taskMutex);
taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
packaged_task<void()> task=std::move(taskQueue.front());
taskQueue.pop_front();
lk.unlock();
task();
}
}
这里我使用packaged_task作为任务,每当添加一个任务,就调用condition_variable::notify_one方法,调用condition_variable::wait的线程就会被唤醒,并检查等待条件。这里有个小细节是notify_one在解锁后执行,这样避免线程唤醒后还要等待互斥锁解锁。
使用示例:
void Task1()
{
Sleep(1000);
cout << "Task1"<<endl;
}
void Task5()
{
Sleep(5000);
cout << "Task5" << endl;
}
class Worker
{
public:
void run();
};
void Worker::run()
{
cout << "Worker::run start" << endl;
Sleep(5000);
cout << "Worker::run end" << endl;
}
int main()
{
ThreadPool pool(2);
pool.pushTask(packaged_task<void()>(Task5));
pool.pushTask(packaged_task<void()>(Task1));
pool.pushTask(packaged_task<void()>(Task1));
Worker worker;
pool.pushTask(packaged_task<void()>(bind(&Worker::run,&worker)));
pool.pushTask(packaged_task<void()>([&](){worker.run();}));
Sleep(20000);
}
这个线程池目前有几个缺点:
- 只能传入调用形式为void()形式的函数或可调用对象,不能返回任务执行的值,只能通过其他方式同步任务执行结果(如果有)
- 传入参数较为复杂,必须封装一层packaged_task,调用对象方法时需要使用bind或者lambda表达式的方法封装
以上缺点在当前版本的实现不予解决,日后另写博文优化。