自主封装线程池(threadpool)

针对TCP服务器的线程池 ​

基于生产者和消费者模型 ​

生产者:负责连接客户端的主线程,当有客户端连接成功后,把连接上的sockfd存入仓库 ​

消费者:由主线程预先开启出来若干个子线程,如果仓库中有sockfd,通过线程同步由某个子线程获取到sockfd,从而与对应的客户端通信,如果客户端断开连接后,那么重新去仓库获取新的sockfd,如仓库空虚,则休眠等待 ​

仓库:使用队列结构,消费者从队头获取数据,因此队头要加互斥量,生产者从队尾放入数据

顺序队列:

头文件(queue.h)

#ifndef QUEUE_H
#define QUEUE_H


#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>

//顺序队列结构
typedef struct ArrayQueue
{
	void** ptr;//存储元素的内存首地址
	size_t cap;//容量
	size_t cnt;//个数计数器
	int front;//队头下标
	int rear;//队尾下标
}ArrayQueue;

//创建结构
ArrayQueue* creat_array_queue(size_t cap);

//销毁
void destory_array_queue(ArrayQueue* queue);
//队满
bool full_array_queue(ArrayQueue* queue);

//队空
bool empty_array_queue(ArrayQueue *queue);

//入队
bool push_array_queue(ArrayQueue* queue,void* data);

//出队
bool pop_array_queue(ArrayQueue* queue);

//队头
void* front_array_queue(ArrayQueue* queue);


//队尾
void* rear_array_queue(ArrayQueue* queue);


//数量
size_t size_array_queue(ArrayQueue* queue);

#endif//QUEUE_H

源文件(queue.c)

#include "queue.h"


//创建结构
ArrayQueue* creat_array_queue(size_t cap)
{
	ArrayQueue* queue=malloc(sizeof(ArrayQueue));
	queue->ptr=malloc(sizeof(void*)*cap);
	queue->cap=cap;
	queue->cnt=0;
	queue->front=0;
	queue->rear=-1;//rear指向队尾元素
	return queue;
}

//销毁
void destory_array_queue(ArrayQueue* queue)
{
	free(queue->ptr);
	free(queue);
}
//队满
bool full_array_queue(ArrayQueue* queue)
{
	return queue->cnt==queue->cap;
}
//队空
bool empty_array_queue(ArrayQueue *queue)
{
	return queue->cnt==0;
}
//入队
bool push_array_queue(ArrayQueue* queue,void* data)
{
	if(full_array_queue(queue))
		return false;
	//先rear往后移位
	queue->rear=(queue->rear+1)%queue->cap;
	//把数据入队到rear的位置
	queue->ptr[queue->rear]=data;
	queue->cnt++;
	return true;
}
//出队
bool pop_array_queue(ArrayQueue* queue)
{
	if(empty_array_queue(queue))
		return false;
	//往后移动front  就相当于出队员工元素
	queue->front=(queue->front+1)%queue->cap;
	queue->cnt--;
	return true;
}
//队头
void* front_array_queue(ArrayQueue* queue)
{
	return queue->ptr[queue->front];
}

//队尾
void* rear_array_queue(ArrayQueue* queue)
{
	return queue->ptr[queue->rear];
}

//数量
size_t size_array_queue(ArrayQueue* queue)
{
	return queue->cnt;
}
线程池:

头文件(threadpool.h)

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <pthread.h>

#include "queue.h"

//消费者线程真正的业务逻辑函数	由调用者提供
typedef void (*con_WorkFP)(void*);
//生产者线程真正的业务逻辑函数 
typedef void* (*pro_WorkFP)(void);

typedef struct ThreadPool
{
	int con_thread_cnt;		//消费者线程数量
	pthread_t *con_tids;	//消费者线程id
	int pro_thread_cnt;		//生产者线程数量
	pthread_t *pro_tids;	//生产者线程id

	ArrayQueue* store;		//队列仓库
	con_WorkFP con_work;	//消费者业务逻辑函数
	pro_WorkFP pro_work;	//生产者业务逻辑函数
	pthread_mutex_t hlock;	//队头锁
	pthread_mutex_t rlock;	//队尾锁
	pthread_cond_t empty;	//空仓条件变量
	pthread_cond_t full;	//满仓条件变量
}ThreadPool;

//创建线程池并初始化
ThreadPool* creat_threadpool(int con_cnt,int pro_cnt,int store_cal,con_WorkFP con_fp,pro_WorkFP pro_fp);

//启动线程池(开启cnt个子线程)
void start_treadpool(ThreadPool* thread);

//生产数据
void push_threadpool(ThreadPool* thread,void* data);

//消费数据
void* pop_threadpool(ThreadPool* thread);

//销毁线程池
void destory_threadpool(ThreadPool* thread);


#endif//THREADPOOL_H

源文件(threadpool.c)

#include "threadpool.h"

//生产者线程的入口函数
static void* pro_run(void* arg)
{
	ThreadPool* thread=(ThreadPool*) arg;
	for(;;)
	{
		//调用生产者业务逻辑函数 生产数据
		void* data=thread->pro_work();
		
		if(NULL==data)
			continue;
		//放入数据到线程池
		push_threadpool(thread,data);
	}
}

//消费者线程的入口函数
static void* con_run(void* arg)
{
	ThreadPool* thread=(ThreadPool*) arg;
	for(;;)
	{
		//从仓库拿出数据
		void *data=pop_threadpool(thread);
		//调用消费者业务逻辑函数 消费数据
		thread->con_work(data);
	}

}

//创建线程池并初始化
ThreadPool* creat_threadpool(int con_cnt,int pro_cnt,int store_cal,con_WorkFP con_fp,pro_WorkFP pro_fp)
{

	//给线程池申请内存
	ThreadPool* pool=malloc(sizeof(ThreadPool));	
	
	//初始化成员
	pool->con_thread_cnt=con_cnt;
	pool->pro_thread_cnt=pro_cnt;

	pool->con_tids=malloc(sizeof(pthread_t)*con_cnt);
	pool->pro_tids=malloc(sizeof(pthread_t)*pro_cnt);
	pool->con_work=con_fp;
	pool->pro_work=pro_fp;

	//创建仓库
	pool->store=creat_array_queue(store_cal);

	//初始化互斥量,条件变量
	pthread_mutex_init(&pool->hlock,NULL);
	pthread_mutex_init(&pool->rlock,NULL);

	pthread_cond_init(&pool->empty,NULL);
	pthread_cond_init(&pool->full,NULL);

	return pool;
}

//启动线程池(开启cnt个子线程)
void start_treadpool(ThreadPool* thread)
{
	//启动生产者线程
	for(int i=0;i<thread->pro_thread_cnt;i++)
	{
		pthread_create(thread->pro_tids+i,NULL,pro_run,thread);
	}
	
	//启动消费者线程
	for(int i=0;i<thread->con_thread_cnt;i++)
	{
		pthread_create(thread->con_tids+i,NULL,con_run,thread);
	}
}

//生产数据
void push_threadpool(ThreadPool* thread,void* data)
{
	//队尾入队 所以队尾上锁(抢资源)
	pthread_mutex_lock(&thread->rlock);
	
	//如果一直队满,就休眠等待
	while(full_array_queue(thread->store))
	{
		//唤醒消费者线程
		pthread_cond_signal(&thread->empty);
		//睡入满仓条件变量,并解锁队尾
		pthread_cond_wait(&thread->full,&thread->rlock);
	}

	//数据存入仓库中
	push_array_queue(thread->store,data);

	//此时仓库里至少有一个数据
	//唤醒一个消费者线程,可以进行消费了
	pthread_cond_signal(&thread->empty);
	//解锁队尾
	pthread_mutex_unlock(&thread->rlock);
}

//消费数据
void* pop_threadpool(ThreadPool* thread)
{
	//队头入队 所以队头上锁(抢资源)
	pthread_mutex_lock(&thread->hlock);
	
	//如果一直队空,就休眠等待
	while(empty_array_queue(thread->store))
	{
		//唤醒消费者线程
		pthread_cond_signal(&thread->full);
		//睡入满仓条件变量,并解锁队尾
		pthread_cond_wait(&thread->empty,&thread->hlock);
	}

	//数据从仓库中取出
	void *data=front_array_queue(thread->store);
	pop_array_queue(thread->store);

	//此时仓库里至少有一个数据没有了
	//唤醒一个生产者线程,可以进行生产了
	pthread_cond_signal(&thread->full);
	//解锁队头
	pthread_mutex_unlock(&thread->hlock);
	return data;
}

//销毁线程池
void destory_threadpool(ThreadPool* thread)
{
	//杀死生产者,消费者线程
	for(int i=0;i<thread->pro_thread_cnt;i++)
	{
		pthread_cancel(thread->pro_tids[i]);
		pthread_join(thread->pro_tids[i],NULL);
	}
	for(int i=0;i<thread->con_thread_cnt;i++)
	{
		pthread_cancel(thread->con_tids[i]);
		pthread_join(thread->con_tids[i],NULL);
	}

	//释放申请的内存
	free(thread->pro_tids);
	free(thread->con_tids);
	destory_array_queue(thread->store);

	pthread_mutex_destroy(&thread->hlock);
	pthread_mutex_destroy(&thread->rlock);
	pthread_cond_destroy(&thread->full);
	pthread_cond_destroy(&thread->empty);

	free(thread);
}
 TCP客户端代码:
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "threadpool.h"

int sockfd; //服务连接客户端的sockfd

//消费者线程业务逻辑函数
void con_work(void* arg)
{

	int *cli_fd=(int*)arg;

	char buf[BUFSIZ];
	for(;;)
	{
		// 接收数据
		int ret = recv(*cli_fd,buf,BUFSIZ,0);
		if(0 >= ret || !strcmp("quit",buf))
		{
			printf("客户端%d退出\n",*cli_fd);
			close(*cli_fd);
			free(cli_fd);
			return;
		}

		printf("cli_fd:%d recv:%s byte:%d\n",*cli_fd,buf,ret);

		strcat(buf,",return!");

		// 返回数据
		ret = send(*cli_fd,buf,strlen(buf)+1,0);
		if(0 >= ret)
		{
			printf("客户端%d退出\n",*cli_fd);
			close(*cli_fd);
			free(cli_fd);
			return;
		}
	}
}

//生产者线程业务逻辑函数
void* pro_work(void)
{
	int *cli_fd=malloc(sizeof(int));
	*cli_fd=accept(sockfd,NULL,NULL);
	if(*cli_fd<0)
	{
		perror("accept");
		return NULL;
	}
	return cli_fd;
}



int main(int argc,const char* argv[])
{
	// 创建socket对象
	sockfd = socket(AF_INET,SOCK_STREAM,0);
	if(0 > sockfd)
	{
		perror("socket");
		return -1;
	}

	// 准备本机地址
	struct sockaddr_in addr;
	addr.sin_family = AF_INET;
	addr.sin_port = htons(1025);
	addr.sin_addr.s_addr = inet_addr("127.0.0.1");
	socklen_t addrlen = sizeof(addr);

	// 绑定socket对象和地址	
	if(bind(sockfd,(struct sockaddr*)&addr,addrlen))
	{
		perror("bind");
		return -2;
	}

	// 开启监听
	if(listen(sockfd,6))
	{
		perror("listen");
		return -3;
	}


	//创建线程池
	ThreadPool* thread=creat_threadpool(3,1,10,con_work,pro_work);

	//启动线程池(一共6个线程再加当前主线程一共7个)
	start_treadpool(thread);
	for(;;);
	


}

上一篇:《python语言程序设计》2018版第8章19题几何Rectangle2D类(上)--原来我可以直接调用


下一篇:C语言 | 第八章 | 控制结构 循环-2