SyncTaskQueue.h #pragma once #include <list> #include <mutex> #include <condition_variable> #include <iostream> template <typename TASK> class SyncTaskQueue//队列内部实现加锁,保证操作同步 { //这个队列是被线程池使用,因此具体实例在线程池中定义 public: SyncTaskQueue(int max_size) :max_size_(max_size) { } ~SyncTaskQueue() {//何时析构 如果有个操作在一个线程中阻塞, 对象无法析构 Stop(); std::cout << "SyncTaskQueue destruction" << std::endl; } void Stop() {//退出循环 stop_ = true; not_empty_cond_.notify_one(); not_full_cond_.notify_one(); std::cout << "SyncTaskQueue Stop" << std::endl; } bool IsFull() { std::lock_guard<std::mutex> locker; return list_task_.size() == max_size_; } bool IsEmpty() { std::lock_guard<std::mutex> locker; return list_task_.size() == 0; } void Push(TASK &&data) { std::unique_lock<std::mutex> locker(mutex_); while (Full() && !stop_) {//避免多次获取互斥锁 std::cout << "task queue is full, wait" << std::endl;//阻塞 //满的时候等待, 阻塞等待消费 not_full_cond_.wait_for(locker, std::chrono::milliseconds(500)); } if (stop_) { return; } if (!Full()) { list_task_.push_back(std::forward<TASK>(data));//为什么需要std::forward,保证右值,移动拷贝? not_empty_cond_.notify_one();//not empty cond signal } } void Pop(TASK& data){//没有用返回值的形式 std::unique_lock<std::mutex> locker(mutex_); while (Empty() && !stop_) { std::cout << "task queue is empty, wait" << std::endl;//阻塞 not_empty_cond_.wait_for(locker, std::chrono::milliseconds(500)); } if (!Empty()) { data = list_task_.front(); list_task_.pop_front();//list pop操作分为2步 not_full_cond_.notify_one();//not full cond signal } } private: bool Full() { return list_task_.size() == max_size_; } bool Empty() { return list_task_.size() == 0; } std::mutex mutex_; int max_size_; std::atomic<bool> stop_ = false; std::condition_variable not_full_cond_;//没有满的时候激发 std::condition_variable not_empty_cond_; std::list<TASK> list_task_; }; ThreadPool.h #pragma once #include <list> #include <thread> #include <functional> #include <memory> #include <atomic> #include "SyncTaskQueue.h" #include <mutex> #include <condition_variable> #include <iostream> class ThreadPool { public: using Task = std::function<void()>;//使用别名 ThreadPool(); ~ThreadPool(); void Stop(); void AddTask(Task &&task); private: void Start(int num_thread); void RunThread(); void StopThread(); private: //多个线程对象容器,方便管理 std::list<std::shared_ptr<std::thread>> thread_group_; int thread_num_;//线程数 SyncTaskQueue<Task> queue_;//任务队列 std::atomic<bool> stop_ = false;//需要包含头文件atomic std::once_flag flag_; }; ThreadPool.cpp #include "ThreadPool.h" ThreadPool::ThreadPool():queue_(10) {//构造函数 Start私有化 保证也只能执行一次 thread_num_ = std::thread::hardware_concurrency(); Start(thread_num_); } ThreadPool::~ThreadPool() { Stop(); } void ThreadPool::Stop() {//保证stop 只有一次 std::call_once(flag_, [this] {StopThread(); }); } void ThreadPool::AddTask(Task &&task) { queue_.Push(std::forward<Task>(task)); } void ThreadPool::Start(int num_thread) { thread_num_ = num_thread; std::cout << "thread pool start" << std::endl; for (int i = 0; i < thread_num_; i++) { thread_group_.push_back(std::make_shared<std::thread>(&ThreadPool::RunThread, this));//创建线程的过程中将线程函数传进去 std::cout << "thread " << thread_group_.back()->get_id() << " create " << std::endl; } #if 0 for (auto thread : thread_group_) { thread->get_id(); } #endif } //所有的子线程都会从任务队列里面去取任务执行 void ThreadPool::RunThread() {//线程从任务队列中取任务 //多个线程里面只有队列任务共享的,stop_数据是共享的 while (!stop_) {//如果没有停止,则一直在while循环 Task task_object; queue_.Pop(task_object);//如果没有数据,会自动阻塞 if (stop_) {//如果时停止,则直接return return; } task_object();//取出任务执行,本线程不结束,继续从队列里面取任务执行 std::cout << "thread id " << std::this_thread::get_id() << " exec one task" << std::endl; } } void ThreadPool::StopThread() { stop_ = true; queue_.Stop();//任务队列可能阻塞,需要先停止 //等待线程池中的所有线程执行结束 for (auto it = thread_group_.begin(); it != thread_group_.end(); it++) { (*it)->join(); } thread_group_.clear();//线程对象列表清除 std::cout << "thread pool stop" << std::endl; } void test_ThreadPool() { ThreadPool thread_pool; std::thread thread1([&thread_pool] { auto id = std::this_thread::get_id(); std::cout << "thread id " << id << " add task " << std::endl; thread_pool.AddTask([id]() { std::cout << "thread id " << id << " exec task " <<std::endl; }); }); std::thread thread2([&thread_pool] { auto id = std::this_thread::get_id(); std::cout << "thread id " << id << " add task " << std::endl; thread_pool.AddTask([id]() { std::cout << "thread id " << id << " exec task " << std::endl; }); }); std::thread thread3([&thread_pool] { auto id = std::this_thread::get_id(); std::cout << "thread id " << id << "add task" << std::endl; thread_pool.AddTask([id]() { std::cout << "thread id " << id << "exec task" << std::endl; }); }); //过完2s再结束 std::this_thread::sleep_for(std::chrono::seconds(2)); thread_pool.Stop();//线程池结束 thread1.join(); thread2.join(); thread3.join();//同步线程函数执行完 }