本文主要是记录阅读 MQTT3.1.1协议中文版 时的心得感悟。
环境信息
- 使用
Docker
运行emqx
,作为MQTT的服务端 - 使用
mqtt-spy.jar
作为MQTT的客户端 - 使用
Paho
写一个简单的Java-MQTT客户端 - 使用
WireShark
进行协议抓包
MQTT 简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于 TCP/IP 协议之上,由IBM在1999年发布。。
一个 MQTT 控制报文包含三个部分:
组成部分 | 长度 | |
---|---|---|
固定报头 | 2-5个字节 | 存在于所有MQTT控制包 |
可变报头 | 存在于某些MQTT控制包 | |
载荷 | 存在于某些MQTT控制包 |
- 我们借助 MQTT 协议发送的消息内容保存在载荷中
1.固定报头
固定报头由两部分组成:控制包类型和剩余长度
- 控制包类型目前有 14 种;
- 剩余长度表示的是“可变报头+载荷”的总长度;
如上图所示,这是一条控制包类型为 CONNECT 的 MQTT 报文,固定报头的中 剩余长度用16进制表示为 0x1e,用10进制表示为 30。
从 1e 的后一个字节 00 到末尾刚好是 30 个字节。
1.1 剩余长度与控制包最大长度256M
剩余长度使用了一种可变长度的结构来编码,这种结构使用单一字节表示0-127的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。剩余长度最多可以用四个字节来表示。
用n个字节表示剩余长度 | 剩余长度范围起始值 | 剩余长度范围结束值 |
---|---|---|
1 | 0 (0x00) | 127 (0x7F) |
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
这将允许应用发送可变报头和载荷总长度为255M大小的控制包。这个数字用16进制表示为:0xFF,0xFF,0xFF,0x7F。
换句话说,这将允许应用发送最多256M大小的控制包。
2.可变报头
以 CONNECT 报文的可变报头为例,主要包含协议名称(MQTT)和协议版本号(v3.1.1对应4);
2.1 MSB 和 LSB
至于 Length MSB(Most Significant Bit,最高有效位) 和 Length LSB (Last/Least Significant Bit,译作最低有效位),
把 MSB 和 LSB 用大端字节/网络字节序来读取,读取的值可以表示协议名称的长度。
以下是 Java 写成的 Demo:
import java.io.*;
import java.util.Arrays;
public class Utf8Characters {
public static void main(String[] args) throws IOException {
// 模拟写入
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(baos);
dataOut.writeUTF("MQTT");
byte[] bytes = baos.toByteArray();
System.out.println(Arrays.toString(bytes)); // 打印 [0, 4, 77, 81, 84, 84]
// 模拟读取
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputStream dataIn = new DataInputStream(bais);
int len = dataIn.readUnsignedShort(); // 2 bytes
byte[] decodedString = new byte[len]; // 4 bytes
dataIn.read(decodedString);
String target = new String(decodedString, "UTF-8");
System.out.println(target); // 打印 MQTT
// 重置一下,重新读取
dataIn.reset();
// 等同于
String result = dataIn.readUTF();
System.out.println(result); // 打印 MQTT
}
}
3. MQTT的特别之处
我们在学习TCP/IP协议的时候,就知道 ACK 这个概念,其实许多构建在TCP/IP协议之上的应用层协议也都会使用 XXXACK 包来表示已经成功接收 XXX 信息。
MQTT也不能“免俗”:
- 连接报文 CONNECT 对应连接确认报文 CONNACK;
- 订阅报文 SUBSCRIBE 对应订阅确认报文 SUBACK;
- 取消订阅报文 UNSUBSCRIBE 对应取消订阅确认报文 UNSUBACK;
- 发布报文 PUBLISH 对应发布确认报文 PUBACK。
还有就是名字中没有使用 ACK,但是实际上也是“一问一答”式的 PINGREQ 和 PINGRESP。但是 MQTT 的控制类型中还是有两处“怪异之处”:
- 唯独 DISCONNECT 没有对应的确认报文;
- PUBLISH 除了有 PUBACK 之外,还有 PUBREC,PUBREL,PUBCOMP
3.1 遗言/遗嘱Will
对于一般IoT设备而言,就是一个大循环while不断接收消息,不存在正常退出的逻辑,一般都是断电断网导致的异常退出。DISCONNECT 并不常用,也不用确认。
但是,如果客户端正常发出了 DISCONNECT 报文,那么服务端收到 DISCONNECT 后必须丢弃所有和当前连接有关的Will Message,不发布。
我们通常都有判断IoT设备是否在线的需求,使用遗言机制就很好实现。
- 遗言/遗嘱是CONNECT类型报文中,伴随客户端连接服务端的请求一并发出的;
- 可变报头中包含 Will Flag,Will QoS,Will Retain;其中 QoS 和 Retain 效果同 publish 报文中的 QoS 和 Retain;
- 如果可变报头连接标识位 Will Flag 等于1,那么载荷中将包含 Will Topic 和 Will Message 字段;
※ 模拟遗嘱发送和接收:
由于 mqtt-spy.jar 的无论是点击x
关闭,还是杀死进程,都是正常的Disconnect退出,所以只好写一个 Java 客户端来模拟异常退出的场景。
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
public class Main {
public static void main(String[] args) throws MqttException {
String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
MqttClientPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
// 2 表示 EXACTLY_ONCE
options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
2, false);
client.connect(options);
}
}
运行这个程序,然后再用 mqtt-spy.jar
模拟一个WEB服务器上的MQTT客户端:
Connections -> New Connection 打开如下图所示的页面,输入Client ID为web,其他都默认,然后 点击Open Connection:
在 Subscriptions and received messages 这一栏点击 New,弹出如下图所示对话框,输入主题DeviceStatus,然后点击 Subscribe:
然后,我们就可以去关闭 Java 的 MQTT客户端了。接着,就收到了遗言:
然后,我还找到了 WireShark 抓取的Java的MQTT客户端发出的Connect报文:
3.1.1 WILL MESSAGE长度限制65535个字节
从理论上来说,MQTT 中的字符串符合以下形式:
用两个字节表示内容长度,因此内容长度可以是0到65535个字节。
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.stream.IntStream;
public class Main {
public static void main(String[] args) throws MqttException, IOException {
String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
MqttClientPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
StringBuilder sb = new StringBuilder();
IntStream.range(0, 65536).forEach(i -> {
int result = i % 10;
sb.append(result);
});
// 2 表示 EXACTLY_ONCE
byte[] payload = sb.toString().getBytes(StandardCharsets.UTF_8);
options.setWill("DeviceStatus", payload,
2, false);
client.connect(options);
}
}
如上面这段代码模拟了 Will Message 为 65536 个字节,服务器直接断开了客户端的连接:
从图中可以看出,Will Message 超长了,导致长度为0。具体可以看这段代码:
org.eclipse.paho.client.mqttv3.internal.wire.MqttConnect#getPayload
if (willMessage != null) {
encodeUTF8(dos, willDestination);
dos.writeShort(willMessage.getPayload().length); // 这段代码再跟进去看,(v >>> 8) & 0xFF 计算等于 0,(v >>> 0) & 0xFF 计算也等于 0
dos.write(willMessage.getPayload());
}
3.1.2 Will Retain只保持最新一条
RETAIN(保持)
1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。
※ 实验如下:
修改 3.1 中 Main 的代码:
// retain 由 false 改为 true
options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
2, true);
然后,在 Run Configuration 中拷贝三份 Main,并分别命名为 iot_1,iot_2,iot_3,并且 Program arguments 也分别为 iot_1,iot_2,iot_3:
分别运行 iot_1,iot_2,iot_3,然后再依次结束他们。
然后,再启动 mqtt-spy.jar,并订阅主题 DeviceStatus:
如图所示,我们观察到新订阅者只获取到主题中的最新一条消息!
3.2 QoS
这个又有很多内容,还是另开一篇。
参考文档
MQTT协议笔记之头部信息 阅读
这篇文章主要解答了我对 Length MSB 和 Length LSB 的疑惑
Java MQTT 客户端之 Paho 阅读
如果你对 Java 实现 MQTT 客户端感兴趣,可以读一下这篇