- 从IO模型到协程(一) 什么是IO,用户进程与内核
- 从IO模型到协程(二) BIO模型和NIO模型
- 从IO模型到协程(三) 多路复用之select、poll和epoll
- 从IO模型到协程(四) 用python实现一个多路复用程序
- 从IO模型到协程(五) python中的协程(coroutine)
- 从IO模型到协程(六) asyncio和协程实现高并发
什么是IO多路复用
IO多路复用:通过一种机制(多路复用器),可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),内核(的多路复用器)就会将就绪的描述符返回给用户进程,通知用户进程进行相应的读写操作。
Linux内核提供的多路复用器有 select(), poll() 和 epoll()
多路复用器只会通知用户进程哪些文件描述符已经事件就绪和事件的类型,但不会自动将内核缓冲区的数据拷贝到用户进程的内存。用户进程需要自己调recv才能获取内核缓存区的客户端消息。因此多路复用还是一种同步IO模式。
在使用select/poll/epoll多路复用的时候,必须要将IO操作(recv/connect/accept)设置为非阻塞的才行。
如果是监控到客户端状态的同时还把客户端数据返回给用户程序,无需用户程序进行系统调用,那就是异步IO模型。
目前Linux内核支持的都是同步IO模型,而只有window的iocp支持这种异步io模型。
多路复用器之select
内核提供了select系统调用:
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
nfds:传入文件描述符
readfds:文件描述符的读事件
writefds:文件描述符的写事件
exceptfds:文件描述符的异常事件
下面是官方文档对select系统调用的描述
select() allows a program to monitor multiple file descriptors,waiting until one or more of the file descriptors become "ready" for some class of I/O operation
# 它允许程序去监控多个文件描述符,直到一个或多个文件描述符变为“就绪”状态以待进行IO操作。
可以在Linux中执行man 2 select 命令或者上网搜man 2 select查一下内核的select()方法
PS:
什么是文件描述符?简单的理解就是:Linux 中一切皆文件,比如 C++ 源文件、视频文件、Shell脚本、可执行文件等,就连键盘、显示器、鼠标等硬件设备也都是文件。一个 Linux 进程可以打开成百上千个文件,为了表示和区分已经打开的文件,Linux 会给每个文件分配一个编号(一个 ID),这个编号就是一个整数,被称为文件描述符(File Descriptor)。
这样说还远没有理解到文件描述符的本质,只是方便理解才这样说。
在本文的例子中,客户端和服务端socket对象就代表(包含)一个文件描述符(文件描述符是一个整型)。select监视的就是客户端和服务端的socket对象。
下面文件描述符(file description)用fd简称表示。
接下来用select多路复用器实现一下上面的客户端和服务端模型:
# coding=utf-8
import socket
import select
# from select import select
# 服务端代码
# 创建套接字
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定ip和端口
ip = "127.0.0.1"
port = 8000
server.bind((ip, port))
server.listen(3) # 只允许最多3个客户端连接
server.setblocking(False) # 非阻塞socket
rfd = set() # 存放要监控的文件描述符,rfd存放监控读状态的socket
wfd = set() # 存放要监控的文件描述符,wfd存放监控写状态的socket
efd = set() # 存放要监控的文件描述符,efd存放监控出现异常状态的socket
rfd.add(server)
client_no = 1
clients = dict()
while True:
print("开始select监控")
# 获取所有读就绪、写就绪和发生异常的socket
r_sockets, w_sockets, e_sockets = select.select(rfd, wfd, efd) # 还可以传第四参超时时间,如果不传第四参,则select方法是阻塞的,阻塞直到有socket的状态改变;如果设了第4参为1,则执行select会阻塞1秒,1秒内没有socket改变状态就会循环1次再执行select
# 这里我只关注可读的socket
for r_socket in r_sockets:
if r_socket == server: # 如果是服务器socket可读,说明有连接进来了,此时可以去直接接收连接
client, addr = r_socket.accept() # 不阻塞,而且肯定可以接收到连接
rfd.add(client) # 将连接的客户端添加到要监控的集合中
clients[client] = client_no
print("客户端 %s 连接成功" % str(client_no))
client_no += 1
else: # 如果是客户端socket可读,说明客户端发送消息到服务端
msg = r_socket.recv(1024) # 不阻塞
if not msg:
print("客户端 %s 断开连接" % str(clients[r_socket]))
rfd.remove(r_socket)
else:
print("客户端 %s 发送消息:%s" % (str(clients[r_socket]), msg.decode('utf-8')))
上面的代码中,多路复用器select监视服务端socket(只监视它是否可读,不监视它是否可写或异常),由于此时socket是不可读的,所以select会阻塞程序,此时程序会让出cpu进入等待队列。一旦socket的状态变为可读(即有客户端连接进来),此时select会唤醒程序,并返回服务端socket给 r_sockets变量。
然后我得主动accept接收一下客户端的连接,并将刚连接进来的客户端soket也添加到要监视的可读状态的文件描述符集合rfd中。也就是说,我现在监视的就是 服务端socket和客户端1号的socket。
之后,如果有新的客户端连进来,都会被添加到被监视的集合中让select进行监控。
如果没有新客户端连接进来(服务端socket不可读,未达到读就绪状态),客户端也没有发送消息给服务端(客户端socket也没达到可读状态),select就会阻塞,直到有socket达到可读状态才会唤醒程序。
当有某些客户端发送消息进来的时候,select就监控到这几个客户端可读,然后将这些客户端socket返回到 r_sockets集合中,服务端就会对这几个客户端进行recv()。
select多路复用器是如何监控到某个fd的事件已经就绪了的呢?
1. 在执行select()时,会把要监控的fd从用户空间拷贝到内核空间。
2. 用户进程被select()阻塞,用户进程被内核放入等待队列
3. 内核会遍历内核中所有的fd,查看他们的状态
4.如果所有fd的状态没发生变化则select()会保持阻塞;如果fd的状态发生改变的话,内核会拷贝所有事件就绪的fd到用户空间,让用户进程进行读写操作。
所以其实select还是通过遍历所有fd来监控fd是否事件就绪。只不过这个遍历过程是由内核来完成,而不是由用户进程完成(用户进程会在select()时阻塞,不占用CPU)。NIO则是在用户进程中遍历所有客户端和服务端的socket。
select的几大缺点:
1. 每次事件循环调用select(),都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
2. 同时每次调用select都需要在内核遍历传递进来的所有fd来查看他们的事件是否就绪(通过在遍历的方式监控事件状态),这个开销在fd很多时也很大
3. select支持监视的文件描述符数量太小了,默认是1024
多路复用器之epoll
相比与select和poll而言,epoll没有描述符数量限制,而且无需多次的将fd从用户空间拷贝到内核空间。
相比于select只有1个系统调用,epoll提供了3个系统调用接口:
epoll_create(size):该函数在内核空间开辟一块新的空间(用来存放要监视的文件描述符),该函数返回一个epoll的文件描述符。size参数限制这个空间可以存多少个文件描述符。
epoll_ctl(int epfd,int op,int fd, struct epoll_event): 该函数是epoll的事件注册函数。这个函数作用是指定监控或者不再监控某个fd的某一个事件(这里说的事件是指 accept,recv,send这些指令)。
第一参:epfd是epoll_create返回的epoll文件描述符,指代在内核空间中开辟的epoll空间
第二参:op是具体的操作,包括:EPOLL_CTL_ADD 注册新的fd到epfd(把fd拷贝到epoll空间);EPOLL_CTL_DEL 从epfd删除一个fd;EPOLL_CTL_MOD 修改已注册的fd的监听事件。
第三参:fd, 需要监听的文件描述符,一般指socket_fd
第四参:event, 告诉内核要监听或者不监听该fd的哪个事件。
epoll_wait(epfd, epoll_event, maxevents, timeout):等待(监控)注册事件的发生。timeout决定这个方法是阻塞的还是非阻塞的。0表示非阻塞,大于0表示阻塞。比如:timeout设为1,调用epoll_wait时内核会监控所有fd是否触发了注册的事件,阻塞1秒,1秒内还是没有fd触发注册的事件,就会返回-1;如果timeout设为0,则马上返回-1;如果timeout为-1,则会一直阻塞,直到有fd的事件状态发生改变。
epoll_create创建的epoll空间又分为两块,一个是事件注册列表,用于记录epoll_ctl给哪些fd注册了哪些事件。一个是事件就绪列表,用于记录哪些fd的哪些事件已经就绪。
epoll的常用事件如下:
EPOLLIN:读就绪 常量值为 1
EPOLLOUT:写就绪 4
EPOLLERR:服务端出现异常 8
EPOLLHUP: 客户端读写关闭(可以理解为client关闭了连接,但是连接还没有断开) 16
epoll有两种工作模式:LT(水平触发,默认) 和 ET(边缘模式)
LT:事件就绪后,用户可以选择处理或不处理,如果用户本次不处理,则下次调用epoll_wait仍会将未处理的事件打包给你
ET:事件就绪后,用必须处理,因为内核将就绪事件打包给你的时候就把对应的就绪事件从就绪列表中删除。
ET模式很大程度减少了epoll_wait被触发的次数(也减少了fd从内核到用户空间的拷贝次数),效率比LT高。
epoll的工作具体流程和使用的数据结构如下:
1.当执行epoll_create()时,会在内核开辟一块内存空间用于存放要监控的fd,这块空间会包含两种数据结构:红黑树和双向链表(就绪链表),其中红黑树用于保存所有的要监控的fd,双向链表保存事件就绪的fd。
2.当执行epoll_ctl()时,会把要监控的fd从用户空间拷贝到内核空间,并为这个fd的对应事件注册一个回调函数,然后再将这个fd作为一个节点添加到红黑树中。
3.当一个fd的读事件或者写事件就绪时,会自动触发这个回调函数,回调函数要做的就是将这个fd添加到就绪链表中。
4.当调用 epoll_wait()时,会检测就绪链表的节点数是否为空,如果不为空说明有fd的事件已经就绪,此时就会将就绪链表中的fd拷贝回用户空间。让用户程序进行相应的读写操作。如果为空,epoll_wait()就会阻塞用户程序,直到有事件就绪。
epoll为什么使用红黑树存fd?
如果用户空间传入一个重复的fd,那么可以通过红黑树的高效查找(O(logn)的复杂度)找到这个fd节点已存在,就不会再把这个fd添加到树中。还有当某一个fd事件就绪的时候,内核也要找到这个fd将它放到就绪链表中。
还有删除一个fd,修改一个fd的事件类型(从监控读事件变为监控写事件),这些操作都必须要找到我要删或者要修改的那个fd,那么使用红黑树这个结构找起来就会非常的快。(树的分支存的是子节点fd的内存地址)
下面用网上的一张图来简单描述epoll的机制
代码实例:
# coding=utf-8
import select, socket
# 服务端代码,该代码不能再windows中运行,因为windows中没有epoll的系统调用
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("0.0.0.0", 8088))
server.listen()
server.setblocking(False)
print("服务器启动成功")
# 创建epoll对象,相当于执行了 epoll_create 在内核中开辟了一块空间用于记录fd事件
epoll = select.epoll()
# 注册要监听的fd事件,相当于调用 epoll_ctl 将fd的事件写入内核空间中。
epoll.register(server.fileno(), select.EPOLLIN) # 监听server套接字的可读事件,第一参传的是服务端socket的文件描述符,文件描述符是一个整型数字
# 存储要监听的socket
monitored_socket = { server.fileno():server}
# 存储客户端发送过来的消息
client_msg = {}
timeout = 10
while True:
# 等待fd的事件就绪(此时内核会监控所有fd的事件状态),相当于执行epoll_wait(),这个过程是阻塞的,上面的setblocking(False)是控制套接字的操作不阻塞,但是不能控制epoll不阻塞
# 这里可以传入阻塞的超时时间timeout, 不传则一直阻塞; 如果没有事件就绪且超过超时时间则返回null;如果有事件就绪,则返回一个列表,每个列表放着一个元祖,元组放着事件就绪的fd和具体事件类型
events = epoll.poll(timeout)
if not events:
print("无事件就绪")
else:
print("有 %s 个事件就绪" % len(events))
for fd, event in events:
ready_socket = monitored_socket[fd] # 根据fd获取就绪的socket对象
print("event", event)
if ready_socket == server: # 如果就绪fd是server,说明server可读事件就绪,表示有客户端连接
client, addr = server.accept() # 非阻塞,而且一定能接收到连接
client_fd = client.fileno()
print("客户端 %s 建立连接成功" % str(client_fd))
client.setblocking(False)
# 先监听客户端的可读事件
epoll.register(client_fd, select.EPOLLIN)
monitored_socket[client_fd] = client
client_msg[client_fd] = [] # 保存该客户端的所有发送过来的消息
elif event & select.EPOLLHUP: # 如果客户端关闭连接
ready_socket.close()
epoll.unregister(fd) # 不再监听该客户端的事件
del monitored_socket[fd]
del client_msg[fd]
print("客户端 %s 关闭连接" % str(fd))
elif event & select.EPOLLIN: # 如果客户端读事件就绪
msg = ready_socket.recv(1024)
if msg:
print("接收到客户端 %s 的消息 %s" % (str(fd), msg.decode("utf-8")))
client_msg[fd].append(msg)
# 修改监听事件为写事件,因为客户端发送消息过来之后我想将消息马上原样发送回给客户端
epoll.modify(fd, select.EPOLLOUT)
else: # 如果返回空字符说明客户端关闭socket断开连接
ready_socket.close()
epoll.unregister(fd) # 不再监听该客户端的事件
del monitored_socket[fd]
del client_msg[fd]
print("客户端 %s 关闭连接" % str(fd))
elif event & select.EPOLLOUT: # 如果客户端写事件就绪,其实只要客户端连接了服务端且它的输入缓冲区没满应该就满足写事件就绪(所谓的客户端写就绪就是服务端可以执行send发送消息给客户端)
try:
msg = client_msg[fd].pop()
except:
print("客户端 %s 消息列表为空" % str(fd))
epoll.modify(fd, select.EPOLLIN) # 如果将客户端的所有消息都发回去了,就改回监听客户端读事件
else:
ready_socket.send(msg)
客户端代码:
# coding=utf-8
from threading import Thread
import socket
# 创建套接字
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定ip和端口
ip = "127.0.0.1"
port = 8088
client.connect((ip, port))
def getResponse():
while True: # 开一个线程用于接收服务器返回的消息
try:
response = client.recv(1024) # 阻塞
print(response.decode("utf-8"))
except: # 如果主线程关闭客户端socket则recv会报错,此时break跳出循环,结束该线程即可
print("子线程由于client断开连接而退出")
break
thread_for_response = Thread(target=getResponse)
thread_for_response.start()
while True:
msg = input()
if msg:
client.send(msg.encode("utf-8"))
print("msg:" + msg)
else: # 如果直接输入换行则断开连接
client.close() # 关闭套接字断开连接
print("client close")
break
PS:client.shutdown()不会断开连接,只是把读写关闭了而已,如果调用client.shutdown(),在服务端的表现就是客户端发送一个空字符给服务端,会触发客户端socket的EPOLLIN。所以我在EPOLLIN里面也加了一段关闭client并且从epoll对象中删除客户端fd的逻辑。
这个代码我把服务端代码放到我的远程服务器运行,客户端放到我的本机运行,不过要将客户端代码的连接ip改为我的远程服务器真实ip才行。
对比select和epoll,epoll比较好的解决了select的3个缺点:
1.select有fd的数量限制默认是1024,epoll没有限制。
2.select通过在内核中遍历所有fd做到事件监控,epoll则通过注册回调函数的方式,当事件就绪时自动将fd放入就绪链表,检测fd的事件是否就绪只需查看就绪链表的节点是否为空即可。
3.在用户程序进行事件循环时,每次循环调用select()都要所有的fd重复的拷贝到内核空间,而epoll会将每次从用户空间拷贝到内核空间的fd保存到红黑树,所以每个fd只需拷贝1次无需重复拷贝,减少cpu的消耗。
但是并不是说epoll的性能就一定比select高,要看具体使用场景:
在连接并发数高,连接活跃度不高(连接了之后不怎么发消息)的情况下,epoll比select更合适
在连接并发数不高,连接活跃度高的情况下,select比epoll更合适
最后强调一点:epoll,poll和select都是同步IO,因为他们需要在socket的事件就绪后由用户程序进行读写(recv,send)。
本文转载自: 张柏沛IT技术博客 > 从IO模型到协程(三) 多路复用之select、poll和epoll