RocketMQ集群搭建(3m-3s-async)
各角色介绍
角色 | 作用 |
---|---|
Producer |
消息发送者,将消息发送到 Broker 。无状态,其与NameServer 集群中的一个节点建立长连接,定期从NameServer 获取Topic 路由信息,并与提供Topic 服务的Master 建立连接,定时向Master 发送心跳 |
Consumer |
消息消费者,从 Broker 中获取消息消费。其与NameServer 集群中的一个节点建立长连接,定期从NameServer 获取Topic 路由信息,并与提供Topic 服务的Master ,Slave 建立长连接,定时向Master ,Slave 发送心跳。Consumer 既可以从Master 订阅消息,也可以从Slave 订阅消息,订阅规则由Broker 配置 |
Broker |
存储和传输消息。分为Master 和Slave ,并且Master 和Slave 是一对多的关系,Master 与 Slave 对应关系通过相同的BrokerName ,不同的BrokerId 来定义(为0 表示Master ,非0 表示Slave )每个Broker 与NameServer 集群中的所有节点建立长连接,定时注册Topic 到所有NameServer
|
NameServer |
负责管理Broker ,比如生产者发送到哪个 Broker ,由 NameServer 告知。集群中节点之间无信息同步,是无状态节点 |
Topic |
消息类别,一个发送者可以发送消息给一个或多个Topic ,一个消息的接受者可以订阅一个或者多个 Topic
|
Message Queue |
用于并行发送和接收消息 |
前提背景
3台服务器如下:
10.28.82.137 简称服务器A
10.28.82.135 简称服务器B
10.28.82.134 简称服务器C
rocketmq-console访问地址:124.196.2.107:3033
每台服务器必须环境:
JDK 本次采用:jdk1.8.0_161
Maven 本次采用:apache-maven-3.6.3
rocketmq 本次采用:rocketmq-all-4.7.1-bin-release
本次rocketmq集群在安装在/wls文件下
JDK安装
切换到/wls: cd /wls
解压jdk1.8.0_161
tar -zxvf jdk-8u161-linux-x64.tar.gz
设置java环境变量:
修改sudo vim /etc/profile,最后一行添加:
export JAVA_HOME=/wls/jdk1.8.0_161 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$JAVA_HOME/bin:$PATH
更新后生效:source /etc/profile
Maven安装
切换到/wls: cd /wls
解压maven
tar -zxvf apache-maven-3.6.3-bin.tar.gz
设置maven环境变量
修改sudo vim /etc/profile,最后一行添加:
export MAVEN_HOME=/wls/apache-maven-3.6.3
export PATH=$JAVA_HOME/bin:$PATH:$MAVEN_HOME/bin
更新后生效:source /etc/profile
rocketmq安装
去官网下载好安装包或者直接使用wget下载
wget -c https://mirror.bit.edu.cn/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
新建文件夹
mkdir -p /wls/rocketmq
将压缩rocketmq文件解压到指定目录
unzip rocketmq-all-4.7.1-bin-release.zip -d /wls/rocketmq
重命名解压后的目录
mv rocketmq-all-4.7.1-bin-release/ rocketmq-4.7.1
配置环境变量,增加RocketMQ NameServer地址
vim /etc/profile
export ROCKETMQ_HOME=/wls/rocketmq/rocketmq-4.7.1
export PATH=$ROCKETMQ_HOME/bin:$PATH
生效环境变量
source /etc/profile
到此已经完成集群前的准备工作。
搭建rocketmq集群(3-master-3-slave)
集群规划
主机 | 角色 | 架构 |
---|---|---|
服务器A(10.28.82.137) |
nameserver ,brokerserver
|
broker-a ,broker-b-s
|
服务器B(10.28.82.135) |
nameserver ,brokerserver
|
broker-b ,broker-c-s
|
服务器C(10.28.82.134) |
nameserver ,brokerserver
|
broker-c ,broker-a-s
|
服务器A(10.28.82.137)
1.配置master broker-a
切换路径
cd /wls/rocketmq/rocketmq-4.7.1/conf/2m-2s-async
修改broker配置
vi broker-a.properties
# 集群名,不同broker节点集群名是一样的
brokerClusterName=DefaultCluster
# broker名字,不同broker节点brokerName是唯一的
brokerName=broker-a
# 0表示Master,大于0表示 Slave
brokerId=0
#nameServer地址,多个用英文分号分割
namesrvAddr=10.28.82.134:9876;10.28.82.135:9876;10.28.82.137:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# 当前节点角色,ASYNC_MASTER=异步复制Master模式,SYNC_MASTER=同步双写Master模式
brokerRole=ASYNC_MASTER
# 刷盘模式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
flushDiskType=ASYNC_FLUSH
# 数据存储路径
storePathRootDir=/wls/rocketmq/store/3astore
# commitLog 存储路径
storePathCommitLog=/wls/rocketmq/store/3astore/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/wls/rocketmq/store/3astore/consumequeue
# 消息索引存储路径
storePathIndex=/wls/rocketmq/store/3astore/index
# checkpoint 文件存储路径
storeCheckpoint=/wls/rocketmq/store/3astore/checkpoint
# abort 文件存储路径
abortFile=/wls/rocketmq/store/3astore/abort
# 是否允许 Broker 自动创建Topic,建议关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议关闭
autoCreateSubscriptionGroup=false
# 设置brokerIP,如果不设置当服务器有很多网卡,默认会读取第一个网卡的IP地址(如安装docker后),会导致客户端无法连接
brokerIP1=10.28.82.137
# broker对外服务的监听端口,默认10911
listenPort=10911
2.配置slave broker-b-s(即把服务器A作为服务器B的从机)
vi broker-b-s.properties
# 集群名,不同broker节点集群名是一样的
brokerClusterName=DefaultCluster
# broker名字,不同broker节点brokerName是唯一的
brokerName=broker-b
# 0表示Master,大于0表示 Slave
brokerId=1
#nameServer地址,多个用英文分号分割
namesrvAddr=10.28.82.134:9876;10.28.82.135:9876;10.28.82.137:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# 当前节点角色,ASYNC_MASTER=异步复制Master模式,SYNC_MASTER=同步双写Master模式
brokerRole=SLAVE
# 刷盘模式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
flushDiskType=ASYNC_FLUSH
# 数据存储路径
storePathRootDir=/wls/rocketmq/store/3bstore
# commitLog 存储路径
storePathCommitLog=/wls/rocketmq/store/3bstore/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/wls/rocketmq/store/3bstore/consumequeue
# 消息索引存储路径
storePathIndex=/wls/rocketmq/store/3bstore/index
# checkpoint 文件存储路径
storeCheckpoint=/wls/rocketmq/store/3bstore/checkpoint
# abort 文件存储路径
abortFile=/wls/rocketmq/store/3bstore/abort
# 是否允许 Broker 自动创建Topic,建议关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议关闭
autoCreateSubscriptionGroup=false
# 设置brokerIP,如果不设置当服务器有很多网卡,默认会读取第一个网卡的IP地址(如安装docker后),会导致客户端无法连接
brokerIP1=10.28.82.137
# broker对外服务的监听端口,默认10911
listenPort=10811
此处一定要注意:两个broker的存储路径不要一样
修改默认broker和nameserver的默认内存配置(此处仅供参考,按实际情况自行修改)
# 修改broker内存配置,将默认4G最低内存改为1g
vim /wls/rocketmq/rocketmq-4.7.1/bin/runbroker.sh
#JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
# 修改runserver.sh,将默认2G最低内存改为512
vim /wls/rocketmq/rocketmq-4.7.1/bin/runserver.sh
#JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
服务器B(10.28.82.135)
1.配置master broker-b
切换路径
cd /wls/rocketmq/rocketmq-4.7.1/conf/2m-2s-async
修改broker配置
vi broker-b.properties
# 集群名,不同broker节点集群名是一样的
brokerClusterName=DefaultCluster
# broker名字,不同broker节点brokerName是唯一的
brokerName=broker-b
# 0表示Master,大于0表示 Slave
brokerId=0
#nameServer地址,多个用英文分号分割
namesrvAddr=10.28.82.134:9876;10.28.82.135:9876;10.28.82.137:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# 当前节点角色,ASYNC_MASTER=异步复制Master模式,SYNC_MASTER=同步双写Master模式
brokerRole=ASYNC_MASTER
# 刷盘模式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
flushDiskType=ASYNC_FLUSH
# 数据存储路径
storePathRootDir=/wls/rocketmq/store/3bstore
# commitLog 存储路径
storePathCommitLog=/wls/rocketmq/store/3bstore/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/wls/rocketmq/store/3bstore/consumequeue
# 消息索引存储路径
storePathIndex=/wls/rocketmq/store/3bstore/index
# checkpoint 文件存储路径
storeCheckpoint=/wls/rocketmq/store/3bstore/checkpoint
# abort 文件存储路径
abortFile=/wls/rocketmq/store/3bstore/abort
# 是否允许 Broker 自动创建Topic,建议关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议关闭
autoCreateSubscriptionGroup=false
# 设置brokerIP,如果不设置当服务器有很多网卡,默认会读取第一个网卡的IP地址(如安装docker后),会导致客户端无法连接
brokerIP1=10.28.82.135
# broker对外服务的监听端口,默认10911
listenPort=10911
2.配置slave broker-c-s(即把服务器B作为服务器C的从机)
vi broker-c-s.properties(原文件夹中无此文件,需要自己新加,或者将其他文件名修改为此文件名)
# 集群名,不同broker节点集群名是一样的
brokerClusterName=DefaultCluster
# broker名字,不同broker节点brokerName是唯一的
brokerName=broker-c
# 0表示Master,大于0表示 Slave
brokerId=1
#nameServer地址,多个用英文分号分割
namesrvAddr=10.28.82.134:9876;10.28.82.135:9876;10.28.82.137:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# 当前节点角色,ASYNC_MASTER=异步复制Master模式,SYNC_MASTER=同步双写Master模式
brokerRole=SLAVE
# 刷盘模式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
flushDiskType=ASYNC_FLUSH
# 数据存储路径
storePathRootDir=/wls/rocketmq/store/3cstore
# commitLog 存储路径
storePathCommitLog=/wls/rocketmq/store/3cstore/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/wls/rocketmq/store/3cstore/consumequeue
# 消息索引存储路径
storePathIndex=/wls/rocketmq/store/3cstore/index
# checkpoint 文件存储路径
storeCheckpoint=/wls/rocketmq/store/3cstore/checkpoint
# abort 文件存储路径
abortFile=/wls/rocketmq/store/3cstore/abort
# 是否允许 Broker 自动创建Topic,建议关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议关闭
autoCreateSubscriptionGroup=false
# 设置brokerIP,如果不设置当服务器有很多网卡,默认会读取第一个网卡的IP地址(如安装docker后),会导致客户端无法连接
brokerIP1=10.28.82.135
# broker对外服务的监听端口,默认10911
listenPort=10811
此处一定要注意:两个broker的存储路径一定不要一样
修改默认broker和nameserver的默认内存配置(此处仅供参考,按实际情况自行修改)
# 修改broker内存配置,将默认4G最低内存改为1g
vim /wls/rocketmq/rocketmq-4.7.1/bin/runbroker.sh
#JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
# 修改runserver.sh,将默认2G最低内存改为512
vim /wls/rocketmq/rocketmq-4.7.1/bin/runserver.sh
#JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
服务器C(10.28.82.134)
1.配置master broker-c
切换路径
cd /wls/rocketmq/rocketmq-4.7.1/conf/2m-2s-async
修改broker配置
vi broker-c.properties
# 集群名,不同broker节点集群名是一样的
brokerClusterName=DefaultCluster
# broker名字,不同broker节点brokerName是唯一的
brokerName=broker-c
# 0表示Master,大于0表示 Slave
brokerId=0
#nameServer地址,多个用英文分号分割
namesrvAddr=10.28.82.134:9876;10.28.82.135:9876;10.28.82.137:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# 当前节点角色,ASYNC_MASTER=异步复制Master模式,SYNC_MASTER=同步双写Master模式
brokerRole=ASYNC_MASTER
# 刷盘模式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
flushDiskType=ASYNC_FLUSH
# 数据存储路径
storePathRootDir=/wls/rocketmq/store/3cstore
# commitLog 存储路径
storePathCommitLog=/wls/rocketmq/store/3cstore/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/wls/rocketmq/store/3cstore/consumequeue
# 消息索引存储路径
storePathIndex=/wls/rocketmq/store/3cstore/index
# checkpoint 文件存储路径
storeCheckpoint=/wls/rocketmq/store/3cstore/checkpoint
# abort 文件存储路径
abortFile=/wls/rocketmq/store/3cstore/abort
# 是否允许 Broker 自动创建Topic,建议关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议关闭
autoCreateSubscriptionGroup=false
# 设置brokerIP,如果不设置当服务器有很多网卡,默认会读取第一个网卡的IP地址(如安装docker后),会导致客户端无法连接
brokerIP1=10.28.82.134
# broker对外服务的监听端口,默认10911
listenPort=10911
2.配置slave broker-a-s(即把服务器C作为服务器A的从机)
vi broker-a-s.properties
# 集群名,不同broker节点集群名是一样的
brokerClusterName=DefaultCluster
# broker名字,不同broker节点brokerName是唯一的
brokerName=broker-a
# 0表示Master,大于0表示 Slave
brokerId=1
#nameServer地址,多个用英文分号分割
namesrvAddr=10.28.82.134:9876;10.28.82.135:9876;10.28.82.137:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# 当前节点角色,ASYNC_MASTER=异步复制Master模式,SYNC_MASTER=同步双写Master模式
brokerRole=SLAVE
# 刷盘模式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
flushDiskType=ASYNC_FLUSH
# 数据存储路径
storePathRootDir=/wls/rocketmq/store/3astore
# commitLog 存储路径
storePathCommitLog=/wls/rocketmq/store/3astore/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/wls/rocketmq/store/3astore/consumequeue
# 消息索引存储路径
storePathIndex=/wls/rocketmq/store/3astore/index
# checkpoint 文件存储路径
storeCheckpoint=/wls/rocketmq/store/3astore/checkpoint
# abort 文件存储路径
abortFile=/wls/rocketmq/store/3astore/abort
# 是否允许 Broker 自动创建Topic,建议关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议关闭
autoCreateSubscriptionGroup=false
# 设置brokerIP,如果不设置当服务器有很多网卡,默认会读取第一个网卡的IP地址(如安装docker后),会导致客户端无法连接
brokerIP1=10.28.82.134
# broker对外服务的监听端口,默认10911
listenPort=10811
此处一定要注意:两个broker的存储路径一定不要一样
修改默认broker和nameserver的默认内存配置(此处仅供参考,按实际情况自行修改)
# 修改broker内存配置,将默认4G最低内存改为1g
vim /wls/rocketmq/rocketmq-4.7.1/bin/runbroker.sh
#JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
# 修改runserver.sh,将默认2G最低内存改为512
vim /wls/rocketmq/rocketmq-4.7.1/bin/runserver.sh
#JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
服务器ABC的broker配置注意
1.注意每个主broker和它的从broker的brokerName需要一致。
2.注意主broker的brokerId=0,从broker的brokerId=1,0表示Master,大于0表示 Slave,不然broker启动会因冲突而失败
3.无论主从,每个broker,都应该注册到所有的nameServer中,如下
namesrvAddr=10.28.82.134:9876;10.28.82.135:9876;10.28.82.137:9876
4.注意主broker的brokerRole=ASYNC_MASTER,从broker的brokerRole=SLAVE
5.brokerIP1按照所在服务器ip来配置
6.同一个服务器的两个broker监听端口不能一样,但是不同服务器的监听端口可以一样
启动集群
启动namesrv(每台都要启动)
cd /wls/rocketmq/rocketmq-4.7.1/bin
# 启动NameServer,NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer
nohup sh mqnamesrv &
# 验证Name Server 是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log
看到The Name Server boot success...即启动成功
启动broker(每台都要启动,此处以服务器A为例)
# 启动Broker集群
# 在机器A,启动Master broker-a,-c 参数指定broker配置文件地址
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
# 在机器A,启动slave broker-b-s,-c 参数指定broker配置文件地址
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
# 验证Name Server 是否启动成功
tail -f ~/logs/rocketmqlogs/broker.log
查看集群
# 查看集群状态
sh mqadmin clusterlist -n 10.28.82.137:9876
或sh mqadmin clusterlist -n 10.28.82.135:9876
或sh mqadmin clusterlist -n 10.28.82.134:9876
# 关闭服务的命令
sh mqshutdown broker
sh mqshutdown namesrv
查看rocketmq进程:
ps -ef |grep rocketmq
安装rocketmq-console(安装到一台服务器上即可)
以服务器A为例:
cd /wls/rocketmq
# 下载源码
git clone https://github.com/apache/rocketmq-externals
# 修改配置文件,修改以下几个配置即可
vim /wls/rocketmq/rocketmq-externals/rocketmq-console/src/main/resources/application.properties
# 服务端口号
server.port=3033
# NameServer服务地址,多台服务器用英文分号分割
rocketmq.config.namesrvAddr=10.28.82.137:9876;10.28.82.135:9876;10.28.82.134:9876
# mq数据路径,可以自己修改
rocketmq.config.dataPath=/wls/rocketmq/rocketmq-console/data
# maven编译
cd /wls/rocketmq/rocketmq-externals/rocketmq-console
mvn clean package -Dmaven.test.skip=true
# 运行rocketmq-console
# 该方式启动关闭服务器连接的时候同时也会关闭启动的进程,适用于测试的时候使用
java -jar target/rocketmq-console-ng-2.0.0.jar
# 该方式将java程序设置为后台运行,nohup表示不挂断运行,命令结尾的&表示在后台运行
# 0:stdin (standard input),1:stdout (standard output),2:stderr (standard error)
# 2>&1是将标准错误(2)重定向到标准输出(&1),标准输出(&1)再被重定向输入到console.log文件中
nohup java -jar target/rocketmq-console-ng-2.0.0.jar >console.log 2>&1 &
集群搭建成功
测试mqadmin管理工具
RocketMQ的源代码中并没有为我们提供类似于Nacos或者RabbitMQ那样的控制台,只提供了一个mqadmin指令来管理RocketMQ,命令在bin目录下。使用方式是 ./mqadmin {command} {args}
所有指令如下:
Topic相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
updateTopic | 创建更新Topic配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port |
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) | ||
-h- | 打印帮助 | ||
-n | NameServer服务地址,格式 ip:port | ||
-p | 指定新topic的读写权限( W=2|R=4|WR=6 ) | ||
-r | 可读队列数(默认为 8) | ||
-w | 可写队列数(默认为 8) | ||
-t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
deleteTopic | 删除Topic | -c | cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
topicList | 查看 Topic 列表信息 | -h | 打印帮助 |
-c | 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicRoute | 查看 Topic 路由信息 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicStatus | 查看 Topic 消息队列offset | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port | ||
-p | 指定新 topic 的读写权限( W=2|R=4|WR=6 ) | ||
-c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令 | ||
updateOrderConf | 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic,键 | ||
-v | orderConf,值 | ||
-m | method,可选get、put、delete | ||
allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic 名称 |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-i | ipList,用逗号分隔,计算这些ip去负载Topic的消息队列 | ||
statsAll | 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-a | 是否只打印活跃topic | ||
-t | 指定topic |
集群相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
clusterList | 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 | -m | 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-i | 打印间隔,单位秒 | ||
clusterRT | 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 | -a | amount,每次探测的总数,RT = 总时间 / amount |
-s | 消息大小,单位B | ||
-c | 探测哪个集群 | ||
-p | 是否打印格式化日志,以|分割,默认不打印 | ||
-h | 打印帮助 | ||
-m | 所属机房,打印使用 | ||
-i | 发送间隔,单位秒 | ||
-n | NameServer 服务地址,格式 ip:port |
Broker相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
updateBrokerConfig | 更新 Broker 配置文件,会修改Broker.conf | -b | Broker 地址,格式为ip:port |
-c | cluster 名称 | ||
-k | key 值 | ||
-v | value 值 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
brokerStatus | 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) | -b | Broker 地址,地址为ip:port |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
brokerConsumeStats | Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 | -b | Broker 地址,地址为ip:port |
-t | 请求超时时间 | ||
-l | diff阈值,超过阈值才打印 | ||
-o | 是否为顺序topic,一般为false | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
getBrokerConfig | 获取Broker配置 | -b | Broker 地址,地址为ip:port |
-n | NameServer 服务地址,格式 ip:port | ||
wipeWritePerm | 从NameServer上清除 Broker写权限 | -b | Broker 地址,地址为ip:port |
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
cleanExpiredCQ | 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker 地址,地址为ip:port | ||
-c | 集群名称 | ||
cleanUnusedTopic | 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker 地址,地址为ip:port | ||
-c | 集群名称 | ||
sendMsgStatus | 向Broker发消息,返回发送状态和RT | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | BrokerName,注意不同于Broker地址 | ||
-s | 消息大小,单位B | ||
-c | 发送次数 |
消息相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
queryMsgById | 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 | -i | msgId |
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
queryMsgByKey | 根据消息 Key 查询消息 | -k | msgKey |
-t | Topic 名称 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
queryMsgByOffset | 根据 Offset 查询消息 | -b | Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到) |
-i | query 队列 id | ||
-o | offset 值 | ||
-t | topic 名称 | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
queryMsgByUniqueKey | 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-i | uniqe msg id | ||
-g | consumerGroup | ||
-d | clientId | ||
-t | topic名称 | ||
checkMsgSendRT | 检测向topic发消息的RT,功能类似clusterRT | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-a | 探测次数 | ||
-s | 消息大小 | ||
sendMessage | 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-p | body,消息体 | ||
-k | keys | ||
-c | tags | ||
-b | BrokerName | ||
-i | queueId | ||
consumeMessage | 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-b | BrokerName | ||
-o | 从offset开始消费 | ||
-i | queueId | ||
-g | 消费者分组 | ||
-s | 开始时间戳,格式详见-h | ||
-d | 结束时间戳 | ||
-c | 消费多少条消息 | ||
printMsg | 从Broker消费消息并打印,可选时间段 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-c | 字符集,例如UTF-8 | ||
-s | subExpress,过滤表达式 | ||
-b | 开始时间戳,格式参见-h | ||
-e | 结束时间戳 | ||
-d | 是否打印消息体 | ||
printMsgByQueue | 类似printMsg,但指定Message Queue | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-t | topic名称 | ||
-i | queueId | ||
-a | BrokerName | ||
-c | 字符集,例如UTF-8 | ||
-s | subExpress,过滤表达式 | ||
-b | 开始时间戳,格式参见-h | ||
-e | 结束时间戳 | ||
-p | 是否打印消息 | ||
-d | 是否打印消息体 | ||
-f | 是否统计tag数量并打印 | ||
resetOffsetByTime | 按时间戳重置offset,Broker和consumer都会重置 | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-g | 消费者分组 | ||
-t | topic名称 | ||
-s | 重置为此时间戳对应的offset | ||
-f | 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系 | ||
-c | 是否重置c++客户端offset |
消费者和消费者组相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
consumerProgress | 查看订阅组消费状态,可以查看具体的client IP的消息积累量 | -g | 消费者所属组名 |
-s | 是否打印client IP | ||
-h | 打印帮助 | ||
-n | NameServer 服务地址,格式 ip:port | ||
consumerStatus | 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand | -h | 打印帮助 |
-n | NameServer 服务地址,格式 ip:port | ||
-g | consumer group | ||
-i | clientId | ||
-s | 是否执行jstack | ||
getConsumerStatus | 获取 Consumer 消费进度 | -g | 消费者所属组名 |
-t | 查询主题 | ||
-i | Consumer 客户端 ip | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
updateSubGroup | 更新或创建订阅关系 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker地址 | ||
-c | 集群名称 | ||
-g | 消费者分组名称 | ||
-s | 分组是否允许消费 | ||
-m | 是否从最小offset开始消费 | ||
-d | 是否是广播模式 | ||
-q | 重试队列数量 | ||
-r | 最大重试次数 | ||
-i | 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费 | ||
-w | 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1 | ||
-a | 当消费者数量变化时是否通知其他消费者负载均衡 | ||
deleteSubGroup | 从Broker删除订阅关系 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-b | Broker地址 | ||
-c | 集群名称 | ||
-g | 消费者分组名称 | ||
cloneGroupOffset | 在目标群组中使用源群组的offset | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-s | 源消费者组 | ||
-d | 目标消费者组 | ||
-t | topic名称 | ||
-o | 暂未使用 |
连接相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
consumerConnec tion | 查询 Consumer 的网络连接 | -g | 消费者所属组名 |
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
producerConnec tion | 查询 Producer 的网络连接 | -g | 生产者所属组名 |
-t | 主题名称 | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 |
NameServer相关
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
updateKvConfig | 更新NameServer的kv配置,目前还未使用 | -s | 命名空间 |
-k | key | ||
-v | value | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
deleteKvConfig | 删除NameServer的kv配置 | -s | 命名空间 |
-k | key | ||
-n | NameServer 服务地址,格式 ip:port | ||
-h | 打印帮助 | ||
getNamesrvConfig | 获取NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
updateNamesrvConfig | 修改NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 | ||
-k | key | ||
-v | value |
其他
名称 | 含义 | 命令选项 | 说明 |
---|---|---|---|
startMonitoring | 开启监控进程,监控消息误删、重试队列消息数等 | -n | NameServer 服务地址,格式 ip:port |
-h | 打印帮助 |
注意:
1、几乎所有指令都需要通过-n参数配置nameServer地址,格式为ip:port
2、几乎所有执行都可以通过-h参数获得帮助
3、当既有Broker地址(-b)又有集群名称clustername(-c)配合项,则优先以Broker地址执行指令。如果不配置Broker地址,则对集群中所有主机执行指令。