RocketMQ源码解析十一(Consumer上报消费进度流程(集群模式))

RocketMQ版本4.6.0,记录自己看源码的过程

Consumer

在消费者启动过程中,会启动MQClientInstance,而MQClientInstance中会启动多个定时任务,其中就包括定时上报消费进度:

private void startScheduledTask() {
    
    // 省略其它定时任务。。。

    // 定时持久化消费进度,默认5s
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // 省略其它定时任务。。。
}

/**
 * 持久化全部消费进度
 */
private void persistAllConsumerOffset() {
    // 获取该JVM上全部消费者实例
    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, MQConsumerInner> entry = it.next();
        MQConsumerInner impl = entry.getValue();
        // 调用各个消费者的持久化接口
        impl.persistConsumerOffset();
    }
}

调用各个消费者的上报接口
DefaultMQPushConsumerImpl

/**
 * 持久化当前消费者消费进度
 */
@Override
public void persistConsumerOffset() {
    try {
        this.makeSureStateOK();
        Set<MessageQueue> mqs = new HashSet<MessageQueue>();
        // 获取该消费者分配的消息队列
        Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
        mqs.addAll(allocateMq);
        // 使用消费进度组件持久化全部队列消费进度
        this.offsetStore.persistAll(mqs);
    } catch (Exception e) {
        log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
    }
}

使用消费进度组件上报该消费者的所有消费队列消费进度

/**
 * 持久化消费进度
 */
@Override
public void persistAll(Set<MessageQueue> mqs) {
    if (null == mqs || mqs.isEmpty())
        return;

    final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();

    // 遍历每个队列当前的消费进度
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
        MessageQueue mq = entry.getKey();
        // 消费进度
        AtomicLong offset = entry.getValue();
        if (offset != null) {
            if (mqs.contains(mq)) {
                try {
                    // 更新消费进度到broker中
                    this.updateConsumeOffsetToBroker(mq, offset.get());
                    log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                        this.groupName,
                        this.mQClientFactory.getClientId(),
                        mq,
                        offset.get());
                } catch (Exception e) {
                    log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                }
            } else {
                unusedMQ.add(mq);
            }
        }
    }

    if (!unusedMQ.isEmpty()) {
        for (MessageQueue mq : unusedMQ) {
            this.offsetTable.remove(mq);
            log.info("remove unused mq, {}, {}", mq, this.groupName);
        }
    }
}

/**
 * 以单向方式更新消费进度到broker
 */
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    updateConsumeOffsetToBroker(mq, offset, true);
}

/**
 * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
 */
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    // 获得一个broker地址,正常情况下都是master broker
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        // 构建更新队列消费进度得请求数据
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);

        if (isOneway) {
            // 单向发送更新进度请求
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

以单向的方式发送请求,请求码为UPDATE_CONSUMER_OFFSET

public void updateConsumerOffsetOneway(
    final String addr,
    final UpdateConsumerOffsetRequestHeader requestHeader,
    final long timeoutMillis
    ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
InterruptedException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);

    this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
}

Broker

broker处理更新消费进度的处理器是ConsumerManageProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
            return this.getConsumerListByGroup(ctx, request);
        // 处理consumer发过来的更新消费进度的请求
        case RequestCode.UPDATE_CONSUMER_OFFSET:
            return this.updateConsumerOffset(ctx, request);
        case RequestCode.QUERY_CONSUMER_OFFSET:
            return this.queryConsumerOffset(ctx, request);
        default:
            break;
    }
    return null;
}

/**
 * 更新消息队列消费进度
 */
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    final RemotingCommand response =
        RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
    final UpdateConsumerOffsetRequestHeader requestHeader =
        (UpdateConsumerOffsetRequestHeader) request
            .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
    // 向消费进度管理组件提交消费进度
    this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
        requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

向消费进度管理组件提交消费进度,消费进度统一由消费进度管理器管理

/**
 * 用来管理消费者的消费进度
 */

public class ConsumerOffsetManager extends ConfigManager {

    /**
     * 消费进度表
     */
    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

    /**
     * 上报消费进度
     * @param clientHost 消费端地址
     * @param group      消费组
     * @param topic      主题
     * @param queueId    队列id
     * @param offset     要更新的进度
     */
    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
        final long offset) {
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(clientHost, key, queueId, offset);
    }

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        // 第一次上报消费进度
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            // 更新队列的消费进度
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
            }
        }
    }
}

这部分逻辑还是很清晰的。
RocketMQ源码解析十一(Consumer上报消费进度流程(集群模式))

参考资料

《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》

上一篇:优美的暴力—分块


下一篇:MySqlBulkLoader 中文乱码