Rebalance

RebalanceService

run

public void run() {
	log.info(this.getServiceName() + " service started");

	while (!this.isStopped()) {
		this.waitForRunning(waitInterval); // 等待20s,然后超时自动释放锁执行doRebalance
		this.mqClientFactory.doRebalance(); // 具体逻辑
	} 
	log.info(this.getServiceName() + " service end");
}

doRebalance

public void doRebalance() {
	for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
		MQConsumerInner impl = entry.getValue();
		if (impl != null) {
			try { // 其中之一是DefaultMQPushConsumerImpl
				impl.doRebalance(); // 具体逻辑
			} catch (Throwable e) {
				log.error("doRebalance exception", e);
			}
		}
	}
}

DefaultMQPushConsumerImpl

doRebalance

public void doRebalance() {
	if (!this.pause) { // RebalancePushImpl
		this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
	}
}

RebalanceImpl

doRebalance

public void doRebalance(final boolean isOrder) { // isOrder=true 顺序消息
	Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); // key:topic  该topic订阅信息 push方式 consumerGroup=null
	if (subTable != null) {
		for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
			final String topic = entry.getKey();  //topic
			try {
				this.rebalanceByTopic(topic, isOrder);
			} catch (Throwable e) {
				if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
					log.warn("rebalanceByTopic Exception", e);
				}
			}
		}
	}

	this.truncateMessageQueueNotMyTopic();
}

rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {
	switch (messageModel) {
		case BROADCASTING: { // 广播
			Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
			if (mqSet != null) {
				boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
				if (changed) {
					this.messageQueueChanged(topic, mqSet, mqSet);
					log.info("messageQueueChanged {} {} {} {}",
						consumerGroup,
						topic,
						mqSet,
						mqSet);
				}
			} else {
				log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
			}
			break;
		}
		case CLUSTERING: { // 集群
			Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 根据topic获得MessageQueue  总的MessageQueue
			List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // todo 得到某个broker下,该consumerGroup下的所有的消费者 (相当于全部消费者)
			if (null == mqSet) {
				if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
					log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
				}
			}

			if (null == cidAll) {
				log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
			}

			if (mqSet != null && cidAll != null) {
				List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
				mqAll.addAll(mqSet);

				Collections.sort(mqAll); // 根据queueId排序
				Collections.sort(cidAll);

				AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

				List<MessageQueue> allocateResult = null;
				try { // todo 重新负载均衡
					allocateResult = strategy.allocate( // todo AllocateMessageQueueStrategy
						this.consumerGroup,
						this.mQClientFactory.getClientId(), // 当前ClientId
						mqAll,
						cidAll);
				} catch (Throwable e) {
					log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
						e);
					return;
				}

				Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
				if (allocateResult != null) {
					allocateResultSet.addAll(allocateResult);
				}

				boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
				if (changed) {
					.........................
					this.messageQueueChanged(topic, mqSet, allocateResultSet);
				}
			}
			break;
		}
		default:
			break;
	}
}
上一篇:《大数据、小数据、无数据:网络世界的数据学术》一 1.4 六项挑战


下一篇:prometheus 监控kafka(章节六)