1 MQClientInstance 2 3 public void start() throws MQClientException { 4 5 //各种线程的start 6 //NettyRemotingClient实现Netty客户器端功能,接受数据包,在客户器端处理后发送给服务端。 7 this.mQClientAPIImpl.start(); 8 //获取nameserver的地址 默认每3秒更新Topic规则 清除下线的borken并且发送心跳到broken 持久化消费者进度 适配执行消费请求的线程池的核心线程书大小 9 this.startScheduledTask(); 10 //启动消息拉取服务,循环拉取阻塞队列pullRequestQueue 11 this.pullMessageService.start(); 12 //负载均衡服务开始 13 this.rebalanceService.start(); 14 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); 15 this.log.info("the client factory [{}] start OK", this.clientId); 16 this.serviceState = ServiceState.RUNNING; 17 }
解析this.rebalanceService.start();
1 @Override 2 public void run() { 3 log.info(this.getServiceName() + " service started"); 4 5 while (!this.isStopped()) {
//CAS操作 每隔20s会重新负载均衡一次,即如果一个消费者宕机了,最多20s就可以让其他消费者接替继续消费 6 this.waitForRunning(waitInterval); 7 this.mqClientFactory.doRebalance(); 8 } 9 10 log.info(this.getServiceName() + " service end"); 11 }
1 public void doRebalance() {
//key=topic value=DefaultMQPushConsumerImpl 2 Iterator var1 = this.consumerTable.entrySet().iterator(); 3 4 while(var1.hasNext()) { 5 Entry<String, MQConsumerInner> entry = (Entry)var1.next(); 6 MQConsumerInner impl = (MQConsumerInner)entry.getValue(); 7 if (impl != null) { 8 try { 9 impl.doRebalance(); 10 } catch (Throwable var5) { 11 this.log.error("doRebalance exception", var5); 12 } 13 } 14 } 15 16 }
对topic层次来进行负载均衡
1 private void rebalanceByTopic(final String topic, final boolean isOrder) { 2 switch (messageModel) { 3 case BROADCASTING: { 4 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 5 if (mqSet != null) { 6 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); 7 if (changed) { 8 this.messageQueueChanged(topic, mqSet, mqSet); 9 log.info("messageQueueChanged {} {} {} {}", 10 consumerGroup, 11 topic, 12 mqSet, 13 mqSet); 14 } 15 } else { 16 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 17 } 18 break; 19 } 20 case CLUSTERING: {
//获取该topic所在的队列信息,因为一个topic可能在多个队列中,所以结果是个集合。topic+brokerName+queueId 21 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//获取此时在消费这个topic的客户端信息 22 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 23 if (null == mqSet) { 24 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 25 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 26 } 27 } 28 29 if (null == cidAll) { 30 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 31 } 32 33 if (mqSet != null && cidAll != null) { 34 List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); 35 mqAll.addAll(mqSet); 36 37 Collections.sort(mqAll); 38 Collections.sort(cidAll); 39 //获取负载均衡实行策略,默认AllocateMessageQueueAveragely 40 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 41 42 List<MessageQueue> allocateResult = null; 43 try {
//真正去实行策略,获取该客户端下分配到的 消费队列 44 allocateResult = strategy.allocate( 45 this.consumerGroup, 46 this.mQClientFactory.getClientId(), 47 mqAll, 48 cidAll); 49 } catch (Throwable e) { 50 log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), 51 e); 52 return; 53 } 54 55 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 56 if (allocateResult != null) { 57 allocateResultSet.addAll(allocateResult); 58 } 59 //更新processQueueTable ,processQueueTable表示当前客户端正在处理的queue。
//比如:负载均衡后发现当前客户端没有分配到可以消费的queue,即allcateResultSet=空,则删除掉processQueueTable中该topic下面的queue数据 60 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 61 if (changed) { 62 log.info( 63 "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 64 strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 65 allocateResultSet.size(), allocateResultSet); 66 this.messageQueueChanged(topic, mqSet, allocateResultSet); 67 } 68 } 69 break; 70 } 71 default: 72 break; 73 } 74 }
1 public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, 2 List<String> cidAll) { 3 if (currentCID == null || currentCID.length() < 1) { 4 throw new IllegalArgumentException("currentCID is empty"); 5 } 6 if (mqAll == null || mqAll.isEmpty()) { 7 throw new IllegalArgumentException("mqAll is null or mqAll empty"); 8 } 9 if (cidAll == null || cidAll.isEmpty()) { 10 throw new IllegalArgumentException("cidAll is null or cidAll empty"); 11 } 12 13 List<MessageQueue> result = new ArrayList<MessageQueue>(); 14 if (!cidAll.contains(currentCID)) { 15 log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", 16 consumerGroup, 17 currentCID, 18 cidAll); 19 return result; 20 } 21 //算法,看不懂 22 int index = cidAll.indexOf(currentCID); 23 int mod = mqAll.size() % cidAll.size(); 24 int averageSize = 25 mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() 26 + 1 : mqAll.size() / cidAll.size()); 27 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; 28 int range = Math.min(averageSize, mqAll.size() - startIndex); 29 for (int i = 0; i < range; i++) { 30 result.add(mqAll.get((startIndex + i) % mqAll.size())); 31 } 32 return result; 33 }
4. Rebalance的潜在危害:机器扩容也有可能会导致重复消费
消费暂停:考虑在只有Consumer 1的情况下,其负责消费所有5个队列;在新增Consumer 2,触发Rebalance时,需要分配2个队列给其消费。那么Consumer 1就需要停止这2个队列的消费,等到这两个队列分配给Consumer 2后,这两个队列才能继续被消费。
重复消费:Consumer 2 在消费分配给自己的2个队列时,必须接着从Consumer 1之前已经消费到的offset继续开始消费。然而默认情况下,offset是异步提交的,如consumer 1当前消费到offset为10,但是异步提交给broker的offset为8;此时consumer2通过Rebalance后开始进行消费,那么如果consumer 2从8的offset开始消费,那么就会有2条消息重复。也就是说,Consumer 2 并不会等待Consumer1提交完offset后,再进行Rebalance,因此提交间隔越长,可能造成的重复消费就越多。
消费突刺:由于rebalance可能导致重复消费,如果需要重复消费的消息过多;或者因为rebalance暂停时间过长,导致积压了部分消息。那么都有可能导致在rebalance结束之后瞬间可能需要消费很多消息。
详见:https://blog.csdn.net/cxyatuo/article/details/104426633