RocketMQ 消息存储

引言

前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息存储部分的整体实现思路。

消息存储

通过前面的知识,我们已经知道了topic是如何分配到Broker的,以及消息发送方是如何决定把消息发送给哪个Broker的,接下来我们看一看Broker介绍到消息后,是怎么存储消息的。

RocketMQ主要存储的文件包括CommitLog文件、ConsumeQueue文件、IndexFile文件。RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。但由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从CommitLog文件中检索消息。

磁盘有时候会比你想象的快很多,有时候也会比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度,这是磁盘比想象的快的地方。但是磁盘随机写的速度只有大概1OOKB/s, 和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。

存储方案

RocketMQ 消息存储
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog, ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

CommitLog以物理文件的方式存放,每台Broker上的CommitLog被本机器所有ConsumeQueue共享。在CommitLog中,一个消息的存储长度是不固定的,RocketMQ采取一些机制,尽量向CommitLog中顺序写,但是随机读。ConsumeQueue的内容也会被写到磁盘里作持久存储,只不过是通过异步刷盘的方式进行。

这样设计的优点:

  1. CommitLog顺序写,可以大大提高写入效率。接收到消息时,只有CommitLog是需要同步刷盘的(根据配置,可能也不需要同步刷盘),其他文件都是异步保存,如果发生了宕机,RocketMQ可以根据CommitLog恢复ConsumeQueue文件和IndexFile。
  2. 虽然CommitLog是随机读,但是利用操作系统的page cache机制,可以批量地从磁盘读取,cache存到内存中之后,加速后续的读取速度。
  3. 为了保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性,CommitLog里存储了Consume Queues、Message keys、Tags等所有信息,即使ConsumeQueue丢失,也可以通过commitLog完全恢复出来。

下图是一个Broker在文件系统中存储的各个文件。我们可以看到CommitLog文件夹、ConsumeQueue文件夹,还有在config文件夹中Topic、Consumer的相关信息。最下面那个文件夹index存的是索引文件,这个文件用来加快消息查询的速度。
RocketMQ 消息存储

  • commitlog:消息存储目录。
  • config:运行期间一些配置信息,主要包括下列信息。
    • consumerFilter.json:主题消息过滤信息。
    • consumerOffset.json:集群消费模式消息消费进度。
    • delayOffset.json:延时消息队列拉取进度。
    • subscriptionGroup:消息消费组配置信息。
    • topic.json:topic配置属性。
  • consumequeue:消息消费队列存储目录。
  • index:消息索引文件存储目录。
  • abort:如果存在 abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出之前删除。
  • checkpoint:文件检测点存储commitlog文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间、index索引文件最后一次刷盘时间戳。

存储流程

  1. 如果当前Broker停止工作或Broker为SLAVE角色或当前Broker不支持写入则拒绝消息写入;如果消息主题长度超过256个字符、消息属性长度超过65536个字符将拒绝该消息写入。
  2. 如果消息的延迟级别大于0,将消息的原主题名称与原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC、消息队列ID更新原先消息的主题与队列。
  3. 获取当前可以写入的CommitLog文件
  4. 在写入CommitLog之前,先申请putMessageLock,也就是将消息存储到CommitLog文件中是串行的
  5. 设置消息的存储时间,如果CommitLog文件不存在就需要创建新的文件
  6. 创建全局唯一消息ID
  7. 获取该消息在消息队列的偏移量
  8. 计算消息总长度,并写入CommitLog
  9. 如果计算发现CommitLog无法存储所有内容,则创建新的CommitLog,文件名为即将插入消息的偏移
  10. 将消息内容写入CommitLog文件后根据配置进行同步或者异步刷盘
  11. 更新逻辑偏移量,并释放putMessageLock
  12. 根据CommitLog偏移量,消息存储大小,tag的hash值插入一条Message到ConsumeQueue
  13. 根据Key的hash值,CommitLog偏移量,插入一条数据到IndexFile
  14. ConsumerQueue每隔一段时间自动刷盘、IndexFile在每次创建新indexFile时刷盘之前的索引文件、checkpoint文件在刷盘CommotLog,ConsumeQueue和IndexFile时进行更新

RocketMQ 消息存储

参考内容

[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 文件 IO 操作的一些最佳实践
[7] 海量数据处理之Bloom Filter详解
[8] rocketmq GitHub Wiki

RocketMQ 消息存储

上一篇:RocketMQ文件存储体系介绍


下一篇:rocketMq 消息偏移量 Offset