消费者协调器和组协调器
了解了 Kafka 中消费者的分区分配策略之后是否会有这样的疑问:如果消费者客户端中配置了两个分配策略,那么以哪个为准呢?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?多个消费者之间的分区分配是需要协同的,那么这个协同的过程又是怎样的呢?这一切都是交由消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)来完成的,它们之间使用一套组协调协议进行交互。
旧版消费者客户端的问题
消费者协调器和组协调器的概念是针对新版的消费者客户端而言的,Kafka 建立之初并没有它们。旧版的消费者客户端是使用 ZooKeeper 的监听器(Watcher)来实现这些功能的。
每个消费组(<group>)在 ZooKeeper 中都维护了一个 /consumers/<group>/ids 路径,在此路径下使用临时节点记录隶属于此消费组的消费者的唯一标识(consumerIdString),consumerIdString 由消费者启动时创建。消费者的唯一标识由 consumer.id+主机名+时间戳+UUID 的部分信息构成,其中 consumer.id 是旧版消费者客户端中的配置,相当于新版客户端中的 client.id。比如某个消费者的唯一标识为 consumerId_localhost-1510734527562-64b377f5,那么其中 consumerId 为指定的 consumer.id,localhost 为计算机的主机名,1510734527562 代表时间戳,而 64b377f5 表示 UUID 的部分信息。
参考下图,与 /consumers/<group>/ids 同级的还有两个节点:owners 和 offsets,/consumers/<group>/owner 路径下记录了分区和消费者的对应关系,/consumers/<group>/offsets 路径下记录了此消费组在分区中对应的消费位移。
每个 broker、主题和分区在 ZooKeeper 中也都对应一个路径:/brokers/ids/<id> 记录了 host、port 及分配在此 broker 上的主题分区列表;/brokers/topics/<topic> 记录了每个分区的 leader 副本、ISR 集合等信息。/brokers/topics/<topic>/partitions/<partition>/state记录了当前 leader 副本、leader_epoch 等信息。
每个消费者在启动时都会在 /consumers/<group>/ids 和 /brokers/ids 路径上注册一个监听器。当 /consumers/<group>/ids 路径下的子节点发生变化时,表示消费组中的消费者发生了变化;当 /brokers/ids 路径下的子节点发生变化时,表示 broker 出现了增减。这样通过 ZooKeeper 所提供的 Watcher,每个消费者就可以监听消费组和 Kafka 集群的状态了。
这种方式下每个消费者对 ZooKeeper 的相关路径分别进行监听,当触发再均衡操作时,一个消费组下的所有消费者会同时进行再均衡操作,而消费者之间并不知道彼此操作的结果,这样可能导致 Kafka 工作在一个不正确的状态。与此同时,这种严重依赖于 ZooKeeper 集群的做法还有两个比较严重的问题。
- 羊群效应(Herd Effect):所谓的羊群效应是指ZooKeeper 中一个被监听的节点变化,大量的 Watcher 通知被发送到客户端,导致在通知期间的其他操作延迟,也有可能发生类似死锁的情况。
- 脑裂问题(Split Brain):消费者进行再均衡操作时每个消费者都与 ZooKeeper 进行通信以判断消费者或broker变化的情况,由于 ZooKeeper 本身的特性,可能导致在同一时刻各个消费者获取的状态不一致,这样会导致异常问题发生。
再均衡的原理
新版的消费者客户端对此进行了重新设计,将全部消费组分成多个子集,每个消费组的子集在服务端对应一个 GroupCoordinator 对其进行管理,GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。而消费者客户端中的 ConsumerCoordinator 组件负责与 GroupCoordinator 进行交互。
ConsumerCoordinator 与 GroupCoordinator 之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。就目前而言,一共有如下几种情形会触发再均衡的操作:
- 有新的消费者加入消费组。
- 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者已经下线。
- 有消费者主动退出消费组(发送 LeaveGroupRequest 请求)。比如客户端调用了 unsubscrible() 方法取消对某些主题的订阅。
- 消费组所对应的 GroupCoorinator 节点发生了变更。
- 消费组内所订阅的任一主题或者主题的分区数量发生变化。
下面就以一个简单的例子来讲解一下再均衡操作的具体内容。当有消费者加入消费组时,消费者、消费组及组协调器之间会经历一下几个阶段。
第一阶段(FIND_COORDINATOR)
消费者需要确定它所属的消费组对应的 GroupCoordinator 所在的 broker,并创建与该 broker 相互通信的网络连接。如果消费者已经保存了与消费组对应的 GroupCoordinator 节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的 GroupCoordinator,这里的“某个节点”并非是集群中的任意节点,而是负载最小的节点。
如下图所示,FindCoordinatorRequest 请求体中只有两个域(Field):coordinator_key 和 coordinator_type。coordinator_key 在这里就是消费组的名称,即 groupId,coordinator_type 置为0。这个 FindCoordinatorRequest 请求还会在 Kafka 事务中提及,为了便于说明问题,这里我们暂且忽略它。
Kafka 在收到 FindCoordinatorRequest 请求之后,会根据 coordinator_key(也就是 groupId)查找对应的 GroupCoordinator 节点,如果找到对应的 GroupCoordinator 则会返回其相对应的 node_id、host 和 port信息。
具体查找 GroupCoordinator 的方式是先根据消费组 groupId 的哈希值计算 __consumer_offsets 中的分区编号,具体算法如代码清单12-1所示:
代码清单12-1 消费组所对应的分区号的计算方式
Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
其中 groupId.hashCode 就是使用 Java 中 String 类的 hashCode() 方法获得的,groupMetadataTopicPartitionCount 为主题 __consumer_offsets 的分区个数,这个可以通过 broker 端参数 offsets.topic.num.partitions 来配置,默认值为50。
找到对应的 __consumer_offsets 中的分区之后,再寻找此分区 leader 副本所在的 broker 节点,该 broker 节点即为这个 groupId 所对应的 GroupCoordinator 节点。消费者 groupId 最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给此分区 leader 副本所在的 broker 节点,让此 broker 节点既扮演 GroupCoordinator 的角色,又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。
第二阶段(JOIN_GROUP)
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。
如上图所示,JoinGroupRequest 的结构包含多个域:
- group_id 就是消费组的 id,通常也表示为 groupId。
- session_timout 对应消费端参数 session.timeout.ms,默认值为10000,即10秒。GroupCoordinator 超过 session_timeout 指定的时间内没有收到心跳报文则认为此消费者已经下线。
- rebalance_timeout 对应消费端参数 max.poll.interval.ms,默认值为300000,即5分钟。表示当消费组再平衡的时候,GroupCoordinator 等待各个消费者重新加入的最长等待时间。
- member_id 表示 GroupCoordinator 分配给消费者的id标识。消费者第一次发送 JoinGroupRequest 请求的时候此字段设置为 null。
- protocol_type 表示消费组实现的协议,对于消费者而言此字段值为“consumer”。
JoinGroupRequest 中的 group_protocols 域为数组类型,其中可以囊括多个分区分配策略,这个主要取决于消费者客户端参数 partition.assignment.strategy 的配置。如果配置了多种策略,那么 JoinGroupRequest 中就会包含多个 protocol_name 和 protocol_metadata。其中 protocol_name 对应于 PartitionAssignor 接口中的 name() 方法,我们在讲述消费者分区分配策略的时候提及过相关内容。而 protocol_metadata 和 PartitionAssignor 接口中的 subscription() 方法有直接关系,protocol_metadata 是一个 bytes 类型,其实质上还可以更细粒度地划分为 version、topics 和 user_data,如下图所示。
version 占2个字节,目前其值固定为0;topics 对应 PartitionAssignor 接口的 subscription() 方法返回值类型 Subscription 中的 topics,代表一个主题列表;user_data 对应 Subscription 中的 userData,可以为空。
如果是原有的消费者重新加入消费组,那么在真正发送 JoinGroupRequest 请求之前还要执行一些准备工作:
- 如果消费端参数 enable.auto.commit 设置为 true(默认值也为 true),即开启自动提交位移功能,那么在请求加入消费组之前需要向 GroupCoordinator 提交消费位移。这个过程是阻塞执行的,要么成功提交消费位移,要么超时。
- 如果消费者添加了自定义的再均衡监听器(ConsumerRebalanceListener),那么此时会调用 onPartitionsRevoked() 方法在重新加入消费组之前实施自定义的规则逻辑,比如清除一些状态,或者提交消费位移等。
- 因为是重新加入消费组,之前与 GroupCoordinator 节点之间的心跳检测也就不需要了,所以在成功地重新加入消费组之前需要禁止心跳检测的运作。
消费者在发送 JoinGroupRequest 请求之后会阻塞等待 Kafka 服务端的响应。服务端在收到 JoinGroupRequest 请求后会交由 GroupCoordinator 来进行处理。GroupCoordinator 首先会对 JoinGroupRequest 请求做合法性校验,比如 group_id 是否为空、当前 broker 节点是否是请求的消费者组所对应的组协调器、rebalance_timeout 的值是否在合理的范围之内。如果消费者是第一次请求加入消费组,那么 JoinGroupRequest 请求中的 member_id 值为 null,即没有它自身的唯一标志,此时组协调器负责为此消费者生成一个 member_id。这个生成的算法很简单,具体如以下伪代码所示。
String memberId = clientId + “-” + UUID.randomUUID().toString();
其中 clientId 为消费者客户端的 clientId,对应请求头中的 client_id。由此可见消费者的 member_id 由 clientId 和 UUID 用“-”字符拼接而成。
选举消费组的leader
GroupCoordinator 需要为消费组内的消费者选举出一个消费组的 leader,这个选举的算法也很简单,分两种情况分析。如果消费组内还没有 leader,那么第一个加入消费组的消费者即为消费组的 leader。如果某一时刻 leader 消费者由于某些原因退出了消费组,那么会重新选举一个新的 leader,这个重新选举 leader 的过程又更“随意”了,相关代码如下:
private val members = new mutable.HashMap[String, MemberMetadata]
var leaderId = members.keys.head
解释一下这2行代码:在 GroupCoordinator 中消费者的信息是以 HashMap 的形式存储的,其中 key 为消费者的 member_id,而 value 是消费者相关的元数据信息。leaderId 表示 leader 消费者的 member_id,它的取值为 HashMap 中的第一个键值对的 key,这种选举的方式基本上和随机无异。总体上来说,消费组的 leader 选举过程是很随意的。
选举分区分配策略
每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配。这个分区分配的选举并非由 leader 消费者决定,而是根据消费组内的各个消费者投票来决定的。这里所说的“根据组内的各个消费者投票来决定”不是指 GroupCoordinator 还要再与各个消费者进行进一步交互,而是根据各个消费者呈报的分配策略来实施。最终选举的分配策略基本上可以看作被各个消费者支持的最多的策略,具体的选举过程如下:
- 收集各个消费者支持的所有分配策略,组成候选集 candidates。
- 每个消费者从候选集 candidates 中找出第一个自身支持的策略,为这个策略投上一票。
- 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
如果有消费者并不支持选出的分配策略,那么就会报出异常 IllegalArgumentException:Member does not support protocol。需要注意的是,这里所说的“消费者所支持的分配策略”是指 partition.assignment.strategy 参数配置的策略,如果这个参数值只配置了 RangeAssignor,那么这个消费者客户端只支持 RangeAssignor 分配策略,而不是消费者客户端代码中实现的3种分配策略及可能的自定义分配策略。
在此之后,Kafka 服务端就要发送 JoinGroupResponse 响应给各个消费者,leader 消费者和其他普通消费者收到的响应内容并不相同,首先我们看一下 JoinGroupResponse 的具体结构,如下图所示。
JoinGroupResponse 包含了多个域,其中 generation_id 用来标识当前消费组的年代信息,避免受到过期请求的影响。leader_id 表示消费组 leader 消费者的 member_id。
Kafka 发送给普通消费者的 JoinGroupResponse 中的 members 内容为空,而只有 leader 消费者的 JoinGroupResponse 中的 members 包含有效数据。members 为数组类型,其中包含各个成员信息。member_metadata 为消费者的订阅信息,与 JoinGroupRequest 中的 protocol_metadata 内容相同,不同的是 JoinGroupRequest 可以包含多个 <protocol_name, protocol_metadata> 的键值对,在收到 JoinGroupRequest 之后,GroupCoordinator 已经选举出唯一的分配策略。也就是说,protocol_name 已经确定(group_protocol),那么对应的 protocol_metadata 也就确定了,最终各个消费者收到的 JoinGroupResponse 响应中的 member_metadata 就是这个确定了的 protocol_metadata。由此可见,Kafka 把分区分配的具体分配交还给客户端,自身并不参与具体的分配细节,这样即使以后分区分配的策略发生了变更,也只需要重启消费端的应用即可,而不需要重启服务端。
本阶段的内容可以简要概括为下面2张图。
第三阶段(SYNC_GROUP)
leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,此时 leader 消费者并不是直接和其余的普通消费者同步分配方案,而是通过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的。在第三阶段,也就是同步阶段,各个消费者会向 GroupCoordinator 发送 SyncGroupRequest 请求来同步分配方案,如下图所示。
我们再来看一下 SyncGroupRequest 请求的具体结构,如下图所示。SyncGroupRequest 中的 group_id、generation_id 和 member_id 前面都有涉及,这里不再赘述。只有 leader 消费者发送的 SyncGroupRequest 请求中才包含具体的分区分配方案,这个分配方案保存在 group_assignment 中,而其余消费者发送的 SyncGroupRequest 请求中的 group_assignment 为空。
group_assignment 是一个数组类型,其中包含了各个消费者对应的具体分配方案:member_id 表示消费者的唯一标识,而 member_assignment 是与消费者对应的分配方案,它还可以做更具体的划分,member_assignment 的结构如下图所示。
与 JoinGroupRequest 请求中的 protocol_metadata 类似,都可以细分为3个更具体的字段,只不过 protocol_metadata 存储的是主题的列表信息,而 member_assignment 存储的是分区信息,member_assignment 中可以包含多个主题的多个分区信息。
服务端在收到消费者发送的 SyncGroupRequest 请求之后会交由 GroupCoordinator 来负责具体的逻辑处理。GroupCoordinator 同样会先对 SyncGroupRequest 请求做合法性校验,在此之后会将从 leader 消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入 Kafka 的 __consumer_offsets 主题中,最后发送响应给各个消费者以提供给各个消费者各自所属的分配方案。
目录
这里所说的响应就是指 SyncGroupRequest 请求对应的 SyncGroupResponse,SyncGroupResponse 的内容很简单,里面包含的就是消费者对应的所属分配方案,SyncGroupResponse 的结构如下图所示,具体字段的释义可以从前面的内容中推测出来,这里就不赘述了。
当消费者收到所属的分配方案之后会调用 PartitionAssignor 中的 onAssignment() 方法。随后再调用 ConsumerRebalanceListener 中的 OnPartitionAssigned() 方法。之后开启心跳任务,消费者定期向服务端的 GroupCoordinator 发送 HeartbeatRequest 来确定彼此在线。
消费组元数据信息
我们知道消费者客户端提交的消费位移会保存在 Kafka 的 __consumer_offsets 主题中,这里也一样,只不过保存的是消费组的元数据信息(GroupMetadata)。具体来说,每个消费组的元数据信息都是一条消息,不过这类消息并不依赖于具体版本的消息格式,因为它只定义了消息中的 key 和 value 字段的具体内容,所以消费组元数据信息的保存可以做到与具体的消息格式无关。
上图中对应的就是消费组元数据信息的具体内容格式,上面是消息的 key,下面是消息的 value。可以看到 key 和 value 中都包含 version 字段,用来标识具体的 key 和 value 的版本信息,不同的版本对应的内容格式可能并不相同,就目前版本而言,key 的 version 为2,而 value 的 version 为1,读者在理解时其实可以忽略这个字段而探究其他具备特定含义的内容。key 中除了 version 就是 group 字段,它表示消费组的名称,和 JoinGroupRequest 或 SyncGroupRequest 请求中的 group_id 是同一个东西。虽然 key 中包含了 version 字段,但确定这条信息所要存储的分区还是根据单独的 group 字段来计算的,这样就可以保证消费组的元数据信息与消费组对应的 GroupCoordinator 处于同一个 broker 节点上,省去了中间轮转的开销。
value 中包含的内容有很多,可以参照和 JoinGroupRequest 或 SyncGroupRequest 请求中的内容来理解,具体各个字段的释义如下。
- protocol_type:消费组实现的协议,这里的值为“consumer”。
- generation:标识当前消费组的年代信息,避免收到过期请求的影响。
- protocol:消费组选取的分区分配策略。
- leader:消费组的 leader 消费者的名称。
- members:数组类型,其中包含了消费组的各个消费者成员信息,上图中右边部分就是消费者成员的具体信息,每个具体字段都比较容易辨别,需要着重说明的是 subscription 和 assignment 这两个字段,分别代码消费者的订阅信息和分配信息。
第四阶段(HEARTBEAT)
进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了 GroupCoordinator,并且 GroupCoordinator 将其保存到了 Kafka 内部的 __consumer_offsets 主题中,此时消费者可以通过 OffsetFetchRequest 请求获取上次提交的消费位移并从此处继续消费。
消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长,则整个会话就被判定为过期,GroupCoordinator 也会认为这个消费者已经死亡,就会触发一次再均衡行为。
消费者的心跳间隔时间由参数 heartbeat.interval.ms 指定,默认值为3000,即3秒,这个参数必须比 session.timeout.ms 参数设定的值要小,一般情况下 heartbeat.interval.ms 的配置值不能超过 session.timeout.ms 配置值的1/3。这个参数可以调整得更低,以控制正常重新平衡的预期时间。
如果一个消费者发生崩溃,并停止读取消息,那么 GroupCoordinator 会等待一小段时间,确认这个消费者死亡之后才会触发再均衡。在这一小段时间内,死掉的消费者并不会读取分区里的消息。这个一小段时间由 session.timeout.ms 参数控制,该参数的配置值必须在broker端参数 group.min.session.timeout.ms(默认值为6000,即6秒)和 group.max.session.timeout.ms(默认值为300000,即5分钟)允许的范围内。
还有一个参数 max.poll.interval.ms,它用来指定使用消费者组管理时 poll() 方法调用之间的最大延迟,也就是消费者在获取更多消息之前可以空闲的时间量的上限。如果此超时时间期满之前 poll() 没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。
除了被动退出消费组,还可以使用 LeaveGroupRequest 请求主动退出消费组,比如客户端调用了 unsubscrible() 方法取消对某些主题的订阅,这个比较简单,这里就不再赘述了。