Rocketmq broker 消息仓库

名词:

commitLog : 消息存储的地方,持久化到磁盘中,保存着生产者发送的完整消息。
consumerqueue: 逻辑消费队列,每一个队列中维护着commitLog 文件中的消息偏移量进行消费。
index:  索引文件。

初始化:

在Broker 启动的时候,org.apache.rocketmq.broker.BrokerController#initialize 初始化

  1.实例化默认的消息仓库。

  2.加载commitLog consumerquque index 磁盘文件,进行维护。

public boolean initialize() throws CloneNotSupportedException {
        boolean result = this.topicConfigManager.load();
        //加载对应管理器的配置文件
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();

        if (result) {
            try {
         //1.实例化消息仓库 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } //2.加载CommitLog 文件,consumerqueue result = result && this.messageStore.load();

.......
}

 

1.实例化默认的消息仓库:

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        this.messageArrivingListener = messageArrivingListener;
        this.brokerConfig = brokerConfig;
        this.messageStoreConfig = messageStoreConfig;
        this.brokerStatsManager = brokerStatsManager;
        this.allocateMappedFileService = new AllocateMappedFileService(this);
        //commitLog ,维护/store/commitLog 文件夹下的文件
        this.commitLog = new CommitLog(this);
        //消息逻辑队列,会加载/store/consumerqueue 文件夹下的topic 队列信息
        this.consumeQueueTable = new ConcurrentHashMap<>(32);
        //刷新 磁盘 consumerqueue 队列信息
        this.flushConsumeQueueService = new FlushConsumeQueueService();
        //清理commitLog 服务
        this.cleanCommitLogService = new CleanCommitLogService();
        //清理consumerQueue服务
        this.cleanConsumeQueueService = new CleanConsumeQueueService();
        //消息状态服务
        this.storeStatsService = new StoreStatsService();
        //索引服务
        this.indexService = new IndexService(this);
        //高可用服务
        this.haService = new HAService(this);
        //commitlog 信息同步consumerqueue 以及index
        this.reputMessageService = new ReputMessageService();

        this.scheduleMessageService = new ScheduleMessageService(this);

        this.transientStorePool = new TransientStorePool(messageStoreConfig);

        if (messageStoreConfig.isTransientStorePoolEnable()) {
            this.transientStorePool.init();
        }

        this.allocateMappedFileService.start();

        this.indexService.start();
        //dispatchList  索引以及consumerqueue 队列的更新
        this.dispatcherList = new LinkedList<>();
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
        MappedFile.ensureDirOK(file.getParent());
        lockFile = new RandomAccessFile(file, "rw");
    }

2. messageStore.load

  加载磁盘上的commitLog 消息存储文件,consumerqueue 逻辑队列文件,index 索引文件,映射成 mappedByteBuffer 内存映射文件,进行快速读写

public boolean load() {
        boolean result = true;

        try {
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

            if (null != scheduleMessageService) {
                result = result && this.scheduleMessageService.load();
            }

            // load Commit Log 1.加载对应磁盘的store/commitLog 文件加里的文件
            result = result && this.commitLog.load();

            // load Consume Queue 2.加载对应磁盘store/consumerqueue  文件夹下面所有topic 对应的消息队列,会加载进入 consumerQueueTable中进行维护,逻辑消息队列
            result = result && this.loadConsumeQueue();

            if (result) {
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                //3。加载Index 索引文件./store/index 索引文件
                this.indexService.load(lastExitOK);

                this.recover(lastExitOK);

                log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
            }
        } catch (Exception e) {
            log.error("load exception", e);
            result = false;
        }

        if (!result) {
            this.allocateMappedFileService.shutdown();
        }

        return result;
    }

 

上一篇:RocketMQ官方为什么“异步刷盘建议用自旋锁,同步刷盘建议用重入锁”?


下一篇:RocketMQ源码解析之broker文件清理