Rocketmq源码解析-Producer部分之消息重试和延迟容错逻辑(2)

问题点

1.消息发送失败了怎么办(网络原因,broker挂掉)?发送端如何实现的高可用?
2.消息队列是如何选择的,即producer向哪个消息队列里发送消息?
3.为什么要单独设计一个broker故障延迟机制呢?

生产者消息重试

生产者在发送消息的时候,3种通信模式默认都不进行重试(同步、异步、oneway)。
消息重试原则上是可以保证消息发送成功并且不丢失,但是消息重试可能会造成消息重复消费问题,所以Rocketmq是不保证消息幂等性的,所以开发者需自行保证幂等性。

Rocketmq开启消息重试配置:

    // 消息发送失败重试次数,默认2次
    producer.setRetryTimesWhenSendFailed(2);
    // 消息重试配置,消息没有存储成功是否发送到另外一个broker
    producer.setRetryAnotherBrokerWhenNotStoreOK(true);

1.同步模式下的消息重试

同步发送消息失败后进行重试,这里的重试方式基本上可以理解为重试别的Broker,该方式只有同步发送才有。具体逻辑在消息发送主逻辑方法sendDefaultImpl中。

	private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

	//省略代码
 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            //记录重试时候发送消息目标Broker名字的数组
            String[] brokersSent = new String[timesTotal];
			//进行重试次数的循环发送消息逻辑
            for (; times < timesTotal; times++) {
            	//选择一个(brokerName!=lastBrokerName)的消息队列,此处进行重试别的broker
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
				//发送消息
				//省略代码
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                            //发送的结果如果不是 SEND_OK ,如果开启了重试,则进行选择别的Broker进行重试
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }								
			}
		//省略代码
	}
 public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //如果lastBrokerName为空,说明第一次正常发送
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //如果lastBrokerName不为空,说明进行了重试
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                //获取一个队列(brokerName!=lastBrokerName)的消息队列
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //如果还没有获取到消息对接,则再进行一次选择
            return selectOneMessageQueue();
        }
    }

2.异步模式下的消息重试

异步模式下发送消息重试。这个重试次数是有Producer设置的retryTimesWhenSendAsyncFailed,具体重试逻辑是在MQClientAPIImpl,方法为onExceptionImpl

    private void onExceptionImpl(final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int timesTotal,
        final AtomicInteger curTimes,
        final Exception e,
        final SendMessageContext context,
        final boolean needRetry,
        final DefaultMQProducerImpl producer
    ) {
    	//当前重试次数自增+1
        int tmp = curTimes.incrementAndGet();
        //是否需要重试&&当前重试次数<=配置的全局重试次数
        if (needRetry && tmp <= timesTotal) {
            //默认,它仍然发送给同一个broker,即上一次发送失败的broker
            String retryBrokerName = brokerName;//by default, it will send to the same broker
            //如果topic订阅信息不为空
            if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
               //选择一个MessageQueue
                MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
                //
                retryBrokerName = mqChosen.getBrokerName();
            }
            //获取对应的broker地址
            String addr = instance.findBrokerAddressInPublish(retryBrokerName);
            log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
                retryBrokerName);
            try {
                request.setOpaque(RemotingCommand.createNewRequestId());
                //重新异步
                sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    timesTotal, curTimes, context, producer);
            } catch (InterruptedException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingConnectException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            } catch (RemotingTooMuchRequestException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            }
        } else {
            try {
            	//上述重试无效,则执行异常的回调
                sendCallback.onException(e);
            } catch (Exception ignored) {
            }
        }
    }

消息队列选择

选择一个消息队列的逻辑是在selectOneMessageQueue中,大致流程如下。

  • 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
  • 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用latencyFaultTolerance
  • 如果上述情况失败,则随机选择一个进行发送
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    	//是否开启broker故障延迟机制,默认为false-不开启
        if (this.sendLatencyFaultEnable) {
            try {
            	//选择一个消息队列
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //判断brokerName是否可用,latencyFaultTolerance内部维护一个FaultItem的Map
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        //如果可用,则返回
                        return mq;
                }
				//从容错信息FaultItem的Map中取一个Broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                //获取该broker的可写队列queueId
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                //如果有queueId>0
                if (writeQueueNums > 0) {
                	//获取一个队列
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        //设置brokerName
                        mq.setBrokerName(notBestBroker);
                        // 队列重置
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
			//随机选择一个队列
            return tpInfo.selectOneMessageQueue();
        }
		//选择一个消息队列
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    @Override
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            return faultItem.isAvailable();
        }
        return true;
    }
    public boolean isAvailable() {
    	//当前时间-可用时间
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }
   public String pickOneAtLeast() {
        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
        List<FaultItem> tmpList = new LinkedList<FaultItem>();
        // 将faultItemTable里的元素全放到list中
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
            // 先给打乱再排序
            Collections.shuffle(tmpList);
            Collections.sort(tmpList);
        
            final int half = tmpList.size() / 2;
            if (half <= 0) {
                // 只有一个元素的情况
                return tmpList.get(0).getName();
            } else {
                // 根据half取余
                final int i = this.whichItemWorst.getAndIncrement() % half;
                return tmpList.get(i).getName();
            }
        }
        return null;
    }

Broker故障延迟机制

1.更新broker延迟信息updateFaultItem

回顾一下消息发送的流程,在消息发送异常时会调用updateFaultItem来更新broker异常信息,下面我们来具体分析一下。

   public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
    }
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        //是否开启延迟故障 默认false
        if (this.sendLatencyFaultEnable) {
            //根据消息当前延迟currentLatency计算broker故障延迟的时间duration
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            //更新故障记录
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }
    
    //   private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    //   private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    private long computeNotAvailableDuration(final long currentLatency) {
 		//举个例子:发送消息延迟为4000L,则对应的延迟为3000L,认为broker不可用时间为180000L
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
        return 0;
    }

由此分析延迟机制主要分2步

  1. 根据消息发送延迟(currentLatency),计算broker不可用时长(duration),即如果消息发送延迟越久,mq会认为broker不可用的时长越久,broker不可用时长是个经验值,如果传入isolation为true,表示默认当前发送时长为30000L,即broker不可用时长为600000L。
  2. 调用latencyFaultTolerance.updateFaultItem更新broker异常延迟容错信息。

2.延迟容错LatencyFaultTolerance

LatencyFaultTolerance是用来判断broker是否可用的。默认实现LatencyFaultToleranceImpl。内部维护了一个FaultItem的map,如果开启故障延迟机制,则会以brokerName为key,FaultItem为value添加一条记录,表示着在某个时刻之前,这个brokerName都会标记为故障。

FaultItem是内部类。主要有三个属性

参数 类型 说明
name Sting brokerName
currentLatency long 发送消息的延迟时间
startTimestamp long 在这个时刻之前,这个brokerName都标记为故障
    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
   		//根据BrokerName获取对应的故障信息
        FaultItem old = this.faultItemTable.get(name);
        //如果为空
        if (null == old) {
            //创建FaultItem对象
            final FaultItem faultItem = new FaultItem(name);
            //设置当前发送延迟时间
            faultItem.setCurrentLatency(currentLatency);
            //设置下次可用的时间 =  现在的时间 + 延迟的时间
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
			//放入map中
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }
上一篇:删掉不参与编译的.c文件代码


下一篇:Android筑基——BroadcastReceiver 的动态注册、发送和接收过程(基于api21)