python之IO多路复用

  在python的网络编程里,socetserver是个重要的内置模块,其在内部其实就是利用了I/O多路复用、多线程和多进程技术,实现了并发通信。与多进程和多线程相比,I/O多路复用的系统开销小,系统不必额外再创建进程或线程,也就不需要维护这些进程或线程,从而大大减小了系统的开销。当然,这三者不是孤立的,可以联合使用,效果可能更好。

IO多路复用的原理

   首先,必须说明的是IO多路复用不是python专有的概念,它是系统层面的;

  其次,所谓的多路复用只是一种如何在“人很多但是路只有一条”的情况下快速通行的方法,类似于通信中的“时分复用”;

  最后,在谈及网络编程的IO多路复用时,我们一般是针对socket而言的。

  在类似大型网站高并发的环境中,同时进行的socket通信数量及其庞大,每一个socket通信链路就是一个IO流,而主机的IO吞吐能力又是有限的,为了尽量的提高处理效率,先后开发出了select,poll,epoll三种IO多路复用的机制。他们可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。其本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的。

  1983 BSD里面实现了select机制,但很快就暴露出了很多问题。
  • select 会修改传入的参数数组,这个对于一个需要调用很多次的函数,是非常不友好的;
  • select会将每一个有变化的socket加入它维护的列表中,但是并不会明确是哪一个socket。它内部其实是通过一个for循环遍历整个列表。当socket的数量不多的时候可能还好,但是几万、几十万、几千万个socket的时候,这个for循环就比较坑了;

  • select 同时只能监视1024个链接,它是由linux 在头文件中定义的,参见FD_SETSIZE,这个可以修改,问题不算大;

  • select 不是线程安全的,如果你把一个socket加入到select, 然后突然另外一个线程关闭了这个socket, 那么接下来select的行为是随机的....

  14年以后的1997年poll机制被发明, 它修复了select的很多问题:

  • poll 去掉了1024个链接的限制,想要多少就多少;
  • poll 不再修改传入数组,但是与平台有关。

  但是poll仍然不是线程安全的!

  5年以后的2002,牛人Davide Libenzi 实现了epoll机制。它修复了poll 和select的绝大部分问题, 比如:

  • epoll 是线程安全的;
  • epoll 不仅告诉你sock组里面数据,还会告诉你具体哪个socket有数据,你不用自己去找了。

  看到这里,貌似有了epoll,select之流可以退休了。但实际情况不是这样的。

  • windows暂时只支持seclet,它没有epoll....
  • select的各平台支持度比较好,API也比较通用,通俗点就是“皮实耐操通用性好舒适度差”;
  • epoll是linux内核原生支持的机制,虽然强大,但是各平台支持度不一样,API也差别较大,就是那种“高大上但局限性高”的东西。不过epoll显然是未来的大趋势。

python中的IO多路复用

  了解了IO多路复用,我们就来看看python中是如何使用的。Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。(下面以select方法为例)

Windows Python:
    提供: select
Mac Python:
    提供: select
Linux Python:
    提供: select、poll、epoll

  注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持其他IO操作,但是无法检测 普通文件操作 自动上次读取是否已经变化。

  对于select方法:

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间) 
参数: 可接受四个参数(前三个必须)
返回值:三个列表

select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。
1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中
2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中
3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中
4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化
   当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import select
import threading
import sys

while True:
    readable, writeable, error = select.select([sys.stdin,],[],[],1)
    if sys.stdin in readable:
        print( 'select get stdin',sys.stdin.readline())

利用select监听终端操作实例

利用select实现伪同时处理多个Socket客户端请求:服务端

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

ip_port = ('127.0.0.1',8002)
sk = socket.socket()
sk.connect(ip_port)

while True:
    inp = input('please input:').encode()
    sk.sendall(inp)
sk.close()

利用select实现伪同时处理多个Socket客户端请求:客户端

  此处的Socket服务端相比与原生的Socket,他支持当某一个请求不再发送数据时,服务器端不会等待而是可以去处理其他请求的数据。但是,如果每个请求的耗时比较长时,select版本的服务器端也无法完成同时操作。

#!/usr/bin/env python
#coding:utf8

'''
 服务器的实现 采用select的方式
'''

import select
import socket
import sys
import Queue

#创建套接字并设置该套接字为非阻塞模式

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(0)

#绑定套接字
server_address = ('localhost',10000)

server.bind(server_address)

#将该socket变成服务模式
#backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5
#这个值不能无限大,因为要在内核中维护连接队列

server.listen(5)

#初始化读取数据的监听列表,最开始时希望从server这个套接字上读取数据
inputs = [server]

#初始化写入数据的监听列表,最开始并没有客户端连接进来,所以列表为空

outputs = []

#要发往客户端的数据
message_queues = {}
while inputs:
    print('waiting for the next event')
    #调用select监听所有监听列表中的套接字,并将准备好的套接字加入到对应的列表中
    readable,writable,exceptional = select.select(inputs,outputs,inputs)#列表中的socket 套接字  如果是文件呢?
    #监控文件句柄有某一处发生了变化 可写 可读  异常属于Linux中的网络编程
    #属于同步I/O操作,属于I/O复用模型的一种
    #rlist--等待到准备好读
    #wlist--等待到准备好写
    #xlist--等待到一种异常
    #处理可读取的套接字

    '''
        如果server这个套接字可读,则说明有新链接到来
        此时在server套接字上调用accept,生成一个与客户端通讯的套接字
        并将与客户端通讯的套接字加入inputs列表,下一次可以通过select检查连接是否可读
        然后在发往客户端的缓冲中加入一项,键名为:与客户端通讯的套接字,键值为空队列
        select系统调用是用来让我们的程序监视多个文件句柄(file descrīptor)的状态变化的。程序会停在select这里等待,
        直到被监视的文件句柄有某一个或多个发生了状态改变
        '''

    '''
        若可读的套接字不是server套接字,有两种情况:一种是有数据到来,另一种是链接断开
        如果有数据到来,先接收数据,然后将收到的数据填入往客户端的缓存区中的对应位置,最后
        将于客户端通讯的套接字加入到写数据的监听列表:
        如果套接字可读.但没有接收到数据,则说明客户端已经断开。这时需要关闭与客户端连接的套接字
        进行资源清理
        '''

    for s in readable:
        if s is server:
            connection,client_address = s.accept()
            print('connection from',client_address)
            connection.setblocking(0)#设置非阻塞
            inputs.append(connection)
            message_queues[connection] = Queue.Queue()
        else:
            data = s.recv(1024).decode()
            if data:
                print('received "%s" from %s'% \
                (data,s.getpeername()))
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                print('closing',client_address)
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]

    #处理可写的套接字
    '''
        在发送缓冲区中取出响应的数据,发往客户端。
        如果没有数据需要写,则将套接字从发送队列中移除,select中不再监视
        '''

    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()

        except Queue.Empty:
            print('  ',s,getpeername(),'queue empty')
            outputs.remove(s)
        else:
            print('sending "%s" to %s'% \
            (next_msg,s.getpeername()))
            s.send(next_msg)

    #处理异常情况

    for s in exceptional:
        for s in exceptional:
            print('exception condition on',s.getpeername())
            inputs.remove(s)
            if s in outputs:
                outputs.remove(s)
            s.close()
            del message_queues[s]

基于select实现socket服务端

上一篇:MVC自定义AuthorizeAttribute实现权限管理


下一篇:09 Python之IO多路复用