c++11 线程池实现

#include<iostream>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<future>
#include<atomic>
#include<functional>
#include<vector>
#include<queue>
using namespace std;
#define THREAD_MAX_NUM 16

class ThreadPool{
private:
    vector<thread> threads;//线程数组
    using Task = function<void()>;//封装可调用对象
    queue<Task> tasks;//任务队列
    mutex tasks_mutex;
    condition_variable my_cond;
    int size;
    atomic<bool> run;//线程池状态
public:
    ThreadPool(int n){
        n = n<1?1:(n>THREAD_MAX_NUM?THREAD_MAX_NUM:n);
        run.store(true);//线程池开启
        for(int i=0;i<n;i++){
            threads.emplace_back([this]{
                threadMain();
            });
        }
    }
    ~ThreadPool(){
        cout<<"析构函数"<<endl;
        run.store(false);//关闭线程池
        my_cond.notify_all();//最后需要唤醒所有的线程,让它们从wait函数中出来。
        for(auto it=threads.begin();it!=threads.end();it++){
            if(it->joinable()){
                it->join();
            }
        }
    }
    //这是个类成员函数!!!不能在构造函数中写成 threads.emplace_back(threadMain)....我是个菜鸡
    void threadMain(){
        while(run){
            unique_lock<mutex> myMutex(tasks_mutex);
            //获取一个任务,如果任务队列为空,则阻塞在这里
            my_cond.wait(myMutex,[this]{
                return !run||!tasks.empty();
            });
            if(!run&&tasks.empty()){
                cout<<"threadId: "<<this_thread::get_id()<<endl;
                return;
            }
            
            Task task = move(tasks.front());
            tasks.pop();
            task();//执行任务
        }
    }
    template<typename F,typename... Args>
    auto commitTask(F&& f,Args&&... args) ->future<decltype(f(args...))>
    {
        using ResType = decltype(f(args...));
        // 把函数入口及参数,打包(绑定)
        auto task_ptr = make_shared<packaged_task<ResType()>>(
            bind(forward<F>(f),forward<Args>(args)...)
        );
        // function<void()> func = move([task_ptr]{
        //     (*task_ptr)();
        // });
        {
            unique_lock<mutex> myMutex(tasks_mutex);
            tasks.emplace([task_ptr](){
                (*task_ptr)();
            });
        }
        my_cond.notify_one();//唤醒一个线程(随机的一个线程)
        return task_ptr->get_future();
    }
};
int threadfunc(int i){
    cout<<"任务开始执行。。。"<<"threadId: "<<this_thread::get_id()<<endl;
    this_thread::sleep_for(100ms);
    cout<<"任务结束."<<endl;
    return i;
}
int main(){
    ThreadPool pool(4);
    for(int i=0;i<20;i++){
        future<int> res =  pool.commitTask(threadfunc,i);
        cout<<res.get()<<endl;
    }
    // future<int> res =  pool.commitTask(threadfunc,3);
    // cout<<res.get()<<endl;
    return 0;
}

该线程池还有一些需要完善的地方,但是目前已经能实现所需的核心功能。如对此代码有问题可随时交流,一起学习!
代码部分参考了这个博客https://www.cnblogs.com/lzpong/p/6397997.html

上一篇:vscode 在使用F5进行执行或者调试时,优先编译


下一篇:【转载】 vscode如何在最新版本中配置c/c++语言环境中的launch.json和tasks.json?