kafka Poll轮询机制与消费者组的重平衡分区策略剖析

注意本文采用最新版本进行Kafka的内核原理剖析,新版本每一个Consumer通过独立的线程,来管理多个Socket连接,即同时与多个broker通信实现消息的并行读取。这就是新版的技术革新。类似于Linux I/O模型或者Select NIO 模型。

Poll为什么要设置一个超时参数

  • 条件:
  • 1:获取足够多的可用数据
  • 2:等待时间超过指定的超时时间。
  • 目的在于让Consumer主线程定期的””苏醒”去做其他事情。比如:定期的执行常规任务,(比如写日志,写库等)。
  • 获取消息,然后执行业务逻辑。

位移精度

  • 最少一次 -> 消息会被重复处理
  • 最多一次 -> 消息会丢失,但不会被重复处理。
  • 精确一次 -> 一定会被处理,且也只会处理一次。

位移角色

  • 上次提交位移 :last committed offset
  • 当前位置 :current position
  • 水位 : High watermark
  • 日志终端位移: (Log End Offset)

位移管理

consumer的位移提交最终会向group coordinator来提交,不过这里重点需要重新说明一下:组协调者coordinator负责管理所有的Consumer实例。而且coordinator运行在broker上(通过选举出某个broker),不过请注意新版本coordinator只负责做组管理。

但是具体的reblance分区分配策略目前已经交由Consumer客户端。这样就解耦了组管理和分区分配。

权利下放的优势:

  • 如果需要分配就貌似需要重启整个kafka集群。
  • 在Consumer端可以定制分区分配策略。
  • 每一个consumer位移提交时,都会向_consumer_offsets对应的分区上追加写入一条消息。如果某一个consumer为同一个group的同一个topic同一个分区提交多次位移,很显然我们只关心最新一次提交的位移。

reblance的触发条件

  • 组订阅发生变更,比如基于正则表达式订阅,当匹配到新的topic创建时,组的订阅就会发生变更。
  • 组的topic分区数发生变更,通过命令行脚本增加了订阅topic的分区数。
  • 组成员发生变更:新加入组以及离开组。

reblance 分配策略

range分区分配策略

举例如下:一个拥有十个分区(0,1,2…..,9)的topic,相同group拥有三个consumerid为a,b,c的消费者:

  • consumer a分配对应的分区号为[0,4),即0,1,2,3前面四个分区

  • consumer b 分配对应分区4,5,6中间三个分区

  • consumer c 分配对应分区7,8,9最后三个分区。

    class RangeAssignor() extends PartitionAssignor with Logging {

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    def (ctx: AssignmentContext) = {
    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
    val partitionAssignment =
    new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
    for (topic <- ctx.myTopicThreadIds.keySet) {
    val curConsumers = ctx.consumersForTopic(topic)
    val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)

    val nPartsPerConsumer = curPartitions.size / curConsumers.size
    val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

    info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
    " for topic " + topic + " with consumers: " + curConsumers)

    for (consumerThreadId <- curConsumers) {
    val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
    assert(myConsumerPosition >= 0)
    val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
    val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)


    * Range-partition the sorted partitions to consumers for better locality.
    * The first few consumers pick up an extra partition, if any.
    */
    if (nParts <= 0)
    warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
    else {
    for (i <- startPart until startPart + nParts) {
    val partition = curPartitions(i)
    大专栏
上一篇:驱动篇:支持轮询操作的 globalfifo 驱动


下一篇:2.do_select函数分析