(原创)C++半同步半异步线程池2

(原创)C++半同步半异步线程池

c++11 boost技术交流群:296561497,欢迎大家来交流技术。

线程池可以高效的处理任务,线程池中开启多个线程,等待同步队列中的任务到来,任务到来多个线程会抢着执行任务,当到来的任务太多,达到上限时需要等待片刻,任务上限保证内存不会溢出。线程池的效率和cpu核数相关,多核的话效率更高,线程数一般取cpu数量+2比较合适,否则线程过多,线程切换频繁反而会导致效率降低。

线程池有两个活动过程:1.外面不停的往线程池添加任务;2.线程池内部不停的取任务执行。活动图如下:

(原创)C++半同步半异步线程池2

线程池中的队列是用的上一篇博文中的同步队列。具体代码:

 
#include<vector>
#include<thread>
#include<functional>
#include<memory>
#include <atomic>
#include"SyncQueue.hpp" const int MaxTaskCount = 100;
class ThreadPool
{
public:
using Task = std::function<void()>;
ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
{
Start(numThreads);
} ~ThreadPool(void)
{
//如果没有停止时则主动停止线程池
Stop();
} void Stop()
{
std::call_once(m_flag, [this]{StopThreadGroup(); }); //保证多线程情况下只调用一次StopThreadGroup
} void AddTask(Task&&task)
{
m_queue.Put(std::forward<Task>(task));
} void AddTask(const Task& task)
{
m_queue.Put(task);
} private:
void Start(int numThreads)
{
m_running = true;
//创建线程组
for (int i = 0; i <numThreads; ++i)
{
m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
}
} void RunInThread()
{
while (m_running)
{
//取任务分别执行
std::list<Task> list;
m_queue.Take(list); for (auto& task : list)
{
if (!m_running)
return; task();
}
}
} void StopThreadGroup()
{
m_queue.Stop(); //让同步队列中的线程停止
m_running = false; //置为false,让内部线程跳出循环并退出 for (auto thread : m_threadgroup) //等待线程结束
{
if (thread)
thread->join();
}
m_threadgroup.clear();
} std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
SyncQueue<Task> m_queue; //同步队列
atomic_bool m_running; //是否停止的标志
std::once_flag m_flag;
};
 

上面的代码中用到了同步队列SyncQueue,它的实现在这里。测试代码如下:

 
void TestThdPool()
{
ThreadPool pool;bool runing = true; std::thread thd1([&pool,&runing]{
while(runing)
{
cout<<"produce "<<this_thread::get_id()<< endl; pool.AddTask([]{
std::cout <<"consume "<<this_thread::get_id()<< endl;
});
}
}); this_thread::sleep_for(std::chrono::seconds(10));
runing = false;
pool.Stop(); thd1.join();
getchar();
}
 

上面的测试代码中,thd1是生产者线程,线程池内部会不断消费生产者产生的任务。在需要的时候可以提前停止线程池,只要调用Stop函数就行了。

一点梦想:尽自己一份力,让c++的世界变得更美好!
上一篇:(并发编程)RLock(与死锁现象),Semaphore,Even事件,线程Queue


下一篇:使用C++11实现一个半同步半异步线程池