#include <iostream>
#include <functional>
#include <queue>
#include <atomic>
#include <mutex>
#include <thread>
#include <future>
#include <memory>
#include <chrono>
#include <list>
using namespace std;
//状态类别
enum class Status
{
//停止
STOP,
//开始
START
};
/**
* @brief 用于初步封装线程
*
*/
class ThreadPackage
{
private:
//线程需要执行的目标函数封装
shared_ptr<function<void()>> _fun = nullptr;
//所属线程的智能指针
shared_ptr<thread> t1 = nullptr;
public:
//等待释放的线程
std::mutex _wait_release_m;
//初始线程状态
Status _thread_status = Status::STOP;
//默认构造函数
ThreadPackage(){};
//析构函数
~ThreadPackage();
//设置目标功能函数
void setWork(shared_ptr<function<void()>> fun);
//开始执行
void start();
};
ThreadPackage::~ThreadPackage()
{
//查看是否执行完成,未执行完成则等待完成
if (t1->joinable())
{
t1->join();
}
};
void ThreadPackage::start()
{
//初始化状态
_thread_status = Status::START;
// 封装线程
t1 = make_shared<thread>([this]() {
std::unique_lock<std::mutex> lk(_wait_release_m);
this->_thread_status = Status::START;
(*(this->_fun))();
this->_thread_status = Status::STOP;
});
}
void ThreadPackage::setWork(shared_ptr<function<void()>> fun)
{
this->_fun = fun;
}
class ThreadPool
{
public:
ThreadPool(int pool_size);
ThreadPool(){};
~ThreadPool(){};
// 添加任务
template <typename F, typename... Args>
decltype(auto) add_task(F &&func, Args &&...args);
private:
//检查线程是否执行完成
shared_ptr<thread> t_check;
//用于释放已结束的线程
shared_ptr<thread> t_realse;
//添加任务到执行队列的线程
shared_ptr<thread> t_dispatch;
//同时执行的最大线程数
std::atomic<int> _pool_size;
//线程池状态
Status _status = Status::STOP;
//待执行任务队列
queue<function<void()>> _wait_work_q;
//等待添加到待执行任务队列锁
std::mutex _wait_add_work_m;
//等待添加到待执行任务队列的队列
queue<function<void()>> _add_wait_work_q;
//正在工作线程列表的锁
std::mutex _working_m;
//执行的线程链表
list<shared_ptr<ThreadPackage>> _starting_l;
//等待释放列表锁
std::mutex _wait_release_m;
//等待释放的线程链表
list<shared_ptr<ThreadPackage>> _wait_release_l;
//释放函数
void release();
//添加任务到执行队列的函数
void dispatch();
//检查线程是否执行完成的函数
void check();
};
void ThreadPool::release()
{
while (true)
{
{
std::unique_lock<std::mutex> lk(_wait_release_m);
for (auto i = _wait_release_l.begin(); i != _wait_release_l.end(); ++i)
{
if ((*i)->_thread_status == Status::STOP)
{
std::unique_lock<std::mutex> lk((*i)->_wait_release_m);
_wait_release_l.erase(i);
break;
}
}
}
std::this_thread::yield();
}
}
void ThreadPool::dispatch()
{
while (true)
{
if (_wait_work_q.size() > 0 && _starting_l.size() < _pool_size)
{
auto _temp_wait_task = _wait_work_q.front();
_wait_work_q.pop();
auto _temp_thread_package = std::make_shared<ThreadPackage>();
_temp_thread_package->setWork(make_shared<function<void()>>(_temp_wait_task));
_temp_thread_package->start();
std::unique_lock<std::mutex> lk(_working_m);
_starting_l.emplace_back(_temp_thread_package);
}
if (_add_wait_work_q.size() > 0)
{
{
std::unique_lock<std::mutex> lk(_wait_add_work_m);
auto _temp_add_wait_task = _add_wait_work_q.front();
_add_wait_work_q.pop();
_wait_work_q.push(_temp_add_wait_task);
}
}
std::this_thread::yield();
}
}
void ThreadPool::check()
{
while (true)
{
{
std::unique_lock<std::mutex> lk(_working_m);
for (auto i = _starting_l.begin(); i != _starting_l.end(); ++i)
{
if ((*i)->_thread_status == Status::STOP)
{
std::unique_lock<std::mutex> lk(_wait_release_m);
_wait_release_l.emplace_back(*i);
i = _starting_l.erase(i);
break;
}
}
}
std::this_thread::yield();
}
}
ThreadPool::ThreadPool(int pool_size)
{
this->_pool_size = pool_size;
t_check = make_shared<thread>(bind(&ThreadPool::check, this));
t_realse = make_shared<thread>(bind(&ThreadPool::release, this));
t_dispatch = make_shared<thread>(bind(&ThreadPool::dispatch, this));
}
/**
* @brief 主要核心思想就这一个模板函数,用于将原始函数封装成
*
* @tparam F 目标函数类型
* @tparam Args 函数的参数类型
* @param func 目标函数
* @param args 目标参数
* @return decltype(auto) 自动推导返回值
*/
template <typename F, typename... Args>
decltype(auto) ThreadPool::add_task(F &&func, Args &&...args)
{
//推导最后的返回值类型
using res_type = typename std::result_of<F(Args...)>::type;
//封装为packaged_task的智能指针类型
auto task = std::make_shared<std::packaged_task<res_type()>>(
std::bind(std::forward<F>(func), std::forward<Args>(args)...));
//获取用于异步胡获取返回值的future
std::future<res_type> res = task->get_future();
{
//尝试获取待添加队列锁
std::unique_lock<std::mutex> lk(_wait_add_work_m);
//添加到待添加任务队列
_add_wait_work_q.emplace([task]() { (*task)(); });
}
return res;
}
int wc(int n)
{
int all = 1;
for (int i = 1; i < 100000000; ++i)
{
all *= i;
all %= 100000007;
}
cout << n << " ";
return all;
}
int main()
{
vector<future<int>> temp;
auto s1 = clock();
ThreadPool pt(10);
for (int i = 0; i < 8; ++i)
{
temp.emplace_back(pt.add_task(wc, 1));
}
auto s2 = clock();
for (int i = 0; i < temp.size(); ++i)
{
temp[i].get();
}
auto s3 = clock();
cout << endl;
for (int i = 0; i < 8; ++i)
{
wc(2);
}
auto s4 = clock();
cout << endl;
thread t1(wc, 3), t2(wc, 3), t3(wc, 3), t4(wc, 3), t5(wc, 3), t6(wc, 3), t7(wc, 3), t8(wc, 3);
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
t8.join();
auto s5 = clock();
cout << "end\n";
std::cout << (s2 - s1) << "\t" << (s3 - s1) << "\t" << (s4 - s3) << "\t" << (s5 - s4) << std::endl;
while (true)
{
std::this_thread::yield();
}
getchar();
return 0;
}