# -*- coding:utf-8 -*-
from multiprocessing import Process
import paho.mqtt.publish as publish
from Data.data import *
import paho.mqtt.subscribe as subscribe
from common.yaml_common_method import *
import time,json,yaml,os
"""基础数据yaml获取"""
data = yaml_method_read("data")
host = data["host"]#IP地址
prot = data["port"]#端口
pub_url = data["pub_url"]+"/register"#发送
sub_url = data["sub_url"]+"/register/response"#订阅
"""主题请求参数yaml获取"""
publish_yaml = yaml_method_read("publish")
publish_msg = publish_yaml["upload_service"]
client_id = time.strftime('mq:test:%Y%m%d%H%M%S',time.localtime(time.time()))
class Mqtt_method_info():
def mqtt_publish(self):
#这里是发送的函数
msg = publish_msg
print("------------------------------Send_sub------------------------------")
print(pub_url)
print("----------------------------Send_message----------------------------")
print(msg)
msg = json.dumps(msg)
publish.single(pub_url, msg, qos=1, hostname=host, port=prot, client_id=client_id)
print("------------------------------End_send------------------------------")
def mqtt_subscribe(self):
#这里是订阅的函数
print("--------------------------Subscribe_topics--------------------------")
print(sub_url)
msg = subscribe.simple(sub_url, qos=1, hostname=host, port=prot, client_id=client_id)
msg_payload = json.loads(msg.payload.decode("utf-8"))
print("---------------------------Return_message----------------------------")
print(msg_payload)
yaml_method_write("mqtt_return_data",msg_payload)
def implement(self):
pub = Process(target=Mqtt_method_info().mqtt_publish)#发送线程
sub = Process(target=Mqtt_method_info().mqtt_subscribe)#订阅线程
sub.start()
time.sleep(1)
pub.start()
sub.join()
print("执行完毕")
if __name__ == '__main__':
Mqtt_method_info().implement()
这里讲一下python-mqtt测试脚本:
我们会使用到python的paho-mqtt库,同样pip安装下就行,失败的话大部分是需要用镜像站的情况
这份代码分为yaml文件读取发送的数据,这里只有操作的方法,大家如果需要使用可以把发送、订阅的函数中修改,只运行方法,然后自己再通过各种文件管理数据