半同步半异步线程池框架代码实现

SyncTaskQueue.h
#pragma once
#include <list>
#include <mutex>
#include <condition_variable>
#include <iostream>

template <typename TASK>
class SyncTaskQueue//队列内部实现加锁,保证操作同步
{ //这个队列是被线程池使用,因此具体实例在线程池中定义
public:
    SyncTaskQueue(int max_size) :max_size_(max_size) {


    }
    ~SyncTaskQueue() {//何时析构   如果有个操作在一个线程中阻塞, 对象无法析构
        Stop();
        std::cout << "SyncTaskQueue destruction" << std::endl;
    }
    void  Stop() {//退出循环
        stop_ = true;
        not_empty_cond_.notify_one();
        not_full_cond_.notify_one();
        std::cout << "SyncTaskQueue Stop" << std::endl;
    }

    bool IsFull() {
        std::lock_guard<std::mutex>  locker;
        return list_task_.size() == max_size_;

    }

    bool IsEmpty() {
        std::lock_guard<std::mutex> locker;
        return list_task_.size() == 0;
    }

    void Push(TASK &&data) {
        std::unique_lock<std::mutex> locker(mutex_);
        while (Full() && !stop_) {//避免多次获取互斥锁
            std::cout << "task queue is full, wait" << std::endl;//阻塞
            //满的时候等待, 阻塞等待消费
            not_full_cond_.wait_for(locker, std::chrono::milliseconds(500));
        }
        if (stop_) {
            return;
        }
        if (!Full()) {
            list_task_.push_back(std::forward<TASK>(data));//为什么需要std::forward,保证右值,移动拷贝?
            not_empty_cond_.notify_one();//not empty cond signal   
        }

    }

    void Pop(TASK& data){//没有用返回值的形式
        std::unique_lock<std::mutex> locker(mutex_);
        while (Empty() && !stop_) {
            std::cout << "task queue is empty, wait" << std::endl;//阻塞
            not_empty_cond_.wait_for(locker, std::chrono::milliseconds(500));
        }
        if (!Empty()) {
            data = list_task_.front();
            list_task_.pop_front();//list  pop操作分为2步
            not_full_cond_.notify_one();//not full cond signal        
        }
    }

private:
    bool Full() {
        return list_task_.size() == max_size_;
    }
    bool Empty() {
        return list_task_.size() == 0;
    }
    std::mutex   mutex_;
    int max_size_;
    std::atomic<bool>   stop_ = false;
    std::condition_variable   not_full_cond_;//没有满的时候激发
    std::condition_variable   not_empty_cond_;
    std::list<TASK>  list_task_;
};

ThreadPool.h
#pragma once
#include <list>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>
#include "SyncTaskQueue.h"
#include <mutex>
#include <condition_variable>
#include <iostream>

class ThreadPool {
public:
    using Task = std::function<void()>;//使用别名
    ThreadPool();
    ~ThreadPool();

    void Stop();
    void AddTask(Task &&task);
private:
    void Start(int num_thread);
    void RunThread();
    void StopThread();

private:
    //多个线程对象容器,方便管理
    std::list<std::shared_ptr<std::thread>> thread_group_;
    int thread_num_;//线程数
    SyncTaskQueue<Task>  queue_;//任务队列
    std::atomic<bool> stop_ = false;//需要包含头文件atomic
    std::once_flag  flag_;
};

ThreadPool.cpp
#include "ThreadPool.h"


ThreadPool::ThreadPool():queue_(10)
{//构造函数  Start私有化  保证也只能执行一次
    thread_num_ = std::thread::hardware_concurrency();
    Start(thread_num_);
}
ThreadPool::~ThreadPool() 
{
   Stop();
}

void ThreadPool::Stop() {//保证stop 只有一次
    std::call_once(flag_, [this] {StopThread(); });
}

void ThreadPool::AddTask(Task &&task) {
    queue_.Push(std::forward<Task>(task));
}

void ThreadPool::Start(int num_thread) {
    thread_num_ = num_thread;
    std::cout << "thread pool start" << std::endl;
    for (int i = 0; i < thread_num_; i++) {
        thread_group_.push_back(std::make_shared<std::thread>(&ThreadPool::RunThread, this));//创建线程的过程中将线程函数传进去        
        std::cout << "thread " << thread_group_.back()->get_id() << " create " << std::endl;
    }
#if 0
    for (auto thread : thread_group_) {
        thread->get_id();
    }
#endif
    
}

//所有的子线程都会从任务队列里面去取任务执行
void ThreadPool::RunThread() {//线程从任务队列中取任务 
    //多个线程里面只有队列任务共享的,stop_数据是共享的
    while (!stop_) {//如果没有停止,则一直在while循环
        Task  task_object;
        queue_.Pop(task_object);//如果没有数据,会自动阻塞
        if (stop_) {//如果时停止,则直接return
            return;
        }
        task_object();//取出任务执行,本线程不结束,继续从队列里面取任务执行
        std::cout << "thread id " << std::this_thread::get_id() << " exec one task" << std::endl;

    }
}

void ThreadPool::StopThread() {
    stop_ = true;
    queue_.Stop();//任务队列可能阻塞,需要先停止
    //等待线程池中的所有线程执行结束
    for (auto it = thread_group_.begin(); it != thread_group_.end(); it++) {
        (*it)->join();
    }
    thread_group_.clear();//线程对象列表清除
    std::cout << "thread pool stop" << std::endl;
}

void test_ThreadPool()
{

    ThreadPool  thread_pool;

    std::thread  thread1([&thread_pool] {
        auto id = std::this_thread::get_id();
        std::cout << "thread id " << id << " add task " << std::endl;
        thread_pool.AddTask([id]() {        
            std::cout << "thread id " << id << " exec task " <<std::endl;
        });
    });
       
    std::thread  thread2([&thread_pool] {
        auto id = std::this_thread::get_id();
        std::cout << "thread id " << id << " add task " << std::endl;
        thread_pool.AddTask([id]() {
            std::cout << "thread id " << id << " exec task " << std::endl;
        });
    });

    std::thread  thread3([&thread_pool] {
        auto id = std::this_thread::get_id();
        std::cout << "thread id " << id << "add task" << std::endl;
        thread_pool.AddTask([id]() {
            std::cout << "thread id " << id << "exec task" << std::endl;
        });
    });

    //过完2s再结束
    std::this_thread::sleep_for(std::chrono::seconds(2));
    thread_pool.Stop();//线程池结束
    thread1.join();
    thread2.join();
    thread3.join();//同步线程函数执行完
}

  

上一篇:ThreadPool(线程池)介绍


下一篇:如果设置最合理的ThreadPool大小与合理的BlockingQueue大小