第一次读源码,理解不到位,请多批评
1、接收MQTT连接请求
首先找到MQTT的模块,./common/transport/mqtt,我们可以看到该模块是一个使用Netty封装的mqttServer,通过读取配置文件来初始化这个mqttServer
1. MqttTransportService
@Service("MqttTransportService")
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true' && '${transport.mqtt.enabled}'=='true')")
@Slf4j
public class MqttTransportService {
@Value("${transport.mqtt.bind_address}")
private String host;
@Value("${transport.mqtt.bind_port}")
private Integer port;
@Value("${transport.mqtt.netty.leak_detector_level}")
private String leakDetectorLevel;
@Value("${transport.mqtt.netty.boss_group_thread_count}")
private Integer bossGroupThreadCount;
@Value("${transport.mqtt.netty.worker_group_thread_count}")
private Integer workerGroupThreadCount;
@Value("${transport.mqtt.netty.so_keep_alive}")
private boolean keepAlive;
@Autowired
private MqttTransportContext context;
private Channel serverChannel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@PostConstruct
public void init() throws Exception {
log.info("Setting resource leak detector level to {}", leakDetectorLevel);
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport...");
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(context))
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
}
@PreDestroy
public void shutdown() throws InterruptedException {
log.info("Stopping MQTT transport!");
try {
serverChannel.close().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
log.info("MQTT transport stopped!");
}
}
代码中这里初始化了一个handler,来具体处理接收到的消息。(好像是netty的使用方式)
2. MqttTransportHandler
Degbug启动Thingsboard,并让MqttClinet发送连接请求。发现消息是从channelRead函数读入。
进入processMqttMsgh函数
看到此时header中的消息类型是CONNECT
进入processConnect函数
我们这里使用的authToken的方式进行连接的,也就是在设备中定义的token。该token作为用户名传入。不需要密码。
进入processAuthTokenConnect函数
在本函数中,消息被封装成了ValidateBasicMqttCredRequestMsg类型的消息,封装完成之后,调用transportService的process函数,进行消息的传输,将消息传到core模块进行权限校验。
我们看到在process函数中传入了传输类型MQTT,封装好的登录信息,以及校验完成的回调函数。
3. 总结
至此,MQTT登录消息就处理完毕了,封装好的消息将被传输至core模块进行处理。
2、消息传输准备
1. DefaultTransportService
在process函数中,消息被进一步封装起来,封装成了一个TbProtoQueueMsg<TransportApiRequestMsg>
类型的信息,并增加了一个UUID。
消息被传输到了doProcess函数
我们先将细节屏蔽看看结构。
第一步是下面的代码
第一个参数是函数的原返回值,第二个参数是转换函数(也就是如何将原返回值进行转化的函数),第三个参数是Executor(详见Guava)
也就是第一步,将send函数的返回结果从ListenableFuture<Response>
转换成了ListenableFuture<ValidateDeviceCredentialsResponse>
类型的结果
第二步是
点进去发现实际上是为上一步转换来的response注册listener,也就是注册回调函数。
而这回调函数,就是之前process函数中传入的回调。
综上,doProcess函数将返回值转换成合适的格式,并设置回调函数。下面我们进入transportApiRequestTemplate.send(protoMsg)函数
2. DefaultTbQueueRequestTemplate的send函数
(TbQueueRequestTemplate实现类)
- 判断pending状态的请求数量是否超出限制
- 生成requestId并组好header信息
可以看到requestId被存放在了请求头
- pendingRequests存储
pendingRequests以requestId为key,以一个包含了一个可手动添加future值的future句柄和一个过期时间。这个future句柄目前是没有任何设置的。这个函数被返回出去,在DefaultTransportService的doProcess函数中转换了格式并设置了回调函数。 - 真正的发送消息
3. 总结
消息传输准备阶段,我们从里向外看,DefaultTbQueueRequestTemplate的send函数生成了一个可手动设置值的future操作句柄,并这次请求的requestId为key,包含操作句柄future和本次请求的过期时间的对象为value存储在pendingRequests中(pendingRequests可以保证requestId唯一)。而后send函数将这个future句柄返回,在上层的调用函数中被转换格式并设置了对应的回调函数。
3、消息传输
1. responseTemplate.send()
从DefaultTbQueueRequestTemplate的send函数中的responseTemplate.send()看起。
可以看到有三个入参
-
应该是topic等相关信息
可以看到消息将会被发到一个tb_transport.api.requests
的topic中。 -
reuqest
就是前面封装后的登录信息 -
队列回调
应该是看是否成功发送消息到指定topic的,成功做什么事,失败做什么事。可以看到失败的时候,pendingRequests删掉了对应requestId的信息,并直接给future设置了失败信息。
2. InMemoryTbQueueProducer.send
可以看到信息传输可以通过很多种方式,我们这里先选择内存队列来看。
可以看到send函数将前面传入的request放入了topicName对应的队列中。topicName和其对应的队列存在一个单例的ConcurrentHashMap中。
这样就相当于将消息放到了topicName对应的队列中了。
3. 总结
responseTemplate.send()有很多中实现,其结果就是将封装好的登录信息送到对应topic的队列中(可能是内存的也可能是消息中间件或其他形式)等待校验逻辑的消费。当传送成功时,会返回成功信息。失败时将会直接给future句柄设置为失败。
4、消息的消费
前文说明了使用内存队列消息的生产过程,接下来说明一下消息的消费过程。
消息消费的入口在DefaultTbQueueResponseTemplate的init函数中。
1. DefaultTbQueueResponseTemplate的init函数
init函数中有一个while循环,当stopped不为true的时候,while循环将一直运行
2. 获取所有的登录request
requestTemplate是一个下面类型的接口。
调用其poll(轮询)函数。对于我们的内存队列,其有单独的实现类。
其中关键一步就是
根据topic的名称,从单例的storage中取出该topic下的所有之前消息生产中存放的请求。
3. request的处理
- 解析header信息,获取超时时间requestId和responseTopic
- 对于不超时的请求进入下面的处理步骤
AsyncCallbackTemplate的withCallbackAndTimeout函数定义如下
使用Futures.withTimeout生成了一个会超时的future句柄,如果超时将自被中断或者取消。
之后调用了AsyncCallbackTemplate.withCallback前面用过。withCallback里给这个会超时的future添加了withTimeout传入的回调函数。
下面我们详细看看这三个参数的来龙去脉。
4. 登录信息校验
这里的handler是transportApiService,在核心服务初始化的时候被设置的。
在DefaultTransportApiService是其实现类。其handle函数如下。
我们找到我们对应的类型
进入validateCredentials函数
终于我们看到了校验的逻辑,在这里使用userName也就是我们传入的auth_token,经过业务校验获取到相关设备信息。
校验成功之后,将返回一个包含设备信息的future句柄。
5. 成功回调
这一块是成功后的回调。回调主要是要将设备信息发送回去。
我们可以看到,response的header里设置了REQUEST_ID_HEADER
,这个request_id是之前消息发送过来的request中设置的。
responseTopic也是requestHeader中带的,是mqtt模块接收core信息的topic。
response就是handle里面返回的信息。
6. 总结
DefaultTbQueueResponseTemplate的init函数将不停地获取对应topic的队列中的request信息,并将所有的request进行处理校验,之后,将带有requestId和设备信息的response重新发送到内存队列中responseTopic对应的队列里。等待进一步处理。
5、校验成功后设备信息的返回
responseTemplate.send()函数,同样我们使用内存队列。
与消息发送过来的方式相同,将对应的responseTopic的队列中,放置刚获得的设备信息。
6、获取设备信息
TbQueueRequestTemplat接口中同样有一个init方法
其中有一个while循环,不停地获取responseTopic对应队列的信息,这些response信息就是设备相关信息
对于每个response信息,我们获取其header中的requestId,并通过requestId获取pendingRequests中的对应的ResponseMetaData<Response> 类型的
expectedResponse信息,并将其中的future句柄的内容设置为response中的设备信息。
再看看最开始的回调信息
当成功获取到response之后,调用onValidateDeviceResponse进行后续的工作
7、总结
总的来说大致的流程是这样的,mqtt连接发送到mqtt-server,mqtt-server通过消息队列,将连接请求相关信息发送至core进行权限校验和认证,core进行校验之后,如果成功将设备信息发送到消息队列,消息消费后被预先注册的成功回调函数处理。否则被失败回调函数处理。
参考资料
https://www.baeldung.com/guava-futures-listenablefuture
https://www.jianshu.com/p/33ac5d394f68