一、简单介绍
1.MQTT-即时通讯协议
mqtt broker即服务端
mqtt client即客户端
2.主要特点
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
- 对负载内容屏蔽的消息传输
- 使用 TCP/IP 提供网络连接
- 有三种消息发布服务质量:
“至多一次”:适用消息频繁发且丢失一两条不影响的场景,如:环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”:确保消息到达,但消息重复可能会发生
“只有一次”:确保消息到达一次。优点是确保消息送达且有且仅有一次,缺点是系统开销大. - 小型传输,开销很小
- 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制
3.下载安装(省略)
二、MQTT+Spring Boot
1.MQTT链接相关信息配置
server:
mqtt:
url: tcp://xxx.xx.xx:1883
topics: scrm_user/#
clientId: xxxx
maxInflight: 20
username: '***'
password: '***'
2.MQTT Bean Configuration
@Configuration
public class MqttConfig {
@Value("${mding.mqtt.url}")
private String mqttUrl;
@Value("${mding.mqtt.topics}")
private String mqttTopics;
@Value("${mding.mqtt.username}")
private String mqttUserName;
@Value("${mding.mqtt.password}")
private String mqttPassword;
@Value("${mding.mqtt.maxInflight}")
private Integer mqttMaxInflight;
@Value("${mding.mqtt.clientId}")
private String mqttClientId;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setServerURIs(mqttUrl.split(","));
connectOptions.setConnectionTimeout(5000);
if (StringUtils.isNotEmpty(mqttUserName)) {
connectOptions.setPassword(mqttPassword.toCharArray());
connectOptions.setUserName(mqttUserName);
}
connectOptions.setAutomaticReconnect(true);
// 客户端心跳消息的最大并发数
connectOptions.setMaxInflight(mqttMaxInflight);
connectOptions.setKeepAliveInterval(120);
factory.setConnectionOptions(connectOptions);
return factory;
}
@Bean
public MessageProducer inbound() {
String clientId = mqttClientId.concat("_").concat(String.valueOf(System.currentTimeMillis()));
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttUrl, clientId);
// 设置连接超时时长
adapter.setCompletionTimeout(30000);
adapter.setConverter(new DefaultPahoMessageConverter());
// 设置服务质量
// 0 最多一次,数据可能丢失;
// 1 至少一次,数据可能重复;
// 2 只有一次,有且只有一次;最耗性能
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInputChannel());
adapter.addTopic(mqttTopics.split(","));
return adapter;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MdingMqttInBoundHandler();
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
String clientId = mqttClientId.concat("_").concat(String.valueOf(System.currentTimeMillis()));
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
// 设置发送消息时不阻塞
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
return messageHandler;
}
}
3.MQTT订阅消息入口
public class MdingMqttInBoundHandler implements MessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MdingMqttInBoundHandler.class);
private static final String PRE_SUFFER = "scrm_user-";
/**
* 备注:低版本使用 mqtt_topic
*
* @param message 消息体
* @throws MessagingException
*/
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
String payload = message.getPayload().toString();
LOGGER.info("MdingMqttInBoundHandler.handleMessage->{} {}", topic, payload);
// 正则匹配具体的topic
Map<Pattern, String> map = TopicPattern.M_DING_PATTERN;
Pattern pattern = map.keySet()
.stream().filter(ptn -> ptn.matcher(topic).find()).findFirst().orElse(null);
if (null != pattern) {
String action = map.get(pattern);
//TODO 执行业务分发逻辑
}
}
}
4.MQTT发布消息
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MdingMqttMsgGateWay {
/**
* 发送消息到MQTT
*
* @param data Data
* @param topic Topic
*/
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
三、客户端调试工具
可以实现用MQTTX客户端进行发布消息和订阅消息的调试。
最后,需要的可以直接拿过去干上。