MQTT3.1.1协议阅读笔记1

本文主要是记录阅读 MQTT3.1.1协议中文版 时的心得感悟。

环境信息

  1. 使用Docker运行emqx,作为MQTT的服务端
  2. 使用mqtt-spy.jar作为MQTT的客户端
  3. 使用Paho写一个简单的Java-MQTT客户端
  4. 使用WireShark进行协议抓包

MQTT 简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于 TCP/IP 协议之上,由IBM在1999年发布。。

一个 MQTT 控制报文包含三个部分:

组成部分 长度
固定报头 2-5个字节 存在于所有MQTT控制包
可变报头 存在于某些MQTT控制包
载荷 存在于某些MQTT控制包
  • 我们借助 MQTT 协议发送的消息内容保存在载荷中

1.固定报头

固定报头由两部分组成:控制包类型和剩余长度

  • 控制包类型目前有 14 种;
  • 剩余长度表示的是“可变报头+载荷”的总长度

MQTT3.1.1协议阅读笔记1

如上图所示,这是一条控制包类型为 CONNECT 的 MQTT 报文,固定报头的中 剩余长度用16进制表示为 0x1e,用10进制表示为 30。

从 1e 的后一个字节 00 到末尾刚好是 30 个字节。

1.1 剩余长度与控制包最大长度256M

剩余长度使用了一种可变长度的结构来编码,这种结构使用单一字节表示0-127的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。剩余长度最多可以用四个字节来表示。

用n个字节表示剩余长度 剩余长度范围起始值 剩余长度范围结束值
1 0 (0x00) 127 (0x7F)
2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

这将允许应用发送可变报头和载荷总长度为255M大小的控制包。这个数字用16进制表示为:0xFF,0xFF,0xFF,0x7F。

换句话说,这将允许应用发送最多256M大小的控制包。

2.可变报头

以 CONNECT 报文的可变报头为例,主要包含协议名称(MQTT)和协议版本号(v3.1.1对应4);

2.1 MSB 和 LSB

至于 Length MSB(Most Significant Bit,最高有效位) 和 Length LSB (Last/Least Significant Bit,译作最低有效位),

把 MSB 和 LSB 用大端字节/网络字节序来读取,读取的值可以表示协议名称的长度。

以下是 Java 写成的 Demo:

import java.io.*;
import java.util.Arrays;

public class Utf8Characters {

    public static void main(String[] args) throws IOException {
        // 模拟写入
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dataOut = new DataOutputStream(baos);

        dataOut.writeUTF("MQTT");

        byte[] bytes = baos.toByteArray();
        System.out.println(Arrays.toString(bytes)); // 打印 [0, 4, 77, 81, 84, 84]

        // 模拟读取
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        DataInputStream dataIn = new DataInputStream(bais);

        int len = dataIn.readUnsignedShort(); // 2 bytes
        byte[] decodedString = new byte[len]; // 4 bytes
        dataIn.read(decodedString);
        String target = new String(decodedString, "UTF-8");
        System.out.println(target); // 打印 MQTT

        // 重置一下,重新读取
        dataIn.reset();
        // 等同于
        String result = dataIn.readUTF();
        System.out.println(result); // 打印 MQTT

    }
}

3. MQTT的特别之处

我们在学习TCP/IP协议的时候,就知道 ACK 这个概念,其实许多构建在TCP/IP协议之上的应用层协议也都会使用 XXXACK 包来表示已经成功接收 XXX 信息。
MQTT也不能“免俗”:

  • 连接报文 CONNECT 对应连接确认报文 CONNACK;
  • 订阅报文 SUBSCRIBE 对应订阅确认报文 SUBACK;
  • 取消订阅报文 UNSUBSCRIBE 对应取消订阅确认报文 UNSUBACK;
  • 发布报文 PUBLISH 对应发布确认报文 PUBACK。

还有就是名字中没有使用 ACK,但是实际上也是“一问一答”式的 PINGREQ 和 PINGRESP。但是 MQTT 的控制类型中还是有两处“怪异之处”

  • 唯独 DISCONNECT 没有对应的确认报文;
  • PUBLISH 除了有 PUBACK 之外,还有 PUBREC,PUBREL,PUBCOMP

3.1 遗言/遗嘱Will

对于一般IoT设备而言,就是一个大循环while不断接收消息,不存在正常退出的逻辑,一般都是断电断网导致的异常退出。DISCONNECT 并不常用,也不用确认。

但是,如果客户端正常发出了 DISCONNECT 报文,那么服务端收到 DISCONNECT 后必须丢弃所有和当前连接有关的Will Message,不发布。

我们通常都有判断IoT设备是否在线的需求,使用遗言机制就很好实现。

  • 遗言/遗嘱是CONNECT类型报文中,伴随客户端连接服务端的请求一并发出的;
  • 可变报头中包含 Will Flag,Will QoS,Will Retain;其中 QoS 和 Retain 效果同 publish 报文中的 QoS 和 Retain;
  • 如果可变报头连接标识位 Will Flag 等于1,那么载荷中将包含 Will Topic 和 Will Message 字段;

模拟遗嘱发送和接收

由于 mqtt-spy.jar 的无论是点击x关闭,还是杀死进程,都是正常的Disconnect退出,所以只好写一个 Java 客户端来模拟异常退出的场景。

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;

public class Main {

    public static void main(String[] args) throws MqttException {
        String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
        MqttClientPersistence persistence = new MemoryPersistence();
        MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);

        MqttConnectOptions options = new MqttConnectOptions();
        // 2 表示 EXACTLY_ONCE
        options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
                2, false);
        client.connect(options);
    }
}

运行这个程序,然后再用 mqtt-spy.jar 模拟一个WEB服务器上的MQTT客户端:

Connections -> New Connection 打开如下图所示的页面,输入Client ID为web,其他都默认,然后 点击Open Connection

MQTT3.1.1协议阅读笔记1

在 Subscriptions and received messages 这一栏点击 New,弹出如下图所示对话框,输入主题DeviceStatus,然后点击 Subscribe

MQTT3.1.1协议阅读笔记1

然后,我们就可以去关闭 Java 的 MQTT客户端了。接着,就收到了遗言:

MQTT3.1.1协议阅读笔记1

然后,我还找到了 WireShark 抓取的Java的MQTT客户端发出的Connect报文:

MQTT3.1.1协议阅读笔记1

3.1.1 WILL MESSAGE长度限制65535个字节

从理论上来说,MQTT 中的字符串符合以下形式:

MQTT3.1.1协议阅读笔记1

用两个字节表示内容长度,因此内容长度可以是0到65535个字节。

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) throws MqttException, IOException {
        String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
        MqttClientPersistence persistence = new MemoryPersistence();
        MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);

        MqttConnectOptions options = new MqttConnectOptions();
        StringBuilder sb = new StringBuilder();
        IntStream.range(0, 65536).forEach(i -> {
                int result = i % 10;
                sb.append(result);
        });
        // 2 表示 EXACTLY_ONCE
        byte[] payload = sb.toString().getBytes(StandardCharsets.UTF_8);
        options.setWill("DeviceStatus", payload,
                2, false);
        client.connect(options);
    }
}

如上面这段代码模拟了 Will Message 为 65536 个字节,服务器直接断开了客户端的连接:

MQTT3.1.1协议阅读笔记1

从图中可以看出,Will Message 超长了,导致长度为0。具体可以看这段代码:org.eclipse.paho.client.mqttv3.internal.wire.MqttConnect#getPayload

if (willMessage != null) {
  encodeUTF8(dos, willDestination);
  dos.writeShort(willMessage.getPayload().length); // 这段代码再跟进去看,(v >>> 8) & 0xFF 计算等于 0,(v >>> 0) & 0xFF 计算也等于 0
  dos.write(willMessage.getPayload());
}

3.1.2 Will Retain只保持最新一条

RETAIN(保持)
1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。

※ 实验如下:
修改 3.1 中 Main 的代码:

// retain 由 false 改为 true
options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
                2, true);

然后,在 Run Configuration 中拷贝三份 Main,并分别命名为 iot_1,iot_2,iot_3,并且 Program arguments 也分别为 iot_1,iot_2,iot_3:

MQTT3.1.1协议阅读笔记1

分别运行 iot_1,iot_2,iot_3,然后再依次结束他们。

然后,再启动 mqtt-spy.jar,并订阅主题 DeviceStatus:

MQTT3.1.1协议阅读笔记1

如图所示,我们观察到新订阅者只获取到主题中的最新一条消息!

3.2 QoS

这个又有很多内容,还是另开一篇。

参考文档

MQTT协议笔记之头部信息 阅读

这篇文章主要解答了我对 Length MSB 和 Length LSB 的疑惑

Java MQTT 客户端之 Paho 阅读

如果你对 Java 实现 MQTT 客户端感兴趣,可以读一下这篇

上一篇:【5月23日培训】广州Modbus-MQTT网关培训来啦,就在本周日!


下一篇:MQTT 客户端工具介绍