Linux 网络编程的5种IO模型:多路复用(select/poll/epoll)

Linux 网络编程的5种IO模型:多路复用(select/poll/epoll)

背景

我们在上一讲 Linux 网络编程的5种IO模型:阻塞IO与非阻塞IO中,对于其中的 阻塞/非阻塞IO 进行了说明。

这一讲我们来看 多路复用机制。

IO复用模型 ( I/O multiplexing )

所谓I/O多路复用机制,就是说通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。这种机制的使用需要额外的功能来配合: select、poll、epoll

select、poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的。

select时间复杂度O(n)

它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长。

poll时间复杂度O(n)

poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态, 但是它没有最大连接数的限制,原因是它是基于链表来存储的.

epoll时间复杂度O(1)

epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll会把哪个流发生了怎样的I/O事件通知我们。所以我们说epoll实际上是事件驱动(每个事件关联上fd)的,此时我们对这些流的操作都是有意义的。(复杂度降低到了O(1))

在多路复用IO模型中,会有一个内核线程不断去轮询多个socket的状态,只有当真正读写事件发生时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有在真正有读写事件进行时,才会使用IO资源,所以它大大减少了资源占用。

select

使用select来监视文件描述符时,要向内核传递的信息包括:

1、我们要监视的文件描述符个数

2、每个文件描述符,我们可以监视它的一种或多种状态,包括:可读,可写,发生异常三种。

3、要等待的时间,监视是一个过程,我们希望内核监视多长时间,然后返回给我们监视结果呢?(可以永远等待,等待一段时间,或者不等待直接返回)

4、监视结果包括:准备好了的文件描述符个数,对于读,写,异常,分别是哪儿个文件描述符准备好了。

fd_set 模型的原理:(理解select模型的关键在于理解fd_set,假设取fd_set长度为1字节):

fd_set中的每一位可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd。

执行FD_ZERO(&set);则set用位表示是0000,0000。

fd=5,执行FD_SET(fd,&set);后set变为0001,0000(第5位置为1)

若再加入fd=2,fd=1,则set变为0001,0011

执行select(6,&set,0,0,0)阻塞等待...

fd=1,fd=2上都发生可读事件,则select返回,此时set变为0000,0011。注意:没有事件发生的fd=5被清空。

基于上面的讨论,可以轻松得出select模型的特点:

(1)可监控的文件描述符个数取决与sizeof(fd_set)的值。我这边服务器上sizeof(fd_set)=512,每bit表示一个文件描述符,则我服务器上支持的最大文件描述符是512*8=4096。据说可调,另有说虽然可调,但调整上限受于编译内核时的变量值。

(2)将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd,一是用于在select返回后,array作为源数据和fd_set进行FD_ISSET判断。二是select返回后会把以前加入的但并无事件发生的fd清空,则每次开始 select前都要重新从array取得fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数。

(3)可见select模型必须在select前循环array(加fd,取maxfd),select返回后循环array(FD_ISSET判断是否有事件发生)。

select 实现原理

1、使用copy_from_user从用户空间拷贝fd_set到内核空间

2、注册回调函数__pollwait

3、遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_pollsock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll

4、以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。

5、__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意:把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。

6、poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。

7、如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。

8、把fd_set从内核空间拷贝到用户空间。

select的几大缺点

1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大

2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

3)select支持的文件描述符数量太小了,默认是1024

%% 时序图
sequenceDiagram
title : 多路复用
participant application
participant kernel

Note right of application: 应用程序调用系统调用

application ->> kernel: select
kernel ->> kernel: 数据未就绪
kernel ->> kernel: 等待数据准备好
kernel ->> application: 返回可读条件
application ->> kernel: recvfrom
kernel ->> kernel: 准备好数据,拷贝到用户空间
kernel ->> application: 拷贝完成,返回成功

/* According to POSIX.1-2001, POSIX.1-2008 */
#include <sys/select.h> /* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout); void FD_CLR(int fd, fd_set *set); //清除某一个被监视的文件描述符。
int FD_ISSET(int fd, fd_set *set); //测试一个文件描述符是否是集合中的一员
void FD_SET(int fd, fd_set *set); //添加一个文件描述符,将set中的某一位设置成1;
void FD_ZERO(fd_set *set); //清空集合中的文件描述符,将每一位都设置为0; #include <sys/select.h> int pselect(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, const struct timespec *timeout,
const sigset_t *sigmask); struct timeval{
long tv_sec; //秒
long tv_usec;//微秒
} struct timespec{
time_t tv_sec;//秒
long tv_nsec;//纳秒
}

select和pselect有三个主要的区别:

1、select超时使用的是struct timeval,用秒和微秒计时,而pselect使用struct timespec ,用秒和纳秒。

2、select会更新超时参数timeout 以指示还剩下多少时间,pselect不会。

3、select没有sigmask参数.

  • sigmask:这个参数保存了一组内核应该打开的信号(即:从调用线程的信号掩码中删除)

  • 当pselect的sigmask 为 NULL时pselect和select一样;当sigmask!=NULL时,等效于以下原子操作:

sigset_t origmask;
sigprocmask(SIG_SETMASK, &sigmask, &origmask);
ready = select(nfds, &readfds, &writefds, &exceptfds, timeout);
sigprocmask(SIG_SETMASK, &origmask, NULL);

接收信号的程序通常只使用信号处理程序来引发全局标志。全局标志将指示事件必须被处理。在程序的主循环中。一个信号将导致select和pselect返回-1 并将erron=EINTR。

我们经常要在主循环中处理信号,主循环的某个位置将会检查全局标志,那么我们会问:如果信号在条件之后,select之前到达怎么办。答案是select会无限期阻塞。

这种情况很少见,但是这就是为什么出现了pselect。因为他是类似原子操作的。

描述: 允许程序监视多个文件描述符,等待所监视的一个或者多个文件描述符变为“准备好”的状态。所谓的”准备好“状态是指:文件描述符不再是阻塞状态,可以用于某类IO操作了,包括可读,可写,发生异常三种 。

参数说明

nfds: 一个整数值, 表示集合中所有文件描述符的范围,即所有文件描述符的最大值+1。

注意, 待测试的描述集总是从0, 1, 2, …开始的。 所以, 假如你要检测的描述符为8, 9, 10, 那么系统实际也要监测0, 1, 2, 3, 4, 5, 6, 7, 此时真正待测试的描述符的个数为11个, 也就是max(8, 9, 10) + 1

readfds/writefds/exceptfds:这些都是fd_set类型的,代表文件描述符集合; 可以认为一个fd_set变量是由很多个二进制构成的数组,每一位表示一个文件描述符是否需要监视。

  • readfds:监视文件描述符的一个集合,我们监视其中的文件描述符是不是可读,或者更准确的说,读取是不是不阻塞了。

  • writefds:监视文件描述符的一个集合,我们监视其中的文件描述符是不是可写,或者更准确的说,写入是不是不阻塞了。

  • exceptfds:用来监视发生错误异常文件

timeout: 表示select返回之前的时间上限。 设为0(NULL),代表无期限等待下去。 这个等待可以被一个信号中断,只有当一个描述符准备好,或者捕获到一个信号时函数才会返回。如果是捕获到信号,select返回-1,并将变量errno设置成EINTR。

如果timeout ->tv_sec 为0 且 timeout->tv_sec 为0 ,不等待直接返回,加入的描述符都会被测试,并且返回满足要求的描述符个数,这种方法通过轮询,无阻塞地获得了多个文件描述符状态。

如果timeout->tv_sec!=0 || timeout->tv_sec!=0 ,等待指定的时间。当有描述符复合条件或者超过超时时间的话,函数返回。等待总是会被信号中断。

返回值

成功时:返回三种描述符集合中”准备好了“的文件描述符数量。

超时:返回0

错误:返回-1,并设置 errno

  • EBADF:集合中包含无效的文件描述符。(文件描述符已经关闭了,或者文件描述符上已经有错误了)。
  • EINTR:捕获到一个信号。
  • EINVAL:nfds是负的或者timeout中包含的值无效。
  • ENOMEM:无法为内部表分配内存。

例程:基于 select的 TCP 服务器

server.c

/*
# Copyright By Schips, All Rights Reserved
# https://gitee.com/schips/
#
# File Name: server.c
# Created : Sat 25 Mar 2020 14:43:39 PM CST
*/ #include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h> typedef struct _info {
char name[10];
char text[54];
}info; int main(int argc, char *argv[])
{
int my_socket;
unsigned int len;
int ret, i, j; // 创建套接字
my_socket = socket(AF_INET, SOCK_STREAM, 0); // IPV4, TCP socket
if(my_socket == -1) { perror("Socket"); }
printf("Creat a socket :[%d]\n", my_socket); // 用于接收消息
info buf ={0}; // 指定地址
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET; // 地址协议族
addr.sin_addr.s_addr = inet_addr("127.0.0.1"); //指定 IP地址
addr.sin_port = htons(12345); //指定端口号 int set = 1;
int get = 0;
int getlen = 0; // 服务器 绑定
bind(my_socket, (struct sockaddr *)&addr, sizeof(addr)); ret = listen(my_socket, 10);
if(-1 == ret) { perror("listen"); }
printf("Listening\n"); int connect_sockets[100] = {0}; // 我们规定,为 0 的成员为无效socket
int connected_cnt = 0; //struct sockaddr_in new = {0};
//int new_addr_size = {0}; fd_set read_sets; int max_fd = my_socket; // 一开始时,只有 一个新的 文件描述:my_socket ,所以它是最大的 while(1) // 在循环中等待连接请求
{
FD_ZERO(&read_sets); // 每次都需要初始化
FD_SET(my_socket, &read_sets); // 添加 要监听的 socket // 添加 之后经过 connect 过来的 套接字数组(一般在第一次循环时是空的)
for( i = 0; i < connected_cnt; i++)
{
if(connect_sockets[i])
{
FD_SET(connect_sockets[i], &read_sets); // 添加经过accept保存下来,需要进行读响应的套接字到集合中
}
} // 设置监听超时时间
// timeout.tv_sec = 2;
// timeout.tv_usec = 0; ret = select(max_fd + 1, &read_sets, NULL, NULL, NULL);
// 判断返回值
switch (ret) {
case 0 :
printf("Time out.\n"); // 监听超时
break;
case -1 :
printf("Err occurs.\n"); // 监听错误
break;
default :
if(FD_ISSET(my_socket, &read_sets)) //这个是原的被动socket,如果是它,则 意味着有新的连接进来了
{
connect_sockets[connected_cnt] = accept(my_socket, NULL, NULL);
max_fd = connect_sockets[connected_cnt];
printf("New socket is %d\n", connect_sockets[connected_cnt]);
connected_cnt ++;
printf("Now we has [%d] connecter\n", connected_cnt);
}else{ // 如果不是 被动socket,那么就意味着是 现有的连接 有消息发来(我们有数据可读)
printf("New message came in.\n"); // 求出是那个文件描述符可读
for(i = 0; i < connected_cnt; i++)
{
if(FD_ISSET(connect_sockets[i], &read_sets) == 1) break;
}
if( i >= connected_cnt) { continue; } printf("Socket [%d] send to server.\n", connect_sockets[i]); // 接收消息
ret = recv(connect_sockets[i], &buf, sizeof(buf), 0);
if( ret <= 0 )
{
// 远程客户端断开处理(如果不处理,会导致服务器也断开)
printf("[%d]/[%d] Client [%d] disconnected.\n", i+1, connected_cnt, connect_sockets[i]);
close(connect_sockets[i]);
// 我们需要将对应的客户端从数组中移除 且 连接数 -1 (移除的方法: 数组成员前移覆盖)
for (j = i; j < connected_cnt - 1; ++j)
{
connect_sockets[j] = connect_sockets[ j + 1];
}
connected_cnt --;
} // 打印消息
printf("[%s] : %s\n", buf.name, buf.text);
// 回复消息
sprintf(buf.name, "Server");
sprintf(buf.text, "Had recvied your[%d] message", connect_sockets[i]);
//sendto(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
send(connect_sockets[i], &buf, sizeof(buf), 0);
}
break;
}
printf("while loop\n");
} // 关闭连接
//shutdown(my_socket, SHUT_RDWR); perror("shutdown");
for(i = 0; i < connected_cnt; i++)
{
close(connect_sockets[i]); perror("close");
}
return close(my_socket);
}

client.c

/*
# Copyright By Schips, All Rights Reserved
# https://gitee.com/schips/
#
# File Name: client.c
# Created : Sat 25 Mar 2020 14:44:19 PM CST
*/ #include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h> typedef struct _info {
char name[10];
char text[54];
}info; int main(int argc, char *argv[])
{
int my_socket;
unsigned int len;
int ret, i = 0; // 创建套接字
my_socket = socket(AF_INET, SOCK_STREAM, 0); // IPV4, TCP socket
if(my_socket == -1) { perror("Socket"); }
printf("Creat a socket :[%d]\n", my_socket); // 用于接收消息
info buf ={0}; // 指定地址
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET; // 地址协议族
addr.sin_addr.s_addr = inet_addr("127.0.0.1"); //指定 IP地址
addr.sin_port = htons(12345); //指定端口号 int new_socket;
struct sockaddr_in new = {0};
int new_addr_size; connect(my_socket, (struct sockaddr *)(&addr), sizeof(struct sockaddr_in));
if(-1 == ret) { perror("connect"); }
printf("connected\n"); // 回复消息
sprintf(buf.name, "Client");
sprintf(buf.text, "Hello tcp text.");
//sendto(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
send(my_socket, &buf, sizeof(buf), 0);
perror("sendto"); // 接收并打印消息
//recvfrom(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
recv(my_socket, &buf, sizeof(buf), 0);
perror("recvfrom"); printf("[%s] : %s\n", buf.name, buf.text); for (i = 0; i < 5; ++i)
{
sleep(2); // 回复消息
sprintf(buf.name, "Client");
sprintf(buf.text, "Hello tcp text [%d].", i++);
//sendto(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
send(my_socket, &buf, sizeof(buf), 0);
perror("sendto"); // 接收并打印消息
//recvfrom(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
recv(my_socket, &buf, sizeof(buf), 0);
printf("[%s] : %s\n", buf.name, buf.text);
perror("recvfrom");
} // 关闭连接
//shutdown(my_socket, SHUT_RDWR); perror("shutdown");
//printf("%d\n", errno);
return close(my_socket); perror("close");
printf("%d\n", errno);
return errno;
}

poll

select() 和 poll() 系统调用的本质一样,poll() 的机制与 select() 类似,与 select() 在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是 poll() 没有最大文件描述符数量的限制( 基于链表来存储的,但是数量过大后性能也是会下降)。poll() 和 select() 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

#define _GNU_SOURCE         /* See feature_test_macros(7) */
#include <signal.h>
#include <poll.h> int ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *tmo_p, const sigset_t *sigmask); struct pollfd{
int fd; //文件描述符
short events; //等待的事件
short revents; //实际发生的事件
};

描述: 监视并等待多个文件描述符的属性变化

参数解析:

fds : 指向一个struct pollfd数组的指针,用于指定测试某个给定的fd的条件

  • fd :每一个 pollfd 结构体指定了一个被监视的文件描述符,可以传递多个结构体,指示 poll() 监视多个文件描述符。
  • events: 指定监测fd的事件(输入、输出、错误),每一个事件有多个取值
  • revents:revents 域是文件描述符的操作结果事件,内核在调用返回时设置这个域。events 域中请求的任何事件都可能在 revents 域中返回。

注意:每个结构体的 events 域是由用户来设置,告诉内核我们关注的是什么,而 revents 域是返回时内核设置的,以说明对该描述符发生了什么事件

nfds : 指定 fds 数组元素个数 ,相当于要检查多少个文件描述符

timeout:指定等待的毫秒数,无论 I/O 是否准备好,poll() 都会返回。

  • -1 : 永远等待,直到事件发生

  • 0:立即返回

  • > 0:等待指定的毫秒数

返回值:

成功时,poll() 返回结构体中 revents 域不为 0 的文件描述符个数;如果在超时前没有任何事件发生,poll()返回 0;

失败时,poll() 返回 -1,并设置 errno 为下列值之一:

  • EBADF:一个或多个结构体中指定的文件描述符无效。
  • EFAULT:fds 指针指向的地址超出进程的地址空间。
  • EINTR:请求的事件之前产生一个信号,调用可以重新发起。
  • EINVAL:nfds 参数超出 PLIMIT_NOFILE 值。
  • ENOMEM:可用内存不足,无法完成请求。

events & revents的取值如下:

事件 描述 是否可作为输入(events) 是否可作为输出(revents)
POLLIN 数据可读(包括普通数据&优先数据)
POLLOUT 数据可写(普通数据&优先数据)
POLLRDNORM 普通数据可读
POLLRDBAND 优先级带数据可读(linux不支持)
POLLPRI 高优先级数据可读,比如TCP带外数据
POLLWRNORM 普通数据可写
POLLWRBAND 优先级带数据可写
POLLRDHUP TCP连接被对端关闭,或者关闭了写操作,由GNU引入
POPPHUP 挂起
POLLERR 错误
POLLNVAL 文件描述符没有打开

https://blog.csdn.net/coolgw2015/article/details/79719328

poll例程

使用poll函数监控标准输入

#include <stdio.h>
#include <unistd.h>
#include <poll.h> int main()
{
struct pollfd poll_fd;
char buf[1024];
poll_fd.fd = 0;
poll_fd.events=POLLIN; for(;;)
{
int ret = poll(&poll_fd,1,2000);
if(ret<0)
{
perror("poll");
continue;
}
if(ret==0)
{
printf("poll timeout!\n");
continue;
}
if(poll_fd.revents==POLLIN)
{ read(0,buf,sizeof(buf)-1);
printf("sdin:%s",buf);
}
}
}

poll 处理 tcp通信

#include <stdio.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/poll.h>
#define MAX_SOCKET 5 typedef struct _info {
char name[10];
char text[54];
}info; int main(int argc, char *argv[])
{
info send_buf;
info recv_buf;
int listen_socket,newsk;
int connected_sockets[MAX_SOCKET];
int connected_cnt = 0;
int i; // 1 创建一个套接字,用于网络通信
listen_socket = socket(PF_INET, SOCK_STREAM, 0);
if (listen_socket == -1)
{
perror("socket");
return -1;
} // 2 绑定服务的IP与端口
struct sockaddr_in ser_addr;
ser_addr.sin_family = PF_INET;
ser_addr.sin_port = htons (12345) ;
ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
int ret = bind(listen_socket, (struct sockaddr *)&ser_addr,sizeof(ser_addr)); if (ret == -1)
{
perror("bind");
return -1;
} // 3 监听端口的数据
ret = listen(listen_socket,MAX_SOCKET);
if (ret == -1)
{
perror("listen");
return -1;
} // 监听的总套接字数组
struct pollfd pollfds[MAX_SOCKET+1];
int poll_ret; // 主套接字信息
pollfds[0].fd = listen_socket;
pollfds[0].events = POLLIN|POLLPRI; //设置为 任意优先级可读 事件监听 while(1)
{
printf("Poll all fds\n");
poll_ret = poll(pollfds, connected_cnt + 1, -1); //-1 阻塞模式进行监听
printf("ret = %d\n", poll_ret); //返回值为1 : 正确监听到变化
printf("Need to handld %d fd(s)\n", poll_ret); //返回值为1 : 正确监听到变化
if(poll_ret == 0)
{
printf("timeout!\n"); //监听超时
} else if( poll_ret == -1 )
{
perror("err!"); //监听出错
} //正确监听到变化
if(pollfds[0].revents & POLLIN || pollfds[0].revents & POLLPRI) //如果新客户端连接
{
// 响应用户连接
connected_sockets[connected_cnt] = accept(listen_socket,NULL,NULL); //返回 新的连接响应的套接字
if(connected_sockets[connected_cnt] == -1)
{
perror("accept");
return -1;
}
printf("new accept from %d!\n", connected_sockets[connected_cnt]); //更新客户端套接字集合
pollfds[connected_cnt+1].fd = connected_sockets[connected_cnt];
pollfds[connected_cnt+1].events = POLLIN|POLLPRI; //任意优先级可读
connected_cnt++;
} else
{
printf("new read/write !\n");
for(i = 0; !(pollfds[i+1].revents & POLLIN) ;i++ ) ;//如果是 客户端发生了数据可读) //4 接收与发送数据
newsk = connected_sockets[i];
memset(&recv_buf, 0, sizeof(recv_buf));
ret = recv(newsk, &recv_buf, sizeof(recv_buf), 0);
if (ret == -1)
{
perror("recv");
return -1;
} if(errno == EINTR) continue;
if(ret == 0 ) //客户端断开
{
printf("%d disconnectded\n", connected_sockets[i]);
close(connected_sockets[i]); connected_sockets[i] = -1;
memset(&pollfds[i+1] , 0, sizeof(struct pollfd )); //清空断开套接字监听信息
continue;
}
printf("[%d],[%s] : %s\n", connected_sockets[i], recv_buf.name, recv_buf.text);
sprintf(send_buf.name, "Server");
sprintf(send_buf.text, "Had recvied your[%d] message", connected_sockets[i]);
//sendto(my_socket, &buf, sizeof(buf), 0, NULL, NULL);
send(connected_sockets[i], &send_buf, sizeof(send_buf), 0);
}
sleep(2);
} // 5 关闭套接字
for(i = 0; i < connected_cnt; i++)
{
close(connected_sockets[i]);
} return 0;
}

poll函数的优缺点

通过poll函数的结构以及小测试程序的编写,我们不难发现poll函数的一些特点:

1、优点

(1)poll() 不要求开发者计算最大文件描述符加一的大小。

(2)poll() 在应付大数目的文件描述符的时候速度更快,相比于select。

(3)它没有最大连接数的限制,原因是它是基于链表来存储的。

(4)在调用函数时,只需要对参数进行一次设置就好了

2、缺点

(1)大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。

(2)与select一样,poll返回后,需要轮询pollfd来获取就绪的描述符,这样会使性能下降

(3)同时连接的大量客户端在一时刻可能只有很少的就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降

epoll

epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它不会复用文件描述符集合来传递结果而迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll 那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

情景导入:

有100万用户同时与一个进程保持着TCP连接,而每一时刻只有几十个或几百个TCP连接是活跃的(接收TCP包),也就是说在每一时刻进程只需要处理这100万连接中的一小部分连接。那么,如何才能高效的处理这种场景呢?进程是否在每次询问操作系统收集有事件发生的TCP连接时,把这100万个连接告诉操作系统,然后由操作系统找出其中有事件发生的几百个连接呢?实际上,在Linux2.4版本以前,那时的select或者poll事件驱动方式是这样做的。

这里有个非常明显的问题,即在某一时刻,进程收集有事件的连接时,其实这100万连接中的大部分都是没有事件发生的。因此如果每次收集事件时,都把100万连接的套接字传给操作系统(这首先是用户态内存到内核态内存的大量复制),而由操作系统内核寻找这些连接上有没有未处理的事件,将会是巨大的资源浪费,然后select和poll就是这样做的,因此它们最多只能处理几千个并发连接。而epoll不这样做,它在Linux内核中申请了一个简易的文件系统,把原先的一个select或poll调用分成了3部分

  1. 调用epoll_create建立一个epoll对象(在epoll文件系统中给这个句柄分配资源);

  2. 调用epoll_ctl向epoll对象中添加这100万个连接的套接字;

  3. 调用epoll_wait收集发生事件的连接。

这样只需要在进程启动时建立1个epoll对象,并在需要的时候向它添加或删除连接就可以了,因此,在实际收集事件时,epoll_wait的效率就会非常高,因为调用epoll_wait时并没有向它传递这100万个连接,内核也不需要去遍历全部的连接。

epoll_create

#include <sys/epoll.h>
int epoll_create(int size);

描述:创建一个epoll对象(实际上它也是一个文件描述符),用于添加或删除指定的连接。

参数解析:

size:想关注的文件描述符数量(用于在内核申请一片空间,用来存放你想关注的socket fd上是否发生以及发生了什么事件。)

返回值:成功返回一个epoll文件描述符。失败返回-1。

注意: epoll会占用一个fd值**,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

epoll_ctl

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); /* 有关的结构体 */
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t; struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

描述: 控制epoll事件,可以是注册、修改或删除一个fd

参数解析:

epfd:epoll_create 返回的对象

op:操作类型

  • EPOLL_CTL_ADD:注册新的fd到epfd中
  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件
  • EPOLL_CTL_DEL:从epfd中删除一个fd

fd:需要监听的fd

event:监听的事件

  • events:下面值的相或结果
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断; EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。缺省是水平触发(Level Triggered)。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

epoll工作模式epoll有2种工作方式:LT和ET。

  • LT(level-triggered)是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
  • ET (edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。
  • data:用户数据,在TCP中一般传递我们需要监听的fd

返回值:成功返回0,失败返回-1

epoll_wait

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout); /* 有关的结构体 */
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t; struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

描述: 等待事件的产生,类似于select()调用。

epoll_wait运行的原理是:等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。并且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。

参数解析:

epfd:epoll_create 返回的对象

events:用来从内核得到所有的读写事件(从内核返回给用户),

maxevents:告诉内核需要监听的所有的socket的句柄数(从用户传给内核),值不能大于创建epoll_create()时的size。

timeout:超时时间(毫秒,0会立即返回,-1永久等待)。

返回值:成功返回需要处理的事件数目,若已超时则返回0;失败返回-1。

epoll 使用流程

通过在包含一个头文件#include <sys/epoll.h> 以及几个简单的API将可以大大的提高你的网络服务器的支持人数。

首先通过create_epoll(int maxfds)来创建一个epoll的句柄,其中maxfds为你epoll所支持的最大句柄数。这个函数会返回一个新的epoll句柄,之后的所有操作将通过这个句柄来进行操作。在用完之后,记得用close()来关闭这个创建出来的epoll句柄。

之后在你的网络主循环里面,每一帧的调用epoll_wait(int epfd, strcuct epoll_event* events, int maxevents, int timeout)来查询所有的网络接口,看哪一个可以读,哪一个可以写了。基本的语法为:

nfds = epoll_wait(kdpfd, events, maxevents, -1);

其中kdpfd为用epoll_create创建之后的句柄,events是一个epoll_event*的指针,当epoll_wait这个函数操作成功之后,epoll_events里面将储存所有的读写事件。maxevents是当前需要监听的所有socket句柄数。

epoll_wait范围之后应该是一个循环,遍利所有的事件。

	//  几乎所有的epoll程序都使用下面的框架
for( ; ; )
{
nfds = epoll_wait(epfd,events,20,500);
for(i=0;i<nfds;++i)
{
if(events[i].data.fd==listenfd) //有新的连接
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个连接
ev.data.fd=connfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中
}
else if( events[i].events&EPOLLIN ) //接收到数据,读socket
{
n = read(sockfd, line, MAXLINE)) < 0 //读
ev.data.ptr = md; //md为自定义类型,添加数据
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
}
else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
{
struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取数据
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
}
else
{
//其他的处理
}
}
}

例程

使用epoll来读取设备文件

#include<stdio.h>
#include<stdlib.h>
#include<fcntl.h>
#include<sys/stat.h>
#include<sys/types.h>
#include<sys/epoll.h> int main(int argc, char *argv[])
{
int epollfd = epoll_create(512); int fd_key = open("/dev/input/event1", O_RDONLY|O_NONBLOCK);
int fd_mice = open("/dev/input/mice", O_RDONLY|O_NONBLOCK); struct epoll_event ev;
ev.events = EPOLLIN; // 监控可读 ev.data.fd = fd_key;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd_key, &ev); ev.data.fd = fd_mice;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd_mice, &ev); struct epoll_event evout[2];
char buf[1024];
while(1)
{
int ret = epoll_wait(epollfd, evout, 2, 5000);
if(ret == 0) continue; // 超时
if(ret < 0 && errno == EINTR) continue; // 被信号打断
if(ret < 0) break; // 错误发生了 // ret > 0情况
int i;
for(i=0; i<ret; ++i)
{
int fd = evout[i].data.fd;
if(read(fd, buf, sizeof(buf)) < 0)
{
// close自动将它从epoll中移除
close(fd);
} if(fd == fd_key) printf("键盘有消息\n");
else if(fd == fd_mice) printf("鼠标有消息\n");
}
}
}
使用 epoll来实现 tcp-server
// http://www.manongjc.com/article/54633.html
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h> #include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h> #define IPADDRESS "127.0.0.1"
#define PORT 12345
#define MAXSIZE 1024
#define LISTENQ 5
#define FDSIZE 1000
#define EPOLLEVENTS 100 //函数声明
//创建套接字并进行绑定
static int socket_bind(const char* ip,int port);
//IO多路复用epoll
static void do_epoll(int listenfd);
//事件处理函数
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf);
//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd);
//读处理
static void do_read(int epollfd,int fd,char *buf);
//写处理
static void do_write(int epollfd,int fd,char *buf);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//删除事件
static void delete_event(int epollfd,int fd,int state); int main(int argc,char *argv[])
{
int listenfd;
listenfd = socket_bind(IPADDRESS,PORT); //创建套接字并进行绑定
listen(listenfd,LISTENQ);
do_epoll(listenfd);
return 0;
} static int socket_bind(const char* ip,int port)
{
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1)
{
perror("socket error:");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); //指定 IP地址
//inet_pton(AF_INET,ip,&servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
{
perror("bind error: ");
exit(1);
}
return listenfd;
} static void do_epoll(int listenfd)
{
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ret;
char buf[MAXSIZE];
memset(buf,0,MAXSIZE);
//创建一个描述符
epollfd = epoll_create(FDSIZE);
//添加监听描述符事件
add_event(epollfd,listenfd,EPOLLIN);
for ( ; ; )
{
//获取已经准备好的描述符事件
ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
handle_events(epollfd,events,ret,listenfd,buf);
}
close(epollfd);
} static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
int i;
int fd;
//进行选好遍历
for (i = 0;i < num;i++)
{
fd = events[i].data.fd;
//根据描述符的类型和事件类型进行处理
if ((fd == listenfd) &&(events[i].events & EPOLLIN))
handle_accpet(epollfd,listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd,fd,buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd,buf);
}
}
static void handle_accpet(int epollfd,int listenfd)
{
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
if (clifd == -1)
perror("accpet error:");
else
{
printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
//添加一个客户描述符和事件
add_event(epollfd,clifd,EPOLLIN);
}
} static void do_read(int epollfd,int fd,char *buf)
{
int nread;
nread = read(fd,buf,MAXSIZE);
if (nread == -1)
{
perror("read error:");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else if (nread == 0)
{
fprintf(stderr,"client close.\n");
close(fd);
delete_event(epollfd,fd,EPOLLIN);
}
else
{
printf("read message is : %s",buf);
//修改描述符对应的事件,由读改为写
modify_event(epollfd,fd,EPOLLOUT);
}
} static void do_write(int epollfd,int fd,char *buf)
{
int nwrite;
nwrite = write(fd,buf,strlen(buf));
if (nwrite == -1)
{
perror("write error:");
close(fd);
delete_event(epollfd,fd,EPOLLOUT);
}
else
modify_event(epollfd,fd,EPOLLIN);
memset(buf,0,MAXSIZE);
} static void add_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
} static void delete_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
} static void modify_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}

4.信号驱动IO模型

在信号驱动IO模型中,当用户线程发起一个IO请求操作,会给对应的socket注册一个信号函数,然后用户线程会继续执行,当内核数据就绪时会发送一个信号给用户线程,用户线程接收到信号之后,便在信号函数中调用IO读写操作来进行实际的IO请求操作。这个一般用于UDP中,对TCP套接口几乎是没用的,原因是该信号产生得过于频繁,并且该信号的出现并没有告诉我们发生了什么事情

Linux 网络编程的5种IO模型:多路复用(select/poll/epoll)

5.异步IO模型

  异步IO模型才是最理想的IO模型,在异步IO模型中,当用户线程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从内核的角度,当它受到一个asynchronous read之后,它会立刻返回,说明read请求已经成功发起了,因此不会对用户线程产生任何block。然后,内核会等待数据准备完成,然后将数据拷贝到用户线程,当这一切都完成之后,内核会给用户线程发送一个信号,告诉它read操作完成了。也就说用户线程完全不需要关心实际的整个IO操作是如何进行的,只需要先发起一个请求,当接收内核返回的成功信号时表示IO操作已经完成,可以直接去使用数据了。

  也就说在异步IO模型中,IO操作的两个阶段都不会阻塞用户线程,这两个阶段都是由内核自动完成,然后发送一个信号告知用户线程操作已完成。用户线程中不需要再次调用IO函数进行具体的读写。这点是和信号驱动模型有所不同的,在信号驱动模型中,当用户线程接收到信号表示数据已经就绪,然后需要用户线程调用IO函数进行实际的读写操作;而在异步IO模型中,收到信号表示IO操作已经完成,不需要再在用户线程中调用iO函数进行实际的读写操作。

  注意,异步IO是需要操作系统的底层支持,在Java 7中,提供了Asynchronous IO。简称AIO

前面四种IO模型实际上都属于同步IO,只有最后一种是真正的异步IO,因为无论是多路复用IO还是信号驱动模型,IO操作的第2个阶段都会引起用户线程阻塞,也就是内核进行数据拷贝的过程都会让用户线程阻塞。

Linux 网络编程的5种IO模型:多路复用(select/poll/epoll)

两种高性能IO设计模式

在传统的网络服务设计模式中,有两种比较经典的模式:

  一种是多线程,一种是线程池。

  对于多线程模式,也就说来了client,服务器就会新建一个线程来处理该client的读写事件,如下图所示:

Linux 网络编程的5种IO模型:多路复用(select/poll/epoll)

这种模式虽然处理起来简单方便,但是由于服务器为每个client的连接都采用一个线程去处理,使得资源占用非常大。因此,当连接数量达到上限时,再有用户请求连接,直接会导致资源瓶颈,严重的可能会直接导致服务器崩溃。

  因此,为了解决这种一个线程对应一个客户端模式带来的问题,提出了采用线程池的方式,也就说创建一个固定大小的线程池,来一个客户端,就从线程池取一个空闲线程来处理,当客户端处理完读写操作之后,就交出对线程的占用。因此这样就避免为每一个客户端都要创建线程带来的资源浪费,使得线程可以重用。

  但是线程池也有它的弊端,如果连接大多是长连接,因此可能会导致在一段时间内,线程池中的线程都被占用,那么当再有用户请求连接时,由于没有可用的空闲线程来处理,就会导致客户端连接失败,从而影响用户体验。因此,线程池比较适合大量的短连接应用。

  因此便出现了下面的两种高性能IO设计模式:Reactor和Proactor。

在Reactor模式中,会先对每个client注册感兴趣的事件,然后有一个线程专门去轮询每个client是否有事件发生,当有事件发生时,便顺序处理每个事件,当所有事件处理完之后,便再转去继续轮询,如下图所示:

Linux 网络编程的5种IO模型:多路复用(select/poll/epoll)

从这里可以看出,上面的五种IO模型中的多路复用IO就是采用Reactor模式。注意,上面的图中展示的 是顺序处理每个事件,当然为了提高事件处理速度,可以通过多线程或者线程池的方式来处理事件。Java NIO使用的就是这种

  在Proactor模式中,当检测到有事件发生时,会新起一个异步操作,然后交由内核线程去处理,当内核线程完成IO操作之后,发送一个通知告知操作已完成,可以得知,异步IO模型采用的就是Proactor模式。Java AIO使用的这种。

异步IO

#include <stdio.h>
#include <stdlib.h>
#include <string.h> #include <aio.h>
#include <unistd.h>
#include <signal.h>
#include <sys/stat.h>
#include <fcntl.h> static char *memBuffer;
static int sFileDesc;
static struct sigaction sOldSigAction; static void MySigQuitHandler(int sig)
{
printf("Signal Quit! The number is: %d\n", sig);
} static void MyFileReadCompleteProcedure(int sig, siginfo_t *si, void *ucontext)
{
printf("The file length is: %zu, and the content is: %s\n", strlen(memBuffer), memBuffer);
int status = close(sFileDesc);
if(status == 0)
上一篇:Flume直接对接SaprkStreaming的两种方式


下一篇:RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析