以前写过一篇关于如何使用多线程推升推送速度(http://www.cnblogs.com/bai-jimmy/p/5177433.html),能够到达5000qps,其实已经可以满足现在的业务,不过在看nginx的说明文档时,又提到nginx支持线程池来提升响应速度, 一直对如何实现线程池很感兴趣,利用周末的时间参考别人的代码,自己写了一个初级版,并且调通了,还没在实际开发中应用,不知道效果如何
代码如下:
pd_log.h
#ifndef __pd_log_
#define __pd_log_ #define LOG_DEBUG_PATH "debug.log"
#define LOG_ERROR_PATH "error.log" /**
* define log level
*/
enum log_level {
DEBUG = ,
ERROR =
}; #define error(...) \
logger(ERROR, __LINE__, __VA_ARGS__) #define debug(...) \
logger(DEBUG, __LINE__, __VA_ARGS__) #define assert(expr, rc) \
if(!(expr)){ \
error(#expr"is null or 0"); \
return rc; \
}
#endif
pd_log.c
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <time.h> #include "pd_log.h" /**
* get now timestr
*/
static void get_time(char *time_str, size_t len) {
time_t tt;
struct tm local_time;
time(&tt);
localtime_r(&tt, &local_time);
strftime(time_str, len, "%m-%d %H:%M:%S", &local_time);
} /**
* log
*/
static void logger(int flag, int line, const char *fmt, ...) {
FILE *fp = NULL;
char time_str[ + ];
va_list args;
get_time(time_str, sizeof(time_str)); switch (flag) {
case DEBUG:
fp = fopen(LOG_DEBUG_PATH, "a");
if (!fp) {
return;
}
fprintf(fp, "%s DEBUG (%d:%d) ", time_str, getpid(), line);
break;
case ERROR:
fp = fopen(LOG_ERROR_PATH, "a");
if (!fp) {
return;
}
fprintf(fp, "%s ERROR (%d:%d) ", time_str, getpid(), line);
break;
default:
return;
} va_start(args, fmt);
vfprintf(fp, fmt, args);
va_end(args);
fprintf(fp, "\n"); fclose(fp);
return;
}
pd_pool.h
/**
* 线程池头文件
* @author jimmy
* @date 2016-5-14
*/
#ifndef __PD_POOL_
#define __PD_POOL_ /*任务链表*/
typedef struct task_s{
void (*routine)(void *);
void *argv;
struct task_s *next;
} pd_task_t; /*任务队列*/
typedef struct queue_s{
pd_task_t *head;
pd_task_t **tail;
size_t max_task_num;
size_t cur_task_num;
}pd_queue_t; /*线程池*/
typedef struct pool_s{
pthread_mutex_t mutex;
pthread_cond_t cond;
pd_queue_t queue;
size_t thread_num;
//size_t thread_stack_size;
}pd_pool_t; /*初始化线程池*/
//pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_stack_size, size_t thread_max_num);
#endif
pd_poo.c
/**
* 线程池
* @author jimmy
* @date 2016-5-14
*/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h> #include <pthread.h> #include "pd_log.h"
#include "pd_log.c"
#include "pd_pool.h" /*tsd*/
pthread_key_t key; void *pd_worker_dispatch(void *argv){
ushort exit_flag = ;
pd_task_t *a_task;
pd_pool_t *a_pool = (pd_pool_t *)argv;
if(pthread_setspecific(key, (void *)&exit_flag) != ){
return NULL;
}
/*动态从任务列表中获取任务执行*/
while(!exit_flag){
pthread_mutex_lock(&a_pool->mutex);
/*如果此时任务链表为空,则需要等待条件变量为真*/
while(a_pool->queue.head == NULL){
pthread_cond_wait(&a_pool->cond, &a_pool->mutex);
}
/*从任务链表中任务开支执行*/
a_task = a_pool->queue.head;
a_pool->queue.head = a_task->next;
a_pool->queue.cur_task_num--;
if(a_pool->queue.head == NULL){
a_pool->queue.tail = &a_pool->queue.head;
}
/*解锁*/
pthread_mutex_unlock(&a_pool->mutex);
/*执行任务*/
a_task->routine(a_task->argv);
//core
free(a_task);
a_task = NULL;
}
pthread_exit();
} /**
* 根据线程数创建所有的线程
*/
static int pd_pool_create(pd_pool_t *a_pool){
int i;
pthread_t tid;
for(i = ; i < a_pool->thread_num; i++){
pthread_create(&tid, NULL, pd_worker_dispatch, a_pool);
}
return ;
} /**
* 线程退出函数
*/
void pd_pool_exit_cb(void *argv){
unsigned int *lock = argv;
ushort *exit_flag_ptr = pthread_getspecific(key);
*exit_flag_ptr = ;
pthread_setspecific(key, (void *)exit_flag_ptr);
*lock = ;
} /**
* 线程池初始化
*/
pd_pool_t *pd_pool_init(size_t thread_num, size_t thread_max_num){
pd_pool_t *a_pool = NULL;
a_pool = calloc(, sizeof(pd_pool_t));
if(!a_pool){
error("pool_init calloc fail: %s", strerror(errno));
return NULL;
}
a_pool->thread_num = thread_num;
//初始化队列参数
a_pool->queue.max_task_num = thread_max_num;
a_pool->queue.cur_task_num = ;
a_pool->queue.head = NULL;
a_pool->queue.tail = &a_pool->queue.head;
//初始化tsd
if(pthread_key_create(&key, NULL) != ){
error("pthread_key_create fail: %s", strerror(errno));
goto err;
}
//初始化互斥锁
if(pthread_mutex_init(&a_pool->mutex, NULL) != ){
error("pthread_mutex_init fail: %s", strerror(errno));
pthread_key_delete(key);
goto err;
}
//初始化条件变量
if(pthread_cond_init(&a_pool->cond, NULL) != ){
error("pthread_cond_init fail: %s", strerror(errno));
pthread_mutex_destroy(&a_pool->mutex);
goto err;
}
//创建线程池
if(pd_pool_create(a_pool) != ){
error("pd_pool_create fail: %s", strerror(errno));
pthread_mutex_unlock(&a_pool->mutex);
pthread_cond_destroy(&a_pool->cond);
goto err;
}
return a_pool;
err:
free(a_pool);
return NULL;
} /**
* 向线程池中添加任务..
*/
int pd_pool_add_task(pd_pool_t *a_pool, void (*routine)(void *), void *argv){
pd_task_t *a_task = NULL;
a_task = (pd_task_t *)calloc(, sizeof(pd_task_t));
if(!a_task){
error("add task calloc faile: %s", strerror(errno));
return -;
}
a_task->routine = routine;
a_task->argv = argv;
a_task->next = NULL;
/*加锁*/
pthread_mutex_lock(&a_pool->mutex);
if(a_pool->queue.cur_task_num >= a_pool->queue.max_task_num){
error("cur_task_num >= max_task_num");
goto err;
}
/*将任务放到末尾*/
*(a_pool->queue.tail) = a_task;
a_pool->queue.tail = &a_task->next;
a_pool->queue.cur_task_num++;
/*通知堵塞的线程*/
pthread_cond_signal(&a_pool->cond);
/*解锁*/
pthread_mutex_unlock(&a_pool->mutex);
return ;
err:
pthread_mutex_unlock(&a_pool->mutex);
free(a_task);
return -;
} void pd_pool_destroy(pd_pool_t *a_pool){
unsigned int n;
unsigned int lock; for(n = ; n < a_pool->thread_num; n++){
lock = ;
if(pd_pool_add_task(a_pool, pd_pool_exit_cb, &lock) != ){
error("pd_pool_destroy fail: add_task fail");
return;
}
while(lock){
usleep();
}
}
pthread_mutex_destroy(&a_pool->mutex);
pthread_cond_destroy(&a_pool->cond);
pthread_key_delete(key);
free(a_pool);
}
/******************************************************************************************/ void testfun(void *argv){
printf("testfun\n");
sleep();
} int main(){
pd_pool_t *a_pool = pd_pool_init(, ); pd_pool_add_task(a_pool, testfun, NULL);
pd_pool_add_task(a_pool, testfun, NULL);
pd_pool_add_task(a_pool, testfun, NULL); pd_pool_destroy(a_pool);
}