在之前的文章中我们知道了 RocketMQ 里面的核心功能、架构和概念。并且也介绍了它的简单使用与 Spring Boot 的集成。下面开始我们对 RocketMQ 的源码探索,首先我们先在自己本地搭建 RocketMQ 的源代码环境。
1、下载源代码
首先我们可以在 rocketmq github 网站下载 Rocket MQ 的源代码,它是以 maven 进行项目管理的。接着把项目导入到自己的开发工具中,博主使用的是 Idea。导入 idea 如下图显示:
下面我们来介绍一下项目中的各个 Module:
名称 | 作用 |
---|---|
acl | RocketMQ 的访问权限控制 |
broker | RocketMQ 的核心模块之一:用于接收消息发送方的请求,主要包括消息处理、存储、以及高可用相关的功能 |
client | RocketMQ 的核心模块之一:消息发送方依赖该模块把消息发送至 broker |
common | RocketMQ 的基础模块:提供一些基础工具以及功能定义 |
distribution | RocketMQ 中的一些核心配置以及服务的功能脚本 |
example | RocketMQ 的功能示例 |
filter | RocketMQ 的消息过滤机制 |
logappender | RocketMQ 的日志 Appender 适配 |
logging | RocketMQ 的日志框架适配 |
namesrv | RocketMQ 的名称服务,主要保存服务的元信息,包括: broker、主题相关的信息 |
openmessaging | RocketMQ 对于 openmessaging 规范的实现 |
remoting | RocketMQ 各个进程之间的相互通信模块,主要是通过 NIO 框架 Netty 来实现的 |
srvutil | 服务的一些工具类 |
store | RocketMQ 消息的日志持久化 |
test | RocketMQ 项目的测试用例 |
tools | RocketMQ 项目提供的一些工具类 |
2、启动 Name Server
步骤一:在本地创建一个目录用于 RocketMQ 的运行环境,/Users/zhaoyong/rocketmq
,然后在这个目录之下创建:conf
、logs
以及 store
目录。
步骤二:从 distribution
把 broker.conf
、logback_broker.xml
和 logback_namesrv.xm
配置Copy 到 /Users/zhaoyong/rocketmq/conf
目录下。并且修改 broker.conf
:
broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
#nameServer 地址,分号分割
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 存储路径
storePathRootDir=/Users/zhaoyong/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/Users/zhaoyong/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/Users/zhaoyong/rocketmq/store/consumequeue
# 消息索引|存储路径
storePathindex=/Users/zhaoyong/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/Users/zhaoyong/rocketmq/store/checkpoint
#abort 文件存储路径 abortFile=/Users/zhaoyong/rocketmq/store/abort
步骤三:配置 Name Server 的启动信息:
主要是配置以下二点:
-
启动类:
org.apache.rocketmq.namesrv.NamesrvStartup
-
环境变量:
ROCKETMQ_HOME=/Users/zhaoyong/rocketmq
步骤四:启动 Name Server,运行结果如下:
三、启动 Broker
步骤一:配置 Broker 的启动信息
主要配置为以下几点:
-
启动类:
org.apache.rocketmq.broker.BrokerStartup
-
指定 Broker 配置文件:
-c /Users/zhaoyong/rocketmq/conf/broker.conf
-
环境变量:
ROCKETMQ_HOME=/Users/zhaoyong/rocketmq
步骤二:启动 Broker,运行结果如下:
4、使用本地启动的消息服务
4.1 发送消息。
修改 org.apache.rocketmq.example.quickstart.Producer
类,在 在 producer.start()
之前设置命名服务地址:
producer.setNamesrvAddr("localhost:9876");
producer.start();
然后运行 org.apache.rocketmq.example.quickstart.Producer
,运行效果如下:
4.2 接收消息
修改 org.apache.rocketmq.example.quickstart.Consumer
类,在 在 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
之前设置命名服务地址:
producer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
然后运行 org.apache.rocketmq.example.quickstart.Consumer
,运行效果如下:
4.3 查看 RocketMQ 运行目录
下面是我们运行消息发送与消息消费后的 RocketMQ 的工具目录,其中 +
后面代表目录,└
后面代码文件。
/Users/zhaoyong/rocketmq
└ abort
+ conf
└ broker.conf
└ logback_broker.xml
└ logback_namesrv.xml
+ logs
+ store
└ abort
└ checkpoint
+ commitlog
└ 00000000000000000000
+ config
└ consumerFilter.json
└ consumerFilter.json.bak
└ consumerOffset.json
└ consumerOffset.json.bak
└ delayOffset.json
└ delayOffset.json.bak
└ subscriptionGroup.json
└ topics.json
└ topics.json.bak
+ consumequeue
+ TopicTest
+ 0
└ 00000000000000000000
+ 1
└ 00000000000000000000
+ 2
└ 00000000000000000000
+ 3
└ 00000000000000000000
+ index
└ 20211222211320060
└ lock
- abort文件:用于判断broker是否正常关闭,在broker启动时创建,关闭时删除,如果broker异常退出,则文件会一直存在,在启动时会走其他流程进行文件修复等。
- checkpoint文件:文件检测点,存储commitlog文件、consumequeue、index索引文件最后一次刷盘时间戳。
- commitlog目录:存放消息实体,所有topic的消息都会通过追加的方式往commitlog文件中写入,单文件大小默认为1G,文件名为起始偏移量,长度为20位,左边补零。
- consumequeue目录:存放消息消费队列,只有写入到这里的数据,才能够被消费者消费,每个topic都会在此目录下创建一个同名目录,每个队列会建立对应的索引文件,用于加快消息的检索和节省磁盘空间,里面存放了消息的关键信息,如commitog文件中的偏移量、消息长度、tag的hashcode值等。
- index文件目录:存放消息索引文件,只有写入到这里的数据,才能够通过key或者msgId等进行查询。
- config目录:存放topic和订阅组的配置信息,以及消费进度等。
- lock文件:锁文件,在consumer进行rebalance的时候。
下面是一个消息的运动轨迹: