基于c++11 thread的scheduler实现

这是一个简单的scheduler,用于在同一个线程上下文(thread context)中执行不同的任务,以避免某些情况下的复杂的竞争。

基于c++11 thread的scheduler实现
 1 #pragma once
 2 #include <string>
 3 #include <thread>
 4 #include <mutex>
 5 #include <condition_variable>
 6 #include <list>
 7 #include <boost/noncopyable.hpp>
 8 
 9 using namespace std;
10 
11 enum{
12     CALLBACK_FUCN = 0,
13     SHUTDOWN = 1
14 };
15 
16 struct callback_t;
17 typedef int (*callback_func)(struct callback_t &param);
18 
19 struct callback_t
20 {
21     int cmd_type;
22     void *param;
23     callback_func callback;
24 };
25 
26 class schedu:boost::noncopyable
27 {
28 public:
29     schedu(string &name);
30     virtual ~schedu();
31     void schedu_callback(struct callback_t &callback);
32 
33 private:
34     void schedu_func();
35 
36 private:
37     string _name;
38     list<struct callback_t> _callback_list;
39     mutex _callback_list_lock;
40     bool _shutting_down;
41 
42     mutex condition_lock;
43     condition_variable condition;
44     thread th;
45 };
基于c++11 thread的scheduler实现
基于c++11 thread的scheduler实现
 1 #include "schedu.h"
 2 #include <iostream>
 3 
 4 void schedu::schedu_func()
 5 {
 6     for (;;){
 7         if (_callback_list.empty()){
 8             unique_lock<mutex> lock(condition_lock);
 9             condition.wait(lock);
10             std::cout << "[scheduler] wait..." << std::endl;
11         }else{
12             _callback_list_lock.lock();
13             callback_t c = _callback_list.front();
14             _callback_list.pop_front();
15             _callback_list_lock.unlock();
16             if (c.cmd_type == SHUTDOWN)
17                 break;
18             else
19                 c.callback(c);
20         }
21     }
22 }
23 
24 schedu::schedu(string &name)
25 :_name(name), th(&schedu::schedu_func,this), _shutting_down(false)
26 {
27     std::cout << "[scheduler] " << _name << " created." << std::endl;
28 }
29 
30 schedu::~schedu()
31 {
32     callback_t callback = { SHUTDOWN, NULL, NULL };
33     schedu_callback(callback);
34     _shutting_down = true;
35     th.join();
36     std::cout << "[scheduler] " << _name << " destroied." << std::endl;
37 }
38 
39 void schedu::schedu_callback(struct callback_t &callback)
40 { 
41     if (_shutting_down){
42         std::cout << "[scheduler] " << _name << " is shutting down..." << std::endl;
43         return;
44     }
45 
46     {
47         lock_guard<mutex> lock(_callback_list_lock);
48         _callback_list.push_back(callback);
49     }
50     unique_lock<mutex> lock(condition_lock);
51     condition.notify_one();
52 }
基于c++11 thread的scheduler实现

测试程序如下:

基于c++11 thread的scheduler实现
 1 #include "schedu.h"
 2 #include <iostream>
 3 #include <chrono>
 4 using namespace std;
 5 
 6 struct func_param
 7 {
 8     int i;
 9     int j;
10 };
11 
12 int func1(callback_t &callback)
13 {
14     this_thread::sleep_for(chrono::seconds(1));
15     func_param *param = (func_param*)callback.param;
16     param->i += 1;
17     cout << "hello world! i = " << param->i << endl;
18     return 1;
19 }
20 
21 int func2(callback_t &callback)
22 {
23     func_param *param = (func_param*)callback.param;
24     param->j += 1;
25     cout << "hello run! j = " << param->j << endl;
26     return 1;
27 }
28 
29 func_param param;
30 int main()
31 {
32     schedu scheduler(string("test_scheduler"));
33 
34     param.i = 0;
35     param.j = 0;
36     for (int i = 0; i < 10; i++){
37         callback_t callback;
38         callback.cmd_type = CALLBACK_FUCN;
39         callback.callback = func1;
40         callback.param = &param;
41         scheduler.schedu_callback(callback);
42 
43         callback.callback = func2;
44         scheduler.schedu_callback(callback);
45     }
46     int a;
47     cin >> a;
48     return a;
49 }
基于c++11 thread的scheduler实现

执行结果:
基于c++11 thread的scheduler实现

基于c++11 thread的scheduler实现

上一篇:LinuxC语言读取文件,分割字符串,存入链表,放入另一个文件


下一篇:Effective C++ 解析 1