3、RocketMQ 源码解析之 源代码环境搭建

在之前的文章中我们知道了 RocketMQ 里面的核心功能、架构和概念。并且也介绍了它的简单使用与 Spring Boot 的集成。下面开始我们对 RocketMQ 的源码探索,首先我们先在自己本地搭建 RocketMQ 的源代码环境。

1、下载源代码

首先我们可以在 rocketmq github 网站下载 Rocket MQ 的源代码,它是以 maven 进行项目管理的。接着把项目导入到自己的开发工具中,博主使用的是 Idea。导入 idea 如下图显示:
3、RocketMQ 源码解析之 源代码环境搭建
下面我们来介绍一下项目中的各个 Module:

名称 作用
acl RocketMQ 的访问权限控制
broker RocketMQ 的核心模块之一:用于接收消息发送方的请求,主要包括消息处理、存储、以及高可用相关的功能
client RocketMQ 的核心模块之一:消息发送方依赖该模块把消息发送至 broker
common RocketMQ 的基础模块:提供一些基础工具以及功能定义
distribution RocketMQ 中的一些核心配置以及服务的功能脚本
example RocketMQ 的功能示例
filter RocketMQ 的消息过滤机制
logappender RocketMQ 的日志 Appender 适配
logging RocketMQ 的日志框架适配
namesrv RocketMQ 的名称服务,主要保存服务的元信息,包括: broker、主题相关的信息
openmessaging RocketMQ 对于 openmessaging 规范的实现
remoting RocketMQ 各个进程之间的相互通信模块,主要是通过 NIO 框架 Netty 来实现的
srvutil 服务的一些工具类
store RocketMQ 消息的日志持久化
test RocketMQ 项目的测试用例
tools RocketMQ 项目提供的一些工具类

2、启动 Name Server

步骤一:在本地创建一个目录用于 RocketMQ 的运行环境,/Users/zhaoyong/rocketmq,然后在这个目录之下创建:conflogs 以及 store 目录。

步骤二:从 distributionbroker.conflogback_broker.xmllogback_namesrv.xm 配置Copy 到 /Users/zhaoyong/rocketmq/conf 目录下。并且修改 broker.conf

broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
#nameServer 地址,分号分割
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 存储路径
storePathRootDir=/Users/zhaoyong/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/Users/zhaoyong/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/Users/zhaoyong/rocketmq/store/consumequeue
# 消息索引|存储路径
storePathindex=/Users/zhaoyong/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/Users/zhaoyong/rocketmq/store/checkpoint
#abort 文件存储路径 abortFile=/Users/zhaoyong/rocketmq/store/abort

步骤三:配置 Name Server 的启动信息:

3、RocketMQ 源码解析之 源代码环境搭建
主要是配置以下二点:

  • 启动类:org.apache.rocketmq.namesrv.NamesrvStartup
  • 环境变量:ROCKETMQ_HOME=/Users/zhaoyong/rocketmq

步骤四:启动 Name Server,运行结果如下:

3、RocketMQ 源码解析之 源代码环境搭建

三、启动 Broker

步骤一:配置 Broker 的启动信息
3、RocketMQ 源码解析之 源代码环境搭建
主要配置为以下几点:

  • 启动类:org.apache.rocketmq.broker.BrokerStartup
  • 指定 Broker 配置文件:-c /Users/zhaoyong/rocketmq/conf/broker.conf
  • 环境变量:ROCKETMQ_HOME=/Users/zhaoyong/rocketmq

步骤二:启动 Broker,运行结果如下:
3、RocketMQ 源码解析之 源代码环境搭建

4、使用本地启动的消息服务

4.1 发送消息。

修改 org.apache.rocketmq.example.quickstart.Producer 类,在 在 producer.start() 之前设置命名服务地址:

producer.setNamesrvAddr("localhost:9876");
producer.start();

然后运行 org.apache.rocketmq.example.quickstart.Producer,运行效果如下:
3、RocketMQ 源码解析之 源代码环境搭建

4.2 接收消息

修改 org.apache.rocketmq.example.quickstart.Consumer 类,在 在 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); 之前设置命名服务地址:

producer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

然后运行 org.apache.rocketmq.example.quickstart.Consumer,运行效果如下:

3、RocketMQ 源码解析之 源代码环境搭建

4.3 查看 RocketMQ 运行目录

下面是我们运行消息发送与消息消费后的 RocketMQ 的工具目录,其中 + 后面代表目录,后面代码文件。

/Users/zhaoyong/rocketmq
	└ abort
	+ conf
		└ broker.conf
		└ logback_broker.xml
		└ logback_namesrv.xml
	+ logs
	+ store
		└ abort
		└ checkpoint
		+ commitlog
			└ 00000000000000000000
		+ config
			└ consumerFilter.json
			└ consumerFilter.json.bak
			└ consumerOffset.json
			└ consumerOffset.json.bak
			└ delayOffset.json
			└ delayOffset.json.bak
			└ subscriptionGroup.json
			└ topics.json
			└ topics.json.bak
		+ consumequeue
			+ TopicTest
				+ 0
					└ 00000000000000000000
				+ 1
					└ 00000000000000000000
				+ 2
					└ 00000000000000000000
				+ 3
					└ 00000000000000000000
		+ index
			└ 20211222211320060
		└ lock
  • abort文件:用于判断broker是否正常关闭,在broker启动时创建,关闭时删除,如果broker异常退出,则文件会一直存在,在启动时会走其他流程进行文件修复等。
  • checkpoint文件:文件检测点,存储commitlog文件、consumequeue、index索引文件最后一次刷盘时间戳。
  • commitlog目录:存放消息实体,所有topic的消息都会通过追加的方式往commitlog文件中写入,单文件大小默认为1G,文件名为起始偏移量,长度为20位,左边补零。
  • consumequeue目录:存放消息消费队列,只有写入到这里的数据,才能够被消费者消费,每个topic都会在此目录下创建一个同名目录,每个队列会建立对应的索引文件,用于加快消息的检索和节省磁盘空间,里面存放了消息的关键信息,如commitog文件中的偏移量、消息长度、tag的hashcode值等。
  • index文件目录:存放消息索引文件,只有写入到这里的数据,才能够通过key或者msgId等进行查询。
  • config目录:存放topic和订阅组的配置信息,以及消费进度等。
  • lock文件:锁文件,在consumer进行rebalance的时候。

下面是一个消息的运动轨迹:
3、RocketMQ 源码解析之 源代码环境搭建

上一篇:RocketMQ安装与启动


下一篇:RocketMQ-什么是死信队列?怎么解决