libevent 编写问答服务器

之前看libevent后写的一个简单问答服务器。

工作方式比较简单,一个主线程和多个工作线程,主线程只接受连接并通知工作线程接管工作,工作线程接管连接然后接收消息并返回。也可以换成进程的方式。

主线程与工作线程之间的通信是使用的socket对,采用libevent的事件。

//libevent server sample on linux

/*socket*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
/**/
#include <iostream>
#include <err.h>    //err()
#include <string.h> //memset()
#include <fcntl.h>  //fcntl()
#include <pthread.h> //
#include <cstdlib>  //calloc()
#include <unistd.h> //close()
#include <sys/queue.h>  //链表
#include <errno.h>  //errno,ENTIR
#include <stdio.h>
/*libevent*/
#include <event.h>
using namespace std;

#define SERVER_PORT 55555
#define LISTEN_NUM 32
#define THREAD_NUM 4
#define BUF_LEN 1024
#define WRITE_BUF_LEN 1024*4

struct event_accepted{
    int fd;//socket fd
    int thread_id;  //记录所连客户端socket所属的工作线程编号
    struct event* ev_read;
    struct event* ev_write;
    char* write_buf;
    int len;    //write_buf的长度
    int offset; //write_buf已写入后的偏移
};
struct socket_pair{
    int connecter;
    int accepter;
    socket_pair(){
        connecter =-1;
        accepter =-1;
    }
};
/*工作线程结构*/
struct workthread_info{
    int thread_id;                      //线程编号
    pthread_mutex_t g_clock;            //线程锁
    volatile int count;                 //各个工作线程socket连接数
    struct socket_pair socket_pairs;    //通知工作线程注册事件的socket对
    struct event socket_pair_event;     //用于工作线程监听注册事件
    struct event_base *base;            //工作线程的event_base实例
    // TAILQ_ENTRY(event_accepted) entries;//工作线程上注册的event的链表,用于回收event分配的内存
    workthread_info(){
        count = 0;
    }
};

struct workthread_info work_info[THREAD_NUM];

pthread_t pthread_id[THREAD_NUM];    //

int setnonblock(int fd);
void on_accept(int fd, short ev, void *arg);
void on_read(int fd, short ev, void *arg);
void on_write(int fd, short ev, void *arg);
void* work_thread(void* arg);
int socketpair_init();    //初始化本地socket连接用于通知子线程注册事件
void socketpair_read(int fd, short ev, void *arg);    //工作线程接收已连接的socket,并为socket注册读事件
void destroy();

int main(){
    //初始化工作线程注册通知socket
    if(socketpair_init() <0){
        err(1,"init socketpair_init failed");
    }

    /*初始化监听事件*/
    event_init();
    struct event ev_accept;
    int thread_id[THREAD_NUM]={0};
    /*初始化工作线程*/
    for(int i=0;i<THREAD_NUM;i++){
        thread_id[i]=i;
        //将工作线程编号传到回调函数中,i在这里是一个临时变量,将i的地址传到线程中去的话i的值什么时候使用是不确定的,可能导致一些工作线程编号相同。
        if(0 !=pthread_create(&pthread_id[i],NULL,work_thread,&thread_id[i])){
            err(1,"thread create failed");
        }
        pthread_mutex_init(&work_info[i].g_clock,NULL);
    }
    /*初始化监听socket*/
    struct sockaddr_in listen_addr;
    int reuseaddr_on =1;
    int listen_fd =socket(AF_INET,SOCK_STREAM,0);
    if(listen_fd<0){
        err(1,"listen failed");
    }
    if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr_on,sizeof(reuseaddr_on)) <0){
        err(1,"setsockopt failed");
    }
    memset(&listen_addr,0,sizeof(listen_addr));
    listen_addr.sin_family = AF_INET;
    listen_addr.sin_addr.s_addr = INADDR_ANY;
    listen_addr.sin_port =htons(SERVER_PORT);
    if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr))<0){
        err(1,"bind failed");
    }
    if(listen(listen_fd,LISTEN_NUM)<0){
        err(1,"listen failed");
    }
    if(setnonblock(listen_fd)<0){
        err(1,"set server socket to non-blocking failed");
    }
    /**/
    event_set(&ev_accept,listen_fd,EV_READ|EV_PERSIST,on_accept,NULL);
    event_add(&ev_accept,NULL);
    event_dispatch();
    return 0;
}

int setnonblock(int fd)
{
    int flags;

    flags = fcntl(fd, F_GETFL);
    if (flags < 0)
        return flags;
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) < 0)
        return -1;

    return 0;
}

void on_accept(int fd, short ev, void *arg){
    int client_fd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    client_fd = accept(fd,(struct sockaddr*)&client_addr,&client_len);
    if(client_fd == -1){
        warn("accept failed");
        return;
    }
    if(setnonblock(client_fd)<0){
        warn("failed to set client socket non-blocking");
    }
    //socket连接数最少的一个工作线程
    int min =work_info[0].count;
    int thread_num =0;
    for(int i=0;i<THREAD_NUM;i++){
        if(work_info[i].count < min){
            thread_num = i;
            min = work_info[i].count;
        }
    }
    //将已连接的socket文件描述符通知工作线程
    send(work_info[thread_num].socket_pairs.connecter,&client_fd,sizeof(client_fd),0);
    work_info[thread_num].count++;
}

int socketpair_init(){
    // int connecter[THREAD_NUM]={-1};
    int listener = -1;
    listener = socket(AF_INET,SOCK_STREAM,0);
    if(listener<0){
        err(1,"%d: init socketpair listener for insert event failed",__LINE__);
    }
    
    struct sockaddr_in listen_addr,connect_addr[THREAD_NUM];
    int size;
    memset(&listen_addr,0,sizeof(listen_addr));
    listen_addr.sin_family = AF_INET;
    listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    listen_addr.sin_port = 0; /*kernel chooses port.*/
    if(bind(listener,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1){
        goto fail;
    }
    if(listen(listener,THREAD_NUM) == -1){
        goto fail;
    }
    for(int i=0;i<THREAD_NUM;i++){
        work_info[i].socket_pairs.connecter = socket(AF_INET,SOCK_STREAM,0);
        if(work_info[i].socket_pairs.connecter<0){
            goto fail;
        }
        size = sizeof(connect_addr[i]);
        /* We want to find out the port number to connect to.  */
        if(getsockname(listener,(struct sockaddr*)&connect_addr[i],(socklen_t *)&size) == -1){
            goto fail;
        }
        if(size != sizeof(connect_addr[i])){
            goto fail;
        }
        if(connect(work_info[i].socket_pairs.connecter,(struct sockaddr*)&connect_addr[i],sizeof(connect_addr[i])) == -1){
            printf("i:%d, %s\n",i,strerror(errno));
            goto fail;
        }
        size = sizeof(listen_addr);
        work_info[i].socket_pairs.accepter = accept(listener,(struct sockaddr*)&listen_addr,(socklen_t*)&size);
        if(work_info[i].socket_pairs.accepter<0){
            goto fail;
        }
        /* Now check we are talking to ourself by matching port and host on the
       two sockets.     */
        if(getsockname(work_info[i].socket_pairs.connecter,(struct sockaddr*)&connect_addr[i],(socklen_t*)&size) == -1){
            goto fail;
        }
        if(size != sizeof(connect_addr[i])
            || listen_addr.sin_family != connect_addr[i].sin_family
            || listen_addr.sin_addr.s_addr != connect_addr[i].sin_addr.s_addr
            || listen_addr.sin_port != connect_addr[i].sin_port){
                goto fail;
        }
    }
    /*close listen socket*/
    close(listener);
    return 0;

    fail:
    for(int i=0;i<THREAD_NUM;i++){
        if(work_info[i].socket_pairs.connecter >0){
            close(work_info[i].socket_pairs.connecter);
        }
        if(work_info[i].socket_pairs.accepter >0){
            close(work_info[i].socket_pairs.accepter);
        }
    }
    return -1;
}

void on_read(int fd, short ev, void *arg){
    struct event_accepted *client = (event_accepted*)arg;
    char* buf = new char[BUF_LEN];
    if(buf == NULL){
        err(1,"malloc failed");
    }
    int recv_len = read(fd,buf,BUF_LEN);
    if(recv_len == 0){
        //客户端断开连接
        cout<<"client disconnected"<<endl;
        close(fd);
        event_del(client->ev_read);
        event_del(client->ev_write);
        delete client->ev_write;
        delete client->ev_read;
        delete client->write_buf;
        work_info[client->thread_id].count--;
        free(client);
        delete[] buf;
        return;
    }
    else if(recv_len < 0){
        cout<<"socekt failure,disconnecting client:"<<strerror(errno)<<endl;
        close(fd);
        event_del(client->ev_read);
        event_del(client->ev_write);
        delete client->ev_write;
        delete client->ev_read;
        delete client->write_buf;
        work_info[client->thread_id].count--;
        free(client);
        delete[] buf;
        return;
    }
    //TODO 1.需要处理数组越界问题;2.这里只是一问一答的情况,如果需要主动推送消息需要增加设计;
    //TODO 1.在同一个文件描述符上如果多次注册同一事件会发生什么?这里改用链表记录需要发送的数据可能会好一点,
    memcpy(client->write_buf+client->len,buf,recv_len);
    client->len += recv_len;
    event_add(client->ev_write,NULL);
    delete[] buf;
}

void on_write(int fd, short ev, void *arg){
    struct event_accepted* client = (event_accepted*)arg;
    int len =0;
    len = write(fd,client->write_buf+client->offset,client->len-client->offset);
    if(len == -1){
        //写操作被信号打断或不能写入,重新注册写事件
        if(errno == EINTR || errno == EAGAIN){
            event_add(client->ev_write,NULL);
            return;
        }
        else{
            err(1,"write error");
        }
    }
    else if((client->offset + len) <client->len){
        //数据没有完全写入
        client->offset += len;
        event_add(client->ev_write,NULL);
        return;
    }
    else{
        //写入完成
        client->offset = client->len = 0;
    }

}

void* work_thread(void* arg){
    int thread_num = *(int*)arg;    //获取线程编号
    struct workthread_info *p_work_info = &work_info[thread_num];
    p_work_info->thread_id = thread_num;   //记录线程编号
    struct event_base * base = p_work_info->base = event_base_new();
    event_set(&p_work_info->socket_pair_event,p_work_info->socket_pairs.accepter,EV_READ|EV_PERSIST,socketpair_read,&p_work_info->socket_pair_event);
    event_base_set(base,&p_work_info->socket_pair_event);
    event_add(&p_work_info->socket_pair_event,NULL);
    event_base_dispatch(base);
}

void socketpair_read(int fd, short ev, void *arg){
    struct event_accepted *client = (struct event_accepted*)calloc(1,sizeof(event_accepted));
    if(NULL == client){
        err(1,"on_accept malloc event_accepted failed");
    }
    struct workthread_info *p_work_info;
    for(int i=0;i<THREAD_NUM;i++){
        //找到所属工作线程编号
        if(work_info[i].socket_pairs.accepter == fd){
            p_work_info = &work_info[i];
            client->thread_id = i; //记录所连客户端socket所属的线程编号
            break;
        }
    }
    int client_fd = -1;
    int recv_len = recv(fd,&client_fd,4,0);
    if(recv_len != 4){
        err(1,"socketpair read len not equal to 4");
    }
    if(client_fd <= 0){
        err(1,"socketpair recved an error socket fd");
    }
    
    client->fd = client_fd;
    client->ev_write = new event;
    client->ev_read = new event;
    client->write_buf = new char[WRITE_BUF_LEN];
    client->len = 0;
    client->offset = 0;
    if(client->ev_read == NULL){
        err(1,"alloc read event failed");
    }
    // //
    // TAILQ_INSERT_TAIL(p_work_info->entries,client,);
    //初始化事件
    event_set(client->ev_read,client_fd,EV_READ|EV_PERSIST,on_read,client);
    event_base_set(p_work_info->base,client->ev_read);
    event_set(client->ev_write,client_fd,EV_WRITE,on_write,client);
    event_base_set(p_work_info->base,client->ev_write);
    //工作线程注册读事件,写事件在需要时再注册
    event_add(client->ev_read,NULL);
}

void destroy(){

}

 

上一篇:【阅读】提问的智慧+有效的报告BUG


下一篇:拖放API