基于初级的线程池优化,这里最主要的问题是解决线程id的管理问题,以及线程取消的管理
这里采用链表来管理线程id,链表的特性便于新增和删除,引进thread_revoke结构体来标记全局的取消线程信息,先分析一下线程什么时候需要取消:当任务很少,少到tasknum <threadnum* THREAD_WORKER_LOW_RATIO 时,也就是,任务的增加速度远远赶不上线程的处理速度时,会导致一部分线程处于饥饿状态(阻塞)。那么我们需要取消的就是处于饥饿状态的线程。对于poisx多线程编程中线程的取消,一般采用pthread_cancel()而且此函数是无阻塞返回的(即不等线程是否真的取消)而且,这里有一个很大很大的疑惑,在线程取消的时候,线程必须等到在取消执行点处取消,这里我们的取消执行点是pthread_cond_wait()这线程处理阻塞状态,互斥锁在线程插入阻塞等待队列时已经释放掉。但是,线程本身处于阻塞状态下(放弃了时间片)会执行取消动作吗?我试验了一下,没有……
这里维护一个取消队列,在线程取消时,置全局取消标志位为1,pthread_broadcast()唤醒所有线程,让在线程唤醒时会判断是否进入取消状态,如果是直接主动退出。当然这里有一个取消计数。
在我写的线程池中,实现了对线程的自动管理,根据任务的多少,自动增加线程或者删除线程,来保持资源的利用率。
my_thread_pool.h
#ifndef _MY_THREADPOOL_HEAD_ #define _MY_THREADPOOL_HEAD_ #include<stdio.h> #include<pthread.h> #include<unistd.h> #include<stdlib.h> #include<assert.h> #include<sys/types.h> #define THREAD_MINNUM 2 #define THREAD_MAXNUM 20 #define THREAD_DEFAULT 2 #define MANAGE_ADJUST_INTERVAL 5 #define WORKER_THREAD_HIGH_RATIO 3 #define WORKER_THREAD_LOW_RATIO 1 #define BUFFSIZE 1024 //task in pthread pool typedef struct pthread_work //任务信息 { void* (*task)(void *); void* args; //任务所带参数(就是pthread_work_info) struct pthread_work* next; }pthread_work; typedef struct pthread_work_info //用于表示执行任务的信息 { pthread_t pid; //哪个线程正在执行该任务 int number; //用来表示这是第几个任务 }pthread_work_info; typedef struct pthread_info //线程信息(用于线程池中存储阻塞线程) { pthread_t pid; struct pthread_info* next; }pthread_info; typedef struct thread_pool { int thread_currnum; //线程池中线程的个数 int thread_blocknum; //线程池中阻塞的线程个数 int thread_work; // 线程池中阻塞的任务个数 pthread_mutex_t pool_lock; pthread_cond_t pool_cond; pthread_work* work_head; //阻塞任务的起始地址 pthread_info* pthread_head; //阻塞线程的头节点 int shutdown; pthread_t manager_thread; //监控线程 }thread_pool; typedef struct pthread_invoke { int flag; //是否要删除线程 int invoked; //已删除线程个数 int need_invoke; //需要删除线程个数 }pthread_invoke; pthread_invoke *invoke=NULL; thread_pool *pool=NULL; void init_pool(); void init_invoke(); void destroy_pool(); int add_work_pool(pthread_work* ); int add_pthread_pool(); int delete_thread_pool(); void* pthread_run(void *); void* manager_pthreadpool(void *); void* my_process(void *args); #endif
my_thread_pool.c实现
#include"my_thread_pool.h" #include<fcntl.h> #include<string.h> void init_pool() { //初始化线程池,创建默认个数的阻塞线程 pool=(thread_pool *)malloc(sizeof(thread_pool)); pthread_mutex_init(&pool->pool_lock,NULL); pthread_cond_init(&pool->pool_cond,NULL); pool->shutdown=0; pool->work_head=NULL; pool->thread_currnum=THREAD_DEFAULT; pool->thread_blocknum=THREAD_DEFAULT; pool->thread_work=0; int i=0; pthread_info *p,*q; for(i=0;i<THREAD_DEFAULT;i++) { if(pool->pthread_head==NULL) { pool->pthread_head=(pthread_info*)malloc(sizeof(pthread_info)); pool->pthread_head->next=NULL; p=pool->pthread_head; } else if(i==THREAD_DEFAULT-1) { q=(pthread_info*)malloc(sizeof(pthread_info)); q->next=NULL; p->next=q; } else { q=(pthread_info*)malloc(sizeof(pthread_info)); q->next=NULL; p->next=q; p=q; } } p=pool->pthread_head; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); for(i=0;i<THREAD_DEFAULT;i++) { pthread_create(&p->pid,NULL,pthread_run,(void*)0); p=p->next; } //manager thread number pthread_create(&pool->manager_thread,NULL,manager_pthreadpool,NULL); } void* pthread_run(void *args) { int flag=0; //创建线程有2种方式,刚开始创建的都是阻塞线程。当线程数量不够时,再增加的线程是都不是阻塞线程,可直接执行的 int flag_block=(int)args; //flag_block=0 represent it is blocked pthread_t pthread_id=pthread_self(); printf("thread %x start \n",pthread_id); while(1) { pthread_mutex_lock(&pool->pool_lock); //printf("thread %x start \n",pthread_id); while(flag_block==0&&pool->shutdown==0&&pool->work_head==NULL) { if(flag) printf("thread %x will waiting\n",pthread_id); pthread_cond_wait(&pool->pool_cond,&pool->pool_lock); } if(pool->shutdown==1) { printf("thread %x will exit\n",pthread_id); pthread_mutex_unlock(&pool->pool_lock); pthread_exit(NULL); } pthread_info* currnode=pool->pthread_head; pthread_info* prexnode=pool->pthread_head; // remove the thread from the block queue while(currnode!=NULL) { if(currnode->pid==pthread_id) break; prexnode=currnode; currnode=currnode->next; } if(currnode!=NULL) { if(currnode==pool->pthread_head) { pool->pthread_head=pool->pthread_head->next; free(prexnode); } else if(currnode->next==NULL) { prexnode->next=NULL; free(currnode); } else { prexnode->next=currnode->next; free(currnode); } } if(invoke->flag==1&&invoke->invoked<invoke->need_invoke) { printf("the pthread %x is useless ,will exit\n",pthread_self()); //delete ptthread_id from queue invoke->invoked++; pool->thread_currnum--; pool->thread_blocknum--; pthread_mutex_unlock(&pool->pool_lock); pthread_exit(NULL); } assert(pool->work_head!=NULL); pthread_work* work=pool->work_head; pool->work_head=pool->work_head->next; ((pthread_work_info*)work->args)->pid=pthread_id; pool->thread_work--; if(flag_block==0) pool->thread_blocknum--; pthread_mutex_unlock(&pool->pool_lock); (*work->task)(work->args); flag=1; pthread_mutex_lock(&pool->pool_lock); //if there is no work ,then the thread will block ,put it into blockqueue if(pool->work_head==NULL) { flag_block=0; pthread_info* p=(pthread_info *)malloc(sizeof(pthread_info)); p->next=pool->pthread_head; p->pid=pthread_self(); pool->pthread_head=p; //input into blocknum ,then should add thread_blocknum pool->thread_blocknum++; } else //执行任务后,还有任务,故不阻塞线程 flag_block=1; pthread_mutex_unlock(&pool->pool_lock); } pthread_exit(NULL); } void destroy_pool() //释放资源 { if(pool==NULL) return ; pthread_mutex_lock(&pool->pool_lock); pool->shutdown=1; pthread_mutex_unlock(&pool->pool_lock); pthread_cond_broadcast(&pool->pool_cond); //wait all pthread free rescource sleep(5); //free rescource pthread_cond_destroy(&pool->pool_cond); pthread_mutex_destroy(&pool->pool_lock); pthread_work* p=pool->work_head; while(p!=NULL) { pool->work_head=pool->work_head->next; free(p); p=pool->work_head; } pthread_info* info=pool->pthread_head; while(info!=NULL) { pool->pthread_head=pool->pthread_head->next; free(info); info=pool->pthread_head; } free(pool); pool=NULL; return ; } void *my_process(void* args) { pthread_work_info *work_info =(pthread_work_info *)args; printf("%x thread is working task %d\n",work_info->pid,work_info->number); sleep(1); return ; } int add_work_pool(pthread_work* work) { pthread_mutex_lock(&pool->pool_lock); pthread_work* p=pool->work_head; if(p==NULL) pool->work_head=work; else { while(p->next!=NULL) p=p->next; p->next=work; } pool->thread_work++; pthread_mutex_unlock(&pool->pool_lock); pthread_cond_signal(&pool->pool_cond); return 1; } int main() { init_pool(); init_invoke(); int i; pthread_work * p; for(i=0;i<40;i++) { if(i>=20) sleep(1); p=(pthread_work *)malloc(sizeof(pthread_work)); p->task=my_process; p->next=NULL; p->args=malloc(sizeof(pthread_work_info)); ((pthread_work_info *)p->args)->number=i; add_work_pool(p); //sleep(1); } sleep(15); //等待所有任务都执行完,然后在删除线程池 destroy_pool(); return 0; } //use for manager pthreadpool every 5 seconds void* manager_pthreadpool(void *args) { char buff[BUFFSIZE]; while(1) { sleep(5); pthread_mutex_lock(&pool->pool_lock); if(pool->shutdown==1) pthread_exit(NULL); printf("the work num is %d,the thread num: %d , the ratio is %lf,the block is %d\n",pool->thread_work,pool->thread_currnum,(double)pool->thread_work/pool->thread_currnum,pool->thread_blocknum); //display all the block pid; pthread_info* p=pool->pthread_head; while(p!=NULL) { printf("%x thread is blocked\n",p->pid); p=p->next; } if(pool->thread_work/pool->thread_currnum>WORKER_THREAD_HIGH_RATIO) { //you must add thread number to adjust to the work(beacuse work is too many) int add_number=(pool->thread_work-pool->thread_currnum*WORKER_THREAD_HIGH_RATIO)/WORKER_THREAD_HIGH_RATIO; int i,err; pthread_attr_t attr; err=pthread_attr_init(&attr); if(err!=0) printf("attr error\n"); err=pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); for(i=1;i<=add_number;i++) { //这些注释掉的语句是,因为再次创建的线程都不会是阻塞线程,所有不用放入队列 //pthread_info *p=(pthread_info *)malloc(sizeof(pthread_info)); //p->next=pool->pthread_head; //p->pid=(pthread_t)0; pthread_t pid; pthread_create(&pid,&attr,pthread_run,(void *)1); //pool->pthread_head=p; pool->thread_currnum++; printf("thread %x add into the pool\n",pid); } pthread_mutex_unlock(&pool->pool_lock); //strcpy(buff,"you must add thread\n"); //int size=strlen(buff); //write_data("a.txt",buff,size); } else if(pool->thread_work/pool->thread_currnum<WORKER_THREAD_LOW_RATIO) { //you must decrease thread number to adjust to the work if(pool->thread_blocknum!=0&&pool->thread_currnum>THREAD_MINNUM) { invoke->flag=1; invoke->invoked=0; invoke->need_invoke=(pool->thread_currnum-pool->thread_blocknum>=THREAD_MINNUM)?pool->thread_blocknum:(pool->thread_currnum-THREAD_MINNUM); pthread_mutex_unlock(&pool->pool_lock); pthread_cond_broadcast(&pool->pool_cond); } pthread_mutex_unlock(&pool->pool_lock); } else pthread_mutex_unlock(&pool->pool_lock); } } //初始化删除线程标志 void init_invoke() { invoke=(pthread_invoke*)malloc(sizeof(pthread_invoke)); invoke->flag=0; invoke->invoked=0; invoke->need_invoke=0; }