select系统调用
#include <sys/select.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
nfds:是指集合中所有文件描述符的范围,即所有文件描述符的最大值加1
readfds:对应可读的文件符集合,是我们关心的,是否可以从这些文件中读取数据的集合,
若有大于等于一个可读文件,则select会返回大于0的值。若无,则根据timeout判断。
writefds: 对应可写的文件符集合。
exceptfds:对应异常的文件符集合。
fd_set结构如下:可以看出容量是有限的,最大1024,一般通过以下来操作
- FD_CLR(inr fd,fd_set* set):用来清除文件描述符集合set中相关fd的位
- FD_ISSET(int fd,fd_set *set):用来测试文件描述符集合set中相关fd的位是否为真
- FD_SET(int fd,fd_set*set):用来设置文件描述符集合set中相关fd的位
- FD_ZERO(fd_set *set):用来清除文件描述符集合set的全部位
#define __FD_SETSIZE 1024 /* fd_set for select and pselect. */ typedef struct { /* XPG4.2 requires this member name. Otherwise avoid the name from the global namespace. */ #ifdef __USE_XOPEN __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)->fds_bits) #else __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)->__fds_bits) #endif
timeout:超时时间,主要分为三种
- NULL:永远等待下去,仅在有一个描述字准备好I/O时才返回;
- 0:立即返回,仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生;
- 特定的时间值: 如果在指定的时间段里没有事件发生,select将超时返回;
struct timeval { __time_t tv_sec; /* Seconds. */ __suseconds_t tv_usec; /* Microseconds. */ };
return: 有三种情况
- 返回0表示超时了;
- 返回-1,表示出错了;
- 返回一个大于0的数,表示文件描述符状态改变的个数;
demo:
1 #include <sys/types.h> 2 #include <sys/socket.h> 3 #include <netinet/in.h> 4 #include <arpa/inet.h> 5 #include <assert.h> 6 #include <stdio.h> 7 #include <unistd.h> 8 #include <errno.h> 9 #include <string.h> 10 #include <fcntl.h> 11 #include <stdlib.h> 12 13 int main( int argc, char* argv[] ) 14 { 15 if( argc <= 2 ) 16 { 17 printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); 18 return 1; 19 } 20 const char* ip = argv[1]; 21 int port = atoi( argv[2] ); 22 printf( "ip is %s and port is %d\n", ip, port ); 23 24 int ret = 0; 25 struct sockaddr_in address; 26 bzero( &address, sizeof( address ) ); 27 address.sin_family = AF_INET; 28 inet_pton( AF_INET, ip, &address.sin_addr ); 29 address.sin_port = htons( port ); 30 31 int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); 32 assert( listenfd >= 0 ); 33 34 ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); 35 assert( ret != -1 ); 36 37 ret = listen( listenfd, 5 ); 38 assert( ret != -1 ); 39 40 struct sockaddr_in client_address; 41 socklen_t client_addrlength = sizeof( client_address ); 42 int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); 43 if ( connfd < 0 ) 44 { 45 printf( "errno is: %d\n", errno ); 46 close( listenfd ); 47 } 48 49 char remote_addr[INET_ADDRSTRLEN]; 50 printf( "connected with ip: %s and port: %d\n", inet_ntop( AF_INET, &client_address.sin_addr, remote_addr, INET_ADDRSTRLEN ), ntohs( client_address.sin_port ) ); 51 52 char buf[1024]; 53 fd_set read_fds; 54 fd_set exception_fds; 55 56 FD_ZERO( &read_fds ); 57 FD_ZERO( &exception_fds ); 58 59 int nReuseAddr = 1; 60 setsockopt( connfd, SOL_SOCKET, SO_OOBINLINE, &nReuseAddr, sizeof( nReuseAddr ) ); 61 while( 1 ) 62 { 63 memset( buf, ‘\0‘, sizeof( buf ) ); 64 FD_SET( connfd, &read_fds ); 65 FD_SET( connfd, &exception_fds ); 66 67 ret = select( connfd + 1, &read_fds, NULL, &exception_fds, NULL ); 68 printf( "select one\n" ); 69 if ( ret < 0 ) 70 { 71 printf( "selection failure\n" ); 72 break; 73 } 74 75 if ( FD_ISSET( connfd, &read_fds ) ) 76 { 77 ret = recv( connfd, buf, sizeof( buf )-1, 0 ); 78 if( ret <= 0 ) 79 { 80 break; 81 } 82 printf( "get %d bytes of normal data: %s\n", ret, buf ); 83 } 84 else if( FD_ISSET( connfd, &exception_fds ) ) 85 { 86 ret = recv( connfd, buf, sizeof( buf )-1, MSG_OOB ); 87 if( ret <= 0 ) 88 { 89 break; 90 } 91 printf( "get %d bytes of oob data: %s\n", ret, buf ); 92 } 93 94 } 95 96 close( connfd ); 97 close( listenfd ); 98 return 0; 99 }
out:
ip is 127.0.0.1 and port is 1233 connected with ip: 127.0.0.1 and port: 33524 select one get 13 bytes of normal data: 12345678901
poll系统调用
# include <poll.h> int poll ( struct pollfd * fds, unsigned int nfds, int timeout);
fds:需要被监视的文件描述符集合;
nfds:被监视的文件描述符数量;
timeout:超时时间,有三种取值:
- 负数:无限超时,一直等到一个指定事件发生;
- 0:立即返回,并列出准备好的文件描述符;
- 正数:等待指定的时间,单位为毫秒;
poll函数与select函数的最大不同之处在于:select函数有最大文件描述符的限制,一般1024个,而poll函数对文件描述符的数量没有限制。但select和poll函数都是通过轮询的方式来查询某个文件描述符状态是否发生了变化,并且需要将整个文件描述符集合在用户空间和内核空间之间来回拷贝,这样随着文件描述符的数量增加,相应的开销也随之增加。
struct pollfd :
struct pollfd { int fd; /* 文件描述符 */ short events; /* 等待的事件 */ short revents; /* 实际发生了的事件 */ } ;
fd:文件描述符
events:告诉poll监听fd上的哪些事件,它是事件的按位或。
- POLLIN:有数据可读。
- POLLRDNORM:有普通数据可读。
- POLLRDBAND:有优先数据可读。
- POLLPRI:有紧迫数据可读。
- POLLOUT:写数据不会导致阻塞。
- POLLWRNORM:写普通数据不会导致阻塞。
- POLLWRBAND:写优先数据不会导致阻塞。
- POLLMSGSIGPOLL:消息可用。
revents:由内核修改,通知应用程序fd上实际发生了哪些事件。除了event这些,还包括以下:
- POLLER:指定的文件描述符发生错误。
- POLLHUP:指定的文件描述符挂起事件。
- POLLNVAL:指定的文件描述符非法。
epoll系统调用
epoll操作是包含有三个接口的:
epoll_create函数:
#include <sys/epoll.h> int epoll_create(int size);
size:表示监听的数目,并不起作用,只是给内核一个提示。
return:返回一个epoll句柄。
epoll_ctl函数:
#include <sys/epoll.h>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epfd:epoll_create()的返回值;
op:动作,有三种取值:
- EPOLL_CTL_ADD:注册新的fd到epfd中;
- EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
- EPOLL_CTL_DEL:从epfd中删除一个fd;
fd:需要监听的fd;
event: 告诉内核需要监听什么事件,取值有:
- EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
- EPOLLOUT:表示对应的文件描述符可以写;
- EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
- EPOLLERR:表示对应的文件描述符发生错误;
- EPOLLHUP:表示对应的文件描述符被挂断;
- EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列;
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;//保存触发事件的某个文件描述符相关的数据 struct epoll_event { __uint32_t events; /* epoll event */ epoll_data_t data; /* User data variable */ };
enum EPOLL_EVENTS { EPOLLIN = 0x001, #define EPOLLIN EPOLLIN EPOLLPRI = 0x002, #define EPOLLPRI EPOLLPRI EPOLLOUT = 0x004, #define EPOLLOUT EPOLLOUT EPOLLRDNORM = 0x040, #define EPOLLRDNORM EPOLLRDNORM EPOLLRDBAND = 0x080, #define EPOLLRDBAND EPOLLRDBAND EPOLLWRNORM = 0x100, #define EPOLLWRNORM EPOLLWRNORM EPOLLWRBAND = 0x200, #define EPOLLWRBAND EPOLLWRBAND EPOLLMSG = 0x400, #define EPOLLMSG EPOLLMSG EPOLLERR = 0x008, #define EPOLLERR EPOLLERR EPOLLHUP = 0x010, #define EPOLLHUP EPOLLHUP EPOLLRDHUP = 0x2000, #define EPOLLRDHUP EPOLLRDHUP EPOLLWAKEUP = 1u << 29, #define EPOLLWAKEUP EPOLLWAKEUP EPOLLONESHOT = 1u << 30, #define EPOLLONESHOT EPOLLONESHOT EPOLLET = 1u << 31 #define EPOLLET EPOLLET };
epoll_wait函数:
#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
events:从内核得到事件的集合;
maxevents:事件集合的大小;
timeout:超时时间,0会立即返回,-1表示永久阻塞,正数表示一个指定的值;
return:成功时返回就绪的文件描述符的个数,失败时返回-1并设置error;
LT和ET模式
epoll对文件描述符的操作由两种模式:水平触发LT(level trigger)和边沿触发ET(edge trigger)。默认的情况下为LT模式。LT模式与ET模式的区别在于:
- LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
- ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。
ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
我们来看看一个demo看一下区别:
1 #include <sys/types.h> 2 #include <sys/socket.h> 3 #include <netinet/in.h> 4 #include <arpa/inet.h> 5 #include <assert.h> 6 #include <stdio.h> 7 #include <unistd.h> 8 #include <errno.h> 9 #include <string.h> 10 #include <fcntl.h> 11 #include <stdlib.h> 12 #include <sys/epoll.h> 13 #include <pthread.h> 14 15 #define MAX_EVENT_NUMBER 1024 16 #define BUFFER_SIZE 10 17 18 int setnonblocking( int fd ) 19 { 20 int old_option = fcntl( fd, F_GETFL ); 21 int new_option = old_option | O_NONBLOCK; 22 fcntl( fd, F_SETFL, new_option ); 23 return old_option; 24 } 25 26 void addfd( int epollfd, int fd, bool enable_et ) 27 { 28 epoll_event event; 29 event.data.fd = fd; 30 event.events = EPOLLIN; 31 if( enable_et ) 32 { 33 event.events |= EPOLLET; 34 } 35 epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); 36 setnonblocking( fd ); 37 } 38 39 void lt( epoll_event* events, int number, int epollfd, int listenfd ) 40 { 41 char buf[ BUFFER_SIZE ]; 42 for ( int i = 0; i < number; i++ ) 43 { 44 printf( "lt events:%d fd:%d\n", events[i].events, events[i].data.fd); 45 int sockfd = events[i].data.fd; 46 if ( sockfd == listenfd ) 47 { 48 struct sockaddr_in client_address; 49 socklen_t client_addrlength = sizeof( client_address ); 50 int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); 51 addfd( epollfd, connfd, false ); 52 printf( "accept new client:%d\n", connfd); 53 } 54 else if ( events[i].events & EPOLLIN ) 55 { 56 // printf( "event trigger once\n" ); 57 memset( buf, ‘\0‘, BUFFER_SIZE ); 58 int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); 59 if( ret <= 0 ) 60 { 61 close( sockfd ); 62 continue; 63 } 64 printf( "get %d bytes of content: %s\n", ret, buf ); 65 } 66 else 67 { 68 printf( "something else happened \n" ); 69 } 70 } 71 } 72 73 void et( epoll_event* events, int number, int epollfd, int listenfd ) 74 { 75 char buf[ BUFFER_SIZE ]; 76 for ( int i = 0; i < number; i++ ) 77 { 78 printf( "et events:%d fd:%d\n", events[i].events, events[i].data.fd); 79 int sockfd = events[i].data.fd; 80 if ( sockfd == listenfd ) 81 { 82 struct sockaddr_in client_address; 83 socklen_t client_addrlength = sizeof( client_address ); 84 int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); 85 addfd( epollfd, connfd, true ); 86 } 87 else if ( events[i].events & EPOLLIN ) 88 { 89 // printf( "event trigger once\n" ); 90 while( 1 ) 91 { 92 memset( buf, ‘\0‘, BUFFER_SIZE ); 93 int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); 94 if( ret < 0 ) 95 { 96 if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ) 97 { 98 printf( "read later\n" ); 99 break; 100 } 101 close( sockfd ); 102 break; 103 } 104 else if( ret == 0 ) 105 { 106 close( sockfd ); 107 } 108 else 109 { 110 printf( "get %d bytes of content: %s\n", ret, buf ); 111 } 112 } 113 } 114 else 115 { 116 printf( "something else happened \n" ); 117 } 118 } 119 } 120 121 int main( int argc, char* argv[] ) 122 { 123 if( argc <= 2 ) 124 { 125 printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); 126 return 1; 127 } 128 const char* ip = argv[1]; 129 int port = atoi( argv[2] ); 130 131 int ret = 0; 132 struct sockaddr_in address; 133 bzero( &address, sizeof( address ) ); 134 address.sin_family = AF_INET; 135 inet_pton( AF_INET, ip, &address.sin_addr ); 136 address.sin_port = htons( port ); 137 138 int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); 139 assert( listenfd >= 0 ); 140 141 ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); 142 assert( ret != -1 ); 143 144 ret = listen( listenfd, 5 ); 145 assert( ret != -1 ); 146 147 epoll_event events[ MAX_EVENT_NUMBER ]; 148 int epollfd = epoll_create( 5 ); 149 assert( epollfd != -1 ); 150 addfd( epollfd, listenfd, true ); 151 152 while( 1 ) 153 { 154 int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); 155 if ( ret < 0 ) 156 { 157 printf( "epoll failure\n" ); 158 break; 159 } 160 161 // lt( events, ret, epollfd, listenfd ); 162 et( events, ret, epollfd, listenfd ); 163 } 164 165 close( listenfd ); 166 return 0; 167 }
lt out:
lt events:1 fd:3 accept new client:5 lt events:1 fd:5 get 9 bytes of content: 123456789 lt events:1 fd:5 get 4 bytes of content: 01
et out:
et events:1 fd:3 et events:1 fd:5 get 9 bytes of content: 123456789 get 4 bytes of content: 01
可以看到et的效率更高。·
EPOLLONESHOT事件
这个可以保证事件只触发一次,主要用来多线程中防止对同一socket同时操作。比如:
1 #include <sys/types.h> 2 #include <sys/socket.h> 3 #include <netinet/in.h> 4 #include <arpa/inet.h> 5 #include <assert.h> 6 #include <stdio.h> 7 #include <unistd.h> 8 #include <errno.h> 9 #include <string.h> 10 #include <fcntl.h> 11 #include <stdlib.h> 12 #include <sys/epoll.h> 13 #include <pthread.h> 14 15 #define MAX_EVENT_NUMBER 1024 16 #define BUFFER_SIZE 1024 17 struct fds 18 { 19 int epollfd; 20 int sockfd; 21 }; 22 23 int setnonblocking( int fd ) 24 { 25 int old_option = fcntl( fd, F_GETFL ); 26 int new_option = old_option | O_NONBLOCK; 27 fcntl( fd, F_SETFL, new_option ); 28 return old_option; 29 } 30 31 void addfd( int epollfd, int fd, bool oneshot ) 32 { 33 epoll_event event; 34 event.data.fd = fd; 35 event.events = EPOLLIN | EPOLLET; 36 if( oneshot ) 37 { 38 event.events |= EPOLLONESHOT; 39 } 40 epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); 41 setnonblocking( fd ); 42 } 43 44 void reset_oneshot( int epollfd, int fd ) 45 { 46 epoll_event event; 47 event.data.fd = fd; 48 event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; 49 epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event ); 50 } 51 52 void* worker( void* arg ) 53 { 54 int sockfd = ( (fds*)arg )->sockfd; 55 int epollfd = ( (fds*)arg )->epollfd; 56 printf( "start new thread to receive data on fd: %d\n", sockfd ); 57 char buf[ BUFFER_SIZE ]; 58 memset( buf, ‘\0‘, BUFFER_SIZE ); 59 while( 1 ) 60 { 61 int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); 62 if( ret == 0 ) 63 { 64 close( sockfd ); 65 printf( "foreiner closed the connection\n" ); 66 break; 67 } 68 else if( ret < 0 ) 69 { 70 if( errno == EAGAIN ) 71 { 72 reset_oneshot( epollfd, sockfd ); 73 printf( "read later\n" ); 74 break; 75 } 76 } 77 else 78 { 79 printf( "get content: %s\n", buf ); 80 sleep( 5 ); 81 } 82 } 83 printf( "end thread receiving data on fd: %d\n", sockfd ); 84 } 85 86 int main( int argc, char* argv[] ) 87 { 88 if( argc <= 2 ) 89 { 90 printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); 91 return 1; 92 } 93 const char* ip = argv[1]; 94 int port = atoi( argv[2] ); 95 96 int ret = 0; 97 struct sockaddr_in address; 98 bzero( &address, sizeof( address ) ); 99 address.sin_family = AF_INET; 100 inet_pton( AF_INET, ip, &address.sin_addr ); 101 address.sin_port = htons( port ); 102 103 int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); 104 assert( listenfd >= 0 ); 105 106 ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); 107 assert( ret != -1 ); 108 109 ret = listen( listenfd, 5 ); 110 assert( ret != -1 ); 111 112 epoll_event events[ MAX_EVENT_NUMBER ]; 113 int epollfd = epoll_create( 5 ); 114 assert( epollfd != -1 ); 115 addfd( epollfd, listenfd, false ); 116 117 while( 1 ) 118 { 119 int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); 120 if ( ret < 0 ) 121 { 122 printf( "epoll failure\n" ); 123 break; 124 } 125 126 for ( int i = 0; i < ret; i++ ) 127 { 128 int sockfd = events[i].data.fd; 129 if ( sockfd == listenfd ) 130 { 131 struct sockaddr_in client_address; 132 socklen_t client_addrlength = sizeof( client_address ); 133 int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); 134 addfd( epollfd, connfd, true ); 135 } 136 else if ( events[i].events & EPOLLIN ) 137 { 138 pthread_t thread; 139 fds fds_for_new_worker; 140 fds_for_new_worker.epollfd = epollfd; 141 fds_for_new_worker.sockfd = sockfd; 142 pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker ); 143 } 144 else 145 { 146 printf( "something else happened \n" ); 147 } 148 } 149 } 150 151 close( listenfd ); 152 return 0; 153 }
out:
start new thread to receive data on fd: 5 get content: 12345678901 read later end thread receiving data on fd: 5
三组IO复用区别:
区别:
1、select和poll只能在LT模式下;
2、epoll可以在工作高效的ET模式下。epoll支持EPOLLONESHOT事件,该事件可以进一步减少可读、可写和异常事件被触发的次数。
实现原理:
select和poll,采用轮询的方式。每次调用都要扫描整个注册文件描述符集合,并将就绪的文件描述符返回给用户程序,故检测就绪事件O(n);
epoll_wait,采用回调函数的方法。内核检测到就绪文件描述符时将触发回调函数,回调函数将该文件描述符上对应的事件插入就绪事件队列,内核最后在适应的时机将该就绪事件队列中的内容拷贝到用户空间->epoll_wait无需轮询整个文件描述符的集合,来检测哪些事件已经就绪了。O(1)
适用情况:
当活动连接比较多的时候(回调函数被触发的过于频繁),epoll_wait的效率未必有select和poll高。
epoll_wait适用于连接数量较多,但活动连接较少的情况