一、线程池的功能
减少线程的创建和销毁的内存资源消耗。
二、线程池基础结构
- 消息队列
- 任务执行队列
- 管理组件
2.1 简易数据结构
2.1.1 消息队列
使用双向链表实现队列结构,节点消息包括执行的消息函数以及函数参数数据。
1 struct nTask { 2 void (*task_func)(struct nTask *task); 3 void *user_data; 4 5 struct nTask *prev; 6 struct nTask *next; 7 };
2.1.2 任务执行队列
使用双向链表实现队列结构,节点信息包括线程id、线程存活状态、所属管理组件。
struct nWorker { pthread_t threadid; int terminate; struct nManager *manager; struct nWorker *prev; struct nWorker *next; };
2.1.3 管理组件
管理组件管理消息队列、任务执行队列。同时,使用锁和条件变量完成对共享数据的保护操作。
typedef struct nManager { struct nTask *tasks; struct nWorker *workers; pthread_mutex_t mutex; pthread_cond_t cond; } ThreadPool;
2.2 底层数据结构接口实现
2.2.1 向队列头插入节点
#define LIST_INSERT(item, list) do { \ item->prev = NULL; \ item->next = list; \ if ((list) != NULL) (list)->prev = item; \ (list) = item; \ } while(0)
2.2.2 删除队列中指定节点
#define LIST_REMOVE(item, list) do { \ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (list == item) list = item->next; \ item->prev = item->next = NULL; \ } while(0)
2.3 线程任务定义
线程池线程执行相同的线程回调函数,在回调函数内,获取消息队列中待处理的消息并执行。
// callback != task static void *nThreadPoolCallback(void *arg) { struct nWorker *worker = (struct nWorker*)arg; while (1) { pthread_mutex_lock(&worker->manager->mutex); while (worker->manager->tasks == NULL) { if (worker->terminate) break; pthread_cond_wait(&worker->manager->cond, &worker->manager->mutex); } if (worker->terminate) { pthread_mutex_unlock(&worker->manager->mutex); break; } struct nTask *task = worker->manager->tasks; LIST_REMOVE(task, worker->manager->tasks); pthread_mutex_unlock(&worker->manager->mutex); task->task_func(task); } free(worker); }
对共享数据消息队列的操作要在锁中进行。执行任务在锁外进行使锁尽可能小。
三、业务接口实现
3.1 线程池创建函数
此函数参数为线程池对象指针和最大线程数。
互斥锁和条件变量的初始化有静态初始化和动态初始化两种。此处用静态初始化的互斥锁和条件变量和memset来初始化线程池的互斥锁和条件变量。
在进行内存申请时要判断内存申请是否成功。
// API int nThreadPoolCreate(ThreadPool *pool, int numWorkers) { if (pool == NULL) return -1; if (numWorkers < 1) numWorkers = 1; memset(pool, 0, sizeof(ThreadPool)); pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t)); //pthread_mutex_init(&pool->mutex, NULL); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t)); int i = 0; for (i = 0;i < numWorkers;i ++) { struct nWorker *worker = (struct nWorker*)malloc(sizeof(struct nWorker)); if (worker == NULL) { perror("malloc"); return -2; } memset(worker, 0, sizeof(struct nWorker)); worker->manager = pool; // int ret = pthread_create(&worker->threadid, NULL, nThreadPoolCallback, worker); if (ret) { perror("pthread_create"); free(worker); return -3; } LIST_INSERT(worker, pool->workers); } // success return 0; }
3.2 线程池销毁函数
线程池销毁主要工作是将线程的存活状态设置为销毁,并广播条件变量通知线程回调函数销毁线程。条件变量要配合锁使用。
// API int nThreadPoolDestory(ThreadPool *pool, int nWorker) { struct nWorker *worker = NULL; for (worker = pool->workers;worker != NULL;worker = worker->next) { worker->terminate; } pthread_mutex_lock(&pool->mutex); pthread_cond_broadcast(&pool->cond); pthread_mutex_unlock(&pool->mutex); pool->workers = NULL; pool->tasks = NULL; return 0; }
3.3 消息添加函数
此函数操作共享数据消息队列,所以需要加锁操作。pthread_cond_signal()函数作用是随机唤醒一个等待条件变量的线程。
// API int nThreadPoolPushTask(ThreadPool *pool, struct nTask *task) { pthread_mutex_lock(&pool->mutex); LIST_INSERT(task, pool->tasks); pthread_cond_signal(&pool->cond); pthread_mutex_unlock(&pool->mutex); }
四、测试样例
#if 1 #define THREADPOOL_INIT_COUNT 20 #define TASK_INIT_SIZE 1000 void task_entry(struct nTask *task) { //type //struct nTask *task = (struct nTask*)task; int idx = *(int *)task->user_data; printf("idx: %d\n", idx); free(task->user_data); free(task); } int main(void) { ThreadPool pool = {0}; nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT); // pool --> memset(); int i = 0; for (i = 0;i < TASK_INIT_SIZE;i ++) { struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask)); if (task == NULL) { perror("malloc"); exit(1); } memset(task, 0, sizeof(struct nTask)); task->task_func = task_entry; task->user_data = malloc(sizeof(int)); *(int*)task->user_data = i; nThreadPoolPushTask(&pool, task); } getchar(); } #endif