针对TCP服务器的线程池
基于生产者和消费者模型
生产者:负责连接客户端的主线程,当有客户端连接成功后,把连接上的sockfd存入仓库
消费者:由主线程预先开启出来若干个子线程,如果仓库中有sockfd,通过线程同步由某个子线程获取到sockfd,从而与对应的客户端通信,如果客户端断开连接后,那么重新去仓库获取新的sockfd,如仓库空虚,则休眠等待
仓库:使用队列结构,消费者从队头获取数据,因此队头要加互斥量,生产者从队尾放入数据
顺序队列:
头文件(queue.h)
#ifndef QUEUE_H
#define QUEUE_H
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
//顺序队列结构
typedef struct ArrayQueue
{
void** ptr;//存储元素的内存首地址
size_t cap;//容量
size_t cnt;//个数计数器
int front;//队头下标
int rear;//队尾下标
}ArrayQueue;
//创建结构
ArrayQueue* creat_array_queue(size_t cap);
//销毁
void destory_array_queue(ArrayQueue* queue);
//队满
bool full_array_queue(ArrayQueue* queue);
//队空
bool empty_array_queue(ArrayQueue *queue);
//入队
bool push_array_queue(ArrayQueue* queue,void* data);
//出队
bool pop_array_queue(ArrayQueue* queue);
//队头
void* front_array_queue(ArrayQueue* queue);
//队尾
void* rear_array_queue(ArrayQueue* queue);
//数量
size_t size_array_queue(ArrayQueue* queue);
#endif//QUEUE_H
源文件(queue.c)
#include "queue.h"
//创建结构
ArrayQueue* creat_array_queue(size_t cap)
{
ArrayQueue* queue=malloc(sizeof(ArrayQueue));
queue->ptr=malloc(sizeof(void*)*cap);
queue->cap=cap;
queue->cnt=0;
queue->front=0;
queue->rear=-1;//rear指向队尾元素
return queue;
}
//销毁
void destory_array_queue(ArrayQueue* queue)
{
free(queue->ptr);
free(queue);
}
//队满
bool full_array_queue(ArrayQueue* queue)
{
return queue->cnt==queue->cap;
}
//队空
bool empty_array_queue(ArrayQueue *queue)
{
return queue->cnt==0;
}
//入队
bool push_array_queue(ArrayQueue* queue,void* data)
{
if(full_array_queue(queue))
return false;
//先rear往后移位
queue->rear=(queue->rear+1)%queue->cap;
//把数据入队到rear的位置
queue->ptr[queue->rear]=data;
queue->cnt++;
return true;
}
//出队
bool pop_array_queue(ArrayQueue* queue)
{
if(empty_array_queue(queue))
return false;
//往后移动front 就相当于出队员工元素
queue->front=(queue->front+1)%queue->cap;
queue->cnt--;
return true;
}
//队头
void* front_array_queue(ArrayQueue* queue)
{
return queue->ptr[queue->front];
}
//队尾
void* rear_array_queue(ArrayQueue* queue)
{
return queue->ptr[queue->rear];
}
//数量
size_t size_array_queue(ArrayQueue* queue)
{
return queue->cnt;
}
线程池:
头文件(threadpool.h)
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <pthread.h>
#include "queue.h"
//消费者线程真正的业务逻辑函数 由调用者提供
typedef void (*con_WorkFP)(void*);
//生产者线程真正的业务逻辑函数
typedef void* (*pro_WorkFP)(void);
typedef struct ThreadPool
{
int con_thread_cnt; //消费者线程数量
pthread_t *con_tids; //消费者线程id
int pro_thread_cnt; //生产者线程数量
pthread_t *pro_tids; //生产者线程id
ArrayQueue* store; //队列仓库
con_WorkFP con_work; //消费者业务逻辑函数
pro_WorkFP pro_work; //生产者业务逻辑函数
pthread_mutex_t hlock; //队头锁
pthread_mutex_t rlock; //队尾锁
pthread_cond_t empty; //空仓条件变量
pthread_cond_t full; //满仓条件变量
}ThreadPool;
//创建线程池并初始化
ThreadPool* creat_threadpool(int con_cnt,int pro_cnt,int store_cal,con_WorkFP con_fp,pro_WorkFP pro_fp);
//启动线程池(开启cnt个子线程)
void start_treadpool(ThreadPool* thread);
//生产数据
void push_threadpool(ThreadPool* thread,void* data);
//消费数据
void* pop_threadpool(ThreadPool* thread);
//销毁线程池
void destory_threadpool(ThreadPool* thread);
#endif//THREADPOOL_H
源文件(threadpool.c)
#include "threadpool.h"
//生产者线程的入口函数
static void* pro_run(void* arg)
{
ThreadPool* thread=(ThreadPool*) arg;
for(;;)
{
//调用生产者业务逻辑函数 生产数据
void* data=thread->pro_work();
if(NULL==data)
continue;
//放入数据到线程池
push_threadpool(thread,data);
}
}
//消费者线程的入口函数
static void* con_run(void* arg)
{
ThreadPool* thread=(ThreadPool*) arg;
for(;;)
{
//从仓库拿出数据
void *data=pop_threadpool(thread);
//调用消费者业务逻辑函数 消费数据
thread->con_work(data);
}
}
//创建线程池并初始化
ThreadPool* creat_threadpool(int con_cnt,int pro_cnt,int store_cal,con_WorkFP con_fp,pro_WorkFP pro_fp)
{
//给线程池申请内存
ThreadPool* pool=malloc(sizeof(ThreadPool));
//初始化成员
pool->con_thread_cnt=con_cnt;
pool->pro_thread_cnt=pro_cnt;
pool->con_tids=malloc(sizeof(pthread_t)*con_cnt);
pool->pro_tids=malloc(sizeof(pthread_t)*pro_cnt);
pool->con_work=con_fp;
pool->pro_work=pro_fp;
//创建仓库
pool->store=creat_array_queue(store_cal);
//初始化互斥量,条件变量
pthread_mutex_init(&pool->hlock,NULL);
pthread_mutex_init(&pool->rlock,NULL);
pthread_cond_init(&pool->empty,NULL);
pthread_cond_init(&pool->full,NULL);
return pool;
}
//启动线程池(开启cnt个子线程)
void start_treadpool(ThreadPool* thread)
{
//启动生产者线程
for(int i=0;i<thread->pro_thread_cnt;i++)
{
pthread_create(thread->pro_tids+i,NULL,pro_run,thread);
}
//启动消费者线程
for(int i=0;i<thread->con_thread_cnt;i++)
{
pthread_create(thread->con_tids+i,NULL,con_run,thread);
}
}
//生产数据
void push_threadpool(ThreadPool* thread,void* data)
{
//队尾入队 所以队尾上锁(抢资源)
pthread_mutex_lock(&thread->rlock);
//如果一直队满,就休眠等待
while(full_array_queue(thread->store))
{
//唤醒消费者线程
pthread_cond_signal(&thread->empty);
//睡入满仓条件变量,并解锁队尾
pthread_cond_wait(&thread->full,&thread->rlock);
}
//数据存入仓库中
push_array_queue(thread->store,data);
//此时仓库里至少有一个数据
//唤醒一个消费者线程,可以进行消费了
pthread_cond_signal(&thread->empty);
//解锁队尾
pthread_mutex_unlock(&thread->rlock);
}
//消费数据
void* pop_threadpool(ThreadPool* thread)
{
//队头入队 所以队头上锁(抢资源)
pthread_mutex_lock(&thread->hlock);
//如果一直队空,就休眠等待
while(empty_array_queue(thread->store))
{
//唤醒消费者线程
pthread_cond_signal(&thread->full);
//睡入满仓条件变量,并解锁队尾
pthread_cond_wait(&thread->empty,&thread->hlock);
}
//数据从仓库中取出
void *data=front_array_queue(thread->store);
pop_array_queue(thread->store);
//此时仓库里至少有一个数据没有了
//唤醒一个生产者线程,可以进行生产了
pthread_cond_signal(&thread->full);
//解锁队头
pthread_mutex_unlock(&thread->hlock);
return data;
}
//销毁线程池
void destory_threadpool(ThreadPool* thread)
{
//杀死生产者,消费者线程
for(int i=0;i<thread->pro_thread_cnt;i++)
{
pthread_cancel(thread->pro_tids[i]);
pthread_join(thread->pro_tids[i],NULL);
}
for(int i=0;i<thread->con_thread_cnt;i++)
{
pthread_cancel(thread->con_tids[i]);
pthread_join(thread->con_tids[i],NULL);
}
//释放申请的内存
free(thread->pro_tids);
free(thread->con_tids);
destory_array_queue(thread->store);
pthread_mutex_destroy(&thread->hlock);
pthread_mutex_destroy(&thread->rlock);
pthread_cond_destroy(&thread->full);
pthread_cond_destroy(&thread->empty);
free(thread);
}
TCP客户端代码:
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "threadpool.h"
int sockfd; //服务连接客户端的sockfd
//消费者线程业务逻辑函数
void con_work(void* arg)
{
int *cli_fd=(int*)arg;
char buf[BUFSIZ];
for(;;)
{
// 接收数据
int ret = recv(*cli_fd,buf,BUFSIZ,0);
if(0 >= ret || !strcmp("quit",buf))
{
printf("客户端%d退出\n",*cli_fd);
close(*cli_fd);
free(cli_fd);
return;
}
printf("cli_fd:%d recv:%s byte:%d\n",*cli_fd,buf,ret);
strcat(buf,",return!");
// 返回数据
ret = send(*cli_fd,buf,strlen(buf)+1,0);
if(0 >= ret)
{
printf("客户端%d退出\n",*cli_fd);
close(*cli_fd);
free(cli_fd);
return;
}
}
}
//生产者线程业务逻辑函数
void* pro_work(void)
{
int *cli_fd=malloc(sizeof(int));
*cli_fd=accept(sockfd,NULL,NULL);
if(*cli_fd<0)
{
perror("accept");
return NULL;
}
return cli_fd;
}
int main(int argc,const char* argv[])
{
// 创建socket对象
sockfd = socket(AF_INET,SOCK_STREAM,0);
if(0 > sockfd)
{
perror("socket");
return -1;
}
// 准备本机地址
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(1025);
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
socklen_t addrlen = sizeof(addr);
// 绑定socket对象和地址
if(bind(sockfd,(struct sockaddr*)&addr,addrlen))
{
perror("bind");
return -2;
}
// 开启监听
if(listen(sockfd,6))
{
perror("listen");
return -3;
}
//创建线程池
ThreadPool* thread=creat_threadpool(3,1,10,con_work,pro_work);
//启动线程池(一共6个线程再加当前主线程一共7个)
start_treadpool(thread);
for(;;);
}