Windows下完成端口移植Linux下的epoll(续)

在上一篇中,我们主要讨论了Windows下关于完成端口的一些知识。对应于完成端口,Linux下面在2.5.44内核中有了epoll,这个是为处理大批量句柄而引进的。
 
先来看看为什么要引进epoll以及它带来的好处。
Linux内核中,原有的select所用到的FD_SET是有限的,在内核中的参数_FD_SETSIZE来设置的。如果想要同时检测1025个句柄的可读(或可写)状态,则select无法满足。而且,而且select是采用轮询方法进行检测的,也就是说每次检测都要遍历所有FD_SET中的句柄。显然,当随着FD_SET中的句柄数的增多,select的效率会不断的下降。如今的服务器,都是要满足上万甚至更多的连接的,显然想要更高效的实现这一要求,必须采用新的方法。于是,不断的修改后,终于形成了稳定的epoll
epoll优点:(1)支持大数量的socket描述符(FD)。举个例子来说,在1GB内存的机器上大约可以打开10万个左右的socket,这个数字足以满足一般的服务器需求。(2epollIO效率不会随着FD数量的增加而线性下降(多少肯定会下降的)。至于原理,可以查阅epoll的实现原理;(3)使用mmap加速内核与用户空间的消息传递,无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。当然epoll还有其他一些优点,这里就不一一列举了。
 
下面来重点说说epoll的使用,这也是大家最关心的部分。在2.6内核中epoll变的简洁而强大。
先来弄清楚一个概念,即epoll2种工作方式:LTET
LT(level triggered)是缺省的工作方式,同时支持blocknon-block。其实这个有点像电路里面的电平触发方式。在这种模式下,内核会告诉你一个文件描述符fd就绪了,然后你就可以对这个fd进行IO操作。如果你不做任何操作,内核会继续通知你。所以,假如你读取数据没有读取完时,内核会继续通知你。其实传统的select/poll就是这种模式。
ET(edge-triggered)是告诉工作方式,只支持non-block。其实这个有点像电路里面的边沿触发方式。这个是高效服务器必选的方式。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll通知你。然后对这个fd只通知你一次,因为之后一直为就绪态,没有了状态的变化,直到你做了某些操作导致了那个fd不再为就绪态。但是注意,如果一直不对这个fd进行IO操作,内核不会发送更多的通知。
在弄清楚了上述两种模式之后,接下来就可以使用epoll了。主要用到三个函数epoll_create,epoll_ctl,epoll_wait
 
int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。当创建好epoll句柄后,它就是会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
 
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
  EPOLL_CTL_ADD:注册新的fd到epfd中; 
  EPOLL_CTL_MOD:修改已经注册的fd的监听事件; 
  EPOLL_CTL_DEL:从epfd中删除一个fd;
 
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
  struct epoll_event { 
     __uint32_t events;    /* Epoll events */ 
     epoll_data_t data;    /* User data variable */ 
  };
 
events可以是以下几个宏的集合:
  EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); 
  EPOLLOUT:表示对应的文件描述符可以写; 
  EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带
                  外数据到来); 
  EPOLLERR:表示对应的文件描述符发生错误; 
  EPOLLHUP:表示对应的文件描述符被挂断; 
  EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水
                平触发(Level Triggered)来说的。 
  EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要
                         继续监听这个socket的话,需要再次把这个socket加入
                         到EPOLL队列里
 
int epoll_wait(int epfd, struct epoll_event * events,
               int maxevents, int timeout);
等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
 
 
下面给出epoll编写服务器的模型:
#include <stdio.h> 
#include <sys/socket.h> 
#include <sys/epoll.h> 
#include <fcntl.h> 
#include <string.h> 
#include <errno.h> 
#include <netinet/in.h> 
#include <arpa/inet.h> 

#define MAXSIZE         64000 
#define MAXEPS            256 
//#define EVENTS            100 
#define LISTENQ         32 
#define SERV_PORT     8000 

int setnonblock(int sock) 

     int flags = fcntl(sock, F_GETFL, 0); 

     if(-1 == flags) { 
            perror("fcntl(sock, F_GETFL)"); 
            return -1; 
     } 

     flags |= O_NONBLOCK; 

     if(-1 == fcntl(sock, F_SETFL, flags)) { 
            perror("fcntl(sock, F_SETFT, flags)"); 
            return -2; 
     } 

     return 0; 


int main(int argc, char *argv[]) 

     int i, maxi, listenfd, connfd, sockfd, epfd, nfds; 
     ssize_t n; 
    
     char buf[MAXSIZE];        

     socklen_t clilen; 
        
     struct epoll_event ev, events[20]; 

     epfd = epoll_create(MAXEPS); 

     struct sockaddr_in clientaddr; 
     struct sockaddr_in serveraddr; 

     listenfd = socket(AF_INET, SOCK_STREAM, 0); 

     setnonblock(listenfd); 

     ev.data.fd = listenfd; 

     ev.events = EPOLLIN | EPOLLET; 

     epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); 
     
     memset(&serveraddr, 0, sizeof(serveraddr)); 
     serveraddr.sin_family = AF_INET; 

/* 
     char *local_addr = "10.0.2.15"; 
     inet_aton(local_addr, &(serveraddr.sin_addr)); 
*/ 
     serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); 
     serveraddr.sin_port = htons(SERV_PORT); 

     if(bind(listenfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) != 0) { 
            perror("bind failed"); 
            return -1; 
     } 

     if(listen(listenfd, LISTENQ) != 0) { 
            perror("listen failed"); 
            return -2; 
     } 

     maxi = 0; 

     printf("began to accept...\n"); 

     for(;;) { 
            nfds = epoll_wait(epfd, events, 32, 10000); 

            if(-1 == m_nfds)    { 
                 if(EINTR == errno) { 
            continue; 
    } 
    return -1; 
            } 

         for(i=0; i<nfds; ++i) { 
                 if(events[i].data.fd == listenfd) { 
                        connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clilen); 

                        if(connfd < 0) { 
                             perror("accept failed"); 
                             return -3; 
                        } 

         printf("accepted..\n"); 

                        setnonblock(connfd); 

                        ev.data.fd = connfd; 
                        ev.events = EPOLLIN | EPOLLET; 

                        epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); 
                 } 
                 else if(events[i].events & EPOLLIN) { 
                        if((sockfd = events[i].data.fd) < 0) 
                             continue; 

                        if((n = read(sockfd, buf, MAXSIZE)) < 0) { 
                             if(errno == ECONNRESET) {    
                                    close(sockfd); 
                                    events[i].data.fd = -1; 
                             } else { 
                                    perror("read failed"); 
                             } 
                        } 
                        else if(0 == n) { 
                             close(sockfd); 
                             events[i].data.fd = -1; 
                        } 

         printf("Read the buf: %s\n", buf); 

                        ev.data.fd = sockfd; 
                        ev.events = EPOLLOUT | EPOLLET; 
                         
                        epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); 
                 } 
                 else if(events[i].events & EPOLLOUT) { 
                        sockfd = events[i].data.fd; 
                        char *sndbuf = "I get your message!"; 
                        write(sockfd,sndbuf, 10); 

                        ev.data.fd = sockfd; 
                        ev.events = EPOLLIN | EPOLLET; 

                        epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); 
                 } 
            } 
     } 
        
     return 0; 
}
 
 
个人总结:
一般epoll服务器模型基本就是上面那个。有人你加入线程池来进行处理,即在epoll-wait和收发处理分开到两个线程中,这样在处理收发数据时,仍然不影响epoll进行监听。也有人建议不使用线程池,因为线程切换等带来的开销不亚于数据处理。这里本人没有进行测试,不便下结论。
主要先想说的,很多在上述模型之外,还需要注意的就是如何读写的问题。因为上述模型只是小量的数据收发,比较简单。
由于epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。只有当read(2)或者write(2)返回EAGAIN时才需要挂起,等待。那么如何保证读取数据或者是发送数据都已经结束了呢?(即全部读完或全部发送)。
读数据的时候需要考虑的是当read()返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取,直到返回的大小小于请求的大小。也可以在一个whiletrue)循环当中不断的read,直到返回EAGAIN位置。建议使用前一种方法。因为很长时间没有给对方回复后,对方可能会认为此次数据包丢失,接着再次发送同样的数据包,服务器此时可以控制退出此次读取,重新读取。
同理,如果发送端流量大于接收端的流量(意思是epoll所在的程序读比转发的socket要快),由于是非阻塞的socket,那么send()函数虽然返回,但实际缓冲区的数据并未真正发给接收端,这样不断的读和发,当缓冲区满后会产生EAGAIN错误,同时不理会这次请求发送的数据。这时就需要你对send进行一些改动,等待片刻后继续发送,并检查看发送的数据量是否正确。


本文转自jazka 51CTO博客,原文链接:http://blog.51cto.com/jazka/252620,如需转载请自行联系原作者
上一篇:服务器防火墙打开失败,如何开放端口


下一篇:PL/SQL之--触发器