Linux中的IO复用接口简介(文件监视?)

I/O复用是Linux中的I/O模型之一。所谓I/O复用,指的是进程预先告诉内核,使得内核一旦发现进程指定的一个或多个I/O条件就绪,就通知进程进行处理,从而不会在单个I/O上导致阻塞。

在Linux中,提供了select、poll、epoll三类接口来实现I/O复用。

select函数接口

select中主要就是一个select函数,用于监听指定事件的发生,原型如下:

1
2
3
4
5
#include<sys/select.h>
#include<sys/time.h>
int select(int maxfd, fd_set *rset, fd_set *wset, fd_set *eset,
const struct timeval *timeout);
//返回:若有就绪描述符则返回其数目,超时返回0,出错返回-1

其中各参数的含义如下:
maxfd:最大文件描述符加1,比它小的从0开始的描述符都将被监视,它的值不能超过系统中定义的FD_SETSIZE(通常是1024)。

rset,wset,eset:分别表示监视的读、写、错误的描述符位数组,通常是一个整数数组,每一个整数可以表示32个描述符是否被监视。需要注意的是这几个参数都是值-结果参数,在调用select后这几个参数将表示哪些描述符就绪了。通过以下几个宏可以很方便的操作fset数组:

1
2
3
4
5
6
7
8
void FD_ZERO(fd_set *fdset);
//将一个fdset清空
void FD_SET(int fd, fd_set *fdset);
//将某个fd对应在该fd_set里的那一位打开
void FD_CLR(int fd, fd_set *fdset);
//将某个fd对应在该fd_set里的那一位关闭
void FD_ISSET(int fd, fd_set *fdset);
//检测某个fd_set里对应fd的那一位是否打开

timeout:超时时间,即select最长等待多久就返回,为NULL时表示等到有操作符准备就绪后才返回。该时间可以精确到微秒,其结构如下:

1
2
3
4
struct timeval{
long tv_sec;//秒数
long tv_usec;//微妙数
}

描述符就绪条件
对于普通数据的读写,描述符就绪显而易见,但仍有一些特殊情况时描述符会读写就绪,UNP中对描述符的读写就绪条件进行了说明。

1)满足以下4个条件时,描述符准备好读
a)套接字接收缓冲区中的数据字节数大于套接字接收缓冲区低水位标记的当前大小(默认为1),读将会返回大于0的数。
b)该连接的读半部关闭,读将会返回0。
c)套接字上有一个错误待处理,读将返回-1。
d)该套接字是一个监听套接字并且已完成连接数不为0。

2)满足以下4个条件时,描述符准备好写
a)套接字发送缓冲区中的可用空间字节数大于等于套接字发送缓冲区低水位标记的当前大小(默认2048),写将会返回大于0的数。
b)该连接的写半部关闭,写将会返回EPIPE。
c)套接字上有一个错误待处理,写将返回-1。
d)使用非阻塞式connect的套接字建立有结果返回。

poll函数接口

poll中的主要函数也只有一个poll,与select作用类似,但参数有所不同,函数原型如下:

1
2
3
#include<poll.h>
int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);
//返回:若有就绪描述符则返回其数目,超时返回0,出错返回-1

其中各参数的含义如下:
fdarray:是一个指向pollfd结构数组的指针,维护着描述符以及事件信息,该结构体是poll里比较核心的结构体,结构如下:

1
2
3
4
5
struct pollfd{
int fd; //描述符
short events; //关注的事件
short revents; //发生的事件
}

该结构体通过两个变量区分关注的事件和发生的事件,从而避免了使用值-结果参数。events和revents可选的标志位如下:

1
2
3
4
5
6
7
8
9
10
11
12
POLLIN			//普通或优先级带数据可读
POLLRDNORM //普通数据可读
POLLRDBAND //优先级带数据可读
POLLPRI //高优先级数据可读
POLLOUT //普通数据可写
POLLWRNORM //普通数据可写
POLLWRBAND //优先级带数据可写
POLLERR //发生错误
POLLHUP //发生挂起
POLLINVAL //描述符不是一个打开的文件
//其中POLLERR,POLLHUP,POLLINVAL仅作为reventes的标志位
//优先级带数据主要是指TCP的带外数据,其它大部分数据都是普通数据。

nfds:指定结构体数组中元素的个数。
timeout:每次调用poll最大等待的毫秒数,负值代表等待到直到有事件触发。

epoll函数接口

epoll主要有三个函数,函数原型如下:

1
2
3
4
5
6
7
#include <sys/epoll.h>
int epoll_create(int size);
//创建一个epoll句柄
int epoll_ctl(int efd, int op, int fd, struct epoll_event *event);
//注册一个epoll事件
int epoll_wait(int efd, struct epoll_event *events, int maxevents, int timeout);
//等待事件发生

epoll_create(int size)

size:能监听多少个描述符,返回一个epoll描述符。注意使用完epoll后要关闭该描述符。

epoll_ctl(int efd, int op, int fd, struct epoll_event *event)

efd:epoll_create返回的epoll描述符
op:表示动作,可以在以下三个宏里选择一个

1
2
3
EPOLL_CTL_ADD	//注册新的fd到epoll中  
EPOLL_CTL_MOD //修改已经注册的fd的监听事件
EPOLL_CTL_DEL //从epoll中删除一个fd

fd:要监听的fd
event:告诉内核要监听什么事件,其结构如下:

1
2
3
4
5
6
7
8
9
10
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; //epoll事件
epoll_data_t data; //用于储存epoll描述符信息
};

其中events表示epoll事件,可选的标志位如下:

1
2
3
4
5
6
7
8
EPOLLIN		//描述符可以读
EPOLLOUT //描述符可以写
EPOLLPRI //描述符有优先数据可读(带外数据)
EPOLLERR //文件描述符发生错误;
EPOLLHUP //文件描述符被挂断;
EPOLLET //将EPOLL设为边缘触发(Edge Triggered)模式,默认是水平触发(Level Triggered)。
EPOLLRDHUP //对端关闭连接
EPOLLONESHOT//只监听一次事件,事件发生后该描述符的其他信息将不被提示。

而epoll_data_t使用了union来存储数据,用户可以使用data来存放一些关于该fd的额外内容。

标志位中比较特殊的是EPOLLET这个选项,这个选项将EPOLL设置为边缘触发模式,EPOLL有EPOLLET和EPOLLLT两种工作模式。
EPOLLLT(Level Triggered,水平触发模式):默认工作模式,支持block和no-block socket,内核通知你描述符事件后,如果不进行操作,会一直通知。
EPOLLET(Edge Triggered,边缘触发模式):高速工作模式,只支持no-block socket,只会在描述符状态由未就绪转为就绪时会通知一次,使用该模式时,如果程序编写的不够健全,是很容易出现问题的。

epoll_wait(int efd, struct epoll_event *events, int maxevents, int timeout);

该函数与select和poll函数的功能类似,监视指定事件的发生并返回给用户。
efd:epoll_create返回的opoll描述符。
events:用来从内核得到事件的集合。
maxevents:用来告知内核events数组的大小。
timeout:超时时间,-1将阻塞直到有事件发生,否则表示最多等待多少毫秒后函数就返回。

select,poll,epoll比较

select

  • select能监控的描述符个数由内核中的FD_SETSIZE限制,仅为1024,这也是select最大的缺点,因为现在的服务器并发量远远不止1024。即使能重新编译内核改变FD_SETSIZE的值,但这并不能提高select的性能。
  • 每次调用select都会线性扫描所有描述符的状态,在select结束后,用户也要线性扫描fd_set数组才知道哪些描述符准备就绪,等于说每次调用复杂度都是O(n)的,在并发量大的情况下,每次扫描都是相当耗时的,很有可能有未处理的连接等待超时。
  • 每次调用select都要在用户空间和内核空间里进行内存复制fd描述符等信息。

poll

  • poll使用pollfd结构来存储fd,突破了select中描述符数目的限制。
  • 与select的后两点类似,poll仍然需要将pollfd数组拷贝到内核空间,之后依次扫描fd的状态,整体复杂度依然是O(n)的,在并发量大的情况下服务器性能会快速下降。

epoll

  • epoll维护的描述符数目不受到限制,而且性能不会随着描述符数目的增加而下降。
  • 服务器的特点是经常维护着大量连接,但其中某一时刻读写的操作符数量却不多。epoll先通过epoll_ctl注册一个描述符到内核中,并一直维护着而不像poll每次操作都将所有要监控的描述符传递给内核;在描述符读写就绪时,通过回掉函数将自己加入就绪队列中,之后epoll_wait返回该就绪队列。也就是说,epoll基本不做无用的操作,时间复杂度仅与活跃的客户端数有关,而不会随着描述符数目的增加而下降。
  • epoll在传递内核与用户空间的消息时使用了内存共享,而不是内存拷贝,这也使得epoll的效率比poll和select更高。

程序示例

分别使用select,poll和epoll实现了简单的回显服务器程序,客户端使用select来实现。其中select和poll程序主要参考unp的实现,只是Demo程序,对一些异常情况没有进行处理。

客户端程序

使用select来监听终端输入和连接服务器的流输入,这样可以保证客户端不在某一个输入流上死等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <stdio.h>
#include <string.h>
#define MAXLINE 4096 in_port_t SERV_PORT = 8888;
//char *addr = "192.168.0.231";
char *addr = "127.0.0.1";
void str_cli(FILE *fp, int sockfd);
int main(int argc ,char *argv[]) {
int sockfd;
struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof servaddr);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET, addr, &servaddr.sin_addr); sockfd = socket(AF_INET, SOCK_STREAM, 0);
connect(sockfd, (const struct sockaddr *)&servaddr, sizeof servaddr); str_cli(stdin, sockfd);
}
void str_cli(FILE *fp, int sockfd) {
int maxfd, stdineof, n;
fd_set rset;
char buf[MAXLINE];
FD_ZERO(&rset);
stdineof = 0;
for (;;) {
//如果不是已经输入结束,就继续监听终端输入
if (stdineof == 0) FD_SET(fileno(fp), &rset);
//监听来自服务器的信息
FD_SET(sockfd, &rset);
//maxfd设置为sockfd和stdin中较大的一个加1
maxfd = (fileno(fp) > sockfd ? fileno(fp) : sockfd) + 1;
//只关心是否有描述符读就绪,其他几个直接传NULL即可
select(maxfd, &rset, NULL, NULL, NULL); //如果有来自服务器的信息可读
if (FD_ISSET(sockfd, &rset)) {
if ((n = read(sockfd, buf, MAXLINE)) == 0) {
//如果这边输入了EOF之后服务器close掉连接说明正常结束,否则为异常结束
if (stdineof == 1)
return;
else
perror("terminated error\n");
}
//输出到终端
write(fileno(stdout), buf, n);
}
//如果有来自终端的输入
if (FD_ISSET(fileno(fp), &rset)) {
//终端这边输入了结束符
if ((n = read(fileno(fp), buf, MAXLINE)) == 0) {
//标记已经输入完毕,并只单端关闭写,因为可能还有消息在来客户端的路上尚未处理
stdineof = 1;
shutdown(sockfd, SHUT_WR);
//不再监听终端输入
FD_CLR(fileno(fp), &rset);
continue;
}
//将输入信息发送给服务器
write(sockfd, buf, n);
}
}
}

select服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <stdio.h>
#include <string.h>
#define MAXLINE 4096 in_port_t SERV_PORT = 8888; int main(int argc ,char *argv[]){
int i;
int listenfd, connfd, sockfd;
int maxfd, maxi, nready, client[FD_SETSIZE];
char buf[MAXLINE];
struct sockaddr_in cliaddr, servaddr;
socklen_t clilen;
ssize_t n;
fd_set rset, allset; listenfd = socket(AF_INET, SOCK_STREAM, 0); memset(&servaddr, 0, sizeof servaddr);
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT); if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof servaddr))
printf("bind error\n"); listen(listenfd, 1024); //客户端描述符存储在client中,maxi表示该数组最大的存有客户端描述符的数组下标
maxfd = listenfd;
maxi = -1;
memset(client, -1, sizeof client);
//初始化读就绪的fd_set数组,并监听listen描述符
FD_ZERO(&allset);
FD_SET(listenfd, &allset); for (;;) {
//allset是监控的描述符列表,rset是可读描述符列表
rset = allset;
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
//如果listen描述符可读,说明有客户端连接
if (FD_ISSET(listenfd, &rset)) {
clilen = sizeof cliaddr;
connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen);
if (connfd == -1) perror("accept error\n");
else printf("%d accepted!\n", connfd); //扫描client数组,找到下标最小的未用的来存客户端描述符
for (i = 0; i < FD_SETSIZE; i++) if (client[i] < 0) {
client[i] = connfd;
break;
}
if (i == FD_SETSIZE) perror("too many clients\n");
//将客户端描述符放到监视的fd_set中,并更新maxfd和maxi
FD_SET(connfd, &allset);
if (connfd > maxfd) maxfd = connfd;
if (i > maxi) maxi = i;
if (--nready <= 0) continue;
}
//扫描所有的客户端,查看是否有描述符读就绪
for (i = 0; i <= maxi; i++) {
if ((sockfd = client[i]) < 0) continue;
if (FD_ISSET(sockfd, &rset)) {
//读到EOF或错误,清除该描述符
if ((n = read(sockfd, buf, MAXLINE)) <= 0) {
close(sockfd);
FD_CLR(sockfd, &allset);
client[i] = -1;
if (n < 0) perror("read error\n");
//回显给客户端
} else {
write(sockfd, buf, n);
}
if (--nready <= 0) break;
}
}
}
return 0;
}

poll服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <stdio.h>
#include <string.h>
#include <limits.h>
#include <poll.h>
#include <errno.h>
#define MAXLINE 4096 #ifndef OPEN_MAX
#define OPEN_MAX 1024
#endif in_port_t SERV_PORT = 8888; int main(int argc ,char *argv[]){
int i, maxi;
int listenfd, connfd, sockfd;
int nready;
char buf[MAXLINE];
struct pollfd client[OPEN_MAX];
struct sockaddr_in cliaddr, servaddr;
socklen_t clilen;
ssize_t n; listenfd = socket(AF_INET, SOCK_STREAM, 0); memset(&servaddr, 0, sizeof servaddr);
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT); if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof servaddr))
perror("bind error\n"); listen(listenfd, 1024);
//client保存了pull监听的描述符,其中client[0]是给listen描述符的
client[0].fd = listenfd;
client[0].events = POLLRDNORM;
for (i = 1; i < OPEN_MAX; i++)
client[i].fd = -1;
maxi = 0; for (;;) {
nready = poll(client, maxi+1, -1);
//如果是监听描述符可读,说明有客户端连入
if (client[0].revents & POLLRDNORM) {
clilen = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen);
if (connfd == -1) perror("accept error\n");
else printf("%d accepted!\n", connfd); //扫描clien数组,找到下标最小的未用的来存客户端描述符
for (i = 1; i < OPEN_MAX; i++) {
if (client[i].fd < 0) {
client[i].fd = connfd;
break;
}
}
if (i == OPEN_MAX) perror("too many clients.\n");
client[i].events = POLLRDNORM;
if (i > maxi) maxi = i;
if (--nready <= 0) continue;
}
//扫描所有的客户端描述符
for (i = 1; i <= maxi; i++) {
if ((sockfd = client[i].fd) < 0) continue;
//POLLERR不需要监听,如果有错误的话poll返回时会自动加上
if (client[i].revents & (POLLRDNORM | POLLERR)) {
//读到EOF或错误关闭描述符
if ((n = read(sockfd, buf, MAXLINE)) <= 0) {
close(sockfd);
client[i].fd = -1;
if (n < 0) perror("read error\n");
//回显给客户端
} else {
write(sockfd, buf, n);
}
if (--nready <= 0) break;
}
}
}
return 0;
}

epoll服务器

回显服务器使用了ET高速模式。在该模式下,最好所有的操作都是非阻塞的,程序中套接字都设置为了non-socket,并且使用了缓冲区,在读到数据时先将数据存到缓冲区中,下次可写时才将数据从缓冲区写回客户端。
另外,在ET模式下,accept、read、write时都要使用循环直到读到EAGAIN才能说明没有数据了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h> #ifndef OPEN_MAX
#define OPEN_MAX 1024
#endif #define SERV_PORT 8888
#define MAXLINE 4096
#define EVENT_MAX 20 struct event_data{
int fd;
int offset;
char buf[MAXLINE];
}event_d[OPEN_MAX]; void set_event_d(int fd, struct epoll_event *evt, struct event_data *met){
met->fd = fd;
met->offset = 0;
memset(&met->buf, 0, sizeof met->buf);
evt->data.ptr = met;
} int main(int argc, char *argv[]){
int listenfd, connfd, epfd;
char buf[MAXLINE]; struct sockaddr_in servaddr, cliaddr;
int i, j, nready;
socklen_t clilen;
ssize_t n, wpos;
struct epoll_event evt, evts[EVENT_MAX]; listenfd = socket(AF_INET, SOCK_STREAM, 0);
//设置lisenfd非阻塞
fcntl(listenfd, F_SETFL, O_NONBLOCK);
memset(&servaddr, 0, sizeof servaddr);
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
bind(listenfd, (struct sockaddr*)&servaddr, sizeof servaddr);
listen(listenfd, 1024); //创建epoll描述符
epfd = epoll_create(OPEN_MAX); //将listen描述符加入到epoll中
set_event_d(listenfd, &evt, &event_d[0]);
evt.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &evt); //event_d中的fd为-1表示没有使用,0用来存listenfd,其它用来存客户端fd
for (i = 1; i < OPEN_MAX; i++) event_d[i].fd = -1; for (;;) {
nready = epoll_wait(epfd, evts, EVENT_MAX, -1);
for (i = 0; i < nready; i++) {
//读取存储描述符信息的指针
struct event_data *ed = (struct event_data*)evts[i].data.ptr; //accept
if (ed->fd == listenfd) {
//ET模式下存在多个client connect只通知一次的情况,需要循环accept直到读到EAGAIN
for(;;) {
clilen = sizeof cliaddr;
connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen);
if (connfd == -1) {
if (errno == EAGAIN) break;
else perror("accept error");
} else {
printf("a client connected! fd: %d\n", connfd);
}
//找到可用的event_d来存放event.data
for (j = 1; j < OPEN_MAX; j++) {
if (event_d[j].fd == -1) break;
}
if (j == OPEN_MAX) {
perror("too many clients");
break;
}
//设置客户端fd非阻塞
fcntl(connfd, F_SETFL, O_NONBLOCK);
set_event_d(connfd, &evt, &event_d[j]);
evt.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &evt);
}
//read
} else if(evts[i].events & EPOLLIN){
//ET模式,重复读直到EAGAIN说明无数据可读或者读到错误及EOF
for (;ed->offset < MAXLINE;) {
n = read(ed->fd, ed->buf + ed->offset, MAXLINE - ed->offset); if (n <= 0) {
if (errno == EAGAIN) break;
if (errno == EINTR) continue;
close(ed->fd);
ed->fd = -1;
break;
}
ed->offset += n;
}
//修改为监听描述符写就绪
evt.events = EPOLLOUT | EPOLLET;
evt.data.ptr = ed;
epoll_ctl(epfd, EPOLL_CTL_MOD, ed->fd, &evt);
//write
} else if(evts[i].events & EPOLLOUT){
wpos = 0;
//ET模式下,重复写直到无数据可发或者EAGAIN
for (;wpos < ed->offset;) {
n = write(ed->fd, ed->buf + wpos, ed->offset - wpos);
if (n < 0) {
if (errno == EAGAIN) break;
if (errno == EINTR) continue;
close(ed->fd);
ed->fd = -1;
break;
}
wpos += n;
}
ed->offset = 0;
//修改为监听描述符读就绪
evt.events = EPOLLIN | EPOLLET;
evt.data.ptr = ed;
epoll_ctl(epfd, EPOLL_CTL_MOD, ed->fd, &evt);
}
}
}
return 0;
}

本文标题:Linux中的IO复用接口简介

文章作者:Vimer Su

发布时间:2013年11月19日 - 19时56分

最后更新:2016年06月04日 - 22时57分

原始链接:http://vimersu.win/blog/2013/11/19/linux-io-reuse-interface/

许可协议: "署名-非商用-相同方式共享 3.0" 转载请保留原文链接及作者。

上一篇:[转]MySQL去除查询结果重复值


下一篇:【Linux命令】id,usermod用户管理命令(包括/etc/passwd、shadow、group、gshadow文件)