RockerMQ源码分析——Broker消息发送流程

org.apache.rocketmq.example.quickstart.Producer

  • 创建一个消息的生产者,且指定一个组
  • 设置namesrv地址,可以从此地址获取topic的队列信息
  • 启动生产者实例
  • 循环中创建消息对象,并指定topic、tag和消息体
  • 在循环中发送消息,采用默认的负载策略,
    • 调用org.apache.rocketmq.client.producer.DefaultMQProducer#send
      • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send
        • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl:
        • Message:发送消息
        • CommunicationMode:发送方式
        • SendCallback:异步消息发送回调函数
        • timeout:消息发送超时时间
        • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:获取topic的路由信息( Broker负载消息存储,一个topic可以利用负载均衡分布在多台broker上,每个broker包含多个Queue:每个QueueData包含BrokerName,读队列和写队列个数,权限?、同步或异步)
          • 先从本地缓存中 ConcurrentMap<String/* topic */, TopicPublishInfo>中尝试获取
            • ->org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:尝试从Nameserver中获取Topic路由信息,并更新本地缓存
              • 为了避免重复从 NameServer 获取配置信息,添加了锁
              • 从默认的Topic或者指定的Topic中获取配置信息(从Nameserver获取)
              • 获取到最新的Topic信息后,与本地缓存进行对比,有变化的话,需要同步更新消费者、生产者关于该Topic的缓存,更新前是先复制一份信息
          • 如果未找到路由信息,则从默认的Topic中寻找路由配置
        • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue:根据Topic路由负载算法选择一个消息队列进行消息发送
          • ->org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
            • 如果开启了消息延时规避
              • 首先对Topic所有队列进行验证,因为加入了发送异常延时,确保消息队列(MessageQueue)所在的Broker是正常的
              • 关于消息延时机制
            • 没有开启的话,就循环向下一个消息队列发送
        • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl:向MessageQueue消息发送
          • 通过Product与Broker的长连接将消息发送给Broker,然后Broker将消息存储,并返回生产者
        • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem如果失败就更新下容错策略,主要用来规避发生故障的broker
        • 如果是同步调用方式(SYNC),则执行失败重试策略,默认重试两次
          主要分析的是RocketMQ 以同步方式发送消息的过程,异步模式与单向模式实现原理基本一样,异步只是增加了发送成功或失败的回掉方法。

broker消息发送的主要执行流程:
RockerMQ源码分析——Broker消息发送流程

DefaultMQProducerImpl#sendDefaultImpl:producer发送消息
private SendResult sendDefaultImpl(
        Message msg,                                    // 发送消息
        final CommunicationMode communicationMode,      // 发送方式
        final SendCallback sendCallback,                // 异步消息发送回调函数
        final long timeout                              // 消息发送超时时间
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 需要先确保Producer状态正常
    this.makeSureStateOK();
    // 消息参数校验
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 查询topic路由信息,先尝试从内存中获取,若没有 则从namesrv通过netty远程获取
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;     // 最后选择用于发送消息的队列
        Exception exception = null;
        SendResult sendResult = null;   // 最后一次发送结果
        // 总次数;若是同步模式,则在默认2次的基础上+1,如果是异步和oneway 模式则只有1次,一旦失败就直接返回
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;      // 第几次发送
        String[] brokersSent = new String[timesTotal];  // 存储每次发送消息时选择的broker名称
        for (; times < timesTotal; times++) {       // 重试总次数
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 根据Topic路由负载算法选择一个消息队列进行消息发送
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }

                    // 向MessageQueue 消息发送,消息发送的核心函数
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    // 如果失败就更新下容错策略,主要用来规避发生故障的broker
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:         // 异步发送
                            return null;
                        case ONEWAY:        // oneway 模式
                            return null;
                        case SYNC:
                            /**
                             * 状态有4种:
                             * 发送成功、发送成功但刷盘失败、发送成功但同步到slave失败以及发送成功而slave不可用
                             */
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) { // 状态不是OK,说明同步发送成功,但存储出现问题
                                // 是否尝试发送到其他Broker上
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    //  同步发送成功但存储有问题时 && 配置存储异常时允许重新发送时,进行重试
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
                    endTimestamp = System.currentTimeMillis();
                    // 如果失败就更新下容错策略,主要用来规避发生故障的broker
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) {  // 打印异常,更新Broker可用性信息,继续循环
                    endTimestamp = System.currentTimeMillis();
                    // 如果失败就更新下容错策略,主要用来规避发生故障的broker
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
                    endTimestamp = System.currentTimeMillis();
                    // 如果失败就更新下容错策略,主要用来规避发生故障的broker
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                        // 如出现以下类型的异常,进行消息发送重试
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                            // 如果有发送结果,进行返回,否则抛出异常
                        default:
                            if (sendResult != null) {
                                return sendResult;
                            }

                            throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    // 如果失败就更新下容错策略,主要用来规避发生故障的broker
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn("sendKernelImpl exception", e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }

        // 返回发送结果
        if (sendResult != null) {
            return sendResult;
        }

        // 根据不同情况,抛出不同异常
        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

        MQClientException mqClientException = new MQClientException(info, exception);
        if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }
    // 检查能否找到NameSrv
    validateNameServerSetting();
    // 找不到消息路由的异常
    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
  • 先获取topic路由信息,详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo
  • 计算允许发送直到成功的最大次数,并进行循环。同步默认配置(2+1)次,异步和oneway只有1次
  • 根据topic路由负载算法选择一个消息队列用于发送消息,详细解析见:MQFaultStrategy#selectOneMessageQueue
  • 调用DefaultMQProducerImpl#sendKernelImpl方法,producer将消息通过和Broker之间建立的长连接发送给Broker,Broker存储接收到的消息,并返回给producer发送结果的状态,这是发送方发送消息的核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl
  • 更新Broker可用信息,主要是再次选择用于发送消息的消息队列时,会参考broker发送消息的延迟,详细解析见:MQFaultStrategy
DefaultMQProducerImpl#tryToFindTopicPublishInfo 查询路由信息
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 先从内存中获取可用的topic路由信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        // 当内存没有可用的路由信息,尝试从Nameserver中获取Topic路由信息,并更新本地缓存
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    // 如果找到可用的路由信息
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        /**
         * 更新Topic路由信息
         */
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
  • 先从缓存中获取可用的路由信息,topicPublishInfoTable是个ConcurrentMap变量,保存了topic和消息队列的映射关系
  • 从namesrv中获取topic路由信息
  • 如果从缓存和Namesrv中都没有找到有用的路由信息,调用MQClientInstance#updateTopicRouteInfoFromNameServer创建topic路由信息
MQFaultStrategy#selectOneMessageQueue
/**
 * 选择一个消息队列发送消息
 *
 * @param tpInfo
 * @param lastBrokerName
 * @return
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            // SendWhichQueue是个本地线程变量 ThreadLocal,保存上一次发送的消息队列下标
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            /**
             * 对Topic所有队列进行验证,因为加入了发送异常延时,确保消息队列(MessageQueue)所在的Broker是正常的
             */
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                /**
                 *  判断当前的消息队列是否可用
                 *  一旦一个 MessageQueue 符合条件,即刻返回,但该 Topic 所在的所有Broker全部标记不可用时,
                 *  进入到下一步逻辑处理
                 */
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }

            /**
             * 根据 Broker 的 startTimestart 进行一个排序,值越小,排前面,然后再选择一个,
             * 返回(此时不能保证一定可用,会抛出异常,如果消息发送方式是同步调用,则有重试机制)。
             */
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
DefaultMQProducerImpl#sendKernelImpl
/**
 * 通过Producer与Broker的长连接将消息发送给Broker,然后Broker将消息存储,并返回生产者
 * producer 发送消息的核心函数
 */
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    /**
     * 获取Broker地址
     */
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        /**
         * 查询Topic路由信息
         * 先从内存中获取
         */
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }

            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }

            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }

            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                // 设置消息标记类型为TRANSACTION_PREPARED_TYPE;表示消息为预提交
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }

            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }

            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }

            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }

                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }

            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                    break;
                default:
                    assert false;
                    break;
            }

            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }

            return sendResult;
        } 
        ......
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
MQFaultStrategy

上一篇:RabbitMQ入门案例


下一篇:RocketMQ-broker状态管理及数据统计