I/O多路转接——select、poll、epoll
一、概述
I/O复用指的是程序能够同时监听多个文件描述符,这对提高程序的性能至关重要。Linux下实现I/O复用的系统调用主要有select、poll和epoll。我们会依次对这三个系统调用进行全面的讲解,并给出实例,方便大家理解。
二、I/O多路转接——select
select系统调用的用途:在一段指定的时间内,监听用户感兴趣的文件描述符上的可读,可写和异常等事件。
1.select函数的基本介绍
1.函数原型
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *execptfds,struct timeval *timeout);
2.参数解读
-
nfds参数指定被监听的文件描述符的总数。他通常被设置为select监听所有文件描述符中最大值加1,因为文件描述符是从0开始计数的。
-
readfds、writefds和exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合。应用程序调用select函数时,通过这3个参数传入自己感兴趣的文件描述符。select调用返回时,内核将修改他们来通知应用程序哪些文件描述符已经就绪。这2个参数都是fd_set类型,他是一种位图结构。
图一(以readfds为例):我们这三个参数都是这样的位图结构(其实也就是数组),下标位置就是所对应的文件描述符,我们将自己想要关心的文件描述符设置到集合中(关心0、1、2),当select函数调用返回时,会将该集合中被关心的文件描述符重新设置(当该描述符读事件就绪时);如图二所示,我们可以看到,只有0和1这两个文件描述符的读事件就绪了,所以2号文件描述符又被置为-1了。
简单的来讲比特位的内容:- 输入时:用户告诉内核,你要帮我关心这个集合中的哪些文件描述符。
- 输出时:内核告诉用户,你关心的哪些文件描述符上的读事件、有哪些已经就绪了。
-
timeout参数是用来设置select函数的超时时间。它是一个timeval结构类型的指针,其定义如下:
struct timeval{ long tv_sec;/*秒数*/ long tv_usec;/*微秒数*/ };
- 当timeout变量的tv_sec成员和tv_usec成员都传递0时,表明不设置超时时间,只要事件不就绪,select就会立即返回。
- 当timeout变量设置为NULL时,只要事件不就绪,select就会一直阻塞。
- 特定的时间值:select调用后在指定的时间内进行阻塞等待,如果被监视的文件描述符上一直没有时间就绪,则在该时间后select进行超时返回。
注:当我们给timeout设定了特定的时间后,假设是10s,如果select在10s内调用返回(3s就返回了),那么timeval的成员tv_sec和tv_usec的值将会被替换成剩余的秒数。
3.返回值
select成功时返回就绪(可读、可写和异常)文件描述符的总数。如果在超时时间内没有任何文件描述符就绪,select将返回0。select失败时将返回-1并设置errno。如果在select等待期间,程序接收到信号,则select立即返回-1,并设置errno为EINTR。
2.select的基本工作流程
我们要实现一个简单的select服务器,该服务器要做的就是读取客户端发来的数据并进行打印,那么这个select服务器的工作流程应该是这样的:
- 首先完成基本的套接字的创建、绑定和监听
- 定义一个fd_array数组用于保存监听套接字和已经与客户端建立连接的套接字,刚开始时就将套接字添加到fd_array数组当中。
- 然后服务器开始循环调用select函数,检测读事件是否就绪,如果就绪则执行对应的操作。
- 每次调用select函数之前,都需要定义一个读文件描述符集readfds,并将fd_array当中的文件描述符依次设置进readfds当中,表示让select帮我们监听这些文件描述符的读事件是否就绪。
- 当select检测到数据就绪时会将读事件就绪的文件描述符设置进readfds当中,此时我们就能够得知哪些文件描述符的读事件就绪了,并对这些文件描述符进行对应的操作。
- 如果读事件就绪的是监听套接字,则调用accept函数从底层全连接队列获取已经建立好的连接,并将该连接对应的套接字添加到fd_array数组当中。
- 如果读事件就绪的是与客户端建立连接的套接字,则调用read函数读取客户端发来的数据并进行打印输出。
- 当然,服务器与客户端建立连接的套接字读事件就绪,也可能是因为客户端将连接关闭了,此时服务器应该调用close关闭该套接字,并将该套接字从fd_array数组当中清除,因为下一次不需要再监视该文件描述符的读事件了。
一些注意事项:
- 因为传入select函数的readfds、writefds和execpfds都是输入输出类型参数,当select函数返回时这些参数当中的值已经被修改了,因此每次调用select函数时都需要对其进行重新设置,timeout也是类似的道理。
- 因为每次调用select函数之前都需要对readfds进行重新设置,所以需要定义一个fd_array数组保存与客户端已经建立的若干连接和监听套接字,实际fd_array数组当中的文件描述符就是需要让select监视读事件的文件描述符。
- 我们的select服务器只是读取客户端发来的数据,因此只需要让select帮我们监视特定文件描述符的读事件,如果要同时让select帮我们监视特定文件描述符的读事件和写事件,则需要分别定义readfds和writefds,并定义两个数组分别保存需要被监视读事件和写事件的文件描述符,便于每次调用select函数前对readfds和writefds进行重新设置。
- 服务器刚开始运行时,fd_array数组当中只有监听套接字,因此select第一次调用时只需要监视监听套接字的读事件是否就绪,但每次调用accept获取到新连接后,都会将新连接对应的套接字添加到fd_array当中,因此后续select调用时就需要监听套接字和若干连接套接字的读事件是否就绪。
- 由于调用select时还需要传入被监视的文件描述符中最大文件描述符值+1,因此每次在便利fd_array对readfds进行重新设置时,还需要记录最大文件描述符值。
3.文件描述符的就绪条件
下列情况下socket可读:
- socket内核接收缓冲区的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞的读该socket,并且读操作返回的字节数大于0。
- socket通信的对方关闭连接。此时对该socket的读操作将返回0。
- 监听socket上有新的连接请求。
- socket上有未处理的错误。
下列情况下socket可写:
- socket内核发送缓冲区中的可用字节数大于等于低水位标记SO_SNDLOWAT,此时可用无阻塞的写,并且返回值大于0。
- socket的写操作被关闭(close),对写操作被关闭的socket进行写操作,会触发SIGPIPE信号。
- socket使用非阻塞connect连接成功或失败之后
- socket上有未处理的错误
异常情况只有一种:
- socket上接收到了带外数据。
4.基于select函数设计的服务器
1.基于套接字的编写
sock.hpp
#include<iostream>
#include<cstdlib>
#include<string>
#include<cstring>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<unistd.h>
using namespace std;
class Sock{
public:
//1.创建套接字
static int Socket(){
int sock = socket(AF_INET,SOCK_STREAM,0);
if(sock < 0){
cerr << "socket error" << endl;
exit(2);
}
return sock;
}
//2.绑定地址信息
static void Bind(int sock,uint16_t port){
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(sock,(struct sockaddr*)&addr,sizeof(addr)) < 0){
cerr << "bind error" << endl;
exit(3);
}
}
//3.监听
static void Listen(int sock){
if(listen(sock,5) < 0){
cerr << "listen error" << endl;
exit(4);
}
}
//4.接受连接
static int Accept(int sock){
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
int fd = accept(sock,(struct sockaddr*)&addr,&len);
if(fd < 0){
cerr << "accept error" << endl;
exit(5);
}
return fd;
}
static void Connect(int sock,string ip,uint16_t port){
struct sockaddr_in addr;
memset(&addr,0,sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if(connect(sock,(struct sockaddr*)&addr,sizeof(addr)) < 0){
cerr << "connect error" << endl;
exit(6);
}
else{
cout << "connect success" << endl;
}
}
static void Close(int sock){
close(sock);
}
};
2.select服务器的编写
1.利用封装好的sock类,对select服务器进行基本的套接字编写
#include<iostream>
#include<string.h>
#include<sys/select.h>
#include"sock.hpp"
using namespace std;
static void Usage(string proc){
cout << "Usage:" << proc << "Server_IP Server_Port" << endl;
}
// ./select_server 8080 -- 表示如何运行这个程序(采用命令行参数的形式)
int main(int argc, char *argv[]){
if(argc != 2){
Usage(argv[0]);
exit(1);
}
//将字符形式的("8080")端口号,转为整数形式
uint16_t port = (uint16_t)atoi(argv[1]);
//创建listen_sock套接字,用于监听客户端的新的连接套接字
int listen_sock=Sock::Socket();
//将listen_sock套接字与指定的IP地址和端口号绑定
Sock::Bind(listen_sock,port);
//将listen_sock套接字设置为监听套接字,用于客户端连接服务器
Sock::Listen(listen_sock);
return 0;
}
2.紧接着在使用select系统调用前,我们还需要利用到一个额外的数组。
- 从参数解读来看,三个事件都是fd_set类型的,本质上都是位图结构,首先由用户将需要关心的文件描述符添加到readfds、writefds和execptfds中。
- 由于select调用返回时会对原来的位图结构重新调整,只返回已有事件就绪的文件描述符,未就绪的事件的文件描述符会被清除(例:我让select关心1/2/3号文件描述符,它只返回1/2号文件描述符),但是3号文件描述符本次select调用没有发生就绪事件,并不意味着下一次3号文件描述符不会有事件发生。
- 但是select在刚刚返回时就已经将3号文件描述符清除了,下次就不可能再关心3号文件描述符了。
- 所以我们就需要利用额外的数组,将需要关心的文件描述符先行保存起来。
- 利用数组的好处,作为一个服务器肯定会有多个连接,每个连接都对应着一个文件描述符,我们知道文件描述符是递增的,我们后续会将accept上来的连接再次添加的数组中,每次循环调用select前都去遍历这个数组,找到最大的文件描述符,作为select系统调用的第一个参数。
#include<iostream>
#include<string.h>
#include<sys/select.h>
#include"sock.hpp"
using namespace std;
#define NUM (sizeof(fd_set)*8)
//内容>=0,认为是合法的fd,如果是-1,表示该位置没有fd
int fd_array[NUM];
static void Usage(string proc){
cout << "Usage:" << proc << "Server_IP Server_Port" << endl;
}
// ./select_server 8080 -- 表示如何运行这个程序(采用命令行参数的形式)
int main(int argc, char *argv[]){
if(argc != 2){
Usage(argv[0]);
exit(1);
}
//将字符形式的("8080")端口号,转为整数形式
uint16_t port = (uint16_t)atoi(argv[1]);
//创建listen_sock套接字,用于监听客户端的新的连接套接字
int listen_sock=Sock::Socket();
//将listen_sock套接字与指定的IP地址和端口号绑定
Sock::Bind(listen_sock,port);
//将listen_sock套接字设置为监听套接字,用于客户端连接服务器
Sock::Listen(listen_sock);
for(int i=0;i<NUM;i++){
fd_array[i]=-1;
}
//创建读事件集合
fd_set readfds;
//将listen_sock保存到fd_array数组中
fd_array[0]=listen_sock;
while(1){
//每次循环都重新初始化fd_set集合
FD_ZERO(&readfds);
//将listen_sock设置为最大的文件描述符
int max_fd=fd_array[0];
//该循环用来判断fd_array数组中的最大文件描述符
//用于select函数的第一个参数
for(int i=0;i<NUM;i++){
if(fd_array[i]==-1) continue;
//下面都是合法的fd
//所有要关心的读事件fd,添加到readfds集合中
FD_SET(fd_array[i],&readfds);
if(max_fd<fd_array[i]){
//更新最大的fd
max_fd=fd_array[i];
}
}
//设置超时时间(也可用不设置)
struct timeval timeout={5,0};
int n=select(max_fd+1,&readfds,nullptr,nullptr,&timeout);
}
return 0;
}
3.对select的返回值进行判断,做出相应的操作
struct timeval timeout = {5, 0};
int n = select(max_fd + 1, &readfds, nullptr, nullptr, &timeout);
switch (n){
case -1:
//-1:表明select函数失败
cout << "select error" << endl;
break;
case 0:
// 0:表明超时,没有任何文件描述符可读
cout << "timeout" << endl;
break;
default:
// 只要大于0,就是返回的已有读事件就绪的文件描述符总数
cout << "有文件描述符可读" << endl;
break;
}
4.当select成功返回时,肯定有多个文件描述符上的读事件已经就绪了,但是我们并不知道是哪一个文件描述符读事件就绪,幸好我们刚刚的fd_array数组保存了我们要关系的套接字(当然到这一步,数组中只有一个需要关心的文件描述符,就是listen_sock)。此时,我们就可以遍历这个数组的文件描述符,对其加以判断,数组中的文件描述符有没有被设置到我们的rfds集合中:
- 如果设置了;就会存在两种情况:
如果是监听套接字:
对于监听套接字而言,它的读事件就是新连接到来,此时我们应该立即accept获取新连接,并将获取到的新连接保存到fd_array数组中,用于下一次select调用时,能够关心这些文件描述符上的读事件。
在这里需要提一点,当我们在获取到新连接后,一定不能立即执行read/recv等操作。因为连接到来并不意味着这些连接上的数据就绪了,如果此时你立即执行读取操作,会存在严重的问题,例如:有人攻击你这个服务器,它给你的服务器发送大量的连接,但是从来都不传输数据,无脑的连接你。你的服务器,不断的accept获取大量的连接,然后进程不断的读取操作,但是就是没有数据,导致进程被挂起,可想而知这种危害很大。
如果是普通套接字:
此时我们就可以直接执行read/recv等操作。由于我们采用的是TCP协议,读取操作会存在数据粘包问题,但是我们此次代码注重理解select的工作流程,对于粘包问题,需要特定的场合。这里我们可以先忽略粘包问题。- 如果未设置,让其再次循环select,直到文件描述符被设置(我们的代码中不做任何操作)
#include <iostream>
#include <string.h>
#include <sys/select.h>
#include "sock.hpp"
using namespace std;
#define NUM (sizeof(fd_set) * 8)
// 内容>=0,认为是合法的fd,如果是-1,表示该位置没有fd
int fd_array[NUM];
static void Usage(string proc)
{
cout << "Usage:" << proc << "Server_IP Server_Port" << endl;
}
// ./select_server 8080 -- 表示如何运行这个程序(采用命令行参数的形式)
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
exit(1);
}
// 将字符形式的("8080")端口号,转为整数形式
uint16_t port = (uint16_t)atoi(argv[1]);
// 创建listen_sock套接字,用于监听客户端的新的连接套接字
int listen_sock = Sock::Socket();
// 将listen_sock套接字与指定的IP地址和端口号绑定
Sock::Bind(listen_sock, port);
// 将listen_sock套接字设置为监听套接字,用于客户端连接服务器
Sock::Listen(listen_sock);
for (int i = 0; i < NUM; i++)
{
// 将所有位置设置为-1,暂时没有任何的fd
fd_array[i] = -1;
}
// 创建读事件集合
fd_set readfds;
// 将listen_sock保存到fd_array数组中
fd_array[0] = listen_sock;
while (1)
{
// 每次循环都重新初始化fd_set集合
FD_ZERO(&readfds);
// 将listen_sock设置为最大的文件描述符
int max_fd = fd_array[0];
// 该循环用来判断fd_array数组中的最大文件描述符
// 用于select函数的第一个参数
for (int i = 0; i < NUM; i++)
{
if (fd_array[i] == -1)
continue;
// 下面都是合法的fd
// 所有要关心的读事件fd,添加到readfds集合中
FD_SET(fd_array[i], &readfds);
if (max_fd < fd_array[i])
{
// 更新最大的fd
max_fd = fd_array[i];
}
}
// 设置超时时间(也可用不设置)
struct timeval timeout = {5, 0};
int n = select(max_fd + 1, &readfds, nullptr, nullptr, &timeout);
switch (n)
{
case -1:
//-1:表明select函数失败
cout << "select error" << endl;
break;
case 0:
// 0:表明超时,没有任何文件描述符可读
cout << "timeout" << endl;
break;
default:
// 只要大于0,就是返回的已有读事件就绪的文件描述符总数
cout << "有文件描述符可读" << endl;
for (int i = 0; i < NUM; i++)
{
//-1表示没有文件描述符,不合法
if (fd_array[i] == -1)
continue;
// 判断fd_array数组中的文件描述符是否有读事件就绪
if (FD_ISSET(fd_array[i], &readfds))
{
cout << "sock: " << fd_array[i] << "有读事件就绪" << endl;
// 判断fd_array数组中的文件描述符是否是listen_sock
if (fd_array[i] == listen_sock)
{
cout << "listen_sock: " << listen_sock << "有新的客户端连接服务器" << endl;
// 是listen_sock,表示有新的客户端连接服务器
int new_fd = Sock::Accept(listen_sock);
if (new_fd >= 0)
{
cout << "listen_sock: " << listen_sock << "获取新连接成功" << endl;
int pos = -1;
for (; pos < NUM; pos++)
{
if (fd_array[pos] == -1)
{
break;
}
}
// 1.找打了一个位置没有被使用
if (pos < NUM)
{
cout << "新连接: " << new_fd << "已经被添加到数组[" << pos << "]的位置" << endl;
fd_array[pos] = new_fd;
}
else
{
cout << "服务器满了,无法添加新连接" << endl;
close(new_fd);
}
}
}
else
{
cout << "sock: " << fd_array[i] << " 上面有普通数据的读取" << endl;
char recv_buffer[1024] = {0};
ssize_t s = recv(fd_array[i], recv_buffer, sizeof(recv_buffer) - 1, 0);
if (s > 0)
{
recv_buffer[s] = 0;
cout << "client[" << fd_array[i] << "]#" << recv_buffer << endl;
}
else if (s == 0)
{
cout << "sock: " << fd_array[i] << "关闭了,client退出了!" << endl;
// 表明关闭了链接
close(fd_array[i]);
cout << "已经在数组下标fd_array[" << i << "中,去掉了sock:" << fd_array[i] << endl;
fd_array[i] = -1;
}
else
{
cout << "读取失败" << endl;
}
}
}
}
break;
}
}
return 0;
}
以上也是select服务器的全部代码。
5.select的优缺点
优点:
- 可以同时等待多个文件描述符,并且只负责等待,实际的IO操作由accept、read、write等接口来完成,这些接口在进行IO操作时不会被阻塞。
- select同时等待多个文件描述符,因此可以将”等“的时间重叠,提高了IO的效率。
缺点:
- 每次调用select,都需要手动设置fd集合,从接口使用角度来说也非常不便
- 每次调用select,都需要把fd集合从用户拷贝进内核,这个开销在fd很多时会很大。
- 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
- select可监控的文件描述符数量太少。
三、I/O多路转接——poll
poll系统调用和select类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪事件。
1.poll函数的基本介绍
1.函数原型:
int poll(struct pollfd *fds,nfds_t nfds,int timeout)
2.参数解读
-
fds参数是一个pollfd结构体类型的数组,它指定所有我们感兴趣的文件描述符上发送的可读、可写和异常等事件。pollfd结构体定义如下:
struct pollfd{ int fd; //文件描述符 short events //注册的事件 short revents;//实际发生的事件,由内核填充 }
其中,fd成员指定文件描述符;events成员告诉poll监听fd上的哪些事件(可读、可写和异常),它是一系列的按位或(不需要像select那样,分别传递可读、可写和异常);revents成员则是由内核进行填充,以通知用户所关心的众多fd上实际发生了哪些事件。poll支持的事件类型如下:
我们主要关注表格中标红的事件即可;注:
- events成员就是用户告诉内核,需要关心哪些文件描述符上的哪些事件。
- revents成员就是由内核告诉用户,你让我关心的这些文件描述上的事件,有哪些文件描述符已经就绪了。
-
nfds参数指定被监听事件集合fds的大小(即:数组下标)
-
timeout参数指定poll的超时时间,单位是毫秒。
- 当timeout=-1时,poll调用将永远阻塞,直到某个事件发生。
- 当timeout=0时,poll调用立即返回。
- 当timeout=1000时,poll调用在1秒内阻塞的,1秒后超时。
3.返回值
- 当发生错误时,返回-1。
- 若设置了timeout,在规定时间内没有任何文件描述符就绪,返回0,否则返回就绪文件描述符的个数
2.poll的基本工作流程
我们要实现一个简单的poll服务器,该服务器要做的就是读取客户端发来的数据并进行打印,那么这个poll服务器的工作流程应该是这样的:
- 首先完成基本的套接字创建、绑定和监听。
- poll的第一个参数是一个数组结构,里面包含了文件描述符、需要关心的事件和实际发送的事件。poll实现的服务器就不需要再利用额外的数组,只需要定义出struct pollfd fd_array数组,每个数组元素都对应着一个文件描述和所关心的事件,只需要内核检测到对应的文件描述符上的事件是否就绪并给予填充。
- 调用poll函数之前,我们需要将监听套接字设置到这个struct pollfd fd_array数组。因为监听套接字的读事件就绪就是有新的连接到来,所以是我们要关心的文件描述符。
- 紧接着不断的事件循环,与select不同的是,poll不需要每次重新设置文件描述符和相应的事件,只要在最初设置好后,以后就会一直帮我们关心相应的文件描述符和事件。因为注册事件和实际事件是分开的。
- 当调用poll函数时,poll检测到某些文件描述符有读事件就绪后,会将其设置到对应文件描述的结构体中的第三个成员中。已告知用户,就可以执行相应的读操作了。
- 如果读事件就绪是监听套接字,则调用accept函数从底层全连接队列中获取已经建立连接好的连接,并将这些连接设置到struct pollfd fd_array数组中,设置好你所需要关心的事件,偏于再次调用poll函数时,关心这些连接上的对应事件。
- 如果读事件就绪是普通的套接字(即建立好连接的哪些套接字),则调用read函数读取客户端发来的数据并进行打印输出。
- 当然读事件就绪也可能是因为客户端关闭了连接,此时服务器应该调用close关闭该套接字,并将该套接字从struct pollfd fd_array数组中清除,因为下一次不需要再关心该文件描述符了。
3.基于poll函数设计的服务器
poll的服务器设计上来讲,和select大致上差不多,只是不需要额外的数组。同样封装sock,这里不多赘述了。
#include <iostream>
#include <string.h>
#include <poll.h>
#include "sock.hpp"
using namespace std;
#define NUM 128
struct pollfd fd_array[NUM];//创建一个pollfd数组,相当于是有个集合
static void Usage(string proc) {
cout << "Usage: " << proc << "port" << endl;
}
//./select_server 8080
int main(int argc, char* argv[]) {
if(argc != 2) {
Usage(argv[0]);
exit(1);
}
uint16_t port = (uint16_t)atoi(argv[1]);
int listen_sock = Sock::Socket(); //创建套接字
Sock::Bind(listen_sock, port); //绑定
Sock::Listen(listen_sock); //设置监听
for(int i = 0; i < NUM; i++) { //将数组全部初始化
fd_array[i].fd = -1; //-1代表文件描述符不合法
fd_array[i].events = 0;
fd_array[i].revents = 0;
}
fd_array[0].fd = listen_sock;//将listen_sock设置到该数组中,表明我们关心这个listen_sock
fd_array[0].events = POLLIN; //关心的事件就是读事件 如果你想同时关心写事件可以POLLIN | POLLOUT 这样一来就不像select那样麻烦了。
fd_array[0].revents = 0; //这里是有内核进行填充的,设置为0即可
//事件循环
while(1){
int timeout = -1;
/*
-1:阻塞式等待
0:立即返回(轮询检测)
1000:1000毫秒内时阻塞式等待,1000毫秒以外超时返回。
*/
int n = poll(fd_array, NUM, timeout);
switch(n) {
case -1:
cerr << "select error" << endl;
break;
case 0:
cout << "select timeout" << endl;
break;
default:
cout << "有fd对应的事件就绪了!" << endl;
for(int i = 0; i < NUM; i++) { //遍历数组,检测文件描述上的事件
if(fd_array[i].revents & POLLIN){
//如果该文件描述符存在读事件
cout << "sock: " << fd_array[i].fd << " 上面有了读事件,就可以读取了" << endl;
if(fd_array[i].fd == listen_sock) {
//如果是监听套接字,需要accept
cout << "listen_sock: " << listen_sock << " 有了新的连接到来" << endl;
int sock = Sock::Accept(listen_sock);
if(sock >= 0){
cout << "listen_sock: " << listen_sock << " 获取新链接成功" << endl;
int pos = 1;
for(; pos < NUM; pos++){
if(fd_array[pos].fd == -1)
break;
}
if(pos < NUM) {
cout << "新链接:" << sock << " 已经被添加到数组[" << pos << "]的位置" << endl;
//将获取上的连接设置到数组中,填好需要关心的事件,这里关心读事件
fd_array[pos].fd = sock;
fd_array[pos].events = POLLIN;
fd_array[pos].revents = 0;
}
else {
cout << "服务器满载了,关闭新的连接" << endl;
close(sock);
}
}
}
else {
cout << "sock: " << fd_array[i].fd << " 上面有普通数据的读取" << endl;
char recv_buffer[1024] = {0};
ssize_t s = recv(fd_array[i].fd, recv_buffer, sizeof(recv_buffer) - 1, 0);
if(s > 0) {
recv_buffer[s] = 0;
cout << "client[" << fd_array[i].fd << "]#" << recv_buffer << endl;
}
else if(s == 0) {
cout << "sock: " << fd_array[i].fd << "关闭了,client退出了!" << endl;
// 表明关闭了链接
close(fd_array[i].fd);
cout << "已经在数组下标fd_array[" << i << "中,去掉了sock:" << fd_array[i].fd << endl;
fd_array[i].fd = -1;
}
else {
cout << "读取失败" << endl;
}
}
}
}
break;
}
}
return 0;
}
4.poll的优缺点
优点:
不同于select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。
- pollfd结构包含了要监视的event和发生的event,不再使用select”参数-值“传递的方式。接口使用比select更方便。
- poll并没有最大数量限制(但是数量过大后性能也是会下降)
缺点:
poll中监听的文件描述符数目增多时
- 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
- 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中。
- 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。
四、I/O多路转接——epoll
我们可以发现从select到poll是一个提升,那么epoll就是对poll的一个更大的提升。epoll是Linux特有的I/O复用函数。它在实现和使用上与select和poll有很大的差距。首先,epoll不再是单独使用一个函数来完成任务,而是使用一组函数。其次epoll把用户关心的文件描述符上的事件放在了内核里的一个事件表中。但epoll需要使用一个额外的文件描述符来唯一标识内核中的这个事件表。(这部分内容会在epoll的底层原理中介绍,大概了解即可)
1.epoll函数的基本介绍
epoll有三个相关的系统调用,分别是epoll_create、epoll_ctl和epoll_wait。
1.epoll_create函数
-
函数原型
//创建一个epoll模型 int epoll_create(int size);
-
参数介绍
size参数自从Linux2.6.8之后,size参数是被忽略的,但size的值必须设置为大于0的值。size只是给内核一个提示,告诉它事件需要多大。仅仅只是给内核一个建议,具体多大还是操作系统说了算。 -
返回值
- epoll模型创建成功返回其对应的文件描述符,否则返回-1,同时错误码会被设置。
- 该函数返回的文件描述符将作为其他epoll系统调用的第一个参数,以指定要访问的内核事件表。
- 当不再使用时,必须调用close函数关闭epoll模型对应的文件描述符,当所有引用epoll实例的文件描述符都已关闭时,内核将销毁该实例并释放相关资源。
接下来的函数用来操控内核事件表
2.epoll_ctl函数
- 函数原型
//向指定的epoll模型中注册事件
int epoll_ctl(int epfd,int op,struct epoll_event *event);
-
参数介绍
-
epfd参数就是刚刚epoll_create创建出来的文件描述符(对应着一个内核事件表)
-
fd参数是要操作的文件描述符,op参数则是指定操作类型。操作类型有如下三种:
- EPOLL_CTL_ADD,往事件表中注册fd上的事件
- EPOLL_CTL_MOD,修改fd上的注册事件
- EPOLL_CTL_DEL,删除fd上的注册事件。
- event参数是指定需要关心的事件,它是epoll_event结构指针类型。epoll_event的定义如下:
```c++
struct epoll_event{
__uint32_t events; //epoll事件
epoll_data_t data; //用户数据
};
```
其中events成员描述的是事件类型。epoll支持的事件类型和poll基本相同。就是多了一个”E“:
- EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)。
- EPOLLOUT:表示对应的文件描述符可以写。
- EPOLLRPT:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)。
- EPOLLERR:表示对应的文件描述符发送错误。
- EPOLLHUP:表示对应的文件描述符被挂断,即对端将文件描述符关闭了。
- EPOLLET:将epoll的工作方式设置为边缘触发(Edge Triggered)模式。
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听该文件描述,需要再次把这个socket加入到EPOLL队列里。
其中data成员用于存储用户数据,其定义如下:
struct union epoll_data{
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
epoll_data_t是一个联合体,其4个成员使用最多的是fd,它指定事件所从属的目标文件描述符。ptr成员可以用来指定与fd相关的用户数据。但是由于epoll_data_t是一个联合体,我们不同使用ptr和fd。如果要将文件描述符和用户数据关联起来,已实现数据快速访问,只能使用其他字段,比如放弃使用epoll_data_t和fd成员,而在ptr指向的用户数据中包含fd。(这里只要大致知道fd和ptr具体的含义即可,后续代码也只是涉及使用fd)
- 返回值
epoll_ctl成功时返回0,失败则返回-1并设置errno。
3.epoll_wait函数
- 函数原型
//用于收益监视的事件中已经就绪的事件
int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout)
-
参数介绍
-
epfd参数就是刚刚epoll_create创建出来的文件描述符(对应着一个内核事件表)
-
timeout参数的含义和poll接口的timeout相同
- -1:epoll_wait调用后进行阻塞等待,直到被监视的某个文件描述符上的某个事件就绪。
- 0:epoll_wait调用后进行非阻塞等待,epoll_wait检测后都会立即返回。
- 特地的时间值:epoll_wait调用后在直到的时间内进行阻塞等待,如果被监视的文件描述符上一直没有事件就绪,则在该时间后epoll_wait进行超时返回。 -
maxevents参数指定最多监听多少个事件,它必须大于0。
-
events参数:它其实是一个数组。epoll_wait函数如果检测到事件,就讲所有就绪的事件从内核事件表(就是epfd所指向的时间表)中复制到它的第二个参数events指向的数组中。这个数组指向用于输出epoll_wait检测到的就绪事件,而不像select和poll的数组那样既作用于传入用户注册事件,又用于输出内核检测到的就绪事件。这样极大的提高了应用程序索引就绪文件描述符的效率
-
返回值
该函数成功时返回就绪的文件描述符个数,失败则返回-1并设置errno。
2.epoll的底层原理
首先,我们在创建好监听套接字后,开始调用epoll_create函数,它的返回值也是一个文件描述符(epoll_fd),该文件描述符(epoll_fd)就会对应内核中的事件表,本质就是在内核中创建出一棵红黑树和一个就绪队列。
- 红黑树就是epoll用来存储用户所关心的文件描述符(key值)和所关心的事件(value值)。
- 就绪队列就是用来存储已经发生事件就绪的文件描述符和相应的就绪事件。
紧接着我们继续调用epoll_ctl函数,这个函数是用来讲用户所关心的文件描述符添加到刚刚内核所创建出来的红黑树节点当中(图中就是将listen_fd添加到红黑树中,当然所关心的事件也是设置好的,假设就是读事件),这个函数除了进行添加操作,还做了一件重要的事情,那就是添加到红黑树节点中的文件描述符都设置了相应的回调函数。
- 回调函数:首先,数据会来自不同的设备,每个设备都会与相应的文件描述符关联起来,也就是通过回调函数参数关联。回调函数的作用在于,当红黑树节点中的文件描述符有事件就绪时,就会通过回调函数,将就绪事件(及相应的id)添加到就绪队列当中。
然后,读取数据的时候就是从就绪队列当中把数据经由内核缓冲区拷贝到用户缓冲区。对于监听套接字而言,它accept上来的新连接还需要再次添加到红黑树当中,由epoll关心,用户就通过这样的内核处理方法,就能够获取到每个连接上的数据。
注:底层原理只是示意图,不一定很详细,但是能够说明问题。
3.epoll的基本工作流程
epoll的工作流程,在看完底层原理后,也差不多就是epoll的工作流程。这里还是再详细说一下其在编码上的流程。
- 首先,和select、poll一样,都是先完成基本的创建套接字、绑定和监听。
- 然后就是调用epoll_create函数,创建出epoll所对应的内核事件表(或叫做epoll模型)。
- 在进行epoll_ctl函数时,就是将所关心的文件描述符添加到内核事件表中,我们需要先创建epoll_event结构,将该结构中的events成员注册你所关心的事件(我以EPOLLIN为例),和你所需要关心的文件描述符(data.fd),这一步就是为调用epoll_ctl做准备。
- 此时就是事件循环,不断的就检测文件描述符上是否发送就绪事件。也就是循环调用epoll_wait函数,但是在此之前,我们还需要创建出一个epoll_event revs[]数组结构,用来存储epoll_wait检测到的就绪事件(由内核自动填充的)。
- 如果成功,epoll_wait的返回值一定是已经就绪的文件描述符的数量,必然是有序的,我们就可以通过遍历的方式,来判断其上是否有读事件就绪。
- 如果是监听套接字,我们不能立即读取(其原因在之前说过了),而是需要进行accept获取新连接,然后填充好你想要关心的事件并将其添加到刚才创建好的epoll模型当中。
- 如果是普通的套接字,就可以正常读取,对于TCP的粘包问题,暂时不考虑
4.基于epoll函数设计的服务器
基本的套接字和之前一样,这里就不多说了。直接来看epoll的代码。
#include <iostream>
#include <string>
#include <cstdlib>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include "sock.hpp"
#define SIZE 128
#define NUM 64
using namespace std;
static void Usage(string proc)
{
cout << "Usage: " << proc << " port" << endl;
}
// ./epoll_server port
int main(int argc, char* argv[])
{
if(argc != 2)
{
Usage(argv[0]);
exit(1);
}
// 1.建立TCP 监听socket
uint16_t port = (uint16_t)atoi(argv[1]);
int listen_sock = Sock::Socket();
Sock::Bind(listen_sock, port);
Sock::Listen(listen_sock);
// 2.创建epoll模型,获得epfd(文件描述符)
int epfd = epoll_create(SIZE);
// 3.添加listen_sock和它所关心的事件添加到内核
struct epoll_event ev;
ev.events = EPOLLIN;//默认LT模式
//ev.events = EPOLLIN | EPOLLET;//ET模式
ev.data.fd = listen_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &ev);
// 4.事件循环
volatile bool quit = false;
struct epoll_event revs[NUM];//这个数组,会由内核进行填充,存储就绪事件
while(!quit)
{
int timeout = -1;
// 这里传入的数组,仅仅是尝试从内核中拿回来已经就绪的事件
int n = epoll_wait(epfd, revs, NUM, timeout);
switch(n)
{
case 0:
cout << "time out ..." << endl;
break;
case -1:
cerr << "epoll error ..." << endl;
break;
default:
cout << "有事件就绪了!" << endl;
// 5.处理就绪事件
for(int i = 0; i < n; i++)
{
int sock = revs[i].data.fd;
cout << "文件描述符:[" << sock << "] 上面有事件就绪了" << endl;
if(revs[i].events & EPOLLIN)
{
cout << "文件描述符:[" << sock << "] 有读事件就绪" << endl;
if(sock == listen_sock)
{
cout << "文件描述符:[" << sock << "] 链接事件就绪" << endl;
// 5.1处理链接事件
int fd = Sock::Accept(listen_sock);
if(fd >= 0)
{
cout << "获取新链接成功啦:" << fd << endl;
// 能不能立即读取呢??不能
struct epoll_event _ev;
_ev.events = EPOLLIN;
_ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &_ev);//将新的fd交给epoll管理
cout << "已经将" << fd<< "托管给epoll啦" << endl;
}
else
{
cout << "accept failed!" << endl;
}
}
else
{
// 5.2正常的读取处理
cout << "文件描述符:[" << sock << "] 正常事件就绪" << endl;
char buffer[1024];
ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);
if(s > 0)
{
buffer[s] = 0;
cout << "client [" << sock << "]#" << buffer << endl;
}
else if(s == 0)
{
// 对端关闭链接
cout << "client quit " << sock << endl;
close(sock);
epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);
cout << "sock: " << "delete from epoll success" << endl;
}
else
{
// 读取失败
cout << "read error! " << endl;
close(sock);
epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);
cout << "sock: " << "delete from epoll success" << endl;
}
}
}
else if(revs[i].events & EPOLLOUT)
{
// 处理写事件
// 写事件相当复杂一下,这部分代码并不会影响整个程序的运行
}
else
{
// 其他
}
}
break;
}
}
close(epfd); //结束后,切记不要忘了close
close(listen_sock);
return 0;
}
5.epoll的优点
优点:
- 接口使用方面:虽然拆分成了三个函数,但是反而使用起来更方便高效
- 数据拷贝轻量:只在新增监听事件的时候调用epoll_ctl将数据从用户拷贝到内核,而select和poll每次都需要重新将需要监视的事件从用户拷贝到内核。此外,调用epoll_wait获取就绪事件时,只会拷贝就绪的事件,不会进行不必要的拷贝操作。
- 事件回调机制:避免操作系统主动轮询检测事件就绪,而是采用回调函数的方式,将就绪的文件描述符结构加入到就绪队列中。调用epoll_wait时直接访问就绪队列就直到哪些文件描述符已经就绪,检测是否有文件描述符就绪的事件复杂度是O(1),因为本质只需要判断就绪队列是否为空即可。
- 没有数量限制:监视的文件描述符数目无上限,只要内存允许,就可以一直向红黑树当中新增节点。
五、LT 和 ET 模式
首先,select、poll和epoll的工作模式默认情况下都是LT模式。但是epoll的工作模式有两种,分别为LT模式(水平触发模式)和 ET 模式(边缘触发模式)。
- LT模式:举一个例子,张三买了很多快递,快递站的小王每隔一段时间通知张三取快递(张三没有去取快递或没取完的情况下,就会一直通知,这不就是轮询嘛)
- ET模型:举一个例子,张三买了很多快递,快递站的小李会通知张三取快递,但是只通知一次(张三没有去取快递或没取完的情况下,小李在也不会通知了)
LT模式和ET模式相比:
ET模式是epoll的高效工作模式,或许你会觉得一直通知的方式,不是更好吗?但是你考虑一个问题,以刚刚的例子来说,小王的方式就是LT模式,他作为快递员,今天什么事都没干,基本上所有的时间都是在通知张三拿快递,他只是通知一个人,但是快递站有很多人的快递,但是小李就是以ET模式工作的,他虽然只通知了一次,但是他一天就可以通知很多人,相比之下,ET的效率更高。
代码中如何体现ET或LT呢?
首先epoll默认是LT模式,我们要将其改为ET模式,只需要将其时间或上一个EPOLLET,在刚才的代码中有体现,可以尝试运行此服务器,测试其效果(我们测试时,只要关心监听套接字就足够了,其将accept函数部分注释掉。然后LT模式下,就会发现,当套接字有读事件就绪后,会不断的触发提示,告诉你有事件就绪;ET模式只会通知一次)
使用ET模式的注意点:ET模式下的文件描述符必须设置为非阻塞
首先,epoll在ET模式下,只会通知一次,这样就会倒逼着程序员必须一次性将数据读取完毕。
假设有这样一种场景,假设有320个字节的数据要读取,现在是ET模式,如何才能保证数据被全部读取完呢?智能循环读取。假设你设置读取的字节数是一次性读取100字节,当进程读第一次时还剩220,第二次还剩120,第三次还剩20,第四次读取只读到了20字节,我们可以发现,前面在读取的时候,都能够最大限度的满足读取要求,因此就能继续读取,当要读取100个字节的数据时,只读到了20个字节,就表明没有数据了,因此不会继续读取,这样数据就能够全部读取完了。
但是如果只300个字节的数据呢?读完第三次后,依然会继续读取,因为上一次读取能读到100字节,进程认为还有数据会继续recv,但实际上已经没有数据了,文件描述符不设置为非阻塞,recv调用就会阻塞,进而导致进程被挂起。如果有多个这样的进程挂起,后果可想而知。
为了解决这样的问题,就必须将文件描述符设置为非阻塞。
六、select、poll和epoll的比较
select、poll和epoll三种I/O多路转接的系统调用,都能够同时监听多个文件描述符。他们都由timeout参数指定超时时间,直到有一个或多个文件描述符上有时间发生时返回,返回值就是文件描述符的数量。返回0表示没有时间发生。
相同点:这3组函数都是通过某种结构体变量来告诉内核关心哪些文件描述符上的哪些事件,并使用该结构体类型的参数来获取内核的处理结果。
不同点:
- select的参数fd_set没有将文件描述符和事件进行绑定,它仅仅是一个文件描述符的集合。因此select需要提供3个这种类型的参数来分别传入和输出可读、可写和异常等事件。这一方面使得select不能处理更多类型的事件,另一方面select每次调用前都需要这三个参数
- poll将文件描述符和事件都定义到了pollfd中,任何事件都能被统一处理。从而使得编程接口简洁很多。并且内核每次修改的都是revents成员,events成员保持不变,因此下一次调用poll时无需重置events参数。
- select和poll调用都会返回整个用户注册的事件集合(其中包括就绪的和未就绪的),所以应用程序在索引文件描述符时的时间复杂度为O(n)
- epoll不再使用单独的接口,通过epoll_create在内核中维护一个事件表,并提供了一个独立的系统调用epoll_ctl来控制往其中添加、删除和修改操作。通过epoll_wait拿到的就是就绪的事件集合,未就绪的只存与内核事件表中,这样索引就绪文件描述符的时间复杂度达到O(1)。
- select和poll只能工作在LT模式下,epoll在LT模式和ET模型下都可以。