Thingsboard MQTT权限校验源码解读

第一次读源码,理解不到位,请多批评

1、接收MQTT连接请求

首先找到MQTT的模块,./common/transport/mqtt,我们可以看到该模块是一个使用Netty封装的mqttServer,通过读取配置文件来初始化这个mqttServer

1. MqttTransportService

@Service("MqttTransportService")
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true' && '${transport.mqtt.enabled}'=='true')")
@Slf4j
public class MqttTransportService {

    @Value("${transport.mqtt.bind_address}")
    private String host;
    @Value("${transport.mqtt.bind_port}")
    private Integer port;

    @Value("${transport.mqtt.netty.leak_detector_level}")
    private String leakDetectorLevel;
    @Value("${transport.mqtt.netty.boss_group_thread_count}")
    private Integer bossGroupThreadCount;
    @Value("${transport.mqtt.netty.worker_group_thread_count}")
    private Integer workerGroupThreadCount;
    @Value("${transport.mqtt.netty.so_keep_alive}")
    private boolean keepAlive;

    @Autowired
    private MqttTransportContext context;

    private Channel serverChannel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    @PostConstruct
    public void init() throws Exception {
        log.info("Setting resource leak detector level to {}", leakDetectorLevel);
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        log.info("Starting MQTT transport...");
        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new MqttTransportServerInitializer(context))
                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);

        serverChannel = b.bind(host, port).sync().channel();
        log.info("Mqtt transport started!");
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        log.info("Stopping MQTT transport!");
        try {
            serverChannel.close().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
        log.info("MQTT transport stopped!");
    }
}

代码中这里初始化了一个handler,来具体处理接收到的消息。(好像是netty的使用方式)
Thingsboard MQTT权限校验源码解读

2. MqttTransportHandler

Degbug启动Thingsboard,并让MqttClinet发送连接请求。发现消息是从channelRead函数读入。
Thingsboard MQTT权限校验源码解读
进入processMqttMsgh函数Thingsboard MQTT权限校验源码解读
看到此时header中的消息类型是CONNECT
Thingsboard MQTT权限校验源码解读
进入processConnect函数
我们这里使用的authToken的方式进行连接的,也就是在设备中定义的token。该token作为用户名传入。不需要密码。
Thingsboard MQTT权限校验源码解读
进入processAuthTokenConnect函数
在本函数中,消息被封装成了ValidateBasicMqttCredRequestMsg类型的消息,封装完成之后,调用transportService的process函数,进行消息的传输,将消息传到core模块进行权限校验。
我们看到在process函数中传入了传输类型MQTT,封装好的登录信息,以及校验完成的回调函数。
Thingsboard MQTT权限校验源码解读

3. 总结

至此,MQTT登录消息就处理完毕了,封装好的消息将被传输至core模块进行处理。

2、消息传输准备

1. DefaultTransportService

在process函数中,消息被进一步封装起来,封装成了一个TbProtoQueueMsg<TransportApiRequestMsg>类型的信息,并增加了一个UUID。Thingsboard MQTT权限校验源码解读
消息被传输到了doProcess函数
我们先将细节屏蔽看看结构。
Thingsboard MQTT权限校验源码解读
第一步是下面的代码
Thingsboard MQTT权限校验源码解读
第一个参数是函数的原返回值,第二个参数是转换函数(也就是如何将原返回值进行转化的函数),第三个参数是Executor(详见Guava)
也就是第一步,将send函数的返回结果从ListenableFuture<Response>转换成了ListenableFuture<ValidateDeviceCredentialsResponse>类型的结果
第二步是
Thingsboard MQTT权限校验源码解读
点进去发现实际上是为上一步转换来的response注册listener,也就是注册回调函数。
Thingsboard MQTT权限校验源码解读
而这回调函数,就是之前process函数中传入的回调。
Thingsboard MQTT权限校验源码解读
综上,doProcess函数将返回值转换成合适的格式,并设置回调函数。下面我们进入transportApiRequestTemplate.send(protoMsg)函数

2. DefaultTbQueueRequestTemplate的send函数

(TbQueueRequestTemplate实现类)
Thingsboard MQTT权限校验源码解读

  1. 判断pending状态的请求数量是否超出限制
    Thingsboard MQTT权限校验源码解读
  2. 生成requestId并组好header信息
    可以看到requestId被存放在了请求头
    Thingsboard MQTT权限校验源码解读
  3. pendingRequests存储
    Thingsboard MQTT权限校验源码解读
    pendingRequests以requestId为key,以一个包含了一个可手动添加future值的future句柄和一个过期时间。这个future句柄目前是没有任何设置的。这个函数被返回出去,在DefaultTransportService的doProcess函数中转换了格式并设置了回调函数。
  4. 真正的发送消息
    Thingsboard MQTT权限校验源码解读

3. 总结

消息传输准备阶段,我们从里向外看,DefaultTbQueueRequestTemplate的send函数生成了一个可手动设置值的future操作句柄,并这次请求的requestId为key,包含操作句柄future和本次请求的过期时间的对象为value存储在pendingRequests中(pendingRequests可以保证requestId唯一)。而后send函数将这个future句柄返回,在上层的调用函数中被转换格式并设置了对应的回调函数。

3、消息传输

1. responseTemplate.send()

从DefaultTbQueueRequestTemplate的send函数中的responseTemplate.send()看起。
Thingsboard MQTT权限校验源码解读
可以看到有三个入参

  1. 应该是topic等相关信息
    Thingsboard MQTT权限校验源码解读
    可以看到消息将会被发到一个tb_transport.api.requests的topic中。

  2. reuqest
    就是前面封装后的登录信息

  3. 队列回调
    Thingsboard MQTT权限校验源码解读

应该是看是否成功发送消息到指定topic的,成功做什么事,失败做什么事。可以看到失败的时候,pendingRequests删掉了对应requestId的信息,并直接给future设置了失败信息。

2. InMemoryTbQueueProducer.send

Thingsboard MQTT权限校验源码解读
可以看到信息传输可以通过很多种方式,我们这里先选择内存队列来看。
Thingsboard MQTT权限校验源码解读
可以看到send函数将前面传入的request放入了topicName对应的队列中。topicName和其对应的队列存在一个单例的ConcurrentHashMap中。
Thingsboard MQTT权限校验源码解读
这样就相当于将消息放到了topicName对应的队列中了。

3. 总结

responseTemplate.send()有很多中实现,其结果就是将封装好的登录信息送到对应topic的队列中(可能是内存的也可能是消息中间件或其他形式)等待校验逻辑的消费。当传送成功时,会返回成功信息。失败时将会直接给future句柄设置为失败。

4、消息的消费

前文说明了使用内存队列消息的生产过程,接下来说明一下消息的消费过程。
消息消费的入口在DefaultTbQueueResponseTemplate的init函数中。

1. DefaultTbQueueResponseTemplate的init函数

init函数中有一个while循环,当stopped不为true的时候,while循环将一直运行
Thingsboard MQTT权限校验源码解读

2. 获取所有的登录request

Thingsboard MQTT权限校验源码解读
requestTemplate是一个下面类型的接口。
Thingsboard MQTT权限校验源码解读
调用其poll(轮询)函数。对于我们的内存队列,其有单独的实现类。
Thingsboard MQTT权限校验源码解读
其中关键一步就是
Thingsboard MQTT权限校验源码解读
根据topic的名称,从单例的storage中取出该topic下的所有之前消息生产中存放的请求。

3. request的处理

  1. 解析header信息,获取超时时间requestId和responseTopic
    Thingsboard MQTT权限校验源码解读
  2. 对于不超时的请求进入下面的处理步骤
    Thingsboard MQTT权限校验源码解读
    AsyncCallbackTemplate的withCallbackAndTimeout函数定义如下
    Thingsboard MQTT权限校验源码解读
    使用Futures.withTimeout生成了一个会超时的future句柄,如果超时将自被中断或者取消。Thingsboard MQTT权限校验源码解读
    之后调用了AsyncCallbackTemplate.withCallback前面用过。withCallback里给这个会超时的future添加了withTimeout传入的回调函数。
    下面我们详细看看这三个参数的来龙去脉。

4. 登录信息校验

Thingsboard MQTT权限校验源码解读
这里的handler是transportApiService,在核心服务初始化的时候被设置的。
Thingsboard MQTT权限校验源码解读

在DefaultTransportApiService是其实现类。其handle函数如下。
Thingsboard MQTT权限校验源码解读
我们找到我们对应的类型
Thingsboard MQTT权限校验源码解读
进入validateCredentials函数
Thingsboard MQTT权限校验源码解读
终于我们看到了校验的逻辑,在这里使用userName也就是我们传入的auth_token,经过业务校验获取到相关设备信息。
校验成功之后,将返回一个包含设备信息的future句柄。

5. 成功回调

Thingsboard MQTT权限校验源码解读
这一块是成功后的回调。回调主要是要将设备信息发送回去。
我们可以看到,response的header里设置了REQUEST_ID_HEADER,这个request_id是之前消息发送过来的request中设置的。
Thingsboard MQTT权限校验源码解读
responseTopic也是requestHeader中带的,是mqtt模块接收core信息的topic。
response就是handle里面返回的信息。

6. 总结

DefaultTbQueueResponseTemplate的init函数将不停地获取对应topic的队列中的request信息,并将所有的request进行处理校验,之后,将带有requestId和设备信息的response重新发送到内存队列中responseTopic对应的队列里。等待进一步处理。

5、校验成功后设备信息的返回

responseTemplate.send()函数,同样我们使用内存队列。
Thingsboard MQTT权限校验源码解读
与消息发送过来的方式相同,将对应的responseTopic的队列中,放置刚获得的设备信息。

6、获取设备信息

TbQueueRequestTemplat接口中同样有一个init方法
Thingsboard MQTT权限校验源码解读
其中有一个while循环,不停地获取responseTopic对应队列的信息,这些response信息就是设备相关信息

Thingsboard MQTT权限校验源码解读
对于每个response信息,我们获取其header中的requestId,并通过requestId获取pendingRequests中的对应的ResponseMetaData<Response> 类型的expectedResponse信息,并将其中的future句柄的内容设置为response中的设备信息。
再看看最开始的回调信息
Thingsboard MQTT权限校验源码解读
当成功获取到response之后,调用onValidateDeviceResponse进行后续的工作

7、总结

总的来说大致的流程是这样的,mqtt连接发送到mqtt-server,mqtt-server通过消息队列,将连接请求相关信息发送至core进行权限校验和认证,core进行校验之后,如果成功将设备信息发送到消息队列,消息消费后被预先注册的成功回调函数处理。否则被失败回调函数处理。

参考资料

https://www.baeldung.com/guava-futures-listenablefuture
https://www.jianshu.com/p/33ac5d394f68

上一篇:.NET Core中的鉴权授权正确方式(.NET5)


下一篇:mysql数据库使用的基本命令