背景:曾参与过一个mqtt项目,大致项目背景为,设备终端作为客户端通过mqtt平台发布某个主题的消息,用户系统也作为客户端,连接到mqtt平台,且订阅该主题,收到终端消息。研发想测试在现有的终端连接数下mqtt平台的稳定性,并且测试平台的连接上限。需要模拟多终端连接mqtt平台并发布、订阅主题。
大致思路如下:
- 将已经注册过的终端号填写进excel中,读取excel中的终端号
- 使用多线程实现多终端连接mqtt
- 多终端发布主题“devicePublish”
- 发送成功后,订阅主题“设备终端号”
需要用到的包为paho-mqtt、threading
paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库,它在 Python 2.7 或 3.x 上为客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。
client.py
# -*- coding: utf-8 -*-
import json
import paho.mqtt.client as mqtt
from threading import Thread
import time
import xlrd
class MqttManager(Thread):
def __init__(self, thread_name, HOST, PORT, terminal_id):
Thread.__init__(self)
self.thread_name = thread_name
self.HOST = HOST
self.PORT = PORT
self.terminal_id = terminal_id
def on_connect(self, client, userdata, flags, rc):
print(self.terminal_id + ":" + "Connected with result code " + str(rc))
client.subscribe(self.terminal_id) # 订阅消息 def on_message(self, client, userdata, msg):
print(self.terminal_id + ":" + "主题:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8')))
def on_subscribe(self, client, userdata, mid, granted_qos):
print(self.terminal_id + ":" + "On Subscribed: qos = %d" % granted_qos)
def on_disconnect(self, client, userdata, rc):
if rc != 0:
print(self.terminal_id + ":" + "Unexpected disconnection %s" % rc)
def publish_data(self):
# 发布的消息
data = {
"deviceCode": self.terminal_id,
"method": "online",
"timestamp": str(round(time.time() * 1000))
}
param = json.dumps(data)
return param
def run(self):
self.client = mqtt.Client(self.terminal_id)
self.client.connect(HOST, PORT, 60) # 60为keepalive的时间间隔
self.client.username_pw_set("username", "passward")
param = self.publish_data()
self.client.publish("devicePublish", payload=param, qos=0) # 发布消息
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message # 订阅消息
self.client.subscribe(self.terminal_id)
self.client.on_disconnect = self.on_disconnect
self.client.loop_forever()
if __name__ == "__main__":
HOST = "127.0.0.1"
PORT = 50009
excel_ter = 'C:/终端设备编号.xls' # 读取终端设备编号表
excel = xlrd.open_workbook(excel_ter)
table = excel.sheet_by_index(0)
nrows = table.nrows
# 创建线程
for i in range(nrows - 1):
terminal_id = str(table.cell(i + 1, 1).value)
thread = MqttManager("Thread-%s" % (i + 1), HOST, PORT, terminal_id)
thread.start()
print('thread %s is running...' % (i + 1) + str(terminal_id))
time.sleep(2)