【Python】多终端发送、订阅mqtt消息

背景:曾参与过一个mqtt项目,大致项目背景为,设备终端作为客户端通过mqtt平台发布某个主题的消息,用户系统也作为客户端,连接到mqtt平台,且订阅该主题,收到终端消息。研发想测试在现有的终端连接数下mqtt平台的稳定性,并且测试平台的连接上限。需要模拟多终端连接mqtt平台并发布、订阅主题。

大致思路如下:

  1. 将已经注册过的终端号填写进excel中,读取excel中的终端号
  2. 使用多线程实现多终端连接mqtt
  3. 多终端发布主题“devicePublish”
  4. 发送成功后,订阅主题“设备终端号”

需要用到的包为paho-mqtt、threading

paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库,它在 Python 2.7 或 3.x 上为客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。

client.py

# -*- coding: utf-8 -*-
import json
import paho.mqtt.client as mqtt
from threading import Thread
import time
import xlrd

class MqttManager(Thread):
    def __init__(self, thread_name, HOST, PORT, terminal_id):
        Thread.__init__(self)
        self.thread_name = thread_name
        self.HOST = HOST
        self.PORT = PORT
        self.terminal_id = terminal_id

    def on_connect(self, client, userdata, flags, rc):
        print(self.terminal_id + ":" + "Connected with result code " + str(rc))
        client.subscribe(self.terminal_id)  # 订阅消息    def on_message(self, client, userdata, msg):
        print(self.terminal_id + ":" + "主题:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8')))

    def on_subscribe(self, client, userdata, mid, granted_qos):
        print(self.terminal_id + ":" + "On Subscribed: qos = %d" % granted_qos)

    def on_disconnect(self, client, userdata, rc):
        if rc != 0:
            print(self.terminal_id + ":" + "Unexpected disconnection %s" % rc)

    def publish_data(self):
        # 发布的消息
        data = {
                  "deviceCode": self.terminal_id,
                  "method": "online",
                  "timestamp": str(round(time.time() * 1000))
              }

        param = json.dumps(data)
        return param

    def run(self):
        self.client = mqtt.Client(self.terminal_id)
        self.client.connect(HOST, PORT, 60)  # 60为keepalive的时间间隔
        self.client.username_pw_set("username", "passward")
        param = self.publish_data()
        self.client.publish("devicePublish", payload=param, qos=0)  # 发布消息
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message  # 订阅消息
        self.client.subscribe(self.terminal_id)
        self.client.on_disconnect = self.on_disconnect
        self.client.loop_forever()

if __name__ == "__main__":
    HOST = "127.0.0.1"
    PORT = 50009    
    excel_ter = 'C:/终端设备编号.xls' # 读取终端设备编号表
    excel = xlrd.open_workbook(excel_ter)
    table = excel.sheet_by_index(0)
    nrows = table.nrows
    # 创建线程
    for i in range(nrows - 1):
        terminal_id = str(table.cell(i + 1, 1).value)
        thread = MqttManager("Thread-%s" % (i + 1), HOST, PORT, terminal_id)
        thread.start()
        print('thread %s is running...' % (i + 1) + str(terminal_id))
        time.sleep(2)

上一篇:VSCode的C/C++扩展功能


下一篇:多类别目标计数 Dilated-Scale-Aware Category-Attention ConvNet for Multi-Class Object Counting 论文笔记