1、总体架构介绍
下面我们搭建一个双主双从的集群,并且采用同步的方式来同步主从之间的信息,总体架构如下:
2、集群工作流程
集群工作流程如下:
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
3、搭建集群
3.1、服务器准备
先准备两台服务器,比如两台虚拟机,可以分别在这两台服务器上放不同的主节点和从节点,但注意同一组的 broker 不要放在同一台服务器上,避免服务器宕机后主从节点一起崩溃。信息如下:
序号 | IP | 角色 | 架构模式 |
---|---|---|---|
1 | 192.168.25.130 | nameserver、brokerserver | Master1、Slave2 |
2 | 192.168.25.131 | nameserver、brokerserver | Master2、Slave1 |
3.2、修改域名映射
修改 hosts 文件:
vim /etc/hosts
往 hosts 文件中添加以下信息:
# nameserver 192.168.25.130 rocketmq-nameserver1 192.168.25.131 rocketmq-nameserver2
# broker 192.168.25.130 rocketmq-master1 192.168.25.130 rocketmq-slave2 192.168.25.131 rocketmq-master2 192.168.25.131 rocketmq-slave1
3.3、关闭防火墙或者开放端口
宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙
# 关闭防火墙 systemctl stop firewalld.service # 查看防火墙的状态 firewall-cmd --state # 禁止firewall开机启动 systemctl disable firewalld.service
或者为了安全,只开放特定的端口号,RocketMQ默认使用3个端口:9876 、10911 、11011 。如果防火墙没有关闭的话,那么防火墙就必须开放这些端口:
-
nameserver
默认使用 9876 端口 -
master
默认使用 10911 端口 -
slave
默认使用11011 端口
执行以下命令:
# 开放name server默认端口 firewall-cmd --remove-port=9876/tcp --permanent # 开放master默认端口 firewall-cmd --remove-port=10911/tcp --permanent # 开放slave默认端口 (当前集群模式可不开启) firewall-cmd --remove-port=11011/tcp --permanent # 重启防火墙 firewall-cmd --reload
3.4、配置环境变量
修改 profile 文件:
vim /etc/profile
直接在该文件最后添加以下配置即可:
# set rocketmq 注意:下面的ROCKETMQ_HOME指定的路径应该是你服务器上rocketmq实际安装的路径 ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release PATH=$PATH:$ROCKETMQ_HOME/bin export ROCKETMQ_HOME PATH
修改后执行命令重新加载配置,使得配置立刻生效:
source /etc/profile
3.5、创建消息存储路径
mkdir /usr/local/rocketmq/store mkdir /usr/local/rocketmq/store/commitlog mkdir /usr/local/rocketmq/store/consumequeue mkdir /usr/local/rocketmq/store/index
3.6、修改broker配置文件
在安装目录下的 conf 目录下,我们可以看到以下目录结构:
因为我们是采用双主双从,同步更新的集群,所以修改 2m-2s-sync 下的配置文件。
- master1
服务器:192.168.25.130 下:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties
修改配置如下:
# 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a # 0 表示 Master,>0 表示 Slave brokerId=0 # nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # Broker 对外服务的监听端口 listenPort=10911 # 删除文件时间点,默认凌晨 4点 deleteWhen=04 # 文件保留时间,默认 48 小时 fileReservedTime=120 # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 # 存储路径 storePathRootDir=/usr/local/rocketmq/store # commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog # 消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue # 消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index # checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint # abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=SYNC_MASTER # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH # checkTransactionMessageEnable=false # 发消息线程池数量 # sendMessageThreadPoolNums=128 # 拉消息线程池数量 # pullMessageThreadPoolNums=128
- slave2
服务器:192.168.25.130 下:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties
修改配置如下:
# 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-b # 0 表示 Master,>0 表示 Slave brokerId=1 # nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # Broker 对外服务的监听端口 listenPort=11011 # 删除文件时间点,默认凌晨 4点 deleteWhen=04 # 文件保留时间,默认 48 小时 fileReservedTime=120 # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 # 存储路径 storePathRootDir=/usr/local/rocketmq/store # commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog # 消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue # 消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index # checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint # abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=SLAVE # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # checkTransactionMessageEnable=false # 发消息线程池数量 # sendMessageThreadPoolNums=128 # 拉消息线程池数量 # pullMessageThreadPoolNums=128
- master2
服务器:192.168.25.131
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties
修改配置如下:
# 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-b # 0 表示 Master,>0 表示 Slave brokerId=0 # nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # Broker 对外服务的监听端口 listenPort=10911 # 删除文件时间点,默认凌晨 4点 deleteWhen=04 # 文件保留时间,默认 48 小时 fileReservedTime=120 # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 # 存储路径 storePathRootDir=/usr/local/rocketmq/store # commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog # 消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue # 消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index # checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint # abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=SYNC_MASTER # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH # checkTransactionMessageEnable=false # 发消息线程池数量 # sendMessageThreadPoolNums=128 # 拉消息线程池数量 # pullMessageThreadPoolNums=128
-
slave1
服务器:192.168.25.131:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties
修改配置如下:
# 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a # 0 表示 Master,>0 表示 Slave brokerId=1 # nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # Broker 对外服务的监听端口 listenPort=11011 # 删除文件时间点,默认凌晨 4点 deleteWhen=04 # 文件保留时间,默认 48 小时 fileReservedTime=120 # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 # 存储路径 storePathRootDir=/usr/local/rocketmq/store # commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog # 消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue # 消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index # checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint # abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=SLAVE # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # checkTransactionMessageEnable=false # 发消息线程池数量 # sendMessageThreadPoolNums=128 # 拉消息线程池数量 # pullMessageThreadPoolNums=128
3.7、修改启动脚本文件
根据内存大小对JVM参数进行适当的调整:
vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/bin/runbroker.sh vi /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/bin/runserver.sh
参考修改如下:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
3.8、启动服务
先关闭 rocketmq,然后再重新启动集群:
# 关闭NameServer sh mqshutdown namesrv # 关闭Broker sh mqshutdown broker
- 启动nameserve集群
分别在192.168.25.130和192.168.25.131启动NameServer:
# 由于已经配置了环境变量,所以可以直接执行以下命令,不然需要在 bin 目录下执行 nohup sh mqnamesrv &
- 启动broker集群
在192.168.25.130上启动master1和slave2:
# 启动master1: nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties & # 启动slave2: nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
在192.168.25.131上启动master2和slave1:
# 启动master2: nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties & # 启动slave1: nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
启动后可通过JPS查看启动进程