名词:
commitLog : 消息存储的地方,持久化到磁盘中,保存着生产者发送的完整消息。
consumerqueue: 逻辑消费队列,每一个队列中维护着commitLog 文件中的消息偏移量进行消费。
index: 索引文件。
初始化:
在Broker 启动的时候,org.apache.rocketmq.broker.BrokerController#initialize 初始化
1.实例化默认的消息仓库。
2.加载commitLog consumerquque index 磁盘文件,进行维护。
public boolean initialize() throws CloneNotSupportedException { boolean result = this.topicConfigManager.load(); //加载对应管理器的配置文件 result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load(); if (result) { try {
//1.实例化消息仓库 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } //2.加载CommitLog 文件,consumerqueue result = result && this.messageStore.load();
.......
}
1.实例化默认的消息仓库:
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; this.messageStoreConfig = messageStoreConfig; this.brokerStatsManager = brokerStatsManager; this.allocateMappedFileService = new AllocateMappedFileService(this); //commitLog ,维护/store/commitLog 文件夹下的文件 this.commitLog = new CommitLog(this); //消息逻辑队列,会加载/store/consumerqueue 文件夹下的topic 队列信息 this.consumeQueueTable = new ConcurrentHashMap<>(32); //刷新 磁盘 consumerqueue 队列信息 this.flushConsumeQueueService = new FlushConsumeQueueService(); //清理commitLog 服务 this.cleanCommitLogService = new CleanCommitLogService(); //清理consumerQueue服务 this.cleanConsumeQueueService = new CleanConsumeQueueService(); //消息状态服务 this.storeStatsService = new StoreStatsService(); //索引服务 this.indexService = new IndexService(this); //高可用服务 this.haService = new HAService(this); //commitlog 信息同步consumerqueue 以及index this.reputMessageService = new ReputMessageService(); this.scheduleMessageService = new ScheduleMessageService(this); this.transientStorePool = new TransientStorePool(messageStoreConfig); if (messageStoreConfig.isTransientStorePoolEnable()) { this.transientStorePool.init(); } this.allocateMappedFileService.start(); this.indexService.start(); //dispatchList 索引以及consumerqueue 队列的更新 this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); MappedFile.ensureDirOK(file.getParent()); lockFile = new RandomAccessFile(file, "rw"); }
2. messageStore.load
加载磁盘上的commitLog 消息存储文件,consumerqueue 逻辑队列文件,index 索引文件,映射成 mappedByteBuffer 内存映射文件,进行快速读写
public boolean load() { boolean result = true; try { boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); if (null != scheduleMessageService) { result = result && this.scheduleMessageService.load(); } // load Commit Log 1.加载对应磁盘的store/commitLog 文件加里的文件 result = result && this.commitLog.load(); // load Consume Queue 2.加载对应磁盘store/consumerqueue 文件夹下面所有topic 对应的消息队列,会加载进入 consumerQueueTable中进行维护,逻辑消息队列 result = result && this.loadConsumeQueue(); if (result) { this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); //3。加载Index 索引文件./store/index 索引文件 this.indexService.load(lastExitOK); this.recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); } } catch (Exception e) { log.error("load exception", e); result = false; } if (!result) { this.allocateMappedFileService.shutdown(); } return result; }