下面这篇,原理理解了,
再结合 这一周来的心得体会,整个框架就差不多了。。。
http://www.haiyun.me/archives/1056.html
有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
#!/usr/bin/python # -*- coding: utf-8 -*- import select
import socket
import Queue
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking( False )
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1 )
server_address = ( '192.168.1.5' , 8080 )
server.bind(server_address) server.listen( 10 )
#select轮询等待读socket集合 inputs = [server]
#select轮询等待写socket集合 outputs = []
message_queues = {}
#select超时时间 timeout = 20
while True :
print "等待活动连接......"
readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
if not (readable or writable or exceptional) :
print "select超时无活动连接,重新select...... "
continue ;
#循环可读事件
for s in readable :
#如果是server监听的socket
if s is server:
#同意连接
connection, client_address = s.accept()
print "新连接: " , client_address
connection.setblocking( 0 )
#将连接加入到select可读事件队列
inputs.append(connection)
#新建连接为key的字典,写回读取到的消息
message_queues[connection] = Queue.Queue()
else :
#不是本机监听就是客户端发来的消息
data = s.recv( 1024 )
if data :
print "收到数据:" , data , "客户端:" ,s.getpeername()
message_queues[s].put(data)
if s not in outputs:
#将读取到的socket加入到可写事件队列
outputs.append(s)
else :
#空白消息,关闭连接
print "关闭连接:" , client_address
if s in outputs :
outputs.remove(s)
inputs.remove(s)
s.close()
del message_queues[s]
for s in writable:
try :
msg = message_queues[s].get_nowait()
except Queue.Empty:
print "连接:" , s.getpeername() , '消息队列为空'
outputs.remove(s)
else :
print "发送数据:" , msg , "到" , s.getpeername()
s.send(msg)
for s in exceptional:
print "异常连接:" , s.getpeername()
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del message_queues[s]
|
Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
#!/usr/bin/python # -*- coding: utf-8 -*- import socket
import select
import Queue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking( False )
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
server_address = ( "192.168.1.5" , 8080 )
server.bind(server_address) server.listen( 5 )
print "服务器启动成功,监听IP:" , server_address
message_queues = {}
#超时,毫秒 timeout = 5000
#监听哪些事件 READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)
#新建轮询事件对象 poller = select.poll()
#注册本机监听socket到等待可读事件事件集合 poller.register(server,READ_ONLY) #文件描述符到socket映射 fd_to_socket = {server.fileno():server,}
while True :
print "等待活动连接......"
#轮询注册的事件集合
events = poller.poll(timeout)
if not events:
print "poll超时,无活动连接,重新poll......"
continue
print "有" , len (events), "个新事件,开始处理......"
for fd ,flag in events:
s = fd_to_socket[fd]
#可读事件
if flag & (select.POLLIN | select.POLLPRI) :
if s is server :
#如果socket是监听的server代表有新连接
connection , client_address = s.accept()
print "新连接:" , client_address
connection.setblocking( False )
fd_to_socket[connection.fileno()] = connection
#加入到等待读事件集合
poller.register(connection,READ_ONLY)
message_queues[connection] = Queue.Queue()
else :
#接收客户端发送的数据
data = s.recv( 1024 )
if data:
print "收到数据:" , data , "客户端:" , s.getpeername()
message_queues[s].put(data)
#修改读取到消息的连接到等待写事件集合
poller.modify(s,READ_WRITE)
else :
# Close the connection
print " closing" , s.getpeername()
# Stop listening for input on the connection
poller.unregister(s)
s.close()
del message_queues[s]
#连接关闭事件
elif flag & select.POLLHUP :
print " Closing " , s.getpeername() , "(HUP)"
poller.unregister(s)
s.close()
#可写事件
elif flag & select.POLLOUT :
try :
msg = message_queues[s].get_nowait()
except Queue.Empty:
print s.getpeername() , " queue empty"
poller.modify(s,READ_ONLY)
else :
print "发送数据:" , data , "客户端:" , s.getpeername()
s.send(msg)
#异常事件
elif flag & select.POLLERR:
print " exception on" , s.getpeername()
poller.unregister(s)
s.close()
del message_queues[s]
|
Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
#!/usr/bin/python # -*- coding: utf-8 -*- import socket, select
import Queue
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
server_address = ( "192.168.1.5" , 8080 )
serversocket.bind(server_address) serversocket.listen( 1 )
print "服务器启动成功,监听IP:" , server_address
serversocket.setblocking( 0 )
timeout = 10
#新建epoll事件对象,后续要监控的事件添加到其中 epoll = select.epoll()
#添加服务器监听fd到等待读事件集合 epoll.register(serversocket.fileno(), select.EPOLLIN) message_queues = {}
fd_to_socket = {serversocket.fileno():serversocket,}
while True :
print "等待活动连接......"
#轮询注册的事件集合
events = epoll.poll(timeout)
if not events:
print "epoll超时无活动连接,重新轮询......"
continue
print "有" , len (events), "个新事件,开始处理......"
for fd, event in events:
socket = fd_to_socket[fd]
#可读事件
if event & select.EPOLLIN:
#如果活动socket为服务器所监听,有新连接
if socket = = serversocket:
connection, address = serversocket.accept()
print "新连接:" , address
connection.setblocking( 0 )
#注册新连接fd到待读事件集合
epoll.register(connection.fileno(), select.EPOLLIN)
fd_to_socket[connection.fileno()] = connection
message_queues[connection] = Queue.Queue()
#否则为客户端发送的数据
else :
data = socket.recv( 1024 )
if data:
print "收到数据:" , data , "客户端:" , socket.getpeername()
message_queues[socket].put(data)
#修改读取到消息的连接到等待写事件集合
epoll.modify(fd, select.EPOLLOUT)
#可写事件
elif event & select.EPOLLOUT:
try :
msg = message_queues[socket].get_nowait()
except Queue.Empty:
print socket.getpeername() , " queue empty"
epoll.modify(fd, select.EPOLLIN)
else :
print "发送数据:" , data , "客户端:" , socket.getpeername()
socket.send(msg)
#关闭事件
elif event & select.EPOLLHUP:
epoll.unregister(fd)
fd_to_socket[fd].close()
del fd_to_socket[fd]
epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close() |