之前看libevent后写的一个简单问答服务器。
工作方式比较简单,一个主线程和多个工作线程,主线程只接受连接并通知工作线程接管工作,工作线程接管连接然后接收消息并返回。也可以换成进程的方式。
主线程与工作线程之间的通信是使用的socket对,采用libevent的事件。
//libevent server sample on linux /*socket*/ #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> /**/ #include <iostream> #include <err.h> //err() #include <string.h> //memset() #include <fcntl.h> //fcntl() #include <pthread.h> // #include <cstdlib> //calloc() #include <unistd.h> //close() #include <sys/queue.h> //链表 #include <errno.h> //errno,ENTIR #include <stdio.h> /*libevent*/ #include <event.h> using namespace std; #define SERVER_PORT 55555 #define LISTEN_NUM 32 #define THREAD_NUM 4 #define BUF_LEN 1024 #define WRITE_BUF_LEN 1024*4 struct event_accepted{ int fd;//socket fd int thread_id; //记录所连客户端socket所属的工作线程编号 struct event* ev_read; struct event* ev_write; char* write_buf; int len; //write_buf的长度 int offset; //write_buf已写入后的偏移 }; struct socket_pair{ int connecter; int accepter; socket_pair(){ connecter =-1; accepter =-1; } }; /*工作线程结构*/ struct workthread_info{ int thread_id; //线程编号 pthread_mutex_t g_clock; //线程锁 volatile int count; //各个工作线程socket连接数 struct socket_pair socket_pairs; //通知工作线程注册事件的socket对 struct event socket_pair_event; //用于工作线程监听注册事件 struct event_base *base; //工作线程的event_base实例 // TAILQ_ENTRY(event_accepted) entries;//工作线程上注册的event的链表,用于回收event分配的内存 workthread_info(){ count = 0; } }; struct workthread_info work_info[THREAD_NUM]; pthread_t pthread_id[THREAD_NUM]; // int setnonblock(int fd); void on_accept(int fd, short ev, void *arg); void on_read(int fd, short ev, void *arg); void on_write(int fd, short ev, void *arg); void* work_thread(void* arg); int socketpair_init(); //初始化本地socket连接用于通知子线程注册事件 void socketpair_read(int fd, short ev, void *arg); //工作线程接收已连接的socket,并为socket注册读事件 void destroy(); int main(){ //初始化工作线程注册通知socket if(socketpair_init() <0){ err(1,"init socketpair_init failed"); } /*初始化监听事件*/ event_init(); struct event ev_accept; int thread_id[THREAD_NUM]={0}; /*初始化工作线程*/ for(int i=0;i<THREAD_NUM;i++){ thread_id[i]=i; //将工作线程编号传到回调函数中,i在这里是一个临时变量,将i的地址传到线程中去的话i的值什么时候使用是不确定的,可能导致一些工作线程编号相同。 if(0 !=pthread_create(&pthread_id[i],NULL,work_thread,&thread_id[i])){ err(1,"thread create failed"); } pthread_mutex_init(&work_info[i].g_clock,NULL); } /*初始化监听socket*/ struct sockaddr_in listen_addr; int reuseaddr_on =1; int listen_fd =socket(AF_INET,SOCK_STREAM,0); if(listen_fd<0){ err(1,"listen failed"); } if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr_on,sizeof(reuseaddr_on)) <0){ err(1,"setsockopt failed"); } memset(&listen_addr,0,sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = INADDR_ANY; listen_addr.sin_port =htons(SERVER_PORT); if(bind(listen_fd,(struct sockaddr*)&listen_addr,sizeof(listen_addr))<0){ err(1,"bind failed"); } if(listen(listen_fd,LISTEN_NUM)<0){ err(1,"listen failed"); } if(setnonblock(listen_fd)<0){ err(1,"set server socket to non-blocking failed"); } /**/ event_set(&ev_accept,listen_fd,EV_READ|EV_PERSIST,on_accept,NULL); event_add(&ev_accept,NULL); event_dispatch(); return 0; } int setnonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } void on_accept(int fd, short ev, void *arg){ int client_fd; struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); client_fd = accept(fd,(struct sockaddr*)&client_addr,&client_len); if(client_fd == -1){ warn("accept failed"); return; } if(setnonblock(client_fd)<0){ warn("failed to set client socket non-blocking"); } //socket连接数最少的一个工作线程 int min =work_info[0].count; int thread_num =0; for(int i=0;i<THREAD_NUM;i++){ if(work_info[i].count < min){ thread_num = i; min = work_info[i].count; } } //将已连接的socket文件描述符通知工作线程 send(work_info[thread_num].socket_pairs.connecter,&client_fd,sizeof(client_fd),0); work_info[thread_num].count++; } int socketpair_init(){ // int connecter[THREAD_NUM]={-1}; int listener = -1; listener = socket(AF_INET,SOCK_STREAM,0); if(listener<0){ err(1,"%d: init socketpair listener for insert event failed",__LINE__); } struct sockaddr_in listen_addr,connect_addr[THREAD_NUM]; int size; memset(&listen_addr,0,sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); listen_addr.sin_port = 0; /*kernel chooses port.*/ if(bind(listener,(struct sockaddr*)&listen_addr,sizeof(listen_addr)) == -1){ goto fail; } if(listen(listener,THREAD_NUM) == -1){ goto fail; } for(int i=0;i<THREAD_NUM;i++){ work_info[i].socket_pairs.connecter = socket(AF_INET,SOCK_STREAM,0); if(work_info[i].socket_pairs.connecter<0){ goto fail; } size = sizeof(connect_addr[i]); /* We want to find out the port number to connect to. */ if(getsockname(listener,(struct sockaddr*)&connect_addr[i],(socklen_t *)&size) == -1){ goto fail; } if(size != sizeof(connect_addr[i])){ goto fail; } if(connect(work_info[i].socket_pairs.connecter,(struct sockaddr*)&connect_addr[i],sizeof(connect_addr[i])) == -1){ printf("i:%d, %s\n",i,strerror(errno)); goto fail; } size = sizeof(listen_addr); work_info[i].socket_pairs.accepter = accept(listener,(struct sockaddr*)&listen_addr,(socklen_t*)&size); if(work_info[i].socket_pairs.accepter<0){ goto fail; } /* Now check we are talking to ourself by matching port and host on the two sockets. */ if(getsockname(work_info[i].socket_pairs.connecter,(struct sockaddr*)&connect_addr[i],(socklen_t*)&size) == -1){ goto fail; } if(size != sizeof(connect_addr[i]) || listen_addr.sin_family != connect_addr[i].sin_family || listen_addr.sin_addr.s_addr != connect_addr[i].sin_addr.s_addr || listen_addr.sin_port != connect_addr[i].sin_port){ goto fail; } } /*close listen socket*/ close(listener); return 0; fail: for(int i=0;i<THREAD_NUM;i++){ if(work_info[i].socket_pairs.connecter >0){ close(work_info[i].socket_pairs.connecter); } if(work_info[i].socket_pairs.accepter >0){ close(work_info[i].socket_pairs.accepter); } } return -1; } void on_read(int fd, short ev, void *arg){ struct event_accepted *client = (event_accepted*)arg; char* buf = new char[BUF_LEN]; if(buf == NULL){ err(1,"malloc failed"); } int recv_len = read(fd,buf,BUF_LEN); if(recv_len == 0){ //客户端断开连接 cout<<"client disconnected"<<endl; close(fd); event_del(client->ev_read); event_del(client->ev_write); delete client->ev_write; delete client->ev_read; delete client->write_buf; work_info[client->thread_id].count--; free(client); delete[] buf; return; } else if(recv_len < 0){ cout<<"socekt failure,disconnecting client:"<<strerror(errno)<<endl; close(fd); event_del(client->ev_read); event_del(client->ev_write); delete client->ev_write; delete client->ev_read; delete client->write_buf; work_info[client->thread_id].count--; free(client); delete[] buf; return; } //TODO 1.需要处理数组越界问题;2.这里只是一问一答的情况,如果需要主动推送消息需要增加设计; //TODO 1.在同一个文件描述符上如果多次注册同一事件会发生什么?这里改用链表记录需要发送的数据可能会好一点, memcpy(client->write_buf+client->len,buf,recv_len); client->len += recv_len; event_add(client->ev_write,NULL); delete[] buf; } void on_write(int fd, short ev, void *arg){ struct event_accepted* client = (event_accepted*)arg; int len =0; len = write(fd,client->write_buf+client->offset,client->len-client->offset); if(len == -1){ //写操作被信号打断或不能写入,重新注册写事件 if(errno == EINTR || errno == EAGAIN){ event_add(client->ev_write,NULL); return; } else{ err(1,"write error"); } } else if((client->offset + len) <client->len){ //数据没有完全写入 client->offset += len; event_add(client->ev_write,NULL); return; } else{ //写入完成 client->offset = client->len = 0; } } void* work_thread(void* arg){ int thread_num = *(int*)arg; //获取线程编号 struct workthread_info *p_work_info = &work_info[thread_num]; p_work_info->thread_id = thread_num; //记录线程编号 struct event_base * base = p_work_info->base = event_base_new(); event_set(&p_work_info->socket_pair_event,p_work_info->socket_pairs.accepter,EV_READ|EV_PERSIST,socketpair_read,&p_work_info->socket_pair_event); event_base_set(base,&p_work_info->socket_pair_event); event_add(&p_work_info->socket_pair_event,NULL); event_base_dispatch(base); } void socketpair_read(int fd, short ev, void *arg){ struct event_accepted *client = (struct event_accepted*)calloc(1,sizeof(event_accepted)); if(NULL == client){ err(1,"on_accept malloc event_accepted failed"); } struct workthread_info *p_work_info; for(int i=0;i<THREAD_NUM;i++){ //找到所属工作线程编号 if(work_info[i].socket_pairs.accepter == fd){ p_work_info = &work_info[i]; client->thread_id = i; //记录所连客户端socket所属的线程编号 break; } } int client_fd = -1; int recv_len = recv(fd,&client_fd,4,0); if(recv_len != 4){ err(1,"socketpair read len not equal to 4"); } if(client_fd <= 0){ err(1,"socketpair recved an error socket fd"); } client->fd = client_fd; client->ev_write = new event; client->ev_read = new event; client->write_buf = new char[WRITE_BUF_LEN]; client->len = 0; client->offset = 0; if(client->ev_read == NULL){ err(1,"alloc read event failed"); } // // // TAILQ_INSERT_TAIL(p_work_info->entries,client,); //初始化事件 event_set(client->ev_read,client_fd,EV_READ|EV_PERSIST,on_read,client); event_base_set(p_work_info->base,client->ev_read); event_set(client->ev_write,client_fd,EV_WRITE,on_write,client); event_base_set(p_work_info->base,client->ev_write); //工作线程注册读事件,写事件在需要时再注册 event_add(client->ev_read,NULL); } void destroy(){ }