需求分析
1. 能够连接多个客户端;
2. 客户端长时间未响应能够断开节省资源;
3. 能够群发,一个用户发来一条消息,所有客户端都可以接收;
思路
直接使用socket.socket类就可以实现,但是这样的话我们需要自己手动写监听,写accept,把accept单独放入一个线程,每连入一个线程就新开辟一个线程。
这里我们选用socketserver这个库来实现,这样我们只需要把核心步骤写出来即可,其他的都能自动化帮我们完成了。
1 mport socketserver 2 import datetime 3 import threading 4 5 6 class MyHandle(socketserver.BaseRequestHandler): # 每一个线程就一个实例 7 def self_server_init(self): 8 if not hasattr(self.server, "clients"): 9 setattr(self.server, "clients", {}) # 如果没有这个属性就给他增加一个 10 if not hasattr(self.server, "_lock_clients"): 11 setattr(self.server, "lock_clients", threading.Lock()) 12 self.server.__dict__["hb_interval"] = 10 13 self.server.clients[self.key] = datetime.datetime.now().timestamp() # 记录时间 14 15 def setup(self): 16 # self init 17 self.event = threading.Event() 18 self.key = self.request, self.event 19 # self.server init 20 self.self_server_init() 21 22 def handle(self): 23 no_hb = set() 24 while not self.event.is_set(): 25 data = self.request.recv(1024) 26 print(data.decode()) 27 if data == b"^hb^": # 收到了心跳包 28 with self.server.lock_clients: 29 self.server.clients[self.key] = datetime.datetime.now().timestamp() 30 if data == b"" or data.strip() == b"quit": 31 with self.server.lock_clients: 32 self.server.clients.pop(self.key, None) # 如果退出就弹出了 33 break # 因为一个线程是一个连接 34 35 # 如果没有退出我们就刷新时间,也有可能是新的连接 36 self.server.clients[self.key] = datetime.datetime.now().timestamp() 37 38 # 发送消息的时候检查是否超时了 39 with self.server.lock_clients: 40 for key, t in self.server.clients.items(): 41 if datetime.datetime.now().timestamp() - t > self.server.hb_interval: # 如果超时了记录一下这个key 42 no_hb.add(key) # s和e都可哈希 43 continue 44 key[0].send("from {}: {}".format(self.client_address, data.decode()).encode()) 45 for key in no_hb: 46 self.server.clients.pop(key) # 移除没有心跳的客户端 47 no_hb.clear() 48 49 def finish(self): 50 super().finish() 51 self.event.set() 52 53 54 server = socketserver.ThreadingTCPServer(("127.0.0.1", 9999), MyHandle) 55 server.serve_forever() # 启动