讨论QQ群:135202158
本文技术参考了sourceforge项目c thread pool,链接:http://sourceforge.net/projects/cthpool/
线程池如上一篇随笔(http://www.cnblogs.com/zzqcn/p/3585003.html)提到的内存池一样,也是一种池化策略,在启动时(或者更高级的,运行时按一定策略分配)预先开启N个线程,当没有工作要做时,这些线程处于睡眠中;一旦有工作加入工作队列,其中的某些线程就会醒来,处理这些工作,完成后继续睡眠 。
要实现线程池(只针对本文的简单实现而言),应设计和构建3样东西:
- 含N个线程的线程组
- 工作队列
- 工作线程例程
线程组和工作队列表示如下:
/* * Threads: * * +----------+----------+------+------------+ * | thread 0 | thread 1 | .... | thread n-1 | * +----------+----------+------+------------+ * * Job Queue: * * back front * | | * v v * +-------+ +-------+ +-------+ * | job 0 | -> | job 1 | -> ... -> | job x | * +-------+ +-------+ +-------+ * */
线程组可以用普通数组或者动态分配的数组实现,维数就是池中线程数量,存放的其实是线程ID。工作队列可以直接用C++ queue容器实现。
工作线程例程(线程函数)的大致执行流程如下图所示:
/* * * Each Thread Routine: * Job-Queue * | ... * v | * +-------+ +---------+ EnQueue * +---> | sleep | (No job) | new job | <--------- Client * | +-------+ +---------+ * | | | * | | DeQueue +---------+ * | + <----------- | new job | * | | +---------+ * | v * | +---------+ * | | do work | * | +---------+ * | | * | | * +----<----+ * */
工作队列中没有工作时它就睡眠 ,有工作时苏醒,从队列首部取出(&删除)一个工作,然后开始执行。
另外,我们还需要一个互斥锁L和一个计数信号量S,互斥锁用来同步工作队列的增删操作,计数信号量用来对工作队列中的工作数量进行记录。工作线程会一直等待S,直到它大于0。
下面给出完整代码。
1. threadpool.h
1 /* 2 * Linux线程池的简单实现. 3 * Author: 赵子清 4 * Blog: http://www.cnblogs.com/zzqcn 5 * 6 **/ 7 8 9 10 #ifndef __THREADPOOL_H__ 11 #define __THREADPOOL_H__ 12 13 14 #include <semaphore.h> 15 #include <pthread.h> 16 #include <queue> 17 18 19 20 #define DLPTP_MAX_THREADS 1024 21 22 23 struct tp_job_t 24 { 25 void (*work) (void*); 26 void* arg; 27 }; 28 29 struct tp_threadpool_t 30 { 31 pthread_t* threads; 32 size_t nthreads; 33 std::queue<tp_job_t> jobs; 34 sem_t njobs; 35 pthread_mutex_t lock; 36 bool running; 37 }; 38 39 40 tp_threadpool_t* tp_init(size_t _nthreads); 41 int tp_deinit(tp_threadpool_t* _ptp); 42 void* tp_worker(void* _ptp); 43 int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg); 44 45 46 #endif 47
2. threadpool.cpp
1 /* 2 * Linux线程池的简单实现. 3 * Author: 赵子清 4 * Blog: http://www.cnblogs.com/zzqcn 5 * 6 **/ 7 8 9 10 #include "threadpool.h" 11 12 13 14 tp_threadpool_t* tp_init(size_t _nthreads) 15 { 16 if(_nthreads < 1 || _nthreads > DLPTP_MAX_THREADS) 17 return NULL; 18 19 int err = 0; 20 tp_threadpool_t* ret = NULL; 21 size_t i, j; 22 23 ret = new tp_threadpool_t; 24 if(NULL == ret) 25 return NULL; 26 ret->nthreads = _nthreads; 27 ret->threads = new pthread_t[_nthreads]; 28 if(NULL == ret->threads) 29 { 30 delete ret; 31 return NULL; 32 } 33 ret->running = true; 34 35 err = sem_init(&ret->njobs, 0, 0); 36 if(-1 == err) 37 { 38 delete[] ret->threads; 39 delete ret; 40 return NULL; 41 } 42 43 err = pthread_mutex_init(&ret->lock, NULL); 44 if(err) 45 { 46 sem_destroy(&ret->njobs); 47 delete[] ret->threads; 48 delete ret; 49 return NULL; 50 } 51 52 for(i=0; i<_nthreads; ++i) 53 { 54 err = pthread_create(&ret->threads[i], NULL, tp_worker, (void*)ret); 55 if(err) 56 { 57 ret->running = false; 58 for(j=0; j<i; ++j) 59 { 60 pthread_cancel(ret->threads[j]); 61 pthread_join(ret->threads[j], NULL); 62 } 63 pthread_mutex_destroy(&ret->lock); 64 sem_destroy(&ret->njobs); 65 delete[] ret->threads; 66 delete ret; 67 return NULL; 68 } 69 } 70 71 return ret; 72 } 73 74 75 int tp_deinit(tp_threadpool_t* _ptp) 76 { 77 if(NULL == _ptp) 78 return -1; 79 80 int err = 0; 81 size_t i, j; 82 83 // TODO: if now worker has job to handle, do something then exit 84 while(!_ptp->jobs.empty()); 85 86 _ptp->running = false; 87 88 for(i=0; i<_ptp->nthreads; ++i) 89 { 90 err = sem_post(&_ptp->njobs); /* V, ++ */ 91 if(err) 92 { 93 for(j=i; j<_ptp->nthreads; ++j) 94 pthread_cancel(_ptp->threads[j]); 95 break; 96 } 97 } 98 99 for(i=0; i<_ptp->nthreads; ++i) 100 pthread_join(_ptp->threads[i], NULL); 101 102 pthread_mutex_destroy(&_ptp->lock); 103 sem_destroy(&_ptp->njobs); 104 105 delete[] _ptp->threads; _ptp->threads = NULL; 106 delete _ptp; _ptp = NULL; 107 108 return 0; 109 } 110 111 112 void* tp_worker(void* _ptp) 113 { 114 if(NULL == _ptp) 115 return NULL; 116 117 tp_threadpool_t* p = (tp_threadpool_t*)_ptp; 118 119 while(p->running) 120 { 121 sem_wait(&p->njobs); /* P, -- */ 122 123 if(!p->running) 124 return NULL; 125 126 void (*work) (void*); 127 void* arg; 128 tp_job_t job; 129 130 pthread_mutex_lock(&p->lock); /* LOCK */ 131 132 job = p->jobs.front(); 133 work = job.work; 134 arg = job.arg; 135 p->jobs.pop(); 136 137 pthread_mutex_unlock(&p->lock); /* UNLOCK */ 138 139 work(arg); 140 } 141 142 return NULL; 143 } 144 145 146 int tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg) 147 { 148 if(NULL == _ptp || NULL == _work) 149 return -1; 150 151 tp_job_t job; 152 job.work = _work; 153 job.arg = _arg; 154 155 pthread_mutex_lock(&_ptp->lock); /* LOCK */ 156 _ptp->jobs.push(job); 157 sem_post(&_ptp->njobs); /* V, ++ */ 158 pthread_mutex_unlock(&_ptp->lock); /* UNLOCK */ 159 160 return 0; 161 }
3. 测试程序main.cpp
1 /* 2 * Linux线程池测试. 3 * Author: 赵子清 4 * Blog: http://www.cnblogs.com/zzqcn 5 * 6 **/ 7 8 #include <unistd.h> 9 #include <stdio.h> 10 #include "threadpool.h" 11 12 13 /* task 1 */ 14 void task1(void* _arg) 15 { 16 printf("# Thread working: %u\n", (int)pthread_self()); 17 printf(" Task 1 running..\n"); 18 usleep(5000); 19 } 20 21 22 /* task 2 */ 23 void task2(void* _arg) 24 { 25 printf("# Thread working: %u\n", (int)pthread_self()); 26 printf(" Task 2 running.. "); 27 printf("%d\n", *((int*)_arg)); 28 usleep(5000); 29 } 30 31 32 #define N_THREADS 4 33 34 int main(int argc, char** argv) 35 { 36 tp_threadpool_t* ptp = NULL; 37 int i; 38 39 ptp = tp_init(N_THREADS); 40 if(NULL == ptp) 41 { 42 fprintf(stderr, "tp_init fail\n"); 43 return -1; 44 } 45 46 int a = 32; 47 for(i=0; i<10; ++i) 48 { 49 tp_add_job(ptp, task1, NULL); 50 tp_add_job(ptp, task2, (void*)&a); 51 } 52 53 tp_deinit(ptp); 54 55 return 0; 56 }