python下的select模块使用 以及epoll与select、poll的区别
先说epoll与select、poll的区别(总结)
整理http://www.zhihu.com/question/32163005
http://www.cnblogs.com/Anker/p/3265058.html
select, poll, epoll 都是I/O多路复用的具体的实现,之所以有这三个鬼存在,其实是他们出现是有先后顺序的。
I/O多路复用这个概念被提出来以后, select是第一个实现 (1983 左右在BSD里面实现的)。
select
select 被实现以后,很快就暴露出了很多问题。
- select 会修改传入的参数数组,这个对于一个需要调用很多次的函数,是非常不友好的。
- 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
- select 如果任何一个sock(I/O stream)出现了数据,select仅仅会返回,但是并不会告诉你是那个sock上有数据,于是你只能自己一个一个的找,)每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
- select 只能监视1024个链接, 这个跟草榴没啥关系哦,linux 定义在头文件中的,参见FD_SETSIZE。
- select 不是线程安全的,如果你把一个sock加入到select, 然后突然另外一个线程发现,尼玛,这个sock不用,要收回。对不起,这个select 不支持的,如果你丧心病狂的竟然关掉这个sock, select的标准行为是。。呃。。不可预测的,
于是14年以后(1997年)一帮人又实现了poll, poll 修复了select的很多问题
poll
- poll 去掉了1024个链接的限制,于是要多少链接呢, 主人你开心就好。
- poll 从设计上来说,不再修改传入数组,不过这个要看你的平台了,所以行走江湖,还是小心为妙。
其实拖14年那么久也不是效率问题, 而是那个时代的硬件实在太弱,一台服务器处理1千多个链接简直就是神一样的存在了,select很长段时间已经满足需求。
但是poll仍然不是线程安全的, 这就意味着,不管服务器有多强悍,你也只能在一个线程里面处理一组I/O流。你当然可以那多进程来配合了,不过然后你就有了多进程的各种问题。
于是5年以后, 在2002, 大神 Davide Libenzi 实现了epoll.
epoll
epoll 可以说是I/O 多路复用最新的一个实现,epoll 修复了poll 和select绝大部分问题, 比如:
- 对于每次需要将FD从用户态拷贝至内核态,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。
- 同样epoll也没有1024的连接数限制
- epoll 现在是线程安全的。
- epoll 现在不仅告诉你sock组里面数据,还会告诉你具体哪个sock有数据,你不用自己去找了。
- epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。
总结
(1)select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间。这就是回调机制带来的性能提升。
(2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等待队列)。这也能节省不少的开销。
python下的一个select实例
转自http://www.cnblogs.com/coser/archive/2012/01/06/2315216.html
server
import select
import socket
import Queue
#create a socket
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)
#set option reused
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)
server_address= ('192.168.1.102',10001)
server.bind(server_address)
server.listen(10)
#sockets from which we except to read
inputs = [server]
#sockets from which we expect to write
outputs = []
#Outgoing message queues (socket:Queue)
message_queues = {}
#A optional parameter for select is TIMEOUT
timeout = 20
while inputs:
print "waiting for next event"
readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
# When timeout reached , select return three empty lists
if not (readable or writable or exceptional) :
print "Time out ! "
break;
for s in readable :
if s is server:
# A "readable" socket is ready to accept a connection
connection, client_address = s.accept()
print " connection from ", client_address
connection.setblocking(0)
inputs.append(connection)
message_queues[connection] = Queue.Queue()
else:
data = s.recv(1024)
if data :
print " received " , data , "from ",s.getpeername()
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
else:
#Interpret empty result as closed connection
print " closing", client_address
if s in outputs :
outputs.remove(s)
inputs.remove(s)
s.close()
#remove message queue
del message_queues[s]
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
print " " , s.getpeername() , 'queue empty'
outputs.remove(s)
else:
print " sending " , next_msg , " to ", s.getpeername()
s.send(next_msg)
for s in exceptional:
print " exception condition on ", s.getpeername()
#stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
#Remove message queue
del message_queues[s]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
client
import socket
messages = ["This is the message" ,
"It will be sent" ,
"in parts "]
print "Connect to the server"
server_address = ("192.168.1.102",10001)
#Create a TCP/IP sock
socks = []
for i in range(10):
socks.append(socket.socket(socket.AF_INET,socket.SOCK_STREAM))
for s in socks:
s.connect(server_address)
counter = 0
for message in messages :
#Sending message from different sockets
for s in socks:
counter+=1
print " %s sending %s" % (s.getpeername(),message+" version "+str(counter))
s.send(message+" version "+str(counter))
#Read responses on both sockets
for s in socks:
data = s.recv(1024)
print " %s received %s" % (s.getpeername(),data)
if not data:
print "closing socket ",s.getpeername()
s.close()
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
poll server
import socket
import select
import Queue
# Create a TCP/IP socket, and then bind and listen
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.102", 10001)
print "Starting up on %s port %s" % server_address
server.bind(server_address)
server.listen(5)
message_queues = {}
#The timeout value is represented in milliseconds, instead of seconds.
timeout = 1000
# Create a limit for the event
READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)
# Set up the poller
poller = select.poll()
poller.register(server,READ_ONLY)
#Map file descriptors to socket objects
fd_to_socket = {server.fileno():server,}
while True:
print "Waiting for the next event"
events = poller.poll(timeout)
print "*"*20
print len(events)
print events
print "*"*20
for fd ,flag in events:
s = fd_to_socket[fd]
if flag & (select.POLLIN | select.POLLPRI) :
if s is server :
# A readable socket is ready to accept a connection
connection , client_address = s.accept()
print " Connection " , client_address
connection.setblocking(False)
fd_to_socket[connection.fileno()] = connection
poller.register(connection,READ_ONLY)
#Give the connection a queue to send data
message_queues[connection] = Queue.Queue()
else :
data = s.recv(1024)
if data:
# A readable client socket has data
print " received %s from %s " % (data, s.getpeername())
message_queues[s].put(data)
poller.modify(s,READ_WRITE)
else :
# Close the connection
print " closing" , s.getpeername()
# Stop listening for input on the connection
poller.unregister(s)
s.close()
del message_queues[s]
elif flag & select.POLLHUP :
#A client that "hang up" , to be closed.
print " Closing ", s.getpeername() ,"(HUP)"
poller.unregister(s)
s.close()
elif flag & select.POLLOUT :
#Socket is ready to send data , if there is any to send
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
# No messages waiting so stop checking
print s.getpeername() , " queue empty"
poller.modify(s,READ_ONLY)
else :
print " sending %s to %s" % (next_msg , s.getpeername())
s.send(next_msg)
elif flag & select.POLLERR:
#Any events with POLLERR cause the server to close the socket
print " exception on" , s.getpeername()
poller.unregister(s)
s.close()
del message_queues[s]