+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
socketserver有四个同步类: TCPServer UDPServer UnixStreamServer UnixDatagramServer
2个Mixin类ForkingMixin和ThreadingMixin类来支持异步
class ForkingUDPServer(ForkingMixIn,UDPServer): pass
class ForkingTCPServer(ForkingMixIn,TCPServer): pass
class ThreadingUDPServer(ThreadingMixIn,UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn,TCPServer): pass
fork是创建多进程,thread是创建多线程
编程接口:
socketserver.BaseServer(server_address,RequestHandlerClass)
需要提供server的绑定地址,和用于处理请求的RequestHandlerClass类,RequestHandlerClass类必须是BaseRequestHandler的子类
BaseServer
def finish_request(self,request,client_address): """finish one request by instantiating RequestHandlerClass""" self.RequestHandlerClass(request,client_address,self)
BaseRequestHandler类
和用户连接的request请求处理类,Server实例接受用户请求后,最后会instantiating这个类
它被初始化时,送入三个构造参数, request client_address server
以后可以在BaseRequestHandler类的实例上通过
self.request是和client连接的socket对象,相当于accept后创建的recv线程
self.server时TCPServer本身
self.client_address是client地址
它会一次调用三个函数,子类可以覆盖
# BaseRequestHandler 需要子类覆盖的方法 def setup(self): # 每一个连接初始化 pass def handler(self): # 每一次处理请求 pass def finish(self): # 每一个连接清理 pass
import socketserver,threading addr=('127.0.0.1',9999) class MyHandler(socketserver.BaseRequestHandler): def setup(self): super().finish() print('setup') self.event=threading.Event() def handle(self): super().handle() print(self.server,self.client_address,self.request) print('{} handler'.format(self.__class__)) print(self.__dict__) print(type(self).__dict__) print(self.__class__.__bases__[0].__dict__) print(threading.enumerate(),threading.current_thread()) while not self.event.is_set(): data=self.request.recv(1024) dope='pdu {}'.format(data.decode()).encode() print(dope) self.request.send(dope) def finish(self): super().finish() self.event.set() server=socketserver.ThreadingTCPServer(addr,MyHandler) server.serve_forever()
server.shutdown() server.server_close()
测试结果说明,handle方法和socket的accept对应,用户连接请求过来后,建立连接并生成一个socket对象保存在self.request中,客户端地址保存在self.client_address中
创建server步骤:
- 必须通过生成BaseRequestHandler的子类并覆盖其handle()方法来创建请求处理程序类,此方法将处理传入请求
- 必须实例化一个socketserver类,传入server地址 & BaseRequestHandler子类
- 调用实例的handle_request() (相当于accept没有套loop) or serve_forever() 方法
- 调用server_close() 关闭套接字,shutdown() 方法等待停止serve_forever()
EchoServer:
import socketserver,threading class EchoHandler(socketserver.BaseRequestHandler): def __init__(self,request,client_address,server): super().__init__(request,client_address,server) def setup(self): super().setup() self.event=threading.Event() print('setup') def handle(self): super().handle() print('handle commence') while not self.event.is_set(): try: dope=self.request.recv(1024) except ConnectionAbortedError as e: print(e) break self.request.send(b'ack '+dope) print('handle end') def finish(self): super().finish() self.event.set() print('finish') s=socketserver.ThreadingTCPServer(('',9999),EchoHandler) s_thread=threading.Thread(target=s.serve_forever,daemon=True) s_thread.start() try: while True: dope=input('>>').strip() if dope == 'quit': break except Exception as e: print(e) except KeyboardInterrupt as e: print(e) finally: s.shutdown() s.server_close()
TcpServer:
import threading,socketserver,logging logging.basicConfig(level=logging.ERROR,format='[%(thread)d %(threadName)s] %(message)s') class MyHandler(socketserver.BaseRequestHandler): # preserve clients clients={} def setup(self): self.event=threading.Event() def handle(self): conn=self.request while not self.event.is_set(): try: data=conn.recv(1024).decode().strip() except Exception as e: logging.error(e) data='quit' if data == 'quit': self.clients.pop(self.client_address) logging.critical('{} leaving'.format(self.client_address)) break logging.info(data) self.clients[self.client_address]=conn for c in self.clients.values(): c.send('ack {}'.format(data).encode()) def finish(self): self.event.set() s=socketserver.ThreadingTCPServer(('',9999),MyHandler) threading.Thread(target=s.serve_forever,name='socketserver').start() from emit1.relevance1 import * threading.Thread(target=showThreads,name='showThreads',daemon=True).start() print(threading.enumerate()) while True: dope=input('>>').strip() if dope == 'quit': logging.critical('leaving !') s.shutdown() s.server_close() break
import threading def showThreads(interval=3,event=threading.Event()): while not event.wait(interval): print(threading.enumerate())
import threading,socketserver class ChatHandler(socketserver.BaseRequestHandler): clients={} def finish(self): super().finish() self.clients.pop(self.client_address) self.event.set() def setup(self): super().setup() self.event=threading.Event() print(self.client_address,self.clients,threading.current_thread()) def handle(self): while not self.event.is_set(): data=self.request.recv(1024),strip().decode() if data == 'quit': break dope='{} {}'format(self.client_address,data).encode() for c in self.clients.values(): c.send(dope) print('End') s=socketserver.ThreadingTCPServer(('',9999),ChatHandler) s_thread=threading.Thread(target=s.serve_forever,daemon=True) s_thread.start() try: while True: dope=input('>>').strip() if dope == 'quit': break except Exception as e: print(e) except KeyboardInterrupt as e: print('Exit') finally: s.shutdown() s.server_close()
为每一个连接提供RequestHandlerClass类实例,依次调用setup handle finish方法,且使用了try finally保证finish一定被调用,这些方法依次执行,如果想维持这个连接和客户端通信,就需要在handle中使用loop
socketserver提供不同的类,但api是一样的