C++高性能服务器开发 epoll+线程池模型

文章目录

前言

最近在总结之前做的恋爱交由平台的项目。在优化服务器时,将一开始使用的同步阻塞+多线程,替换为现在的epoll+线程池模型。提高了并发的能力,可以实现C10k的目标。

因此,特写此文,用来记录epoll+线程池模型。为相同需求的同学提供优化思路。

epoll 部分

  1. 首先要做的还是socket的创建,绑定,监听。并且创建epoll的句柄,同时将监听socket挂载到红黑树上,方便以后客户端有连接请求时可以建立连接。
int TcpNet::InitNetWork()
{
    pool_t *pool = NULL;
    m_pool = new thread_pool;
    bzero(&serveraddr,sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    if(inet_pton(AF_INET,_DEF_SERVERIP,&serveraddr.sin_addr.s_addr) == -1)
    {
        perror("Init Ip Error:");
        return FALSE;
    }
    serveraddr.sin_port = htons(_DEF_PORT); 
    //创建Socket
    if((sockfd = socket(AF_INET,SOCK_STREAM,0)) == -1)
    {
        perror("Create Socket Error:");
        return FALSE;
    }
    int mw_optval;
    setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,(char*)&mw_optval,sizeof(mw_optval));
    //绑定端口号
    if(bind(sockfd,(struct sockaddr*)&serveraddr,sizeof(serveraddr)) == -1)
    {
        perror("Bind Socket Error:");
        return FALSE;
    }
    //监听socket
    if(listen(sockfd,_DEF_LISTEN) == -1)
    {
        perror("Listen Error:");
        return FALSE;
    }
    epfd = epoll_create(_DEF_EPOLLSIZE);//创建epoll的句柄,可以监听的文件描述符为_DEF_EPOLLSIZE

    Addfd(sockfd,TRUE);
    //创建拥有10个线程的线程池 最大线程数200 环形队列最大值50
    if((pool = (m_pool->Pool_create(200,10,50))) == NULL)
        err_str("Create Thread_Pool Failed:",-1);

    m_pool->Producer_add(pool, EPOLL_Jobs, pool);
    return TRUE;
}
// 这个函数是将要监控的socket挂载到红黑树上 
void TcpNet::Addfd(int fd,int enable_et/*是否为边缘触发*/)
{
    struct epoll_event eptemp;
    eptemp.events = EPOLLIN;  // 对应文件描述符可读
    eptemp.data.fd = fd;	// 文件描述符
    if(enable_et)
        eptemp.events |= EPOLLET;
    epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&eptemp);
}
  1. 将epoll_work函数加入到线程池的任务队列中,然后用一个线程负责调用epoll_wait(),当有读事件发生时,epoll_wait返回事件就绪的个数,并且从内核态将就绪事件拷贝到了用户态。在这里调用epoll_deal函数遍历返回的epoll事件集合。首先判断是否是监听socket,如果是则将 与客户端建立连接的 函数加入到线程池的任务队列中。如果是读事件就绪,则将读取数据函数加入到线程池的任务队列中。
// 这个是将 epoll_jobs函数加入到线程池任务队列中
m_pool->Producer_add(pool, EPOLL_Jobs, pool);
// epoll工作内容  epoll_wait
void * TcpNet::EPOLL_Jobs(void * arg)
{
    pool_t *pool = (pool_t*)arg;
    int ready;
    int i = 0;
    while(1)
    {
        //阻塞-1监听socket
        printf("%d\n",i++);
		//epoll_wait返回值是就绪事件的个数
        if((ready = epoll_wait(m_pThis->epfd,m_pThis->epollarr,_DEF_EPOLLSIZE,-1)) == -1)
            err_str("Epoll Call Failed:",-1);//出错
        //调用epoll_deal 处理就绪的fd
        m_pThis->Epoll_Deal(ready,pool);
        bzero(m_pThis->epollarr,sizeof(epollarr));
    }
}
// 处理就绪事件  根据继续个数,处理就绪事件
// 在这里判断是 客户端连接还是
// 客户端连接的话,就交个线程,执行Accept_Deal建立连接的任务
// 是读事件就绪,则分配一个线程执行读取内核缓冲区的内容,并且
void TcpNet::Epoll_Deal(int ready,pool_t *pool)
{
    int i = 0;
    for(i=0; i<ready; i++)
    {
        int fd = epollarr[i].data.fd;//从epollarr中读出就绪文件描述符
        if(sockfd == fd)   //客户端建立链接
            m_pool->Producer_add(pool,Accept_Deal,NULL);
        else if(epollarr[i].events & EPOLLIN)//如果是IO事件,则删除fd,将读线程加入队列
        {
            Deletefd(fd);
            m_pool->Producer_add(pool,Info_Recv,(void*)fd);
        }
    }
}
// 创建客户端连接
void *TcpNet::Accept_Deal(void *arg)
{
    struct sockaddr_in clientaddr;
    int clientsize = sizeof(clientaddr);
    int clientfd;
    char ipstr[_DEF_IPSIZE];
    pthread_mutex_lock(&m_pThis->alock);
    if((clientfd = accept(m_pThis->sockfd,(struct sockaddr*)&clientaddr,(socklen_t*)&clientsize)) == -1)
    {
        err_str("Custom Thread Accept Error",-1);
    }
    pthread_mutex_unlock(&m_pThis->alock);
    m_pThis->Addfd(clientfd,TRUE);
    printf("Custom Thread TID:0x%x\tClient IP:%s\tClient PORT:%d\t\n",(unsigned int)pthread_self()
           ,inet_ntop(AF_INET,&clientaddr.sin_addr.s_addr,ipstr,sizeof(ipstr)),ntohs(clientaddr.sin_port));
    return 0;
}
void *TcpNet::Info_Recv(void *arg)
{
    int clientfd = (long)arg;
    int nRelReadNum = 0;
    int nPackSize = 0;
    char *pSzBuf = NULL;
    nRelReadNum = recv(clientfd,&nPackSize,sizeof(nPackSize),0);
    if(nRelReadNum <= 0)
    {
       // m_pThis->Deletefd(clientfd);
        close(clientfd);
        return NULL;
    }
    pSzBuf = (char*)malloc(sizeof(char)*nPackSize);//先收到的是包大小
    int nOffSet = 0;
    nRelReadNum = 0;
    //接收包的数据
    while(nPackSize)
    {
        nRelReadNum = recv(clientfd,pSzBuf+nOffSet,nPackSize,0);
        if(nRelReadNum > 0)
        {
            nOffSet += nRelReadNum;
            nPackSize -= nRelReadNum;
        }
    }
    m_pThis->m_kernel->DealData(clientfd,pSzBuf,nOffSet);
    m_pThis->Addfd(clientfd,TRUE );
    printf("pszbuf = %p \n",pSzBuf);
    if(pSzBuf != NULL)
    {
        free(pSzBuf);
        pSzBuf = NULL;
    }
    return 0;
}

线程池部分

整体设计:

C++高性能服务器开发 epoll+线程池模型
分为三部分:管理线程(负责线程数的扩容与缩减)、任务队列(负责向任务队列填装任务)、工作线程(负责从任务队列取任务并处理)。

其中任务队列、工作线程采用生产者-消费者模型.
采用互斥量+条件变量实现线程同步。

/* 
 *  函数任务: 创建线程
 *	max:最大线程数
 * 	min:最少线程数
 * 	que_max:队列最大长度
 */
pool_t *thread_pool::Pool_create(int max,int min,int que_max)
{
    pool_t *p;
    if((p = (pool_t*)malloc(sizeof(pool_t))) == NULL)
    {
        err_str("malloc pool error:",-1);
    }
    p->thread_max = max;
    p->thread_min = min;
    p->thread_alive = 0;
    p->thread_busy = 0;
    p->thread_shutdown = TRUE;
    p->thread_wait = 0;
    p->queue_max = que_max;
    p->queue_cur = 0;
    p->queue_front = 0;
    p->queue_rear = 0;
    if(pthread_cond_init(&p->not_full,NULL)!=0 ||
            pthread_cond_init(&p->not_empty,NULL)!=0 ||
            pthread_mutex_init(&p->lock,NULL)!=0)
    {
        err_str("init cond or mutex error:",-1);
    }
    if((p->tids = (pthread_t*)malloc(sizeof(pthread_t)*max)) == NULL)
    {
        err_str("malloc tids error:",-1);
    }
    bzero(p->tids,sizeof(pthread_t)*max);
    if((p->queue_task = (task_t*)malloc(sizeof(task_t)*que_max))==NULL)
    {
        err_str("malloc task queue error:",-1);
    }
    int err;
    for(int i=0; i<min; i++)
    {
        if((err = pthread_create(&p->tids[i],NULL,  Custom,(void*)p))>0)
        {
            printf("create custom error:%s\n",strerror(err));
            return NULL;
        }
        ++(p->thread_alive);
    }
    if((err = pthread_create(&(p->manager_tid),NULL, Manager,(void*)p))>0)
    {
        printf("create Manager error:%s\n",strerror(err));
        return NULL;
    }
    return p;
}


/* 
 *  根据传进来的任务函数,和任务参数
 *	将任务投递到队列中   (生产者)
 */
int thread_pool::Producer_add(pool_t * p,void *(task)(void *arg),void *arg)
{
    pthread_mutex_lock(&p->lock);
    while(p->queue_cur == p->queue_max && p->thread_shutdown  )
    {
		// 当线程个数等于最大线程个数时 ,挂起线程,解锁   条件变量
        pthread_cond_wait(&p->not_full,&p->lock);
    }
    if(!p->thread_shutdown  )
    {
        pthread_mutex_unlock(&p->lock);
        return -1;
    }
    p->queue_task[p->queue_front].task = task;	// 将任务投递到队列中
    p->queue_task[p->queue_front].arg = arg;	// 任务函数的参数
    p->queue_front = (p->queue_front + 1) % p->queue_max; // 将front后移
    ++(p->queue_cur);	//更新当前节点
    pthread_cond_signal(&p->not_empty);
    pthread_mutex_unlock(&p->lock);
    return 0;
}

// 工作线程--负责从任务队列中取出任务并执行
//  消费者
void * thread_pool::Custom(void * arg)
{
    pool_t * p = (pool_t*)arg;
    task_t task;
    while(p->thread_shutdown)
    {
        pthread_mutex_lock(&p->lock);
        while(p->queue_cur == 0 && p->thread_shutdown  )
        {
            pthread_cond_wait(&p->not_empty,&p->lock);
        }
        if(!p->thread_shutdown  )
        {
            pthread_mutex_unlock(&p->lock);
            pthread_exit(NULL);
        }
        if(p->thread_wait > 0 && p->thread_alive > p->thread_min)
        {
            --(p->thread_wait);
            --(p->thread_alive);
            pthread_mutex_unlock(&p->lock);
            pthread_exit(NULL);
        }
        task.task = p->queue_task[p->queue_rear].task;
        task.arg = p->queue_task[p->queue_rear].arg;
        p->queue_rear = (p->queue_rear + 1) % p->queue_max;
        --(p->queue_cur);
        pthread_cond_signal(&p->not_full);
        ++(p->thread_busy);
        pthread_mutex_unlock(&p->lock);
        //执行核心工作
        (*task.task)(task.arg);  // 调用任务函数 
        pthread_mutex_lock(&p->lock);
        --(p->thread_busy);
        pthread_mutex_unlock(&p->lock);
    }
    return 0;
}

// 管理线程 用于扩容和缩减线程个数
void *thread_pool::Manager(void *arg)
{
    pool_t * p = (pool_t *)arg;
    int alive;
    int cur;
    int busy;
    int add = 0;
    while(p->thread_shutdown )
    {
        pthread_mutex_lock(&p->lock);
        alive = p->thread_alive;
        busy = p->thread_busy;
        cur = p->queue_cur;
        pthread_mutex_unlock(&p->lock);
        if((cur > alive - busy || (float)busy / alive*100 >= (float)80 ) ||
                p->thread_max > alive)
        {
            for(int i=0; i<(p->thread_max)&&add<_DEF_COUNT; i++,add++)
            {
                pthread_mutex_lock(&p->lock);
                if(p->tids[i] == 0 || !if_thread_alive(p->tids[i]))
                {
                    pthread_create(&p->tids[i],NULL,Custom,(void*)p);
                    ++(p->thread_alive);
                }
                pthread_mutex_unlock(&p->lock);
            }
        }
        if(busy *2 < alive - busy && alive > p->thread_min)
        {
            pthread_mutex_lock(&p->lock);
            p->thread_wait = _DEF_COUNT;
            pthread_mutex_unlock(&p->lock);
            for(int i=0; i<_DEF_COUNT; i++)
            {
                pthread_cond_signal(&p->not_empty);
            }
        }
        sleep(_DEF_TIMEOUT);
    }
    return 0;
}
上一篇:实验九 线程


下一篇:flutter添加到android原生