之前参与了一个横向项目,对方要求和他们的服务端对接时,我们开发的客户端必须一直保持连接,即维护一个长连接,这样服务端可以随时对我们下发控制命令。
简介
本文主要介绍如何实现TCP的长连接维护,主要通过Python的socket模块来实现,采用的实现方式为心跳保活策略,即定期发送约定好的心跳包以维持连接不断开。
原理简介
短连接指的是开启一个socket连接,收发完数据后,立刻关闭连接。我们通常使用的TCP就是这种连接方式,其示意图和工作流程如下(Client表示客户端,Server表示服务端)。
- Client对Server发起连接请求;
- Server收到请求,双方建立连接;
- Client向Server发送数据;
- Server回应Client;
- 一次读写完成,此时双方任何一个都可以发起关闭连接操作;
- 另一方收到关闭连接后,断开本次连接。
长连接指的是开启一个socket连接,多次收发数据包直到需要的时候再关闭连接。因此,需要定期发送一个不占用数据传输的心跳包来告知服务端自己的状态以维持连接。其示意图和工作流程如下(Client表示客户端,Server表示服务端)。
- Client向Server发起连接请求;
- Server收到请求,双方建立连接;
- Client多次向Server发送数据(包括心跳包);
- Server对每个收到的数据进行回应;
- 长时间操作之后Client发起关闭请求;
- Server断开和Client的连接。
代码实现
我们约定传输的数据采用JSON格式,且报文结尾都紧跟一个换行符以方便服务端进行解析,下面时具体的代码,为这里去掉了一些具体的业务线程,只保留了简单的心跳发送和接受服务端消息的线程,以方便理解。
- 登录设备,若连接建立则登录成功
- 每隔固定时间向服务端发送一次心跳以保活连接
- 每隔固定时间接受一次服务端发送过来的数据,按照约定的间隔符解析出报文以方便业务处理
import os
import socket
import time
import threading
import json
from loguru import logger
class TCPSocket(object):
def __init__(self, size, ip, port):
"""
@param size: 报文上限大小
@param ip: ip地址
@param port: 端口
"""
self.sk = None
self.size = size
self.format = "utf8"
self.ip_port = (ip, port)
self.logger = logger
self.msg_type = ['LOGIN', 'HEART']
self.login_dict = {
"code": "LOGIN",
}
# 心跳频率
self.heart_interval = 5
self.status_interval = 5
self.adapt_time = False
# 建立socket连接
def connect(self):
self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sk.connect(self.ip_port)
except Exception as e:
self.logger.error("connect to server failed,prepare to reconnect", e)
self.reconnect()
# 重新连接 5s/次
def reconnect(self):
self.logger.info("try to reconnect")
while True:
try:
self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sk.connect(self.ip_port)
self.login_send()
self.logger.info('client start connect to host/port:{}'.format(self.ip_port))
break
except ConnectionRefusedError:
self.logger.error('socket server refused or not started, reconnect to server in 5s .... host/port:{}'.format(self.ip_port))
time.sleep(5)
except Exception as e:
self.logger.error('do connect error:{}'.format(str(e)))
time.sleep(5)
self.logger.info("reconnect successfully!!!")
# 发送登录验证
def login_send(self):
try:
login_msg = self.build_request_json("LOGIN", **self.login_dict)
self.sk.send(login_msg.encode(self.format))
self.logger.info("[tcp client] send logon message:{}".format(login_msg.replace(os.linesep, "")))
except socket.error:
self.logger.info('socket error,do reconnect')
time.sleep(5)
except Exception as e:
self.logger.error(e)
time.sleep(5)
def rec(self):
while True:
try:
message = self.sk.recv(self.size)
messages = self.parse_response_json(message)
if messages:
for msg in messages:
if msg['code'] == "LOGIN":
# 登录成功
self.logger.info("[tcp client] receive logon response: {}".format(msg))
elif msg['code'] == "HEART":
# 收到心跳反馈
self.logger.info("[tcp client] receive heart response: {}".format(msg))
else:
self.logger.warning("message queue is not supported!!!")
else:
self.logger.info("no message from server or messages are not valid:{}".format(messages))
except socket.error as e:
self.logger.error(e)
time.sleep(5)
self.reconnect()
except Exception as e:
self.logger.error(e)
time.sleep(5)
# 间隔固定时间发送心跳
def heartbeats(self):
while True:
try:
msg = self.build_request_json("HEART")
self.sk.send(msg.encode(self.format))
self.logger.info("[tcp client] send heart message:{}".format(msg.replace(os.linesep, "")))
except socket.error:
self.logger.error('socket error,do reconnect')
self.reconnect()
except Exception as e:
self.logger.error('other error occur', e)
time.sleep(5)
self.reconnect()
time.sleep(self.heart_interval)
@staticmethod
def build_request_json(method: str, **args) -> str:
"""
:param method: 该请求的方法类型
:return: 构建好的用于和服务端通信的Json数据
"""
if method == "LOGIN":
json_data = {
"code": "LOGIN",
}
elif method == "HEART":
json_data = {
"code": "HEART",
}
else:
print("this method {} is not supported now!!!".format(method))
json_data = None
return json.dumps(json_data) + os.linesep if json_data else None
def parse_response_json(self, data: bytes):
msgs = []
try:
data_list = data.decode(self.format).split(os.linesep)
data_list = list(filter(lambda x: x.strip().startswith("{"), data_list))
for msg in data_list:
msg = json.loads(msg)
if msg['code'] in self.msg_type:
msgs.append(msg)
return msgs
except Exception as e:
self.logger.error(e)
return None
if __name__ == '__main__':
socket1 = TCPSocket(1024, "127.0.0.1", 5433)
socket1.connect()
socket1.login_send()
t1 = threading.Thread(target=socket1.rec)
t2 = threading.Thread(target=socket1.heartbeats)
t1.start()
t2.start()
服务端的代码正常实现即可,这里就不贴了。
总结
本文简单介绍了如何使用Python实现基于心跳保活的TCP长连接。