缘由
之前写socket的CS模型代码,都是利用最原始的多线程方式。服务端是主线程,接到客户端的连接请求就从线程池中获取一个线程去处理整个socket连接的所有操作,虽然在连接数较短的情况下没有什么影响,但是当连接数很大的情况下,线程的切换和线程池的大小问题就明显起来了。
问题
应该存在一种方式可以让一个线程去处理多个连接,在连接有事情做的时候才过去处理,不然的话线程就挂起,让线程的利用率更高,于是后来学习了select以及epoll。在这里我重点总结一下select
select的原理
select是监听触发机制,监听可读和可写队列。当有事件发生时,进行遍历轮询事件发生的对象然后返回
比如在服务端启动之后把服务端添加到select的可读监听队列中,当有客户端请求连接服务端时,select函数会返回一个可读事件再让服务端接受客户端的连接。
select的返回方式可以是阻塞或者是非阻塞,非阻塞式的select处理方式是轮询的,会不断询问占用Cpu太多的资源和时间,所以建议使用阻塞等待返回的方式去使用select
优点:
没有了多线程的创建、销毁、切换带来的效率和内存上的消耗
缺点
select存在一个最大可监听文件描述符数量,所以会收到最大连接监听的限制
select在事件发生以后也是需要遍历监听对象,并不能直接定位到哪个对象,这个操作在对象数量庞大的情况下是个效率的瓶颈。所以后来有了epoll
程序流程描述:
1. 在创建了server的socket以后,通过无限循环监听select事件执行相应的操作
2. 客户端请求连接服务端,inputs非空,outputs为空,为queue增添客户端对象 ,readable非空,writeable为空
3. 服务端接收客户端发送的数据,为outputs增添消息数据,readable不为空,writeable为空,如果接收失败说明客户端断开了连接,则终止监听此客户端的消息,回到第1步等待客户端连接
4. outputs非空触发select事件,readable为空,writeable不为空,将queue中的数据发送到客户端,如果queue为空触发清除outputs操作
5. 等待客户端发送数据,回到第2步
服务端代码:
#!/usr/bin/env python
#*-* coding:utf-8 *-*
import select
import socket
import Queue
import time
import os
class Server():
#创建socket 套接字
def __init__(self):
self.server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.server.setblocking(False)
#配置参数
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_address= ('127.0.0.1',8008)
self.server.bind(self.server_address)
self.server.listen(10)
self.inputs = [self.server]
self.outputs = []
self.message_queues = {}
#timeout = 20
def run(self):
while self.inputs:
print "=================================="
print "waiting for next event"
print "inputs",self.inputs
print "outputs",self.outputs
print "queue", self.message_queues
#readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout) 最后一个是超时,当前连接要是超过这个时间的话,就会kill
readable , writable , exceptional = select.select(self.inputs, self.outputs, self.inputs)
print "readable , writable , exceptional",readable , writable , exceptional
# When timeout reached , select return three empty lists
if not (readable or writable or exceptional) :
print "Time out ! "
break;
for s in readable :
if s is self.server:
#通过self.inputs查看是否有客户端来
connection, client_address = s.accept()
print " connection from ", client_address
connection.setblocking(0)
self.inputs.append(connection)
self.message_queues[connection] = Queue.Queue()
else:
try:
data = s.recv(1024)
except:
print " closing", client_address
if s in self.outputs :
self.outputs.remove(s)
self.inputs.remove(s)
s.close()
#清除队列信息
del self.message_queues[s]
else:
if data :
print " received " , data , "from ",s.getpeername()
self.message_queues[s].put(data)
# Add output channel for response
if s not in self.outputs:
self.outputs.append(s)
for s in writable:
try:
next_msg = self.message_queues[s].get_nowait()
except Queue.Empty:
print " " , s.getpeername() , 'queue empty'
self.outputs.remove(s)
else:
print " sending " , next_msg , " to ", s.getpeername()
os.popen('sleep 5').read()
s.send(next_msg)
for s in exceptional:
print " exception condition on ", s.getpeername()
#stop listening for input on the connection
self.inputs.remove(s)
if s in self.outputs:
self.outputs.remove(s)
s.close()
#清除队列信息
del self.message_queues[s]
if __name__ == "__main__":
s = Server()
s.run()
客户端代码:
# -*- coding:UTF-8 -*-
import socket
import os
import signal
import time, threading
def SendDataToServer(sock):
while True:
put = str(raw_input())
sock.send(put)
def ReceiveData(sock):
while True:
try:
data = sock.recv(1024)
time.sleep(1)
except:
print "Server Down!"
os._exit(0)
else:
if data:
print "RECEIVE:",data
if __name__ == "__main__":
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', 8008))
send_tick = threading.Thread(target=SendDataToServer, args=(s,))
rec_tick = threading.Thread(target=ReceiveData, args=(s,))
send_tick.start()
rec_tick.start()