Springboot整合物联网IOT的MQTT协议

准备工作 (下载EMQX服务端,相关客户端工具)

1. 服务端工具:

https://www.emqx.io/downloads?os=Windows

2. 客户端工具:

https://mqttx.app/zh#download

 <!--web依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <!--mqtt相关依赖-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

自定义yml配置

server:
  port: 8989
#mqtt properties
mqtt:
  #uris 可以有多个 所以是个数组
  uris:
    - tcp://127.0.0.1:1883
  clientId: mqtt_test1
  topics:
    - demo
    - test
  username: admin
  password: 123456
  timeout: 30
  keepalive: 60
  qos: 1

增加config配置读取yml文件 (使用了Lombok 需要自行添加pom依赖)

package com.huawen.mqtt.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @author:xjl
 * @date:2022/5/5 17:27
 * @Description: MQTT的配置类
 **/
@Component
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttConfiguration {

    /**
     * uris 服务器地址配置
     */
    private String[] uris;

    /**
     * clientId
     */
    private String clientId;

    /**
     * 话题
     */
    private String[] topics;

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 连接超时时长
     */
    private Integer timeout;

    /**
     * keep Alive时间
     */
    private Integer keepalive;

    /**
     * 遗嘱消息 QoS
     */
    private Integer qos;
}

消费者配置

package com.huawen.mqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**
 * @author:xjl
 * @date:2022/5/6 9:06
 * @Description: MQTT 消费端的配置
 **/
@Configuration
@Slf4j
public class MqttInBoundConfiguration {
    @Resource
    private MqttConfiguration mqttProperties;

    //==================================== 消费消息==========================================//

    /**
     * 入站通道
     *
     * @return 消息通道对象 {@link MessageChannel}
     */
    @Bean("input")
    public MessageChannel mqttInputChannel() {
        //直连通道
        return new DirectChannel();
    }


    /**
     * 创建MqttPahoClientFactory 设置MQTT的broker的连接属性 如果使用ssl验证 也需要此处设置
     *
     * @return MQTT客户端工厂 {@link MqttPahoClientFactory}
     */
    @Bean
    public MqttPahoClientFactory inClientFactory() {
        //设置连接属性
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getUris());
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setConnectionTimeout(mqttProperties.getTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepalive());
        // 接受离线消息  告诉代理客户端是否要建立持久会话   false为建立持久会话
        options.setCleanSession(false);
        //设置断开后重新连接
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }


    /**
     * 入站
     *
     * @return 消息提供者 {@link MessageProducer}
     */
    @Bean
    public MessageProducer producer() {
        // Paho客户端消息驱动通道适配器,主要用来订阅主题  对inboundTopics主题进行监听
        //clientId 加后缀 不然会报retrying 不能重复
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_customer", inClientFactory(), mqttProperties.getTopics());
        adapter.setCompletionTimeout(5000);
        // Paho消息转换器
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        // 按字节接收消息
        // defaultPahoMessageConverter.setPayloadAsBytes(true);
        adapter.setConverter(defaultPahoMessageConverter);
        // 设置QoS
        adapter.setQos(mqttProperties.getQos());
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    /**
     * 通过通道获取数据
     * ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
     * tips:
     * 异步处理
     *
     * @return 消息处理 {@link MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = "input")
    public MessageHandler handler() {
        return message -> {
            log.info("收到的完整消息为--->{}", message);
            log.info("----------------------");
            log.info("message:" + message.getPayload());
            log.info("Id:" + message.getHeaders().getId());
            log.info("receivedQos:" + message.getHeaders().get(MqttHeaders.RECEIVED_QOS));
            String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
            log.info("topic:" + topic);
            log.info("----------------------");
        };
    }
}

生产者配置

package com.huawen.mqtt.controller;

import com.huawen.mqtt.bean.MyMessage;
import com.huawen.mqtt.inter.MqttGateway;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author:xjl
 * @date:2022/5/6 9:17
 * @Description: mqtt发布消息controller
 **/
@RestController
public class MqttPublishController {
    @Resource
    private MqttGateway mqttGateWay;

    @PostMapping("/send")
    public String send(@RequestBody MyMessage myMessage) {
        // 发送消息到指定主题
        mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
        return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();
    }
}

创建一个通用接口 用于发送数据

package com.huawen.mqtt.inter;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

/**
 * @author:xjl
 * @date:2022/5/6 9:20
 * @Description: 接口MqttGateway
 **/
@MessagingGateway(defaultRequestChannel = "out")
public interface MqttGateway {
    /**
     * 定义重载方法,用于消息发送
     *
     * @param payload 负载
     */
    void sendToMqtt(String payload);

    /**
     * 指定topic进行消息发送
     *
     * @param topic   topic话题
     * @param payload 负载
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 指定topic和qos进行消息发送
     *
     * @param topic   topic话题
     * @param qos     qos
     * @param payload 负载 (字符串类型)
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    /**
     * 指定topic和qos进行消息发送
     *
     * @param topic   topic话题
     * @param qos     qos
     * @param payload 负载 (字节数组类型)
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

生产者测试controller

package com.huawen.mqtt.controller;

import com.huawen.mqtt.bean.MyMessage;
import com.huawen.mqtt.inter.MqttGateway;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author:xjl
 * @date:2022/5/6 9:17
 * @Description: mqtt发布消息controller
 **/
@RestController
public class MqttPublishController {
    @Resource
    private MqttGateway mqttGateWay;

    @PostMapping("/send")
    public String send(@RequestBody MyMessage myMessage) {
        // 发送消息到指定主题
        mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
        return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();
    }
}

该文章参考 https://blog.****.net/m0_46689235/article/details/124606005

上一篇:Python 启动Appium Service然后执行APP自动化case


下一篇:K8s中的控制器和资源对象是什么关系呢?