使用服务器端rpc来控制模拟的温度传感器
效果图如下:
下面把我的设置和代码记录一下:
增加设备
如果使用服务器端RPC命令,不需要在增加规则链了
增加控制按钮
注意这里的methond方法
getvalue setvalue要与代码中对应的接收方法一致
rpc代码(C#使用mqttnet)
主要的代码是下面这些
public static void Main(string[] args)
{
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
//创建tcp options 用于连接mqtt
var options = new MqttClientOptionsBuilder()
//clinet的名称
.WithClientId("client3")
//mqtt broker的服务器地址IP,默认的是1883端口
.WithTcpServer("192.168.137.131", 1883)
.WithCredentials("FOvUIh5e7sHD4BWKceb2", "")
//hivemq的登录用户名
.WithUserProperty("FOvUIh5e7sHD4BWKceb2", "")
.WithCleanSession()
.Build();
mqttClient.ConnectAsync(options, CancellationToken.None);
mqttClient.UseConnectedHandler(e =>
{
Console.WriteLine("连接成功");
Console.WriteLine("当前连接线程名称:" + Thread.CurrentThread.ManagedThreadId.ToString());
mqttClient.SubscribeAsync("v1/devices/me/rpc/request/+");
});
mqttClient.UseApplicationMessageReceivedHandler(e =>
{
string topic = e.ApplicationMessage.Topic;
Console.WriteLine("当前接受线程名称:" + Thread.CurrentThread.ManagedThreadId.ToString());
if (topic.StartsWith("v1/devices/me/rpc/request/"))
{
string requestId = topic.Substring("v1/devices/me/rpc/request/".Length);
JObject data = JObject.Parse(Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
if (data.GetValue("method").ToString() == "getValue")
{
var temp = new JObject { { "temperature", temperature } };
var message = new MqttApplicationMessageBuilder()
.WithTopic("v1/devices/me/rpc/response/" + requestId)
.WithPayload(temp.ToString())
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
//执行发送数据到tb
Console.WriteLine(DateTime.Now + "发送数据到tb");
mqttClient.PublishAsync(message, CancellationToken.None);
}
else if (data.GetValue("method").ToString() == "setValue")
{
string param = data.GetValue("params").ToString();
setValue(param);
}
}
});
Console.ReadLine();
}
其中下面这些就是解析rpc命令的
if (topic.StartsWith("v1/devices/me/rpc/request/"))
{
string requestId = topic.Substring("v1/devices/me/rpc/request/".Length);
JObject data = JObject.Parse(Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
if (data.GetValue("method").ToString() == "getValue")
{
var temp = new JObject { { "temperature", temperature } };
var message = new MqttApplicationMessageBuilder()
.WithTopic("v1/devices/me/rpc/response/" + requestId)
.WithPayload(temp.ToString())
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
//执行发送数据到tb
Console.WriteLine(DateTime.Now + "发送数据到tb");
mqttClient.PublishAsync(message, CancellationToken.None);
}
else if (data.GetValue("method").ToString() == "setValue")
{
string param = data.GetValue("params").ToString();
setValue(param);
}
}
});
PS:
这里没有实现将温控器设置的温度值回传到tb中,我感觉是线程的问题,因为我测试郭增加线程来处理数据回传就会导致不能获取到温控器的数据,这两个线程不是一个。
这里的问题留待后面研究。
python 版的mqtt
python版本的mqtt可以实现线程间的通信及实时监控,这里也分享给大家
# -*- coding: utf-8 -*-
"""
Created on Sat Feb 6 14:10:01 2021
"""
# This Program illustrates the Server Side RPC on ThingsBoard IoT Platform
# Paste your ThingsBoard IoT Platform IP and Device access token
# Temperature_Controller_Server_Side_RPC.py : This program illustrates Server side RPC using a Simulated Temperature Controller
import os
import time
import sys
import json
import random
import paho.mqtt.client as mqtt
from threading import Thread
# Thingsboard platform credentials
THINGSBOARD_HOST = '192.168.137.131' #Change IP Address
ACCESS_TOKEN = 'FOvUIh5e7sHD4BWKceb2'
sensor_data = {'temperature': 25}
def publishValue(client):
INTERVAL = 2
print("Thread Started")
next_reading = time.time()
while True:
client.publish('v1/devices/me/telemetry', json.dumps(sensor_data),1)
next_reading += INTERVAL
sleep_time = next_reading - time.time()
if sleep_time > 0:
time.sleep(sleep_time)
def read_temperature():
temp = sensor_data['temperature']
return temp
# Function will set the temperature value in device
def setValue (params):
sensor_data['temperature'] = params
#print("Rx setValue is : ",sensor_data)
print("Temperature Set : ",params,"C")
# MQTT on_connect callback function
def on_connect(client, userdata, flags, rc):
print("连接成功")
#print("rc code:", rc)
client.subscribe('v1/devices/me/rpc/request/+')
# MQTT on_message callback function
def on_message(client, userdata, msg):
print('Topic: ' + msg.topic + '\nMessage: ' + str(msg.payload))
if msg.topic.startswith('v1/devices/me/rpc/request/'):
requestId = msg.topic[len('v1/devices/me/rpc/request/'):len(msg.topic)]
#print("requestId : ", requestId)
data = json.loads(msg.payload)
if data['method'] == 'getValue':
#print("getvalue request\n")
#print("sent getValue : ", sensor_data)
client.publish('v1/devices/me/rpc/response/' + requestId, json.dumps(sensor_data['temperature']), 1)
if data['method'] == 'setValue':
#print("setvalue request\n")
params = data['params']
setValue(params)
# create a client instance
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(ACCESS_TOKEN)
client.connect(THINGSBOARD_HOST,1883,60)
t = Thread(target=publishValue, args=(client,))
try:
client.loop_start()
t.start()
while True:
pass
except KeyboardInterrupt:
client.disconnect()