RocketMQ-broker存储机制
该篇主要讲解rocketmq-store模块,了解其存储机制,文件读写原理。
为何Rocketmq存储写文件这么快呢?
简单来说,总结两点:
1)pagecache+虚拟内存 2)零拷贝+java文件映射
Broker存储目录结构
commitlog 文件名是一个20个字符,代表该文件存储的起始偏移量,文件大小通过MappedFileSizeCommitLog配置。
consumequeue 中每一个消息就是一个索引,直到到commitlog中的message,用来作为消费者拉取消息,更新点位使用。
Index 按照消息key创建的hash索引,文件名是创建时的时间戳。
config 保存了当前broker中的全部的topic,订阅关系和消费进度,这些数据会定时的从内存中持久化到磁盘,以便宕机后恢复。
abort Broker是否异常关闭的标志,正常启动会删除该文件
checkpoint broker最后一次正常运行的状态,保存了最后一次刷盘时间,最后一次正确索引的时间。
先整体看一下rocketmq都有哪些功能,后面逐一介绍
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { // 消息到达监听 在消费长轮询中使用 this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; this.messageStoreConfig = messageStoreConfig; // broker状态管理 this.brokerStatsManager = brokerStatsManager; // 内存映射文件创建服务 this.allocateMappedFileService = new AllocateMappedFileService(this); this.commitLog = new CommitLog(this); // topic 与 多个comsumerqueue对应关系 this.consumeQueueTable = new ConcurrentHashMap<>(32); // 刷盘服务 this.flushConsumeQueueService = new FlushConsumeQueueService(); // 清理commit log this.cleanCommitLogService = new CleanCommitLogService(); // 清理ConsumeQueue this.cleanConsumeQueueService = new CleanConsumeQueueService(); //存储状态管理 this.storeStatsService = new StoreStatsService(); // 索引服务 this.indexService = new IndexService(this); // 主从同步服务 this.haService = new HAService(this); // reputMessageService是用来把commitlog中的数据写到consumerqueue和index中 this.reputMessageService = new ReputMessageService(); this.scheduleMessageService = new ScheduleMessageService(this); // 直接内存池,避免频繁创建,在异步刷盘的时候作为与pagecache的数据交换区,通过commit操作完成数据传输到pagecache this.transientStorePool = new TransientStorePool(messageStoreConfig); if (messageStoreConfig.isTransientStorePoolEnable()) { this.transientStorePool.init(); } // 启动mappedFile创建服务 this.allocateMappedFileService.start(); // 启动索引服务 this.indexService.start(); // 在上面的reputMessageService中的reput方法中,调用下面的2个的dodispatch方法 this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); }
加载与启动
@Override public boolean load() { boolean result = true; try { // 根据abort异常文件 判断服务上次是否正常退出 boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); // TODO 延迟消息服务相关 将在后面介绍 if (null != scheduleMessageService) { result = result && this.scheduleMessageService.load(); } // load Commit Log 把几个commitlog文件加载进来 result = result && this.commitLog.load(); // load Consume Queue result = result && this.loadConsumeQueue(); if (result) { // 加载checkpoint this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); //加载索引文件 this.indexService.load(lastExitOK); // 恢复 this.recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); } } catch (Exception e) { log.error("load exception", e); result = false; } if (!result) { this.allocateMappedFileService.shutdown(); } return result; }
@Override public void start() throws Exception { // ConsumeQueue 刷盘服务 this.flushConsumeQueueService.start(); // 内部会启动 同步刷盘服务和异步转存服务 this.commitLog.start(); this.storeStatsService.start(); if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { this.scheduleMessageService.start(); } // 根据是否可重复 来设置dispatch的起始offset if (this.getMessageStoreConfig().isDuplicationEnable()) { // 从commitlog的ConfirmOffset开始存储消息 this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); } else { // 从commitlog的最大偏移量处开始存储消息 this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); } // 内部会创建consumerqueue 内部文件 this.reputMessageService.start(); // 启动数据同步 this.haService.start(); //创建abort文件 this.createTempFile(); // 一些定时任务 文件清理 dump锁信息 以及校验文件大小与偏移量间是否一致 this.addScheduleTask(); this.shutdown = false; }
接下来将会重点介绍 commitlog consumerqueue index 事务消息与延迟消息 HA 刷盘机制