MQTT SpringBoot入门跑起来

一、简单介绍
1.MQTT-即时通讯协议
MQTT SpringBoot入门跑起来
mqtt broker即服务端
mqtt client即客户端

2.主要特点

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
  • 对负载内容屏蔽的消息传输
  • 使用 TCP/IP 提供网络连接
  • 有三种消息发布服务质量:
    “至多一次”:适用消息频繁发且丢失一两条不影响的场景,如:环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    “至少一次”:确保消息到达,但消息重复可能会发生
    “只有一次”:确保消息到达一次。优点是确保消息送达且有且仅有一次,缺点是系统开销大.
  • 小型传输,开销很小
  • 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制

3.下载安装(省略)

二、MQTT+Spring Boot
1.MQTT链接相关信息配置

server:
  mqtt:
    url: tcp://xxx.xx.xx:1883 
    topics: scrm_user/#
    clientId: xxxx
    maxInflight: 20
    username: '***'
    password: '***'

2.MQTT Bean Configuration

@Configuration
public class MqttConfig {

    @Value("${mding.mqtt.url}")
    private String mqttUrl;

    @Value("${mding.mqtt.topics}")
    private String mqttTopics;

    @Value("${mding.mqtt.username}")
    private String mqttUserName;

    @Value("${mding.mqtt.password}")
    private String mqttPassword;

    @Value("${mding.mqtt.maxInflight}")
    private Integer mqttMaxInflight;

    @Value("${mding.mqtt.clientId}")
    private String mqttClientId;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setServerURIs(mqttUrl.split(","));
        connectOptions.setConnectionTimeout(5000);
        if (StringUtils.isNotEmpty(mqttUserName)) {
            connectOptions.setPassword(mqttPassword.toCharArray());
            connectOptions.setUserName(mqttUserName);
        }
        connectOptions.setAutomaticReconnect(true);
        // 客户端心跳消息的最大并发数
        connectOptions.setMaxInflight(mqttMaxInflight);
        connectOptions.setKeepAliveInterval(120);
        factory.setConnectionOptions(connectOptions);
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        String clientId = mqttClientId.concat("_").concat(String.valueOf(System.currentTimeMillis()));
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttUrl, clientId);
        // 设置连接超时时长
        adapter.setCompletionTimeout(30000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        // 设置服务质量
        // 0 最多一次,数据可能丢失;
        // 1 至少一次,数据可能重复;
        // 2 只有一次,有且只有一次;最耗性能
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInputChannel());
        adapter.addTopic(mqttTopics.split(","));
        return adapter;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MdingMqttInBoundHandler();
    }


    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        String clientId = mqttClientId.concat("_").concat(String.valueOf(System.currentTimeMillis()));
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
        // 设置发送消息时不阻塞
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(0);
        return messageHandler;
    }

}

3.MQTT订阅消息入口

public class MdingMqttInBoundHandler implements MessageHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(MdingMqttInBoundHandler.class);

    private static final String PRE_SUFFER = "scrm_user-";

    /**
     * 备注:低版本使用 mqtt_topic
     *
     * @param message 消息体
     * @throws MessagingException
     */
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
        String payload = message.getPayload().toString();
        LOGGER.info("MdingMqttInBoundHandler.handleMessage->{} {}", topic, payload);
        // 正则匹配具体的topic
        Map<Pattern, String> map = TopicPattern.M_DING_PATTERN;
        Pattern pattern = map.keySet()
                .stream().filter(ptn -> ptn.matcher(topic).find()).findFirst().orElse(null);
        if (null != pattern) {
            String action = map.get(pattern);
            //TODO 执行业务分发逻辑
        }
    }
}

4.MQTT发布消息

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MdingMqttMsgGateWay {

    /**
     * 发送消息到MQTT
     *
     * @param data  Data
     * @param topic Topic
     */
    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

}

三、客户端调试工具
可以实现用MQTTX客户端进行发布消息和订阅消息的调试。

MQTT SpringBoot入门跑起来

最后,需要的可以直接拿过去干上。

上一篇:2021-10-13


下一篇:我如何用 CircuitPython 和开源工具监控温室