文章目录
基础概念
RocketMQ主要由 消息生产者(Producer)、代理服务器(Broker Server)、消息消费者(Consumer) 三部分组成,
其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,主题(Topic)一类消息的集合,
每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker.
角色理解
Broker
理解成RocketMQ魔法
经纪人主要用于生产者和消费者接收和发送消息
经纪人会定时向nameserver提交自己的信息
是消息中间件的消息存储、转发服务器
每个Broker节点,在启动时,都遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报
Nameserver
理解成zookeeper的效果,只是他没用zk,用nameserver来替代zk
由媒体实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点
域名服务器是服务发现者,会员中的人物角色(制作人、经纪人、消费者)都需要定时服务器域名上向自己的状态,以便互相发现彼此,超时不上报的话,域名服务器可能会从列表中剔除除名
nameserver可以有多个,当多个nameserver出现的时候,其他角色同时给他们上报信息,以保证高可用,
NameServer 网络间互不通信,没有主备的概念
nameserver 内存式存储,nameserver 中的broker、topic 等信息默认无不会持久化,所以他是状态节点
Producer
消息的生产者
随机选择其中一个NameServer节点建立长连接,获得等热门消息信息(包括topic下的队列,这些queue发布在哪些broker上)
接下来向提供主题服务的master建立长连接(因为rocketmq只有master才能写),并且定时向master发送心跳消息
Comsumer
消息的消费者
通过NameServer,获得Topic的路由信息连接到对应的Broker上消费
因为主从都可以读取消息,因此消费者会与主从都建立连接进行消费消息
核心概念
Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
Message
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。
系统提供了通过Message ID和Key查询消息的功能
queue
1个topic会被分成N个queue(队列),数量是可配置的。
message(消息)本身其实是存储到queue上的,消费者消费的也是queue上的消息。
Tag
为消息设置的标志,用于同一主题下区分不同类型的消息。
来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。
标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。
消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性
Producer Group
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送事务消息之后崩溃,则Broker服务器会联系同一Producer组的其他Producer实例以提交或回溯消费
Consumer Group
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。
消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。消费者组的消费者实例必须订阅完全相同的Topic。
Message Model
集群消费模式: 相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费模式: 相同Consumer Group的每个Consumer实例都接收全量的消息。
Message Ordered
普通顺序消费模式: 消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
严格顺序消息模式: 消费者收到的所有消息均是有顺序的。
ACK
首先要明确一点:ACK机制是消费者端的,不是在Producer端的。想要消费者消费完后消息要进行ACK确认,如果未确认代表则是消费失败,这时候Broker会进行重试策略(仅集结)群模式会重试)。
ACK的英文就是:消费者说:好的,我消费了。消息给我标记成功已消费吧。
单机环境
环境要求:
64 位操作系统,推荐使用 Linux/Unix/Mac;
64 位 JDK 1.8+;
Maven 3.2.x;
git;
Broker 服务器 4g+ 可用磁盘
准备环境
安装jdk 1.8+
#下载jdk
wget https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/8/jdk/x64/linux/OpenJDK8U-jdk_x64_linux_openj9_8u292b10_openj9-0.26.0.tar.gz
#解压jdk
mkdir -p /usr/local/java/
tar -xvf OpenJDK8U-jdk_x64_linux_openj9_8u292b10_openj9-0.26.0.tar.gz
mv jdk8u292-b10 /usr/local/java/jdk8
#设置环境变量
cat ~/.bashrc
…
export JAVA_HOME=/usr/local/java/jdk8
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
#生效环境变量
source ~/.bashrc
安装meven
#下载
wget https://apache.website-solution.net/maven/maven-3/3.8.1/binaries/apache-maven-3.8.1-bin.tar.gz
#解压
tar -xvf apache-maven-3.8.1-bin.tar.gz -C /usr/lib/jvm/
#设置环境变量
cat >> /etc/profile <<EOF
export MEVEN_HOME=/usr/lib/jvm/apache-maven-3.8.1
export PATH=/usr/lib/jvm/apache-maven-3.8.1/bin:$PATH
EOF
source /etc/profile
开始安装recoketMQ
#下载
wget https://mirror-hk.koddos.net/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip
#解压
unzip rocketmq-all-4.8.0-source-release.zip
mv rocketmq-all-4.8.0-source-release/ recoketMQ
#编译安装
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0
启动nameserver
#修改runserver.sh 不知为何它会显示为[[]]的样子,根据shell 语法这是不对的,取消一对中括号即可
if [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
#默认占用端口9876
nohup sh bin/mqnamesrv topicList -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动broker
#先修改启动内存,否者启动包memroy 不足
vim bin/runbroker.sh
...
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
...
vim bin/runserver.sh
...
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
...
#启动服务
nohup sh bin/mqbroker -n localhost:9876 &
#broker 启动后会在用户家目录创建一个logs文件夹
tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...
发送和接收消息
简单起见,使用环境变量NAMESRV_ADDR
#设置环境变量 (临时的)
export NAMESRV_ADDR=localhost:9876
#消息创建者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
#消息消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
停止服务
#先停止broker
sh bin/mqshutdown broker
#停止namesrv
sh bin/mqshutdown namesrv
多Master多Slave模式-同步双写
主机规划
IP | 部署服务 | 角色 |
---|---|---|
192.168.0.100 | NameServer | – |
192.168.0.208 | NameServer | – |
192.168.0.100 | broker-a | master |
192.168.0.100 | broker-b-s | slave |
192.168.0.208 | broker-b | master |
192.168.0.208 | broker-a-s | slave |
软件依赖
安装jdk8
#下载jdk
wget https://mirrors.tuna.tsinghua.edu.cn/AdoptOpenJDK/8/jdk/x64/linux/OpenJDK8U-jdk_x64_linux_openj9_8u292b10_openj9-0.26.0.tar.gz
#解压jdk
mkdir -p /usr/local/java/
tar -xvf OpenJDK8U-jdk_x64_linux_openj9_8u292b10_openj9-0.26.0.tar.gz
mv jdk8u292-b10 /usr/local/java/jdk8
#设置环境变量
cat ~/.bashrc
…
export JAVA_HOME=/usr/local/java/jdk8
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
#生效环境变量
source ~/.bashrc
安装Rocketmq
wget https://apache.website-solution.net/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
unzip rocketmq-all-4.8.0-bin-release.zip
mv rocketmq-all-4.8.0-bin-release /usr/local/rocketmq-4.8
#设置 rocketmq 环境变量
export ROCKETMQ_HOME=/usr/local/rocketmq-4.8
export PATH=$PATH::$ROCKETMQ_HOME/bin
#创建存储路径
mkdir -p /usr/local/rocketmq-4.8/data/{store,store-s}/{commitlog,consumequeue,index}
修改日志配置
#创建日志目录
mkdir -p /usr/local/rocketmq-4.8/logs
#替换*.xml文件中的{user.home}为自己指定的目录
cd /usr/local/rocketmq-4.8/conf && sed -i 's#${user.home}#/usr/local/rocketmq-4.8#g' *.xml
修改jvm 启动参数
vim bin/runserver.sh
#修改runserver.sh 不知为何它会显示为[[ ]]的样子,根据shell 语法这是不对的,取消一对中括号即可
if [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then
#修改启动内存,否者启动包memroy 不足
vim bin/runbroker.sh
...
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
vim bin/runserver.sh
...
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改配置文件
broker-a.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
#0 表示master,>0 表示slave
brokerId=0
#nameServer地址,分号隔开
namesrvAddr=192.168.0.100:9876;192.168.0.208:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许broker自动创建topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许broker自动创建订阅组,建议线下开始,线上关闭
autoCreateSubscriptionGroup=true
#broker对外服务的监听端口,
#同一台机器部署多个broker,端口号要不同,且端口号之间要相距大些
listenPort=10911
#删除文件的时间节点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小,默认大小1g
mapedFileSizeCommitLog=1073741824
#consumeQueue每个文件默认存30w条,根据自身业务进行调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileInterval=120000
redeleteHangedFileInterval=120000
#检查物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#store存储路径,master与slave目录要不同
storePathRootDir=/usr/local/rocketmq-4.8/data/store
#commitLog存储路径
storePathCommitLog=/usr/local/rocketmq-4.8/data/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.8/data/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.8/data/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.8/data/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq-4.8/data/store/abort
#限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
checkTransactionMessageEnable=false
#发消息线程池数
sendMessageThreadPoolNums=16
#拉去消息线程池数
pullMessageThreadPoolNums=16
#broker角色:
brokerRole=SYSC_MASTER
#刷盘方式
flushDiskType=ASYNC_FLUSH
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=192.168.0.100
broker-b-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
namesrvAddr=192.168.0.100:9876;192.168.0.208:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10950
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
destroyMapedFileInterval=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq-4.8/data/store-s
storePathCommitLog=/usr/local/rocketmq-4.8/data/store-s/commitlog
storePathConsumeQueue=/usr/local/rocketmq-4.8/data/store-s/consumequeue
storePathIndex=/usr/local/rocketmq-4.8/data/store-s/index
storeCheckpoint=/usr/local/rocketmq-4.8/data/store-s/checkpoint
abortFile=/usr/local/rocketmq-4.8/data/store-s/abort
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
checkTransactionMessageEnable=false
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=192.168.0.100
broker-b.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
namesrvAddr=192.168.0.100:9876;192.168.0.208:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
destroyMapedFileInterval=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq-4.8/data/store
storePathCommitLog=/usr/local/rocketmq-4.8/data/store/commitlog
storePathConsumeQueue=/usr/local/rocketmq-4.8/data/store/consumequeue
storePathIndex=/usr/local/rocketmq-4.8/data/store/index
storeCheckpoint=/usr/local/rocketmq-4.8/data/store/checkpoint
abortFile=/usr/local/rocketmq-4.8/data/store/abort
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
checkTransactionMessageEnable=false
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16
brokerRole=SYSC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=192.168.0.208
broker-a-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
namesrvAddr=192.168.0.100:9876;192.168.0.208:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10950
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
destroyMapedFileInterval=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq-4.8/data/store-s
storePathCommitLog=/usr/local/rocketmq-4.8/data/store-s/commitlog
storePathConsumeQueue=/usr/local/rocketmq-4.8/data/store-s/consumequeue
storePathIndex=/usr/local/rocketmq-4.8/data/store-s/index
storeCheckpoint=/usr/local/rocketmq-4.8/data/store-s/checkpoint
abortFile=/usr/local/rocketmq-4.8/data/store-s/abort
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
checkTransactionMessageEnable=false
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=192.168.0.208
启动服务
#启动nameserver
nohup sh bin/mqnamesrv topicList -n localhost:9876 &
#启动192.168.0.100 上的 broker
nohup sh bin/mqbroker -c /usr/local/rocketmq-4.8/conf/2m-2s-sync/broker-a.properties &
nohup sh bin/mqbroker -c /usr/local/rocketmq-4.8/conf/2m-2s-sync/broker-b-s.properties &
#启动192.168.0.208 上的 broker
nohup sh bin/mqbroker -c /usr/local/rocketmq-4.8/conf/2m-2s-sync/broker-b.properties &
nohup sh bin/mqbroker -c /usr/local/rocketmq-4.8/conf/2m-2s-sync/broker-a-s.properties &
查看集群&测试
cd /usr/local/rocketmq-4.8
sh bin/mqadmin clusterList -n 127.0.0.1:9876
官方的测试发送接收
# 设置环境变量 (临时的)
export NAMESRV_ADDR=localhost:9876
# 消息创建者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 消息消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
使用mqadmin进行测试
# 设置环境变量(临时的)
export NAMESRV_ADDR=localhost:9876
# 发送消息
sh bin/mqadmin sendMessage -n 192.168.0.100:9876 -t TopicTest -p "hello word" -c test -k one
# 接收消息
sh bin/mqadmin consumeMessage -n 192.168.0.100:9876 -t TopicTest
Dledger集群搭建
DLedger 应用场景
DLedger 其中一个应用就是在分布式消息系统中,RocketMQ 4.5 版本发布后,可以采用 RocketMQ on DLedger 方式进行部署。
DLedger commitlog 代替了原来的 commitlog,使得 commitlog 拥有了选举复制能力,然后通过角色透传的方式,raft 角色透传给外部 broker 角色,leader 对应原来的 master,follower 和 candidate 对应原来的 slave。
因此 RocketMQ 的 broker 拥有了自动故障转移的能力,在一组 broker 中如果 Master 挂了,能够依靠 DLedger 自动选主能力重新选出一个 leader,然后通过角色透传变成新的 Master。
DLedger 还可以构建高可用的嵌入式 KV 存储。我们把对一些数据的操作记录到 DLedger 中,然后根据数据量或者实际需求,恢复到hashmap 或者 rocksdb 中,从而构建一致的、高可用的 KV 存储系统,应用到元信息管理等场景。
RocketMQ Dledger 的方案简介
RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,组中至少需要 3 个 Broker 节点来保证集群能够运行,在 Broker 启动时候,通过 raft 算法能够自动选举出一个 Broker 为 Leader 节点,其余为 Follower 节点。
这种模式下 Leader 和 Follower 之间复制数据以保证高可用,如果 Leader 节点出现问题是可以自动进行容灾切换并保证数据一致性。
且不仅仅如此,该模式也支持 Broker 节点水平扩展来增加吞吐量。所以该模式将会是部署 RocketMQ 常用模式之一。
整体架构:高可用、高并发、可伸缩 、海量消息
高可用
三个 NameServer 极端情况下,确保集群的可用性,任何两个 NameServer 挂掉也不会影响信息的整体使用。
在上图中每个 Master Broker 都有两个 Slave Broker,这样可以保证可用性,如在同一个 Dledger Group 中 Master Broker 宕机后,Dledger 会去行投票将剩下的节点晋升为 Master Broker。
高并发
假设某个Topic的每秒十万消息的写入, 可以增加 Master Broker 然后十万消息的写入会分别分配到不同的 Master Broker ,如有5台 Master Broker 那每个 Broker 就会承载2万的消息写入。
可伸缩
如果消息数量增大,需要存储更多的数量和最高的并发,完全可以增加 Broker ,这样可以线性扩展集群。
海量消息
数据都是分布式存储的,每个Topic的数据都会分布在不同的 Broker 中,如果需要存储更多的数据,只需要增加 Master Broker 就可以了。
基本配置参数说明:
参数名称 | 参数描述 | 参数示例 |
---|---|---|
brokerClusterName | Broker 集群名称 | RaftCluster |
brokerName | Broker 名称 | RaftNode00 |
listenPort | Broker 监听端口 | 30911 |
namesrvAddr | Broker Namesrv 地址 | 127.0.0.1:9876 |
storePathRootDir | Broker 存储目录 | /tmp/rmqstore/node00 |
storePathCommitLog | Commitlog 存储目录 | /tmp/rmqstore/node00/commitlog |
Dledger 配置参数说明:
name | 含义 | 举例 |
---|---|---|
enableDLegerCommitLog | 是否启动 DLedger | true |
dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 |
dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 | n0-127.0.0.1:40911; n1-127.0.0.1:40912; n2-127.0.0.1:40913 |
dLegerSelfId | 节点 id, 必须属于dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 |
conf/dledger/broker-n0.conf 的配置举例
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
#启动nameserver
nohup sh bin/mqnamesrv
#启动broker
nohup sh bin/mqbroker -c conf/dledger/broker-n0.conf &
nohup sh bin/mqbroker -c conf/dledger/broker-n1.conf &
nohup sh bin/mqbroker -c conf/dledger/broker-n2.conf &
#查看集群
sh bin/mqadmin clusterList -n 127.0.0.1:9876
BID 为 0 的表示 Master,其余都是 Follower)
测试发送和接收消息
简单起见,使用环境变量NAMESRV_ADDR
#设置环境变量 (临时的)
export NAMESRV_ADDR=localhost:9876
#消息创建者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
#消息消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
docker快速安装RocketMQ 控制台
docker run -d --name rmqconsole \
-p 8080:8080 \
--restart=always \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.0.100:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
apacherocketmq/rocketmq-console:2.0.0
看到下面谁是master
测试高可用性
停止master,扮演master宕机
发现立马就有从主机顶替上来成为了master,大大提高了服务的高可用