使用C++11 开发一个半同步半异步线程池

摘自:《深入应用C++11》第九章

实际中,主要有两种方法处理大量的并发任务,一种是一个请求由系统产生一个相应的处理请求的线程(一对一)

另外一种是系统预先生成一些用于处理请求的进程,当请求的任务来临时,先放入同步队列中,分配一个处理请求的进程去处理任务,

线程处理完任务后还可以重用,不会销毁,而是等待下次任务的到来。(一对多的线程池技术)

线程池技术,能避免大量线程的创建和销毁动作,节省资源,对于多核处理器,由于线程被分派配到多个cpu,会提高并行处理的效率。

线程池技术分为半同步半异步线程池和领导者追随者线程池,下面附上代码:

//SyncQueue.hpp
//同步队列,存放任务
#ifndef SYNCQUEUE_HPP
#define SYNCQUEUE_HPP #include <list>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <utility>
#include <iostream> template<typename T>
class SyncQueue
{
public:
SyncQueue(int maxSize) : m_maxSize(maxSize),m_needStop(false){} void Put(const T& x)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notFull.wait(locker,[this]{ return m_needStop || NotFull();});
if(m_needStop)
return;
m_queue.push_back(x);
m_notEmpty.notify_one();
} void Take(std::list<T>& list)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker,[this]{return m_needStop || NotEmpty();});
if(m_needStop) list = std::move(m_queue); //move semantics,avoid copy.
m_notFull.notify_one();
} void Take(T& x)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker,[this]{return m_needStop || NotEmpty();});
if(m_needStop)
return;
x=m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
} void Stop()
{
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all();
m_notEmpty.notify_all();
} std::size_t Size()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
}
private:
bool NotFull()
{
bool full = m_queue.size() >= m_maxSize;
if(full)
std::cout << "the buffer is full,waiting...\n";
return !full;
}
bool NotEmpty()
{
bool empty = m_queue.empty();
if(empty)
std::cout << "the buffer is empty,waiting...\n";
return !empty;
}
private:
std::list<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_notEmpty;
std::condition_variable m_notFull;
int m_maxSize;
bool m_needStop; //stop flag
}; #endif // SYNC_QUEUE_HPP
//ThreadPool.hpp

#ifndef THREAD_POOL_HPP
#define THREAD_POOL_HPP #include <list>
#include <thread>
#include <memory>
#include "SyncQueue.hpp"
#include <functional>
#include <atomic> const int MaxTaskCount = ; class ThreadPool
{
public:
using Task = std::function<void()>; ThreadPool(int numThreads) :
m_taskQueue(MaxTaskCount)
{
Start(numThreads);
} ~ThreadPool(){ Stop();};
void Stop()
{
std::call_once(m_once_flag,[this]{StopThreadGroup();});
} void AddTask(const Task& task)
{
m_taskQueue.Put(task);
} std::size_t SyncQueueSize()
{
return m_taskQueue.Size();
}
private:
void Start(int numThreads)
{
m_running = true; for(int i = ;i < numThreads;++i)
{
m_threadGrop.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread,this));
} } void RunInThread()
{
while(m_running)
{
std::list<Task> list;
m_taskQueue.Take(list);
for(auto& task : list)
{
if(!m_running)
return; task();
}
}
return;
} void StopThreadGroup()
{
m_taskQueue.Stop();
m_running = false;
for(auto thread : m_threadGrop)
{
if(thread)
thread->join();
}
m_threadGrop.clear();
}
private:
std::list<std::shared_ptr<std::thread>> m_threadGrop; //thread group
SyncQueue<Task> m_taskQueue;
std::atomic_bool m_running;
std::once_flag m_once_flag;
}; #endif // THREAD_POOL_HPP

测试:

#include "ThreadPool.hpp"
#include <thread>
#include <iostream>
#include <chrono>
#include <functional>
int main()
{
ThreadPool pool(); //create two threads to handle tasks std::thread thd1([&pool]{
for(int i = ;i < ;i++)
{
auto thdId = std::this_thread::get_id(); pool.AddTask([thdId]{
std::cout<<"thdID1: "<< thdId << std::endl;
});
}
}); std::thread thd2([&pool]{
for(int i = ;i < ;i++)
{
auto thdID = std::this_thread::get_id(); pool.AddTask([thdID]{
std::cout << "thdID2: " << thdID << std::endl;
});
}
}); thd1.join();
thd2.join();
std::this_thread::sleep_for(std::chrono::seconds());
pool.Stop();
return ;
}
上一篇:imx6 18bit display


下一篇:使用pygame制作一个简单的游戏