kafka的Rebalance机制

选举机制
如果kafka集群有多个broker节点,消费组会选择哪个partition节点作为Coordinator节点呢?它会通过如下公式,其中的50代表着kafka内部主题consumer offset的分区总数

Math.abs(hash(groupID)) % 50
那么当前Consumer Group的Coordinator就是上述公式计算出的partition的leader partition

  1. Rebalance流程
    Coordinator发生Rebalance的时候,Coordinator并不会主动通知组内的所有Consumer重新加入组,而是当Consumer向Coordinator发送心跳的时候,Coordinator将Rebalance的状况通过心跳响应告知Consumer。Rebalance机制整体可以分为两个步骤,一个是Joining the Group,另外一个是分配Synchronizing Group State

3.1 Joining the Group
在当前这个步骤中,所有的消费者会和Coordinator交互,请求Coordinator加入当前消费组。Coordinator会从所有的消费者中选择一个消费者作为leader consumer, 选择的算法是随机选择

3.2 Synchronizing Group State
leader Consumer从Coordinator获取所有的消费者的信息,并将消费组订阅的partition分配结果封装为SyncGroup请求,需要注意的是leader Consumer不会直接与组内其它的消费者交互,leader Consumer会将SyncGroup发送给Coordinator,Coordinator再将分配结果发送给各个Consumer。分配partition有如下3种策略RangeAssignor,RoundRobinAssignor,StickyAssignor,关于这三种分配方案更详细的资料请看上一篇文章

如果leader consumer因为一些特殊原因导致分配分区失败(Coordinator通过超时的方式检测),那么Coordinator会重新要求所有的Consumer重新进行步骤Joining the Group状态

  1. Coordinator生命周期
    为了更好的了解Coordinator的职责以及Rebalance机制,笔者详细介绍一下Coordinator的生命周期

Coordinator生命周期中总共有5种状态,Down,Initialize,Stable,Joining,AwaitingSync

Down:Coordinator不会维护任何消费组状态

Initialize:Coordinator处于初始化状态,Coordinator从Zookeeper中读取相关的消费组数据,这个时候Coordinator对接受到消费者心跳或者加入组的请求都会返回错误

Stable:Coordinator处理消费者心跳请求,但是还未开始初始化generation,Coordinator正在等待消费者加入组的请求

Joining:Coordinator正在处理组内成员加入组的请求

AwaitingSync:等待leader consumer分配分区,并将分区分配结果发送给各个Consumer

这五个状态相互转换流程图示如下,其中的重点用红框标出,它们对应着Rebalance的流程步骤
kafka的Rebalance机制

  1. Generation机制
    在上文中提到消费者消费消息超时之后,如果再次尝试提交offset,就会出现如下的异常

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

出现该异常的原因是Coordinator消费组的保护机制。上文提到如果消费者消费超时,笔者称其为TimeoutConsumer,那么TimeoutConsumer就会被Coordinator从消费组中剔除,Coordinator就会进行Rebalance,将当前消费者负责的partition重新分配给其它的消费者,如果TimeoutConsumer完成了消息的消费,假设TimeoutConsumer成功提交partition的offset,那么就会出现混乱,因为TimeoutConsumer负责的partition已经被分配给了其它的消费者。Generation(代际)机制就是上述的保护机制。

Coordinator每进行一次Rebalance,就会为当前的Rebalance设置一个Generation标记,比如说第一次Rebalance标记是1,如果再次Rebalance,该标记就会成为2,消费者在提交offset的时候会将generation一同提交,Coordinator在发现TimeoutConsumer的标记已经超时的情况下会拒绝消费者提交generation标记。

Generation的机制可能会导致上一代际消费者和当前代际消费者消费相同的消息,所以消费者在消费消息的时候需要实现消息消费的幂等性,关于幂等性消费的问题笔者将会写一瓶文章详细介绍。

  1. Leader Consumer
    上文提到Leader Consumer是Coordinator在Joining the Group步骤的时候随机选择的,Leader Consumer负责组内各个Consumer的partition分配,除此之外Leader Consumer还负责整个消费组订阅的主题的监控,Leader Consumer会定期更新消费组订阅的主题信息,一旦发现主题信息发生了变化,Leader Consumer会通知Coordinator触发Rebalance机制。
上一篇:【软件测试】盘一盘工作中遇到的 MQ 异常测试


下一篇:rocketmq源码解析namesrvController启动③