EMQX构建简易的云服务-基本流程

在这里插入图片描述

DashBoard 定义认证用户

在这里插入图片描述

定义Mqtt协议主题

// 设备激活
public final static String ACTIVATE = "mqtt/0/1";
// 设备重置
public final static String RESET = "mqtt/0/0";
// 上线
public final static String ONLINE = "mqtt/1/1";
// 下线
public final static String OFFLINE = "mqtt/1/0";
// 上行-设备上报数据到平台
public final static String REPORT = "mqtt/2/1";
// 下行-平台下发数据给设备
public final static String ISSUED = "%s/2/0";

设备认证流程

首先在云平台创建产品,生成PK/PS,用于Mqtt Broker的连接认证
将PK/PS烧录到设备中
设备开机启动,首次连接平台携带PK/PS/DK,mqtt连接成功后,云服务端会下发DS给到设备,并标识设备已激活
设备再次连接云服务,mqtt连接成功后,会校验DK/DS是否合法,不合法将设备踢下线。
设备订阅${clientId}/2/0主题

@PostConstruct
 public void init() throws MqttException {
     client.setCallback(new MqttCallbackHandler());
     client.subscribe(String.format(MqttTopicConstant.ISSUED, client.getClientId()));
 }

mqtt-receive-server服务

使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_receive_server
订阅ACTIVATE 、RESET 、ONLINE 、OFFLINE 、REPORT 等主题
将接收的数据简单处理,转发到KafKa

mqtt:
    broker-url: tcp://42.194.132.44:1883
    client-id: mqtt_receive_server
    username: mqtt_server
    password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
@PostConstruct
public void init() throws MqttException {
    client.setCallback(new MqttCallbackHandler(kafkaService));
    subscribe(MqttTopicConstant.ACTIVATE);
    subscribe(MqttTopicConstant.RESET);
    subscribe(MqttTopicConstant.ONLINE);
    subscribe(MqttTopicConstant.OFFLINE);
    subscribe(MqttTopicConstant.REPORT);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    String data = new String(message.getPayload());
    log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);
    UpData upData = JSONObject.parseObject(data, UpData.class);
    UpKafKaData upKafKaData = new UpKafKaData(topic, data);
    log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));
    kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
}

mqtt-sender-service服务

使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_sender_server
不订阅主题,只下发数据,下发数据主题为${clientId}/2/0
提供API给给业务子系统使用,用于下发数据给设备

mqtt:
    broker-url: tcp://42.194.132.44:1883
    client-id: mqtt_sender_server
    username: mqtt_server
    password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
package com.angel.ocean.listener;

import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.domain.client.ActivateData;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;

@Slf4j
@Component
public class UpDataConsumerListener {

    @Resource
    private MqttService mqttService;

    /**
     * 批量消费
     */
    @KafkaListener(topics = UP_DATA_TOPIC, containerFactory = "batchFactory")
    public void batchListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        try {
            log.info("UpDataConsumerListener.batchListen(), records.size: {}", records.size());
            for (ConsumerRecord<String, String> record : records) {
                UpKafKaData data = JSONObject.parseObject(record.value(), UpKafKaData.class);
                log.info("{}", record.value());
                handler(data.getTopic(), data.getData());
            }
        } catch (Exception e) {
            log.error("UpDataConsumerListener.batchListen() Exception:{}", e.getMessage(), e);
        } finally {
            // 手动确认
            ack.acknowledge();
        }
    }

    private void handler(String topic, String data) {
        switch (topic) {
            case MqttTopicConstant.ACTIVATE:
                activateHandler(data);
                break;
            case MqttTopicConstant.RESET:
                otherHandler(data);
                break;
            case MqttTopicConstant.OFFLINE:
                otherHandler(data);
                break;
            case MqttTopicConstant.ONLINE:
                otherHandler(data);
                break;
            case MqttTopicConstant.REPORT:
                otherHandler(data);
                break;
            default:
                otherHandler(data);
        }
    }

    private void activateHandler(String data) {
        ActivateData activateData = JSONObject.parseObject(data, ActivateData.class);
        String clientId = activateData.getClientId();
        mqttService.publish(String.format(MqttTopicConstant.ISSUED, clientId), "200");
    }

    private void otherHandler(String data) {
        log.info("{}", data);
    }

}
package com.angel.ocean.controller;

import com.angel.ocean.common.ApiResult;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@Slf4j
@RestController
@RequestMapping("/mqtt/server")
public class MqttController {

    @Resource
    private MqttClient server;

    @Resource
    private MqttService mqttService;

    /**
     * 数据下发接口
     * @param clientId
     * @param data
     * @return
     */
    @RequestMapping("/sender")
    public ApiResult<?> publish(String clientId, String data) {

        String topic = String.format(MqttTopicConstant.ISSUED, clientId);

        mqttService.publish(topic, data);

        if(server.isConnected()) {
            MqttMessage message = new MqttMessage(data.getBytes());
            message.setQos(0);
            try {
                server.publish(topic, message);
                log.info("Message published, topic:{}, data:{}", topic, data);
            } catch (MqttException e) {
                log.error("Message publish failed, topic:{}", topic, e);
                return ApiResult.error();
            }
            return ApiResult.success();
        }

        log.info("Message publish failed, not online.");

        return ApiResult.error();
    }
}
上一篇:前端在WebSocket中加入Token