我们已经把相关的连接报文搞定了。笔者想来想去还是决定先讲解一下订阅报文(SUBSCRIBE )。如果传统的通信方式是客户端和服务端之间一般就直接传输信息。但是MQTT的通信方式是通过发布/订阅的方式进行的。笔者不知道他是否跟设计模式中的发布订阅模式有没有关系。可是他们思想却有一点相似之处。
客户端知道服务上有很多个主题。就好比如说有很多消息的分类一样子。有社会新闻、体育讲坛等。那么客户端只要找到自己感兴趣的进行订阅就可以了。一个客户端可以向服务器订阅多个主题。而所谓的发布就是客户端对不同的主题进行发布信息。即好比如新闻的发布者一样子。这个时候只要订阅这个主题的客户端就可以接收到来自服务端的新闻。我们的手机常常会接收到一些推送的信息。事实上有很多App应用都是用MQTT协议来进行的。所以不难看出服务端主要是负责客户端和客户端的之间信息的传输和信息管理。大至如图下
注意:发布者也是客户端。订阅者也是客户端
主题(Topic )
如果主题只是一个字符串值的话,那么显然会比较单调。这样子功能也显得比较无力。所以在主题上面就了所谓的分隔符和通配符的说法(个人想法)。分隔符的意思就是让主题可以分层次。就好如说主题“体育讲坛/篮球/NBA”。看到这样子的主题,请问一下你还有什么不明白的话。是不是感觉很有层次感。剩下只有一个问题?如果我们订阅了主题“体育讲坛/篮球/NBA”,并向主题“体育讲坛/篮球”发布一个信息。那么已经订阅主题“体育讲坛/篮球/NBA”的客户端们是不是可以接受到信息呢?反过来讲如果我们订阅了主题“体育讲坛/篮球”,向主题“体育讲坛/篮球/NBA”发信息,客户端们是否又能接受信息呢?
笔者就以HiveMQ作服务器来做一下上面的小实验。如下
客验结果显然是失败的——订阅主题“体育讲坛/篮球/NBA”的客户端根本收不到来自主题“体育讲坛/篮球“的发布信息。说明分隔符就是用于主题名的分层次。没有别的意思。
通过上面的实验我们知道如果想要收到NBA就是必须订阅主题“体育讲坛/篮球/NBA”。可是总是有一些人只要是篮球的新闻有喜欢。怎么办。通配符的功能就出来了。通配符有俩种——"+"和“#”。+为单层的通配符。表示当前这一层的全都合非。这样子以上面的说到的例子来做实验。我们订阅一个主题为“体育讲坛/篮球/+”。按照理解的意思就是只要是在“体育讲坛/篮球”的信息都是我们想要的。结果如下
我们可以看到笔者在“体育讲坛/篮球/NBA”和“体育讲坛/篮球/ABC”各发布了信息。结果他都能收到。那么如果我们对主题“体育讲坛/篮球”或是主题“体育讲坛/篮球/NBA/福州专场”发布信息呢?笔者试过了很可惜都不行。
记得我们上面说到有一些人只要跟篮球相关的都喜欢。可是如果使用通配符“+”是可以接近我们的要求。注意是接近。“+”通配符只是表示当前一层的。从当前的第二层就不行了。而本身的层也不算。就像上面的。只有篮球下的子一层才是合非的。讲到这里大家一定会想到用“#“通配符试试。没有错。“#“通配符就是表示当前本身和下面子层所有。如下
实验的结果很终满足了。
对于主题,在文档中有一个要求——主题不能以 ”#“ "+" "$" 为开头。对于”#“ ” +“的话,大家都好理解。那么”$“又是什么鬼。在文档我们可以看到这样子的字符"$SYS"。事实上他们是想说”$“开头的主题一般用于系统内部的一些主题。你们可以去找一些第三方的MQTT服务器。都会有很多以”$“开头的主题。
SUBSCRIBE报文
通过上面的介绍。笔者想你们一定对MQTT通信方式有了一定的概念。而本章的订阅报文就是用于告诉服务器我想要什么的主题了。通过前面几章的了解。我们知道报文的固定报头是少了的。笔者就以MQTT 3.1.1来介绍吧。如下
SUBSCRIBE报文的INT值是8。所以对应的二进制为1000。后面的DUP QOS RETAIN对应是0010。其中QOS是必须是01。对订阅者来讲,他一定希望自己的订阅是成功的。所以订阅报文的QOS是01就相当好理解了。如果不理解QOS是什么的话,请看一下前面几章。
订阅报文也有可变报头,可变报头只有一个消息ID。消息ID是从客端端开始分配的。笔者为什么样子认为呢?主要是看到客户端在发布信息的时候就要求消息ID。所以笔者才会觉得消息ID在客户端进行分配的。当然也不是什么报文都会消息ID的。但是有消息ID一般QOS大于0。
订阅报文的有效载荷里面存在了相关的订阅订题列表。前面说过可以支持一个客户端多个订阅。列表里面每有一主题项只有俩个值。一个表示主题名,一个表示服务质量要求(Requested QoS)。这里的服务质量要求(Requested QoS)和 固定报头的服务质量的值是一样子。但是用意却是不一样子。这里是指这个订阅者接收这主题的服务质量最大等级。举个列子吧。笔者订阅了一个主题主题“体育讲坛/篮球/NBA”,同时他的服务质量要求(Requested QoS)的值为1。这个时候有一个发布者在这个主题上发布一个服务质QOS为2。笔者还是可以收到这个发布者发来的信息。只是信息的服务质量QOS却变为1了。要明白QOS(1)和QOS(2)的执行行为是不样子的。这个后面章节会讲到。当然如果发布者在这个主题上发布一个服务质QOS为0。这就没有什么区别了。如下
对于有效载荷笔者这里就不多讲解了。也没有什么可说的。看文档的图片就够了如下。
宏观上:
微观上:
列表出我们可以看他订阅了俩个主题。一个主题”a/b“,一个主题”c/b“。上面列出大概的图片(宏观上)和比较细的图片(微观上)。如果看不懂也没有关系。笔者接下来会用代码来抓一包看看。相信在对照一下就明白列表出画的是什么。
现在让我们好好想想当服务器接收到来自客户端的订阅报文的时候要做些什么样子的反应呢?首先我们要明白如果服务端接收到一个订阅报文,第一步想到一定是查看订阅报文的格式是不是正确的。相关的主题名是不是为空的。主题名的写法是不是非法。这些一定离不开。当然对应的一些共有的验证笔者就不说了。一切没有问题的情况下,服务器会去看一下当前订阅者前面有没有订阅过相同的主题。如果有就替换当前的。如果没有就创建一下新的。然后服务器在根据当前主题查找一下符合保留的信息。如果有,就发送给当前的订阅者。然后发送一个订阅报文确定(SUBACK )。当然这前后没有规定。先发送一个订阅报文确定(SUBACK ),在处理保留的信息也是可以的。
注意:在发送符合保留的信息就要对QOS进行处理。上面笔者也讲过了。
SUBACK 报文
当服务端处理SUBSCRIBE报文的时候,都会生成一个SUBACK 报文来回应订阅者。笔者这里不想对他太过的讲解。他的内容也很简单。如下
对于SUBACK 报文的可变报头里面也只有一个消息ID。而且跟SUBSCRIBE报文的消息ID是一样子的。有效载何的内容存放是订阅主题的服务质量要求(Requested QoS)。笔者在MQTT 3.1 文档时面可以看到有多个主题的列子。可是在MQTT 3.1.1里面却没有。那么笔者就把MQTT 3.1.1的放在下里吧。读者们可以自行查看。
上面列表里面显示返回码,事实上是主题相关的服务质量要求(Requested QoS)。所以就可以知道他可以会返回四个值。如下
QOS 0:0x00
QOS 1:0x01
QOS2 :0x02
Failure :0x80
代码实现
有了上面的了解之后,笔者就想在通过一些代码来加深理解。当然重新写那是不可能的。笔者就用上一章的代码。并加上订阅报文相关的处理。如下
private void onSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) { if (!this.connected) {
ctx.close();
return;
}
int messageId = msg.variableHeader().messageId(); List<MqttTopicSubscription> requestSubscriptions = msg.payload().topicSubscriptions(); for (MqttTopicSubscription subscription : requestSubscriptions) { if (StringUtils.isEmpty(subscription.topicName())) {
ctx.close();
return;
}
} List<Integer> grantedQosLevels = new ArrayList<Integer>(); requestSubscriptions.forEach(subscription -> {
if (subscription.topicName().startsWith("$")) grantedQosLevels.add(MqttQoS.FAILURE.value());
else grantedQosLevels.add(subscription.qualityOfService().value());
}); BrokerSessionHelper.sendMessage(
ctx,
MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId),
new MqttSubAckPayload(grantedQosLevels)),
this.clientId,
messageId,
true); for (int i = 0; i < requestSubscriptions.size(); i++) { MqttQoS grantedQoS = MqttQoS.valueOf(grantedQosLevels.get(i));
String topic = requestSubscriptions.get(i).topicName(); //1。查看以前有没有订阅过相同的主题,如果有就替换。
//2。查看有没有符合的保留信息,有发送
//读者们自行去实现。是要用redis,还是要用sqllite自去实现。 }
}
订阅报文的实现并不难。难就在对于对保留信息的处理。还有就是服务端要对当前的客户端的订阅进行保留。那么笔者这边做的事情比较简单。主要是为了学习查看相关的报文格式。但是笔者还是要列出来一下。如下
1.判断是否发生过连接。即是连接报文的处理。如果没有的话,断开连接。
if (!this.connected) {
ctx.close();
return;
}
2.获得报文的消息ID和相关的订阅主题。判断主题不为空。当然你也可自定义主题的验证合法规则。笔者这里就不多说了。
int messageId = msg.variableHeader().messageId(); List<MqttTopicSubscription> requestSubscriptions = msg.payload().topicSubscriptions(); for (MqttTopicSubscription subscription : requestSubscriptions) { if (StringUtils.isEmpty(subscription.topicName())) {
ctx.close();
return;
}
}
3.获得相关主题的服务质量要求,用于返回码和处理保留的消息。并返回SUBACK报文
List<Integer> grantedQosLevels = new ArrayList<Integer>(); requestSubscriptions.forEach(subscription -> {
if (subscription.topicName().startsWith("$")) grantedQosLevels.add(MqttQoS.FAILURE.value());
else grantedQosLevels.add(subscription.qualityOfService().value());
}); BrokerSessionHelper.sendMessage(
ctx,
MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(messageId),
new MqttSubAckPayload(grantedQosLevels)),
this.clientId,
messageId,
true);
4.处理保留的信息。这里笔者并没实现。因为这里要接合相关的数据库或是NOSQL。所以这里笔者没有去做。因这里太多的东西的。而且不同的人实现和想法也不一样子。所以笔者就没有列出来。
for (int i = 0; i < requestSubscriptions.size(); i++) { MqttQoS grantedQoS = MqttQoS.valueOf(grantedQosLevels.get(i));
String topic = requestSubscriptions.get(i).topicName(); //1。查看以前有没有订阅过相同的主题,如果有就替换。
//2。查看有没有符合的保留信息,有发送
//读者们自行去实现。是要用redis,还是要用sqllite自去实现。 }
笔者把相关的抓到的报文格列出来。如下
SUBSCRIBE报文:
笔者已经把SUBSCRIBE报文的各个部分用不同的颜色标出耿了。其中的黄色线表示下同主题的长度。就是上面微观图片里面的MSB和LSB。其他的也没有什么。 只是要注意最后一个值也就是服务质量要求(Requested QoS)。笔者这边是1。所以最后的二进制是00000001。
SUBACK 报文:
我们可以看到SUBACK 报文的消息ID和SUBSCRIBE报文的消息是一样子的。还有就是记得最后的服务质量要求。