rocketmq 消费负载均衡解析

 1 MQClientInstance
 2 
 3 public void start() throws MQClientException {
 4     
 5            //各种线程的start
 6           //NettyRemotingClient实现Netty客户器端功能,接受数据包,在客户器端处理后发送给服务端。
 7            this.mQClientAPIImpl.start();
 8           //获取nameserver的地址 默认每3秒更新Topic规则 清除下线的borken并且发送心跳到broken  持久化消费者进度 适配执行消费请求的线程池的核心线程书大小
 9                 this.startScheduledTask();
10           //启动消息拉取服务,循环拉取阻塞队列pullRequestQueue
11                 this.pullMessageService.start();
12           //负载均衡服务开始
13                 this.rebalanceService.start();
14                 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
15                 this.log.info("the client factory [{}] start OK", this.clientId);
16                 this.serviceState = ServiceState.RUNNING;
17 }    

 解析this.rebalanceService.start();

 1     @Override
 2     public void run() {
 3         log.info(this.getServiceName() + " service started");
 4 
 5         while (!this.isStopped()) {
        //CAS操作 每隔20s会重新负载均衡一次,即如果一个消费者宕机了,最多20s就可以让其他消费者接替继续消费 6 this.waitForRunning(waitInterval); 7 this.mqClientFactory.doRebalance(); 8 } 9 10 log.info(this.getServiceName() + " service end"); 11 }

 1    public void doRebalance() {
      //key=topic value=DefaultMQPushConsumerImpl 2 Iterator var1 = this.consumerTable.entrySet().iterator(); 3 4 while(var1.hasNext()) { 5 Entry<String, MQConsumerInner> entry = (Entry)var1.next(); 6 MQConsumerInner impl = (MQConsumerInner)entry.getValue(); 7 if (impl != null) { 8 try { 9 impl.doRebalance(); 10 } catch (Throwable var5) { 11 this.log.error("doRebalance exception", var5); 12 } 13 } 14 } 15 16 }

对topic层次来进行负载均衡
 1     private void rebalanceByTopic(final String topic, final boolean isOrder) {
 2         switch (messageModel) {
 3             case BROADCASTING: {
 4                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 5                 if (mqSet != null) {
 6                     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
 7                     if (changed) {
 8                         this.messageQueueChanged(topic, mqSet, mqSet);
 9                         log.info("messageQueueChanged {} {} {} {}",
10                             consumerGroup,
11                             topic,
12                             mqSet,
13                             mqSet);
14                     }
15                 } else {
16                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
17                 }
18                 break;
19             }
20             case CLUSTERING: {
           //获取该topic所在的队列信息,因为一个topic可能在多个队列中,所以结果是个集合。topic+brokerName+queueId 21 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
           //获取此时在消费这个topic的客户端信息 22 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 23 if (null == mqSet) { 24 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 25 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 26 } 27 } 28 29 if (null == cidAll) { 30 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 31 } 32 33 if (mqSet != null && cidAll != null) { 34 List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); 35 mqAll.addAll(mqSet); 36 37 Collections.sort(mqAll); 38 Collections.sort(cidAll); 39             //获取负载均衡实行策略,默认AllocateMessageQueueAveragely 40 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 41 42 List<MessageQueue> allocateResult = null; 43 try {
                //真正去实行策略,获取该客户端下分配到的 消费队列 44 allocateResult = strategy.allocate( 45 this.consumerGroup, 46 this.mQClientFactory.getClientId(), 47 mqAll, 48 cidAll); 49 } catch (Throwable e) { 50 log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), 51 e); 52 return; 53 } 54 55 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 56 if (allocateResult != null) { 57 allocateResultSet.addAll(allocateResult); 58 } 59             //更新processQueueTable ,processQueueTable表示当前客户端正在处理的queue。
              //比如:负载均衡后发现当前客户端没有分配到可以消费的queue,即allcateResultSet=空,则删除掉processQueueTable中该topic下面的queue数据 60 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 61 if (changed) { 62 log.info( 63 "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 64 strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 65 allocateResultSet.size(), allocateResultSet); 66 this.messageQueueChanged(topic, mqSet, allocateResultSet); 67 } 68 } 69 break; 70 } 71 default: 72 break; 73 } 74 }

 1     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
 2         List<String> cidAll) {
 3         if (currentCID == null || currentCID.length() < 1) {
 4             throw new IllegalArgumentException("currentCID is empty");
 5         }
 6         if (mqAll == null || mqAll.isEmpty()) {
 7             throw new IllegalArgumentException("mqAll is null or mqAll empty");
 8         }
 9         if (cidAll == null || cidAll.isEmpty()) {
10             throw new IllegalArgumentException("cidAll is null or cidAll empty");
11         }
12 
13         List<MessageQueue> result = new ArrayList<MessageQueue>();
14         if (!cidAll.contains(currentCID)) {
15             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
16                 consumerGroup,
17                 currentCID,
18                 cidAll);
19             return result;
20         }
21      //算法,看不懂
22         int index = cidAll.indexOf(currentCID);
23         int mod = mqAll.size() % cidAll.size();
24         int averageSize =
25             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
26                 + 1 : mqAll.size() / cidAll.size());
27         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
28         int range = Math.min(averageSize, mqAll.size() - startIndex);
29         for (int i = 0; i < range; i++) {
30             result.add(mqAll.get((startIndex + i) % mqAll.size()));
31         }
32         return result;
33     }

 

 

 

 

 

4. Rebalance的潜在危害:机器扩容也有可能会导致重复消费
消费暂停:考虑在只有Consumer 1的情况下,其负责消费所有5个队列;在新增Consumer 2,触发Rebalance时,需要分配2个队列给其消费。那么Consumer 1就需要停止这2个队列的消费,等到这两个队列分配给Consumer 2后,这两个队列才能继续被消费。
重复消费:Consumer 2 在消费分配给自己的2个队列时,必须接着从Consumer 1之前已经消费到的offset继续开始消费。然而默认情况下,offset是异步提交的,如consumer 1当前消费到offset为10,但是异步提交给broker的offset为8;此时consumer2通过Rebalance后开始进行消费,那么如果consumer 2从8的offset开始消费,那么就会有2条消息重复。也就是说,Consumer 2 并不会等待Consumer1提交完offset后,再进行Rebalance,因此提交间隔越长,可能造成的重复消费就越多。
消费突刺:由于rebalance可能导致重复消费,如果需要重复消费的消息过多;或者因为rebalance暂停时间过长,导致积压了部分消息。那么都有可能导致在rebalance结束之后瞬间可能需要消费很多消息。

详见:https://blog.csdn.net/cxyatuo/article/details/104426633

上一篇:RocketMQ如何保证消息的可靠性投递?


下一篇:rocketmq实现延迟队列精确到秒级实现方案3-时间轮和秒级文件实现