在IO多路复用中,epoll等IO多路复用工具是对IO进行管理,使用reactor模式,变为对事件的管理。
struct sockitem //socket的中间状态可以保存到这个结构体,如接收了一半的数据包
{
int sockfd;
int (*callback)(int fd, int events, void *arg); //利用回调函数处理,避免区分读写和accpet
char recvbuffer[1024];
char sendbuffer[1024];
};
struct reactor //全局用到的变量
{
int epfd;
struct epoll_event events[512];
};
struct reactor *eventloop = NULL;
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg)
{
struct sockitem *si = (struct sockitem*)arg;
send(fd, "hello\n", 6, 0);
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
//ev.data.fd = clientfd;
si->sockfd = fd;
si->callback = recv_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}
int recv_cb(int fd, int events, void *arg)
{
//int clientfd = events[i].data.fd;
struct sockitem *si = (struct sockitem*)arg;
struct epoll_event ev;
char buffer[1024] = {0};
int ret = recv(fd, buffer, 1024, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
return -1;
} else
{
//出错操作
}
ev.events = EPOLLIN;
//ev.data.fd = fd;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
close(fd);
free(si);
} else if (ret == 0)
{
printf("disconnect %d\n", fd);
ev.events = EPOLLIN;
//ev.data.fd = fd;
epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
close(fd);
free(si);
} else
{
printf("Recv: %s, %d Bytes\n", buffer, ret);
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
//ev.data.fd = clientfd;
si->sockfd = fd;
si->callback = send_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int accept_cb(int fd, int events, void *arg)
{
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(fd, (struct sockaddr*)&client_addr, &client_len);
if (clientfd <= 0) return -1;
char str[INET_ADDRSTRLEN] = {0};
printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port));
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
//ev.data.fd = clientfd;
struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = clientfd;
si->callback = recv_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, clientfd, &ev);
return clientfd;
}
int main(int argc, char *argv[])
{
if (argc < 2)
{
return -1;
}
int port = atoi(argv[1]);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
{
return -2;
}
if (listen(sockfd, 5) < 0)
{
return -3;
}
eventloop = (struct reactor*)malloc(sizeof(struct reactor));
eventloop->epfd = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
//ev.data.fd = sockfd; //int idx = 2000;
struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = sockfd;
si->callback = accept_cb;
ev.data.ptr = si;
epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev);
pthread_t id;
pthread_create(&id, NULL, worker_thread, NULL);
//pthread_cond_waittime();
while (1)
{
int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
if (nready < -1)
{
break;
}
int i = 0;
for (i = 0;i < nready;i ++)
{
if (eventloop->events[i].events & EPOLLIN) //读事件
{
//printf("sockitem\n");
struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
if (eventloop->events[i].events & EPOLLOUT) //写事件
{
struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
}
}
}
这里有一个问题,listenfd是和其他fd一样,加入到epoll中监听读事件的循环。这样每次循环就只能accpet一次,只能产生一个新连接,其余都是与listenfd无关的操作。对于截接入量较大的场景,响应速度较慢。可以用一个线程专门处理listenfd,一个线程处理其他clientfd。
void *worker_thread(void *arg) //处理clientfd
{
while (1)
{
int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
if (nready < -1)
{
break;
}
int i = 0;
for (i = 0;i < nready;i ++)
{
if (eventloop->events[i].events & EPOLLIN)
{
struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
if (eventloop->events[i].events & EPOLLOUT)
{
struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
si->callback(si->sockfd, eventloop->events[i].events, si);
}
}
}
}
void *thread_cb(void *arg)//处理listenfd
{
//lfd,re,si已定义
struct pollfd pfd={0};//只监听一个fd,没必要用epoll
pfd.fd=lfd;//lfd是监听fd
pfd.events=POLLIN;
while(1)
{
int ret=poll(&pfd,lfd+1,-1);
if(ret==-1)
{
printf("poll error,errno=%d\n",errno);
return NULL;
}
if(pfd.revents&POLLIN)
{
struct sockaddr_in cli_addr={0};
struct epoll_event ev={0};
socklen_t cli_len=sizeof(struct sockaddr_in);
int cfd=accept(lfd,(struct sockaddr*)&cli_addr,&cli_len);
if(cfd<0)
{
printf("accpet error,errno=%d\n",errno);
return NULL;
}
char ip[64]={0};
printf("recv from %s at port %d\n",inet_ntop(AF_INET,&client_addr.sin_addr,ip,sizeof(ip)),ntohs(client_addr.sin_port));
si=(struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd=cfd;
si->cb=recv_cb;
ev.events=EPOLLIN;
ev.data.ptr=si;
epoll_ctl(re->epfd,EPOLL_CTL_ADD,cfd,&ev);
}
}
}
如果接入速度还不符合要求,我们还可以设置多个listenfd开多个线程,每个线程监听一个listenfd,同时监听clientfd的线程也开多个,这样就是“多对多”。这样会产生一个问题,如果客户端前后session的数据有关联,会出现A线程调用B线程中的数据,导致不同线程之间数据共享(竞争),这样的话就需要加锁,但是效率会下降。解决方法是,做到把数据和业务逻辑相分离,线程运行时信息不存到服务器本身的进程中,而是存到单独的缓存中,这样就可以做到隔离客户端前后之间的依赖关系。这样一来,就相当于服务器不存储数据,单处理业务。客户端的请求由不同的服务线程响应也没关系,直接根据客户端id去缓存中取数据即可。
对于多进程的结构,方法也是一样,不过还有几个问题。一是,多进程监听同一端口,需要listen()之后直接fork(),listenfd就会被继承到子进程中,与父进程一致。这里再提一下,epoll_create()在fork()之前或之后都可以,虽然效果不同,但是没有太大影响。二是惊群的问题,一个连接来了由哪个进程响应,通过共享内存的一把锁实现,保证任一时刻epoll中只有一个fd加入。
最后,再提一个问题,为什么UDP很少用epoll,而TCP却用的多。UDP对于服务器而言,只有一个fd,没必要使用多路复用。TCP是一个连接就有一个fd,所以需要epoll去管理。