线程池实现

基于初级的线程池优化,这里最主要的问题是解决线程id的管理问题,以及线程取消的管理
这里采用链表来管理线程id,链表的特性便于新增和删除,引进thread_revoke结构体来标记全局的取消线程信息,先分析一下线程什么时候需要取消:当任务很少,少到tasknum <threadnum* THREAD_WORKER_LOW_RATIO 时,也就是,任务的增加速度远远赶不上线程的处理速度时,会导致一部分线程处于饥饿状态(阻塞)。那么我们需要取消的就是处于饥饿状态的线程。对于poisx多线程编程中线程的取消,一般采用pthread_cancel()而且此函数是无阻塞返回的(即不等线程是否真的取消)而且,这里有一个很大很大的疑惑,在线程取消的时候,线程必须等到在取消执行点处取消,这里我们的取消执行点是pthread_cond_wait()这线程处理阻塞状态,互斥锁在线程插入阻塞等待队列时已经释放掉。但是,线程本身处于阻塞状态下(放弃了时间片)会执行取消动作吗?我试验了一下,没有……
这里维护一个取消队列,在线程取消时,置全局取消标志位为1,pthread_broadcast()唤醒所有线程,让在线程唤醒时会判断是否进入取消状态,如果是直接主动退出。当然这里有一个取消计数。
在我写的线程池中,实现了对线程的自动管理,根据任务的多少,自动增加线程或者删除线程,来保持资源的利用率。
my_thread_pool.h
#ifndef _MY_THREADPOOL_HEAD_
#define _MY_THREADPOOL_HEAD_

#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>
#include<assert.h>
#include<sys/types.h>

#define THREAD_MINNUM 2
#define THREAD_MAXNUM 20
#define THREAD_DEFAULT 2

#define MANAGE_ADJUST_INTERVAL  5
#define WORKER_THREAD_HIGH_RATIO 3
#define WORKER_THREAD_LOW_RATIO 1

#define BUFFSIZE 1024

//task in pthread pool
typedef struct pthread_work           //任务信息
{
	void* (*task)(void *);
	void* args;                       //任务所带参数(就是pthread_work_info)
	struct pthread_work* next;
}pthread_work;

typedef struct pthread_work_info    //用于表示执行任务的信息
{
	pthread_t pid;                 //哪个线程正在执行该任务
	int number;                    //用来表示这是第几个任务
}pthread_work_info;

typedef struct pthread_info         //线程信息(用于线程池中存储阻塞线程)
{
	pthread_t pid;
	struct pthread_info* next;
}pthread_info;

typedef struct thread_pool
{
	int thread_currnum;    //线程池中线程的个数
	int thread_blocknum;   //线程池中阻塞的线程个数
	int thread_work;      // 线程池中阻塞的任务个数
	pthread_mutex_t pool_lock;
	pthread_cond_t pool_cond;

	pthread_work* work_head;     //阻塞任务的起始地址
	pthread_info* pthread_head;  //阻塞线程的头节点
	int shutdown;
	pthread_t manager_thread;    //监控线程
}thread_pool;

typedef struct pthread_invoke
{
	int flag;                   //是否要删除线程
	int invoked;               //已删除线程个数
	int need_invoke;           //需要删除线程个数
}pthread_invoke;

pthread_invoke *invoke=NULL;
thread_pool *pool=NULL;

void init_pool();
void init_invoke();
void destroy_pool();
int add_work_pool(pthread_work* );
int add_pthread_pool();
int delete_thread_pool();
void* pthread_run(void *);
void* manager_pthreadpool(void *);
void* my_process(void *args);

#endif


my_thread_pool.c实现
#include"my_thread_pool.h"
#include<fcntl.h>
#include<string.h>

void init_pool()
{
    //初始化线程池,创建默认个数的阻塞线程
	pool=(thread_pool *)malloc(sizeof(thread_pool));
	pthread_mutex_init(&pool->pool_lock,NULL);
	pthread_cond_init(&pool->pool_cond,NULL);
	pool->shutdown=0;
	pool->work_head=NULL;
	pool->thread_currnum=THREAD_DEFAULT;
	pool->thread_blocknum=THREAD_DEFAULT;
	pool->thread_work=0;
	int i=0;
	pthread_info *p,*q;
	for(i=0;i<THREAD_DEFAULT;i++)
	{
		if(pool->pthread_head==NULL)
		{
			pool->pthread_head=(pthread_info*)malloc(sizeof(pthread_info));
			pool->pthread_head->next=NULL;
			p=pool->pthread_head;
		}
		else if(i==THREAD_DEFAULT-1)
		{
			q=(pthread_info*)malloc(sizeof(pthread_info));
			q->next=NULL;
			p->next=q;
		}
		else
		{
			q=(pthread_info*)malloc(sizeof(pthread_info));
			q->next=NULL;
			p->next=q;
			p=q;
		}
	}
	p=pool->pthread_head;

	pthread_attr_t attr;
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
	for(i=0;i<THREAD_DEFAULT;i++)
	{
		pthread_create(&p->pid,NULL,pthread_run,(void*)0);
		p=p->next;
	}
	//manager thread number
	pthread_create(&pool->manager_thread,NULL,manager_pthreadpool,NULL);
}

void* pthread_run(void *args)
{
	int flag=0;
	//创建线程有2种方式,刚开始创建的都是阻塞线程。当线程数量不够时,再增加的线程是都不是阻塞线程,可直接执行的
	int flag_block=(int)args;   //flag_block=0 represent it is blocked
	pthread_t pthread_id=pthread_self();
	printf("thread %x start \n",pthread_id);
	while(1)
	{
		pthread_mutex_lock(&pool->pool_lock);
		//printf("thread %x start \n",pthread_id);
		while(flag_block==0&&pool->shutdown==0&&pool->work_head==NULL)
		{
			if(flag)
				printf("thread %x will waiting\n",pthread_id);
			pthread_cond_wait(&pool->pool_cond,&pool->pool_lock);
		}
		if(pool->shutdown==1)
		{
			printf("thread %x will exit\n",pthread_id);
			pthread_mutex_unlock(&pool->pool_lock);
			pthread_exit(NULL);
		}
		pthread_info* currnode=pool->pthread_head;
		pthread_info* prexnode=pool->pthread_head;
		// remove the thread from the block queue
		while(currnode!=NULL)
		{
			if(currnode->pid==pthread_id)
				break;
			prexnode=currnode;
			currnode=currnode->next;
		}
		if(currnode!=NULL)
		{
			if(currnode==pool->pthread_head)
			{
				pool->pthread_head=pool->pthread_head->next;
				free(prexnode);
			}
			else if(currnode->next==NULL)
			{
				prexnode->next=NULL;
				free(currnode);
			}
			else
			{
				prexnode->next=currnode->next;
				free(currnode);
			}
		}
		if(invoke->flag==1&&invoke->invoked<invoke->need_invoke)
		{
			printf("the pthread %x is useless ,will exit\n",pthread_self());
			//delete ptthread_id from queue
			invoke->invoked++;
			pool->thread_currnum--;
			pool->thread_blocknum--;
			pthread_mutex_unlock(&pool->pool_lock);
			pthread_exit(NULL);
		}
		assert(pool->work_head!=NULL);
		pthread_work* work=pool->work_head;
		pool->work_head=pool->work_head->next;
		((pthread_work_info*)work->args)->pid=pthread_id;
		pool->thread_work--;
		if(flag_block==0)
			pool->thread_blocknum--;
		pthread_mutex_unlock(&pool->pool_lock);
		(*work->task)(work->args);
		flag=1;
		pthread_mutex_lock(&pool->pool_lock);
		//if there is no work ,then the thread will block ,put it into blockqueue
		if(pool->work_head==NULL)
		{
			flag_block=0;
			pthread_info* p=(pthread_info *)malloc(sizeof(pthread_info));
			p->next=pool->pthread_head;
			p->pid=pthread_self();
			pool->pthread_head=p;
			//input into blocknum ,then should add thread_blocknum
			pool->thread_blocknum++;
		}
		else  //执行任务后,还有任务,故不阻塞线程
			flag_block=1;
		pthread_mutex_unlock(&pool->pool_lock);
	}
	pthread_exit(NULL);
}

void destroy_pool()  //释放资源
{
	if(pool==NULL)
		return ;
	pthread_mutex_lock(&pool->pool_lock);
	pool->shutdown=1;
	pthread_mutex_unlock(&pool->pool_lock);
	pthread_cond_broadcast(&pool->pool_cond);
	//wait all pthread free rescource
	sleep(5);

	//free rescource
	pthread_cond_destroy(&pool->pool_cond);
	pthread_mutex_destroy(&pool->pool_lock);
	pthread_work* p=pool->work_head;
	while(p!=NULL)
	{
		pool->work_head=pool->work_head->next;
		free(p);
		p=pool->work_head;
	}
	pthread_info* info=pool->pthread_head;
	while(info!=NULL)
	{
		pool->pthread_head=pool->pthread_head->next;
		free(info);
		info=pool->pthread_head;
	}
	free(pool);
	pool=NULL;
	return ;
}

void *my_process(void* args)
{
	pthread_work_info *work_info =(pthread_work_info *)args;
	printf("%x thread is working task %d\n",work_info->pid,work_info->number);
	sleep(1);
	return ;
}

int add_work_pool(pthread_work* work)
{
	pthread_mutex_lock(&pool->pool_lock);
	pthread_work* p=pool->work_head;
	if(p==NULL)
		pool->work_head=work;
	else
	{
		while(p->next!=NULL)
			p=p->next;
		p->next=work;
	}
	pool->thread_work++;
	pthread_mutex_unlock(&pool->pool_lock);
	pthread_cond_signal(&pool->pool_cond);
	return 1;

}

int main()
{
	init_pool();
	init_invoke();
	int i;
	pthread_work * p;

	for(i=0;i<40;i++)
	{
		if(i>=20)
			sleep(1);
		p=(pthread_work *)malloc(sizeof(pthread_work));
		p->task=my_process;
		p->next=NULL;
		p->args=malloc(sizeof(pthread_work_info));
		((pthread_work_info *)p->args)->number=i;
		add_work_pool(p);
		//sleep(1);
	}
	sleep(15);  //等待所有任务都执行完,然后在删除线程池
	destroy_pool();
	return 0;
}

//use for manager pthreadpool every 5 seconds
void* manager_pthreadpool(void *args)
{
	char buff[BUFFSIZE];
	while(1)
	{
		sleep(5);
		pthread_mutex_lock(&pool->pool_lock);
		if(pool->shutdown==1)
			pthread_exit(NULL);
		printf("the work num is %d,the thread num: %d , the ratio is %lf,the block is %d\n",pool->thread_work,pool->thread_currnum,(double)pool->thread_work/pool->thread_currnum,pool->thread_blocknum);
		//display all the block pid;
		pthread_info* p=pool->pthread_head;
		while(p!=NULL)
		{
			printf("%x  thread is blocked\n",p->pid);
			p=p->next;
		}

		if(pool->thread_work/pool->thread_currnum>WORKER_THREAD_HIGH_RATIO)
		{
			//you must add thread number to adjust to the work(beacuse work is too many)
			int add_number=(pool->thread_work-pool->thread_currnum*WORKER_THREAD_HIGH_RATIO)/WORKER_THREAD_HIGH_RATIO;
			int i,err;
			pthread_attr_t attr;
			err=pthread_attr_init(&attr);
			if(err!=0)
				printf("attr error\n");
			err=pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
			for(i=1;i<=add_number;i++)
			{
                //这些注释掉的语句是,因为再次创建的线程都不会是阻塞线程,所有不用放入队列
				//pthread_info *p=(pthread_info *)malloc(sizeof(pthread_info));
				//p->next=pool->pthread_head;
				//p->pid=(pthread_t)0;
				pthread_t pid;
				pthread_create(&pid,&attr,pthread_run,(void *)1);
				//pool->pthread_head=p;
				pool->thread_currnum++;
				printf("thread %x add into the pool\n",pid);
			}
			pthread_mutex_unlock(&pool->pool_lock);
			//strcpy(buff,"you must add thread\n");
			//int size=strlen(buff);
			//write_data("a.txt",buff,size);

		}
		else if(pool->thread_work/pool->thread_currnum<WORKER_THREAD_LOW_RATIO)
		{
			//you must decrease thread number to adjust to the work
			if(pool->thread_blocknum!=0&&pool->thread_currnum>THREAD_MINNUM)
			{
				invoke->flag=1;
				invoke->invoked=0;
				invoke->need_invoke=(pool->thread_currnum-pool->thread_blocknum>=THREAD_MINNUM)?pool->thread_blocknum:(pool->thread_currnum-THREAD_MINNUM);
				pthread_mutex_unlock(&pool->pool_lock);
				pthread_cond_broadcast(&pool->pool_cond);
			}
			pthread_mutex_unlock(&pool->pool_lock);

		}
		else
			pthread_mutex_unlock(&pool->pool_lock);

	}
}
//初始化删除线程标志
void init_invoke()
{
	invoke=(pthread_invoke*)malloc(sizeof(pthread_invoke));
	invoke->flag=0;
	invoke->invoked=0;
	invoke->need_invoke=0;
}


线程池实现,布布扣,bubuko.com

线程池实现

上一篇:表达式求值 - Java实现


下一篇:浅析python中的类变量和对象变量