手写微信群聊服务器

需求分析

1. 能够连接多个客户端;

2. 客户端长时间未响应能够断开节省资源;

3. 能够群发,一个用户发来一条消息,所有客户端都可以接收;

 

思路

直接使用socket.socket类就可以实现,但是这样的话我们需要自己手动写监听,写accept,把accept单独放入一个线程,每连入一个线程就新开辟一个线程。

 

这里我们选用socketserver这个库来实现,这样我们只需要把核心步骤写出来即可,其他的都能自动化帮我们完成了。

 1 mport socketserver
 2 import datetime
 3 import threading
 4 
 5 
 6 class MyHandle(socketserver.BaseRequestHandler):  # 每一个线程就一个实例
 7     def self_server_init(self):
 8         if not hasattr(self.server, "clients"):
 9             setattr(self.server, "clients", {})  # 如果没有这个属性就给他增加一个
10         if not hasattr(self.server, "_lock_clients"):
11             setattr(self.server, "lock_clients", threading.Lock())
12         self.server.__dict__["hb_interval"] = 10
13         self.server.clients[self.key] = datetime.datetime.now().timestamp()  # 记录时间
14 
15     def setup(self):
16         # self init
17         self.event = threading.Event()
18         self.key = self.request, self.event
19         # self.server init
20         self.self_server_init()
21 
22     def handle(self):
23         no_hb = set()
24         while not self.event.is_set():
25             data = self.request.recv(1024)
26             print(data.decode())
27             if data == b"^hb^":   # 收到了心跳包
28                 with self.server.lock_clients:
29                     self.server.clients[self.key] = datetime.datetime.now().timestamp()
30             if data == b"" or data.strip() == b"quit":
31                 with self.server.lock_clients:
32                     self.server.clients.pop(self.key, None)  # 如果退出就弹出了
33                 break  # 因为一个线程是一个连接
34 
35             # 如果没有退出我们就刷新时间,也有可能是新的连接
36             self.server.clients[self.key] = datetime.datetime.now().timestamp()
37 
38             # 发送消息的时候检查是否超时了
39             with self.server.lock_clients:
40                 for key, t in self.server.clients.items():
41                     if datetime.datetime.now().timestamp() - t > self.server.hb_interval:  # 如果超时了记录一下这个key
42                         no_hb.add(key)  # s和e都可哈希
43                         continue
44                     key[0].send("from {}: {}".format(self.client_address, data.decode()).encode())
45                 for key in no_hb:
46                     self.server.clients.pop(key)  # 移除没有心跳的客户端
47                 no_hb.clear()
48 
49     def finish(self):
50         super().finish()
51         self.event.set()
52 
53 
54 server = socketserver.ThreadingTCPServer(("127.0.0.1", 9999), MyHandle)
55 server.serve_forever()   # 启动

 

上一篇:【Maven实战技巧】「插件使用专题」Maven-Assembly插件实现自定义打包


下一篇:flowable-流程中心设计之子流程(九)