一、线程池
大多数的网络服务器,包括Web服务器都具有一个特点,就是单位时间内必须处理数目巨大的连接请求,但是处理时间却是比较短的。在传统的多线程服务器模型中是这样实现的:一旦有个请求到达,就创建一个新的线程,由该线程执行任务,任务执行完毕之后,线程就退出。这就是"即时创建,即时销毁"的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数非常频繁,那么服务器就将处于一个不停的创建线程和销毁线程的状态。这笔开销是不可忽略的,尤其是线程执行的时间非常非常短的情况。
线程池就是为了解决上述问题的,它的实现原理是这样的:在应用程序启动之后,就马上创建一定数量的线程,放入空闲的队列中。这些线程都是处于阻塞状态,这些线程只占一点内存,不占用CPU。当任务到来后,线程池将选择一个空闲的线程,将任务传入此线程中运行。当所有的线程都处在处理任务的时候,线程池将自动创建一定的数量的新线程,用于处理更多的任务。执行任务完成之后线程并不退出,而是继续在线程池中等待下一次任务。当大部分线程处于阻塞状态时,线程池将自动销毁一部分的线程,回收系统资源。
下面是一个简单线程池的实现,这个线程池的代码是我参考网上的一个例子实现的,由于找不到出处了,就没办法注明参考自哪里了。它的方案是这样的:程序启动之前,初始化线程池,启动线程池中的线程,由于还没有任务到来,线程池中的所有线程都处在阻塞状态,当一有任务到达就从线程池中取出一个空闲线程处理,如果所有的线程都处于工作状态,就添加到队列,进行排队。如果队列中的任务个数大于队列的所能容纳的最大数量,那就不能添加任务到队列中,只能等待队列不满才能添加任务到队列中。
主要由两个文件组成一个threadpool.h头文件和一个threadpool.c源文件组成。源码中已有重要的注释,就不加以分析了。
threadpool.h文件:
- struct job
- {
- void* (*callback_function)(void *arg); //线程回调函数
- void *arg; //回调函数参数
- struct job *next;
- };
- struct threadpool
- {
- int thread_num; //线程池中开启线程的个数
- int queue_max_num; //队列中最大job的个数
- struct job *head; //指向job的头指针
- struct job *tail; //指向job的尾指针
- pthread_t *pthreads; //线程池中所有线程的pthread_t
- pthread_mutex_t mutex; //互斥信号量
- pthread_cond_t queue_empty; //队列为空的条件变量
- pthread_cond_t queue_not_empty; //队列不为空的条件变量
- pthread_cond_t queue_not_full; //队列不为满的条件变量
- int queue_cur_num; //队列当前的job个数
- int queue_close; //队列是否已经关闭
- int pool_close; //线程池是否已经关闭
- };
- //================================================================================================
- //函数名: threadpool_init
- //函数描述: 初始化线程池
- //输入: [in] thread_num 线程池开启的线程个数
- // [in] queue_max_num 队列的最大job个数
- //输出: 无
- //返回: 成功:线程池地址 失败:NULL
- //================================================================================================
- struct threadpool* threadpool_init(int thread_num, int queue_max_num);
- //================================================================================================
- //函数名: threadpool_add_job
- //函数描述: 向线程池中添加任务
- //输入: [in] pool 线程池地址
- // [in] callback_function 回调函数
- // [in] arg 回调函数参数
- //输出: 无
- //返回: 成功:0 失败:-1
- //================================================================================================
- int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg);
- //================================================================================================
- //函数名: threadpool_destroy
- //函数描述: 销毁线程池
- //输入: [in] pool 线程池地址
- //输出: 无
- //返回: 成功:0 失败:-1
- //================================================================================================
- int threadpool_destroy(struct threadpool *pool);
- //================================================================================================
- //函数名: threadpool_function
- //函数描述: 线程池中线程函数
- //输入: [in] arg 线程池地址
- //输出: 无
- //返回: 无
- //================================================================================================
- void* threadpool_function(void* arg);
- #include "threadpool.h"
- struct threadpool* threadpool_init(int thread_num, int queue_max_num)
- {
- struct threadpool *pool = NULL;
- do
- {
- pool = malloc(sizeof(struct threadpool));
- if (NULL == pool)
- {
- printf("failed to malloc threadpool!\n");
- break;
- }
- pool->thread_num = thread_num;
- pool->queue_max_num = queue_max_num;
- pool->queue_cur_num = 0;
- pool->head = NULL;
- pool->tail = NULL;
- if (pthread_mutex_init(&(pool->mutex), NULL))
- {
- printf("failed to init mutex!\n");
- break;
- }
- if (pthread_cond_init(&(pool->queue_empty), NULL))
- {
- printf("failed to init queue_empty!\n");
- break;
- }
- if (pthread_cond_init(&(pool->queue_not_empty), NULL))
- {
- printf("failed to init queue_not_empty!\n");
- break;
- }
- if (pthread_cond_init(&(pool->queue_not_full), NULL))
- {
- printf("failed to init queue_not_full!\n");
- break;
- }
- pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
- if (NULL == pool->pthreads)
- {
- printf("failed to malloc pthreads!\n");
- break;
- }
- pool->queue_close = 0;
- pool->pool_close = 0;
- int i;
- for (i = 0; i < pool->thread_num; ++i)
- {
- pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool);
- }
- return pool;
- } while (0);
- return NULL;
- }
- int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg)
- {
- assert(pool != NULL);
- assert(callback_function != NULL);
- assert(arg != NULL);
- pthread_mutex_lock(&(pool->mutex));
- while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close))
- {
- pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex)); //队列满的时候就等待
- }
- if (pool->queue_close || pool->pool_close) //队列关闭或者线程池关闭就退出
- {
- pthread_mutex_unlock(&(pool->mutex));
- return -1;
- }
- struct job *pjob =(struct job*) malloc(sizeof(struct job));
- if (NULL == pjob)
- {
- pthread_mutex_unlock(&(pool->mutex));
- return -1;
- }
- pjob->callback_function = callback_function;
- pjob->arg = arg;
- pjob->next = NULL;
- if (pool->head == NULL)
- {
- pool->head = pool->tail = pjob;
- pthread_cond_broadcast(&(pool->queue_not_empty)); //队列空的时候,有任务来时就通知线程池中的线程:队列非空
- }
- else
- {
- pool->tail->next = pjob;
- pool->tail = pjob;
- }
- pool->queue_cur_num++;
- pthread_mutex_unlock(&(pool->mutex));
- return 0;
- }
- void* threadpool_function(void* arg)
- {
- struct threadpool *pool = (struct threadpool*)arg;
- struct job *pjob = NULL;
- while (1) //死循环
- {
- pthread_mutex_lock(&(pool->mutex));
- while ((pool->queue_cur_num == 0) && !pool->pool_close) //队列为空时,就等待队列非空
- {
- pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
- }
- if (pool->pool_close) //线程池关闭,线程就退出
- {
- pthread_mutex_unlock(&(pool->mutex));
- pthread_exit(NULL);
- }
- pool->queue_cur_num--;
- pjob = pool->head;
- if (pool->queue_cur_num == 0)
- {
- pool->head = pool->tail = NULL;
- }
- else
- {
- pool->head = pjob->next;
- }
- if (pool->queue_cur_num == 0)
- {
- pthread_cond_signal(&(pool->queue_empty)); //队列为空,就可以通知threadpool_destroy函数,销毁线程函数
- }
- if (pool->queue_cur_num == pool->queue_max_num - 1)
- {
- pthread_cond_broadcast(&(pool->queue_not_full)); //队列非满,就可以通知threadpool_add_job函数,添加新任务
- }
- pthread_mutex_unlock(&(pool->mutex));
- (*(pjob->callback_function))(pjob->arg); //线程真正要做的工作,回调函数的调用
- free(pjob);
- pjob = NULL;
- }
- }
- int threadpool_destroy(struct threadpool *pool)
- {
- assert(pool != NULL);
- pthread_mutex_lock(&(pool->mutex));
- if (pool->queue_close || pool->pool_close) //线程池已经退出了,就直接返回
- {
- pthread_mutex_unlock(&(pool->mutex));
- return -1;
- }
- pool->queue_close = 1; //置队列关闭标志
- while (pool->queue_cur_num != 0)
- {
- pthread_cond_wait(&(pool->queue_empty), &(pool->mutex)); //等待队列为空
- }
- pool->pool_close = 1; //置线程池关闭标志
- pthread_mutex_unlock(&(pool->mutex));
- pthread_cond_broadcast(&(pool->queue_not_empty)); //唤醒线程池中正在阻塞的线程
- pthread_cond_broadcast(&(pool->queue_not_full)); //唤醒添加任务的threadpool_add_job函数
- int i;
- for (i = 0; i < pool->thread_num; ++i)
- {
- pthread_join(pool->pthreads[i], NULL); //等待线程池的所有线程执行完毕
- }
- pthread_mutex_destroy(&(pool->mutex)); //清理资源
- pthread_cond_destroy(&(pool->queue_empty));
- pthread_cond_destroy(&(pool->queue_not_empty));
- pthread_cond_destroy(&(pool->queue_not_full));
- free(pool->pthreads);
- struct job *p;
- while (pool->head != NULL)
- {
- p = pool->head;
- pool->head = p->next;
- free(p);
- }
- free(pool);
- return 0;
- }
- #include "threadpool.h"
- void* work(void* arg)
- {
- char *p = (char*) arg;
- printf("threadpool callback fuction : %s.\n", p);
- sleep(1);
- }
- int main(void)
- {
- struct threadpool *pool = threadpool_init(10, 20);
- threadpool_add_job(pool, work, "1");
- threadpool_add_job(pool, work, "2");
- threadpool_add_job(pool, work, "3");
- threadpool_add_job(pool, work, "4");
- threadpool_add_job(pool, work, "5");
- threadpool_add_job(pool, work, "6");
- threadpool_add_job(pool, work, "7");
- threadpool_add_job(pool, work, "8");
- threadpool_add_job(pool, work, "9");
- threadpool_add_job(pool, work, "10");
- threadpool_add_job(pool, work, "11");
- threadpool_add_job(pool, work, "12");
- threadpool_add_job(pool, work, "13");
- threadpool_add_job(pool, work, "14");
- threadpool_add_job(pool, work, "15");
- threadpool_add_job(pool, work, "16");
- threadpool_add_job(pool, work, "17");
- threadpool_add_job(pool, work, "18");
- threadpool_add_job(pool, work, "19");
- threadpool_add_job(pool, work, "20");
- threadpool_add_job(pool, work, "21");
- threadpool_add_job(pool, work, "22");
- threadpool_add_job(pool, work, "23");
- threadpool_add_job(pool, work, "24");
- threadpool_add_job(pool, work, "25");
- threadpool_add_job(pool, work, "26");
- threadpool_add_job(pool, work, "27");
- threadpool_add_job(pool, work, "28");
- threadpool_add_job(pool, work, "29");
- threadpool_add_job(pool, work, "30");
- threadpool_add_job(pool, work, "31");
- threadpool_add_job(pool, work, "32");
- threadpool_add_job(pool, work, "33");
- threadpool_add_job(pool, work, "34");
- threadpool_add_job(pool, work, "35");
- threadpool_add_job(pool, work, "36");
- threadpool_add_job(pool, work, "37");
- threadpool_add_job(pool, work, "38");
- threadpool_add_job(pool, work, "39");
- threadpool_add_job(pool, work, "40");
- sleep(5);
- threadpool_destroy(pool);
- return 0;
- }
上面的文章介绍了线程池的原理及意义。
下面,介绍的这个线程池与上面提到的那个线程池有一部分相似的地方。
主要区别为:
1、线程池中的每个线程都有自己的互斥量和条件变量,而不是线程池共享一个。
2、线程池中的线程在程序结束时,等待线程池中线程停止的机制不同。
该程序主要由两个文件构成,分别为ThreadPool.h和ThreadPool.cpp文件。
ThreadPool.h文件:
- #define MAXT_IN_POOL 200
- #define BUSY_THRESHOlD 0.5
- #define MANAGE_INTREVAL 2
- class ThreadPool;
- typedef void (*dispatch_fn)(void*);
- //线程函数参数
- typedef struct tagThread
- {
- pthread_t thread_id; //线程ID
- pthread_mutex_t thread_mutex; //信号量
- pthread_cond_t thread_cond; //条件变量
- dispatch_fn do_job; //调用的函数,任务
- void* args; //函数参数
- ThreadPool *parent; //线程池指针
- }_thread;
- //线程池
- class ThreadPool
- {
- public:
- //================================================================================================
- //函数名: ThreadPool
- //函数描述: 构造函数
- //输入: [in] max_threads_in_pool 线程池最大线程数
- //输入: [in] min_threads_in_pool 线程池最小问题数
- //输出: 无
- //返回: 无
- //================================================================================================
- ThreadPool(unsigned int max_threads_in_pool, unsigned int min_threads_in_pool = 2);
- ~ThreadPool();
- //================================================================================================
- //函数名: dispatch_threadpool
- //函数描述: 将任务加入线程池,由线程池进行分发
- //输入: [in] dispatch_me 调用的函数地址
- //输入: [in] dispatch_me 函数参数
- //输出: 无
- //返回: 无
- //================================================================================================
- void dispatch_threadpool(dispatch_fn dispatch_me, void* dispatch_me);
- private:
- pthread_mutex_t tp_mutex; //信号量
- pthread_cond_t tp_idle; //线程池中线程有空闲线程的条件变量
- pthread_cond_t tp_full; //线程池中线程为满的条件变量
- pthread_cond_t tp_empty; //线程池中线程为空的条件变量
- int tp_min; //线程池的最小线程数
- int tp_max; //线程池的最大线程数
- int tp_avail; //线程池中空闲的线程数
- int tp_total; //线程池中已创建的线程数
- _thread** tp_list; //指向线程池中所有空闲线程的参数的指针
- bool tp_stop; //线程池是否已停止
- //================================================================================================
- //函数名: add_avail
- //函数描述: 加入空闲线程
- //输入: [in] avail 线程的参数
- //输出: 无
- //返回: 成功:true,失败:false
- //================================================================================================
- bool add_avail(_thread* avail);
- //================================================================================================
- //函数名: work_thread
- //函数描述: 线程函数
- //输入: [in] args 参数
- //输出: 无
- //返回: 无
- //================================================================================================
- static void* work_thread(void* args);
- //================================================================================================
- //函数名: add_thread
- //函数描述: 添加一个线程
- //输入: [in] dispatch_me 函数指针
- //输入: [in] args 函数参数
- //输出: 无
- //返回: 无
- //================================================================================================
- bool add_thread(dispatch_fn dispatch_me, void* args);
- //================================================================================================
- //函数名: syn_all
- //函数描述: 等待线程池中所有线程空闲
- //输入: 无
- //输出: 无
- //返回: 无
- //================================================================================================
- void syn_all();
- };
- ThreadPool::ThreadPool(unsigned int max_threads_in_pool, unsigned int min_threads_in_pool)
- {
- pthread_t manage_id;
- if (min_threads_in_pool <= 0 || max_threads_in_pool < 0 || min_threads_in_pool > max_threads_in_pool || max_threads_in_pool > MAXT_IN_POOL)
- {
- return ;
- }
- tp_avail = 0; //初始化线程池
- tp_total = 0;
- tp_min = min_threads_in_pool;
- tp_max = max_threads_in_pool;
- tp_stop = false;
- tp_list = (_thread * *) malloc(sizeof(void *) * max_threads_in_pool);
- if (NULL == tp_list)
- {
- return;
- }
- memset(tp_list, 0, sizeof(void *) * max_threads_in_pool);
- pthread_mutex_init(&tp_mutex, NULL);
- pthread_cond_init(&tp_idle, NULL);
- pthread_cond_init(&tp_full, NULL);
- pthread_cond_init(&tp_empty, NULL);
- }
- bool ThreadPool::add_avail(_thread* avail)
- {
- bool ret = false;
- pthread_mutex_lock(&tp_mutex);
- if (tp_avail < tp_max)
- {
- tp_list[tp_avail] = avail;
- tp_avail++;
- pthread_cond_signal(&tp_idle); //线程池中有线程为空闲
- if (tp_avail >= tp_total)
- {
- pthread_cond_signal(&tp_full); //线程池中所有线程都为为空闲
- }
- ret = true;
- }
- pthread_mutex_unlock(&tp_mutex);
- return ret;
- }
- void* ThreadPool::work_thread(void* args)
- {
- _thread* thread = (_thread*) args;
- ThreadPool *pool = thread->parent;
- while (pool->tp_stop == false)
- {
- thread->do_job(thread->args);
- pthread_mutex_lock(&thread->thread_mutex); //执行完任务之后,添加到空闲线程队列中
- if (pool->add_avail(thread))
- {
- pthread_cond_wait(&thread->thread_cond, &thread->thread_mutex);
- pthread_mutex_unlock(&thread->thread_mutex);
- }
- else
- {
- pthread_mutex_unlock(&thread->thread_mutex);
- pthread_mutex_destroy(&thread->thread_mutex);
- pthread_cond_destroy(&thread->thread_cond);
- free(thread);
- break;
- }
- }
- pthread_mutex_lock(&pool->tp_mutex);
- pool->tp_total--;
- if (pool->tp_total <= 0)
- {
- pthread_cond_signal(&pool->tp_empty);
- }
- pthread_mutex_unlock(&pool->tp_mutex);
- return NULL;
- }
- bool ThreadPool::add_thread(dispatch_fn dispatch_me, void* args) //添加一个线程
- {
- _thread* thread = NULL;
- pthread_attr_t attr;
- thread = (_thread *) malloc(sizeof(_thread));
- if (NULL == thread)
- {
- return false;
- }
- pthread_mutex_init(&thread->thread_mutex, NULL);
- pthread_cond_init(&thread->thread_cond, NULL);
- thread->do_job = dispatch_me;
- thread->args = args;
- thread->parent = this;
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (pthread_create(&thread->thread_id, &attr, work_thread, (void *) thread) != 0)
- {
- pthread_mutex_destroy(&thread->thread_mutex);
- pthread_cond_destroy(&thread->thread_cond);
- pthread_attr_destroy(&attr);
- free(thread);
- return false;
- }
- tp_total++;
- return true;
- }
- void ThreadPool::dispatch_threadpool(dispatch_fn dispatch_me, void* args)
- {
- _thread* thread = NULL;
- pthread_mutex_lock(&tp_mutex);
- if (tp_avail <= 0 && tp_total >= tp_max) //无可用线程,而且线程数已达最大值,等待空闲线程
- {
- pthread_cond_wait(&tp_idle, &tp_mutex);
- }
- if (tp_avail <= 0) //无可用线程,而且线程数未达最大值,添加线程
- {
- if (!add_thread(dispatch_me, args))
- {
- return;
- }
- }
- else //有可用线程
- {
- tp_avail--;
- thread = tp_list[tp_avail];
- tp_list[tp_avail] = NULL;
- thread->do_job = dispatch_me;
- thread->args = args;
- pthread_mutex_lock(&thread->thread_mutex);
- pthread_cond_signal(&thread->thread_cond);
- pthread_mutex_unlock(&thread->thread_mutex);
- }
- pthread_mutex_unlock(&tp_mutex);
- }
- void ThreadPool::syn_all()
- {
- if (tp_avail < tp_total) //等待线程池中所有线程都为空闲状态
- {
- pthread_cond_wait(&tp_full, &tp_mutex);
- }
- tp_stop = true;
- int i = 0;
- for (i = 0; i < tp_avail; i++) //唤醒线程池中所有线程
- {
- _thread *thread = tp_list[i];
- pthread_mutex_lock(&thread->thread_mutex);
- pthread_cond_signal(&thread->thread_cond);
- pthread_mutex_unlock(&thread->thread_mutex);
- }
- if (tp_total > 0)
- {
- pthread_cond_wait(&tp_empty, &tp_mutex); //等待线程池中所有线程都结束
- }
- }
- ThreadPool::~ThreadPool()
- {
- sleep(MANAGE_INTREVAL);
- pthread_mutex_lock(&tp_mutex);
- syn_all(); //等待线程池为空
- int i = 0;
- for (i = 0; i < tp_total; i++) //资源释放
- {
- free(tp_list[i]);
- tp_list[i] = NULL;
- }
- pthread_mutex_unlock(&tp_mutex);
- pthread_mutex_destroy(&tp_mutex);
- pthread_cond_destroy(&tp_idle);
- pthread_cond_destroy(&tp_full);
- pthread_cond_destroy(&tp_empty);
- free(tp_list);
- }