有关EMQX桥接的配置工作

前言

桥接是一种连接多个 EMQ X 或者其他 MQTT 消息中间件的方式。不同于集群,工作在桥接模式下的节点之间不会复制主题树和路由表。桥接模式所做的是:

  • 按照规则把消息转发至桥接节点;
  • 从桥接节点订阅主题,并在收到消息后在本节点/集群中转发该消息。

环境:

emqx1 | v4.1.2 | 192.168.5.207:2883 | 节点node
emqx2 | v3.1.1 | 192.168.5.209:2883 | 远程broker

用Docker在两个服务器上面各自跑了一个emqx的容器,暴露在宿主机的端口分别都是2883;

桥接

前提

使用桥接方式之前,需要了解到的一点是,基于Docker的运行方式,如果需要使用桥接的功能,那么需要修改emqx.conf文件,该文件在容器的具体目录为/opt/emqx/etc/emqx.conf,需要挂载该文件出来,但是EMQX版本需要大于v3.1.1,如果处于并且小于这个版本,那么是无法挂载该文件的,这是emqx的一个bug,也即是在v3.1.2才可以挂载emqx.conf文件,所以我们用了当前最新的v4.1.2版本;

该版本有关桥接部分的配置已被插件化,所以需要挂载另一个配置文件进行配置,容器内的/opt/emqx/etc/plugins/emqx_bridge_mqtt.conf文件需要被挂载;

docker-compose.yml文件配置如下所示:

volumes:
      - ./mqtt/volumes/plugins/emqx_bridge_mqtt.conf:/opt/emqx/etc/plugins/emqx_bridge_mqtt.conf
      - ./mqtt/volumes/data/loaded_plugins:/opt/emqx/data/loaded_plugins

emqx_bridge_mqtt.conf文件在容器中的具体位置如图所示:

有关EMQX桥接的配置工作

emqx_bridge_mqtt.conf文件内容如下:

##====================================================================
## Configuration for EMQ X MQTT Broker Bridge
##====================================================================

##--------------------------------------------------------------------
## Bridges to aws
##--------------------------------------------------------------------

## Bridge address: node name for local bridge, host:port for remote.
##
## Value: String
## Example: emqx@127.0.0.1,  127.0.0.1:1883
bridge.mqtt.aws.address = 192.168.5.209:2883

## Protocol version of the bridge.
##
## Value: Enum
## - mqttv5
## - mqttv4
## - mqttv3
bridge.mqtt.aws.proto_ver = mqttv4

## Start type of the bridge.
##
## Value: enum
## manual
## auto
bridge.mqtt.aws.start_type = manual

## Whether to enable bridge mode for mqtt bridge
##
## This option is prepared for the mqtt broker which does not
## support bridge_mode such as the mqtt-plugin of the rabbitmq
##
## Value: boolean
bridge.mqtt.aws.bridge_mode = true

## The ClientId of a remote bridge.
##
## Value: String
bridge.mqtt.aws.clientid = bridge_aws

## The Clean start flag of a remote bridge.
##
## Value: boolean
## Default: true
##
## NOTE: Some IoT platforms require clean_start
##       must be set to 'true'
bridge.mqtt.aws.clean_start = true

## The username for a remote bridge.
##
## Value: String
bridge.mqtt.aws.username = user

## The password for a remote bridge.
##
## Value: String
bridge.mqtt.aws.password = passwd

## Topics that need to be forward to AWS IoTHUB
##
## Value: String
## Example: topic1/#,topic2/#
bridge.mqtt.aws.forwards = topic1/#,topic2/#

## Forward messages to the mountpoint of an AWS IoTHUB
##
## Value: String
#bridge.mqtt.aws.forward_mountpoint = bridge/aws/${node}/

## Need to subscribe to AWS topics
##
## Value: String
 bridge.mqtt.aws.subscription.1.topic = get/#

## Need to subscribe to AWS topics QoS.
##
## Value: Number
 bridge.mqtt.aws.subscription.1.qos = 1

## A mountpoint that receives messages from AWS IoTHUB
##
## Value: String
## bridge.mqtt.aws.receive_mountpoint = receive/aws/


## Bribge to remote server via SSL.
##
## Value: on | off
bridge.mqtt.aws.ssl = off

## PEM-encoded CA certificates of the bridge.
##
## Value: File
bridge.mqtt.aws.cacertfile = etc/certs/cacert.pem

## Client SSL Certfile of the bridge.
##
## Value: File
bridge.mqtt.aws.certfile = etc/certs/client-cert.pem

## Client SSL Keyfile of the bridge.
##
## Value: File
bridge.mqtt.aws.keyfile = etc/certs/client-key.pem

## SSL Ciphers used by the bridge.
##
## Value: String
bridge.mqtt.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA

## Ciphers for TLS PSK.
## Note that 'bridge.${BridgeName}.ciphers' and 'bridge.${BridgeName}.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#bridge.mqtt.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA

## Ping interval of a down bridge.
##
## Value: Duration
## Default: 10 seconds
bridge.mqtt.aws.keepalive = 60s

## TLS versions used by the bridge.
##
## Value: String
bridge.mqtt.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1

## Bridge reconnect time.
##
## Value: Duration
## Default: 30 seconds
bridge.mqtt.aws.reconnect_interval = 30s

## Retry interval for bridge QoS1 message delivering.
##
## Value: Duration
bridge.mqtt.aws.retry_interval = 20s

## Publish messages in batches, only RPC Bridge supports
##
## Value: Integer
## default: 32
bridge.mqtt.aws.batch_size = 32

## Inflight size.
##
## Value: Integer
bridge.mqtt.aws.max_inflight_size = 32

## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined,
## replayq works in a mem-only manner.
##
## Value: String
bridge.mqtt.aws.queue.replayq_dir = data/emqx_aws_bridge/

## Replayq segment size
##
## Value: Bytesize
bridge.mqtt.aws.queue.replayq_seg_bytes = 10MB

## Replayq max total size
##
## Value: Bytesize
bridge.mqtt.aws.queue.max_total_size = 5GB

如果不知道怎么修改的情况下,那么直接把bridge.mqtt.aws.address的地址和mqtt的远程端口修改为你需要转发的地址和端口,如207服务器的mqtt broker是作为转发节点的,209是作为接收端的mqtt broker,那么就填入bridge.mqtt.aws.address = 192.168.5.209:2883(这里的209的mqtt端口我使用的是2883,默认是1883)

## Bridge address: node name for local bridge, host:port for remote.
##
## Value: String
## Example: emqx@127.0.0.1,  127.0.0.1:1883
bridge.mqtt.aws.address = 192.168.5.209:2883

设置需要转发的topic,转发topic1和topic2主题,也就是任何一个向207发布topic1主题和topic2主题的mqtt消息都会转发到209mqtt broker去;

这里需要注意一个点,就是mountpoint配置,该配置在官网介绍是作为一个消息发布的主题前缀信息来配置的,但是这里测试的时候发现如果这句配置不注释,那么转发会不成功;

向209订阅主题,如有人向209的主题发布消息,那么207broker相应的也会收到消息并发布给向207订阅了该主题的对象,这里207向209订阅了test/#主题,qos为1;

## Topics that need to be forward to AWS IoTHUB
##
## Value: String
## Example: topic1/#,topic2/#
bridge.mqtt.aws.forwards = topic1/#,topic2/#

## Forward messages to the mountpoint of an AWS IoTHUB
##
## Value: String
#bridge.mqtt.aws.forward_mountpoint = bridge/aws/${node}/

## Need to subscribe to AWS topics
##
## Value: String
 bridge.mqtt.aws.subscription.1.topic = test/#

## Need to subscribe to AWS topics QoS.
##
## Value: Number
 bridge.mqtt.aws.subscription.1.qos = 1

## A mountpoint that receives messages from AWS IoTHUB
##
## Value: String
## bridge.mqtt.aws.receive_mountpoint = receive/aws/

配置文件启动插件(loaded_plugins)

说完emqx_bridge_mqtt.conf,我们来看看loaded_plugins文件,默认启动emqx的时候,emqx_bridge_mqtt插件是不启动的,需要在控制面板中的插件那里点击启动才行,如何使其在启动emqx启动的时候,bridge插件随之启动呢,我们在emqx.conf配置文件中找到这段一个配置,有关插件的,这里提示,插件自启动的配置在/opt/emqx/data/loaded_plugins这里,如图:
有关EMQX桥接的配置工作

然后修改loaded_plugins文件,将emqx_bridge_mqtt的自启动设置为true即可
有关EMQX桥接的配置工作

将emqx_bridge_mqtt的自启动设置为true之后,运行容器的时候,该插件就默认启动了,如果没有修改该配置的话,那么就得进入emqx的控制台界面去自己启动该插件,每一次都要人为启动的话,效率不高,所以这里设置了自启动,以下截图为控制台启动该插件的图示:有关EMQX桥接的配置工作

启用配置好的桥接网络

bridge插件启动之后,还得进入容器对已配置好的桥接网络进行启动,这里就不进入容器内操作了,直接宿主机执行容器内指令,如下操作:有关EMQX桥接的配置工作

容器外执行容器内指令,可使用docker的exec指令。emqx是可以在配置文件里面配置多个桥接网络的,通过bin目录下面的emqx_ctl可以查询配置了哪些桥接网络,还可以启动配置好的桥接网络,转发的主题和订阅的主题等操作;

# 查询配置了哪些桥接网络
docker exec emqx /bin/sh -c "bin/emqx_ctl bridges list"
# 启动配置好的aws桥接网络
docker exec emqx /bin/sh -c "bin/emqx_ctl bridges start aws"
# 查询已启动的aws桥接网络转发的主题
docker exec emqx /bin/sh -c "bin/emqx_ctl bridges forwards aws"
# 查询已启动的aws桥接网络订阅的主题
docker exec emqx /bin/sh -c "bin/emqx_ctl bridges subscriptions aws"

测试桥接网络

通过图示描述,如下图手绘所示:有关EMQX桥接的配置工作

  • fx1:mqttfx客户端,向207服务器订阅sub主题,推送to1和to2主题到207服务器;
  • 207服务器:转发任何客户端push过来的to1、to2主题的消息到209服务器,向209服务器订阅sub主题,接收209服务器推送过来的sub主题,向那些订阅了sub主题的客户端推送sub主题;
  • 209服务器:接收207服务器转发过来的to1、to2主题的消息,向所有订阅了sub主题的客户端推送消息,向所有订阅了to1、to2主题的客户端推送消息;
  • fx2:mqttfx客户端,向209服务器订阅to1、to2主题,推送sub主题;

如上述过程无异常,即为测试成功;

上一篇:Hello Docker(四)——Docker网络


下一篇:There is no Layer2 option in WireGuard