文章目录
前言
最近在总结之前做的恋爱交由平台的项目。在优化服务器时,将一开始使用的同步阻塞+多线程,替换为现在的epoll+线程池模型。提高了并发的能力,可以实现C10k的目标。
因此,特写此文,用来记录epoll+线程池模型。为相同需求的同学提供优化思路。
epoll 部分
- 首先要做的还是socket的创建,绑定,监听。并且创建epoll的句柄,同时将监听socket挂载到红黑树上,方便以后客户端有连接请求时可以建立连接。
int TcpNet::InitNetWork()
{
pool_t *pool = NULL;
m_pool = new thread_pool;
bzero(&serveraddr,sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
if(inet_pton(AF_INET,_DEF_SERVERIP,&serveraddr.sin_addr.s_addr) == -1)
{
perror("Init Ip Error:");
return FALSE;
}
serveraddr.sin_port = htons(_DEF_PORT);
//创建Socket
if((sockfd = socket(AF_INET,SOCK_STREAM,0)) == -1)
{
perror("Create Socket Error:");
return FALSE;
}
int mw_optval;
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,(char*)&mw_optval,sizeof(mw_optval));
//绑定端口号
if(bind(sockfd,(struct sockaddr*)&serveraddr,sizeof(serveraddr)) == -1)
{
perror("Bind Socket Error:");
return FALSE;
}
//监听socket
if(listen(sockfd,_DEF_LISTEN) == -1)
{
perror("Listen Error:");
return FALSE;
}
epfd = epoll_create(_DEF_EPOLLSIZE);//创建epoll的句柄,可以监听的文件描述符为_DEF_EPOLLSIZE
Addfd(sockfd,TRUE);
//创建拥有10个线程的线程池 最大线程数200 环形队列最大值50
if((pool = (m_pool->Pool_create(200,10,50))) == NULL)
err_str("Create Thread_Pool Failed:",-1);
m_pool->Producer_add(pool, EPOLL_Jobs, pool);
return TRUE;
}
// 这个函数是将要监控的socket挂载到红黑树上
void TcpNet::Addfd(int fd,int enable_et/*是否为边缘触发*/)
{
struct epoll_event eptemp;
eptemp.events = EPOLLIN; // 对应文件描述符可读
eptemp.data.fd = fd; // 文件描述符
if(enable_et)
eptemp.events |= EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&eptemp);
}
- 将epoll_work函数加入到线程池的任务队列中,然后用一个线程负责调用epoll_wait(),当有读事件发生时,epoll_wait返回事件就绪的个数,并且从内核态将就绪事件拷贝到了用户态。在这里调用epoll_deal函数遍历返回的epoll事件集合。首先判断是否是监听socket,如果是则将 与客户端建立连接的 函数加入到线程池的任务队列中。如果是读事件就绪,则将读取数据函数加入到线程池的任务队列中。
// 这个是将 epoll_jobs函数加入到线程池任务队列中
m_pool->Producer_add(pool, EPOLL_Jobs, pool);
// epoll工作内容 epoll_wait
void * TcpNet::EPOLL_Jobs(void * arg)
{
pool_t *pool = (pool_t*)arg;
int ready;
int i = 0;
while(1)
{
//阻塞-1监听socket
printf("%d\n",i++);
//epoll_wait返回值是就绪事件的个数
if((ready = epoll_wait(m_pThis->epfd,m_pThis->epollarr,_DEF_EPOLLSIZE,-1)) == -1)
err_str("Epoll Call Failed:",-1);//出错
//调用epoll_deal 处理就绪的fd
m_pThis->Epoll_Deal(ready,pool);
bzero(m_pThis->epollarr,sizeof(epollarr));
}
}
// 处理就绪事件 根据继续个数,处理就绪事件
// 在这里判断是 客户端连接还是
// 客户端连接的话,就交个线程,执行Accept_Deal建立连接的任务
// 是读事件就绪,则分配一个线程执行读取内核缓冲区的内容,并且
void TcpNet::Epoll_Deal(int ready,pool_t *pool)
{
int i = 0;
for(i=0; i<ready; i++)
{
int fd = epollarr[i].data.fd;//从epollarr中读出就绪文件描述符
if(sockfd == fd) //客户端建立链接
m_pool->Producer_add(pool,Accept_Deal,NULL);
else if(epollarr[i].events & EPOLLIN)//如果是IO事件,则删除fd,将读线程加入队列
{
Deletefd(fd);
m_pool->Producer_add(pool,Info_Recv,(void*)fd);
}
}
}
// 创建客户端连接
void *TcpNet::Accept_Deal(void *arg)
{
struct sockaddr_in clientaddr;
int clientsize = sizeof(clientaddr);
int clientfd;
char ipstr[_DEF_IPSIZE];
pthread_mutex_lock(&m_pThis->alock);
if((clientfd = accept(m_pThis->sockfd,(struct sockaddr*)&clientaddr,(socklen_t*)&clientsize)) == -1)
{
err_str("Custom Thread Accept Error",-1);
}
pthread_mutex_unlock(&m_pThis->alock);
m_pThis->Addfd(clientfd,TRUE);
printf("Custom Thread TID:0x%x\tClient IP:%s\tClient PORT:%d\t\n",(unsigned int)pthread_self()
,inet_ntop(AF_INET,&clientaddr.sin_addr.s_addr,ipstr,sizeof(ipstr)),ntohs(clientaddr.sin_port));
return 0;
}
void *TcpNet::Info_Recv(void *arg)
{
int clientfd = (long)arg;
int nRelReadNum = 0;
int nPackSize = 0;
char *pSzBuf = NULL;
nRelReadNum = recv(clientfd,&nPackSize,sizeof(nPackSize),0);
if(nRelReadNum <= 0)
{
// m_pThis->Deletefd(clientfd);
close(clientfd);
return NULL;
}
pSzBuf = (char*)malloc(sizeof(char)*nPackSize);//先收到的是包大小
int nOffSet = 0;
nRelReadNum = 0;
//接收包的数据
while(nPackSize)
{
nRelReadNum = recv(clientfd,pSzBuf+nOffSet,nPackSize,0);
if(nRelReadNum > 0)
{
nOffSet += nRelReadNum;
nPackSize -= nRelReadNum;
}
}
m_pThis->m_kernel->DealData(clientfd,pSzBuf,nOffSet);
m_pThis->Addfd(clientfd,TRUE );
printf("pszbuf = %p \n",pSzBuf);
if(pSzBuf != NULL)
{
free(pSzBuf);
pSzBuf = NULL;
}
return 0;
}
线程池部分
整体设计:
分为三部分:管理线程(负责线程数的扩容与缩减)、任务队列(负责向任务队列填装任务)、工作线程(负责从任务队列取任务并处理)。
其中任务队列、工作线程采用生产者-消费者模型.
采用互斥量+条件变量实现线程同步。
/*
* 函数任务: 创建线程
* max:最大线程数
* min:最少线程数
* que_max:队列最大长度
*/
pool_t *thread_pool::Pool_create(int max,int min,int que_max)
{
pool_t *p;
if((p = (pool_t*)malloc(sizeof(pool_t))) == NULL)
{
err_str("malloc pool error:",-1);
}
p->thread_max = max;
p->thread_min = min;
p->thread_alive = 0;
p->thread_busy = 0;
p->thread_shutdown = TRUE;
p->thread_wait = 0;
p->queue_max = que_max;
p->queue_cur = 0;
p->queue_front = 0;
p->queue_rear = 0;
if(pthread_cond_init(&p->not_full,NULL)!=0 ||
pthread_cond_init(&p->not_empty,NULL)!=0 ||
pthread_mutex_init(&p->lock,NULL)!=0)
{
err_str("init cond or mutex error:",-1);
}
if((p->tids = (pthread_t*)malloc(sizeof(pthread_t)*max)) == NULL)
{
err_str("malloc tids error:",-1);
}
bzero(p->tids,sizeof(pthread_t)*max);
if((p->queue_task = (task_t*)malloc(sizeof(task_t)*que_max))==NULL)
{
err_str("malloc task queue error:",-1);
}
int err;
for(int i=0; i<min; i++)
{
if((err = pthread_create(&p->tids[i],NULL, Custom,(void*)p))>0)
{
printf("create custom error:%s\n",strerror(err));
return NULL;
}
++(p->thread_alive);
}
if((err = pthread_create(&(p->manager_tid),NULL, Manager,(void*)p))>0)
{
printf("create Manager error:%s\n",strerror(err));
return NULL;
}
return p;
}
/*
* 根据传进来的任务函数,和任务参数
* 将任务投递到队列中 (生产者)
*/
int thread_pool::Producer_add(pool_t * p,void *(task)(void *arg),void *arg)
{
pthread_mutex_lock(&p->lock);
while(p->queue_cur == p->queue_max && p->thread_shutdown )
{
// 当线程个数等于最大线程个数时 ,挂起线程,解锁 条件变量
pthread_cond_wait(&p->not_full,&p->lock);
}
if(!p->thread_shutdown )
{
pthread_mutex_unlock(&p->lock);
return -1;
}
p->queue_task[p->queue_front].task = task; // 将任务投递到队列中
p->queue_task[p->queue_front].arg = arg; // 任务函数的参数
p->queue_front = (p->queue_front + 1) % p->queue_max; // 将front后移
++(p->queue_cur); //更新当前节点
pthread_cond_signal(&p->not_empty);
pthread_mutex_unlock(&p->lock);
return 0;
}
// 工作线程--负责从任务队列中取出任务并执行
// 消费者
void * thread_pool::Custom(void * arg)
{
pool_t * p = (pool_t*)arg;
task_t task;
while(p->thread_shutdown)
{
pthread_mutex_lock(&p->lock);
while(p->queue_cur == 0 && p->thread_shutdown )
{
pthread_cond_wait(&p->not_empty,&p->lock);
}
if(!p->thread_shutdown )
{
pthread_mutex_unlock(&p->lock);
pthread_exit(NULL);
}
if(p->thread_wait > 0 && p->thread_alive > p->thread_min)
{
--(p->thread_wait);
--(p->thread_alive);
pthread_mutex_unlock(&p->lock);
pthread_exit(NULL);
}
task.task = p->queue_task[p->queue_rear].task;
task.arg = p->queue_task[p->queue_rear].arg;
p->queue_rear = (p->queue_rear + 1) % p->queue_max;
--(p->queue_cur);
pthread_cond_signal(&p->not_full);
++(p->thread_busy);
pthread_mutex_unlock(&p->lock);
//执行核心工作
(*task.task)(task.arg); // 调用任务函数
pthread_mutex_lock(&p->lock);
--(p->thread_busy);
pthread_mutex_unlock(&p->lock);
}
return 0;
}
// 管理线程 用于扩容和缩减线程个数
void *thread_pool::Manager(void *arg)
{
pool_t * p = (pool_t *)arg;
int alive;
int cur;
int busy;
int add = 0;
while(p->thread_shutdown )
{
pthread_mutex_lock(&p->lock);
alive = p->thread_alive;
busy = p->thread_busy;
cur = p->queue_cur;
pthread_mutex_unlock(&p->lock);
if((cur > alive - busy || (float)busy / alive*100 >= (float)80 ) ||
p->thread_max > alive)
{
for(int i=0; i<(p->thread_max)&&add<_DEF_COUNT; i++,add++)
{
pthread_mutex_lock(&p->lock);
if(p->tids[i] == 0 || !if_thread_alive(p->tids[i]))
{
pthread_create(&p->tids[i],NULL,Custom,(void*)p);
++(p->thread_alive);
}
pthread_mutex_unlock(&p->lock);
}
}
if(busy *2 < alive - busy && alive > p->thread_min)
{
pthread_mutex_lock(&p->lock);
p->thread_wait = _DEF_COUNT;
pthread_mutex_unlock(&p->lock);
for(int i=0; i<_DEF_COUNT; i++)
{
pthread_cond_signal(&p->not_empty);
}
}
sleep(_DEF_TIMEOUT);
}
return 0;
}