消息队列(五) ---RocketMQ-消息存储3

问题:

  • consumeQueue 如何工作
  • 刷盘机制如何工作

概述

该节我们将学习 consumeQueue 如何工作,先来看一下消息发送的大概过程。 消息队列(五) ---RocketMQ-消息存储3 而为什么需要 consumeQueue 的存在呢?我们只有一个 commitLog 文件,那么假如需要查找某个主题下的消息,那么我们不得不遍历整个commmitLog 来完成查询,consumeQueue 的存在方便的消息的查询获取。首先来看一下 consumeQueue 文件的结构,其中 TopicTest 和 TopicTest1 是 topic ,里面的编号0~3表示该主题下的4个队列。

├─TopicTest
│  ├─0
│  ├─1
│  ├─2
│  └─3
└─TopicTest1
    ├─0
    ├─1
    ├─2
    └─3

ConsumeQueue 相关

DefaultMessageStore在初始化时开启了一个线程,该线程的作用是**(当生产者新生产一条消息)更新消息到 consumeQueue 中去** ,然后consumeQueue在回刷回去磁盘持久化。 可以看到这里分为两部分。

我们看一下这个线程是在哪里启动的

    public void start() throws Exception {
        if (this.messageStore != null) {
            this.messageStore.start();
        }

        ...

    }

DefaultMessageStore#start 方法

        //No.3 开启 reputMessageService
        if (this.getMessageStoreConfig().isDuplicationEnable()) {
            this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
        } else {
            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
        }
        this.reputMessageService.start();



reputMessageService#run 方法

        @Override
        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    //一直执行 doReput 方法 
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }

下面总结来自参考文章,侵删 doReput做了以下几件事 1:获取CommitLog中存储的新消息。

SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);

reputFromOffset记录了本次需要拉取的消息在CommitLog中的偏移。这里将reputFromOffset传递给CommitLog,获取CommitLog在reputFromOffset处存储的消息。

2:如果第一步获取的消息不为空,则表明有新消息被存储到CommitLog中,此时便会通知ConsumeQueue更新消息偏移。

DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
......

DefaultMessageStore.this.doDispatch(dispatchRequest);

3:更新reputFromOffset,设置为下次需要拉取的消息在CommitLog中的偏移。

this.reputFromOffset = result.getStartOffset();
......
int size = dispatchRequest.getMsgSize();
......
this.reputFromOffset += size;

上面的重点在第二步中,这里调用 DefaultMessageStore.this.doDispatch(dispatchRequest) 来通知ConsumeQueue。

DefaultMessageStore中存储了一个dispatcherList,其中存放了几个CommitLogDispatcher对象,它们都是用来监听CommitLog中新消息存储的。

this.dispatcherList = new LinkedList<>();
//用于分发给 ConsumeQueue 
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
//用于分发给 Index 
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

其中第二步比较重要,这里我们这里也可以知道 commitLog 的分发器有两个,一个用于 ConsumeQueue ,另外一个用于 Index

    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }


    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        //找到对应的 ConsumeQueue
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        //调用ConsumeQueue的存储逻辑
        cq.putMessagePositionInfoWrapper(dispatchRequest);
    }

    // 存储体的封装
    public void putMessagePositionInfoWrapper(DispatchRequest request) {
        final int maxRetries = 30;
        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
        for (int i = 0; i < maxRetries && canWrite; i++) {
            long tagsCode = request.getTagsCode();
            if (isExtWriteEnable()) {
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                cqExtUnit.setFilterBitMap(request.getBitMap());
                cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                cqExtUnit.setTagsCode(request.getTagsCode());

                long extAddr = this.consumeQueueExt.put(cqExtUnit);
                if (isExtAddr(extAddr)) {
                    tagsCode = extAddr;
                } else {
                    log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                        topic, queueId, request.getCommitLogOffset());
                }
            }
            //存储操作
            boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
            if (result) {
                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                return;
            } else {
                // XXX: warn and notify me
                log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                    + " failed, retry " + i + " times");

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.warn("", e);
                }
            }
        }

        // XXX: warn and notify me
        log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
    }

    //存储操作
    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {

        if (offset <= this.maxPhysicOffset) {
            return true;
        }

        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);

        //offset -> size -> tagsCode 
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

        //找到对应的 MappedFile 
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {

            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }

            if (cqOffset != 0) {
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }

                if (expectLogicOffset != currentLogicOffset) {
                    LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                    );
                }
            }
            this.maxPhysicOffset = offset;
            //调用 MappedFile 直接加入 
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }
 

从上面我们可以知道 consumeQueue 的存储底层使用的是 mapfile 。

异步刷盘

刷盘的方式分为异步和同步,rocketmq 默认是异步刷盘,根据我们前面的分析,消息最后是落地的,那么刷盘的最终调用应该会是 mappfile 的flush 方法 。 消息队列(五) ---RocketMQ-消息存储3

异步刷盘

异步刷盘相关的服务是FlushRealTimeService ,位于 CommitLog 这个类里面。 FlushRealTimeService 的 run 方法 。

        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

                int flushPhysicQueueThoroughInterval =
                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

                boolean printFlushProgress = false;

                // Print flush progress
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                    this.lastFlushTimestamp = currentTimeMillis;
                    flushPhysicQueueLeastPages = 0;
                    printFlushProgress = (printTimes++ % 10) == 0;
                }

                // 前面的都是获刷盘时间
                try {
                    if (flushCommitLogTimed) {
                        Thread.sleep(interval);
                    } else {
                        this.waitForRunning(interval);
                    }

                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }

                    long begin = System.currentTimeMillis();
                    // 核心逻辑
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }

            // Normal shutdown, to ensure that all the flush before exit
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.flush(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }

            this.printFlushProgress();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

同步刷盘

同步刷盘使用的是 GroupCommitService

总结

参考文章

  • https://zhuanlan.zhihu.com/p/59516998
  • https://zhuanlan.zhihu.com/p/58728454
  • https://juejin.im/post/5d3f00aaf265da03e1685097 (深入分析)
上一篇:rocketMQ 删除过期文件


下一篇:FZU 第十五届程序设计竞赛_重现赛 & FOJ Problem 2289 项链