RebalanceService
run
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval); // 等待20s,然后超时自动释放锁执行doRebalance
this.mqClientFactory.doRebalance(); // 具体逻辑
}
log.info(this.getServiceName() + " service end");
}
doRebalance
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try { // 其中之一是DefaultMQPushConsumerImpl
impl.doRebalance(); // 具体逻辑
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
DefaultMQPushConsumerImpl
doRebalance
public void doRebalance() {
if (!this.pause) { // RebalancePushImpl
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
RebalanceImpl
doRebalance
public void doRebalance(final boolean isOrder) { // isOrder=true 顺序消息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); // key:topic 该topic订阅信息 push方式 consumerGroup=null
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey(); //topic
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: { // 广播
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: { // 集群
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 根据topic获得MessageQueue 总的MessageQueue
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // todo 得到某个broker下,该consumerGroup下的所有的消费者 (相当于全部消费者)
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll); // 根据queueId排序
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try { // todo 重新负载均衡
allocateResult = strategy.allocate( // todo AllocateMessageQueueStrategy
this.consumerGroup,
this.mQClientFactory.getClientId(), // 当前ClientId
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
.........................
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}