Kafka的消费者(五)

本文为博主自学笔记整理,内容来源于互联网,如有侵权,请联系删除。

个人笔记:https://github.com/dbses/TechNotes

01 | 消费者组到底是什么?

消费者组(Consumer Group)特性

消费者组,即 Consumer Group,是 Kafka 提供的可扩展且具有容错性的消费者机制。

组内可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

Consumer Group 有下面这三个特性:

  1. Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。
  2. Group ID 是一个字符串,在一个 Kafka 集群中是唯一的。
  3. 主题的单个分区只能分配给 Consumer Group 的某个 Consumer 实例消费。当然也可以被其他的 Group 消费。

Consumer Group 与传统的消息引擎模型

传统的消息引擎模型有两大类,分别是点对点模型(即消息队列模型)和发布 / 订阅模型。

传统的消息队列模型的缺陷在于消息一旦被消费,就会从队列中删除,而且只能被下游的一个 Consumer 消费。很显然,这种模型的性能很差,因为下游的多个 Consumer 都要“抢”这个共享消息队列的消息。

发布 / 订阅模型倒是允许消息被多个 Consumer 消费,但它的问题是分区伸缩性(scalability)不高,因为每个订阅者都必须要订阅主题的所有分区。

(Kafka 是如何解决的?)

Kafka 的 Consumer Group 避开了这两种模型的缺陷,同时又兼具它们的优点,Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。

如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。当 Consumer Group 订阅了多个主题后,组内的每个实例不要求一定要订阅主题的所有分区,它只会消费部分分区中的消息。

Group 里多少个 Consumer 合适?

那么在实际使用场景中,我怎么知道一个 Group 下该有多少个 Consumer 实例呢?理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。

举个简单的例子,假设一个 Consumer Group 订阅了 3 个主题,分别是 A、B、C,它们的分区数依次是 1、2、3,那么通常情况下,为该 Group 设置 6 个 Consumer 实例是比较理想的情形,因为它能最大限度地实现高伸缩性。

(这里理论描述值与举例值矛盾啊!)

你可能会问,我能设置小于或大于 6 的实例吗?当然可以!如果你有 3 个实例,那么平均下来每个实例大约消费 2 个分区(6 / 3 = 2);如果你设置了 8 个实例,那么很遗憾,有 2 个实例(8 – 6 = 2)将不会被分配任何分区,它们永远处于空闲状态。因此,在实际使用过程中一般不推荐设置大于总分区数的 Consumer 实例。设置多余的实例只会浪费资源,而没有任何好处。

Kafka 是怎么管理位移的?

消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在 Kafka 中,这个位置信息有个专门的术语:位移(Offset)。

其实对于 Consumer Group 而言,它是一组 KV 对,Key 是分区,V 对应 Consumer 消费该分区的最新位移。可以认为是这样的数据结构 Map<TopicPartition, Long>。

在新版本( 0.9版本)的 Consumer Group 中,Kafka 社区重新设计了 Consumer Group 的位移管理方式,采用了将位移保存在 Kafka 内部主题的方法。这个内部主题就是 __consumer_offsets。

Consumer Group 端重平衡

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

Rebalance 的触发条件有 3 个。

  1. 组成员数发生变更。
  2. 订阅主题数发生变更。
  3. 订阅主题的分区数发生变更。

举个例子,假设目前某个 Consumer Group 下有两个 Consumer,比如 A 和 B,当第三个成员 C 加入时,Kafka 会触发 Rebalance,并根据默认的分配策略重新为 A、B 和 C 分配分区,如下图所示:

Kafka的消费者(五)

Rebalance 之后的分配依然是公平的。

不过,Rebalance 也有一些缺陷。在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成,这跟 JVM 垃圾回收的 stop the world 很类似。

其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。这样的话,就不用重新创建连接其他 Broker 的 Socket 资源。

最后,Rebalance 实在是太慢了。曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至少现在还没有特别好的解决方案。所谓“本事大不如不摊上”,也许最好的解决方案就是避免 Rebalance 的发生吧。

02 | 揭开神秘的“位移主题”面纱

__consumer_offsets 在 Kafka 源码中有个更为正式的名字,叫位移主题,即 Offsets Topic。

引入位移主题的背景及原因

老版本 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。

但是,ZooKeeper 其实并不适用于这种高频的写操作,因此,Kafka 社区自 0.8.2.x 版本开始,就在酝酿修改这种设计,并最终在新版本 Consumer 中正式推出了全新的位移管理机制,也包括这个新的位移主题。

位移主题的消息格式

新版本 Consumer 的位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。

那么这个主题存的是什么格式的消息呢?位移主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号 >。位移主题的消息体可以简单地认为就是保存了位移值。

实际上,位移主题的消息格式除了上面这种,还保存下面 2 种格式:

  • 用于保存 Consumer Group 信息的消息

    它是用来注册 Consumer Group 的。

  • 用于删除 Group 过期位移甚至是删除 Group 的消息

    这种格式有个专属的名字:tombstone 消息,即墓碑消息,也称 delete mark,它们指的是一个东西。这些消息只出现在源码中而不暴露给你。它的主要特点是它的消息体是 null,即空消息体。

一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。

位移主题是怎么被创建的?

通常来说,当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。位移主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。但如果是 Kafka 自动创建的,分区数是怎么设置的呢?

这就要看 Broker 端参数 offsets.topic.num.partitions 的取值了。它的默认值是 50,因此 Kafka 会自动创建一个 50 分区的位移主题。

除了分区数,副本数或备份因子是怎么控制的呢?答案也很简单,这就是 Broker 端另一个参数 offsets.topic.replication.factor 要做的事情了。它的默认值是 3。

当然,你也可以选择手动创建位移主题,具体方法就是,在 Kafka 集群尚未启动任何 Consumer 之前,使用 Kafka API 创建它。手动创建的好处在于,你可以创建满足你实际场景需要的位移主题。比如很多人说 50 个分区对我来讲太多了,我不想要这么多分区,那么你可以自己创建它,不用理会 offsets.topic.num.partitions 的值。

不过我给你的建议是,还是让 Kafka 自动创建比较好。目前 Kafka 源码中有一些地方硬编码了 50 分区数,因此如果你自行创建了一个不同于默认分区数的位移主题,可能会碰到各种各种奇怪的问题。这是社区的一个 bug,目前代码已经修复了,但依然在审核中。

什么地方会用到位移主题?

目前 Kafka Consumer 提交位移的方式有两种:自动提交位移和手动提交位移。

Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为你定期提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,就能保证消息消费不会丢失。但这一点同时也是缺点。因为它太省事了,以至于丧失了很大的灵活性和可控性,你完全没法把控 Consumer 端的位移管理。

自动提交位移还可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。

手动提交位移,即设置 enable.auto.commit = false。一旦设置了 false,Consumer 应用就要承担起位移提交的责任。Kafka Consumer API 提供了位移提交的方法,如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。

Kafka 怎么删除位移主题的过期消息?

Kafka 是怎么删除位移主题中的过期消息的呢?答案就是 Compaction。

Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限膨胀。那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。

这一张来自官网的图片,可以说明 Compact 过程。

Kafka的消费者(五)

图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的。

Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。

很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。

03 | 消费者组重平衡能避免吗?

要避免 Rebalance,还是要从 Rebalance 发生的时机入手。我们在前面说过,Rebalance 发生的时机有三个:

  • 组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发 Rebalance,这是 Rebalance 发生的最常见的原因。后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。接下来,我们主要说说因为组成员数量变化而引发的 Rebalance 该如何避免。

通常来说,增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要。它不属于我们要规避的那类“不必要 Rebalance”。

Group 下实例数减少是我们要关注的。Coordinator 会在什么情况下认为某个 Consumer 实例已挂从而要退组呢?

Kafka 如何感知 Consumer 是否挂掉?

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。

除了这个参数,Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。

除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

哪些 Rebalance 是不必要的?

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。

因此,你需要仔细地设置 session.timeout.ms 和 heartbeat.interval.ms 的值。要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。

将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。我之前有一个客户,在他们的场景中,Consumer 消费数据时需要将消息处理之后写入到 MongoDB。显然,这是一个很重的消费逻辑。MongoDB 的一丁点不稳定都会导致 Consumer 程序消费时长的增加。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。就拿 MongoDB 这个例子来说,如果写 MongoDB 的最长时间是 7 分钟,那么你可以将该参数设置为 8 分钟左右。

如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。为什么特意说 GC?那是因为在实际场景中,我见过太多因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance 了。

04 | Kafka中位移提交那些事儿

今天我们要聊的位移是 Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。

切记是下一条消息的位移,而不是目前最新消费消息的位移。

因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

自动提交位移

Consumer 端有个参数 enable.auto.commit,它的默认值是 true。即 Java Consumer 默认就是自动提交位移的。如果启用了自动提交,Consumer 端还有个参数就派上用场了:auto.commit.interval.ms。它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移。

下面代码展示了设置自动提交位移的方法。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。

虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

手动同步提交

开启手动提交位移的方法就是设置 enable.auto.commit 为 false。可以调用KafkaConsumer#commitSync()同步提交位移。

下面这段代码展示了 commitSync() 的使用方法。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    process(records); // 处理消息
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        handle(e); // 处理提交失败异常
    }
}

手动提交位移的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。

但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。

手动异步提交

鉴于上面这个问题,Kafka 社区为手动提交位移提供了另一个 API 方法:KafkaConsumer#commitAsync()。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。

由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    process(records); // 处理消息
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            handle(exception);
        }
    });
}

commitAsync 是否能够替代 commitSync 呢?

答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

手动同异步结合提交

显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:

  1. 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  2. 我们不希望程序总处于阻塞状态,影响 TPS。

下面这段代码,将两个 API 方法结合使用进行手动提交。

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        process(records); // 处理消息
        consumer.commitAysnc(); // 使用异步提交规避阻塞
    }
} catch (Exception e) {
    handle(e); // 处理异常
} finally {
    try {
        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    } finally {
        consumer.close();
    }
}

对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性。

所以,如果你需要自行编写代码开发一套 Kafka Consumer 应用,那么我推荐你使用上面的代码范例来实现手动的位移提交。

更精细化的位移提交

我们说了自动提交和手动提交,也说了同步提交和异步提交,这些就是 Kafka 位移提交的全部了吗?其实,我们还差一部分。实际上,Kafka Consumer API 还提供了一组更为方便的方法,可以帮助你实现更精细化的位移管理功能。

设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。

在 Kafka 中也是相同的道理。对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个 5000 条消息的例子,你可能希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。

Kafka Consumer API 为手动提交提供了这样的方法:commitSync(Map<TopicPartition, OffsetAndMetadata>) 和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)。它们的参数是一个 Map 对象,键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据。

就拿刚刚提过的那个例子来说,如何每处理 100 条消息就提交一次位移呢?在这里,我以 commitAsync 为例,展示一段代码,实际上,commitSync 的调用方法和它是一模一样的。

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record: records) {
        process(record);  // 处理消息
        offsets.put(new TopicPartition(record.topic(), record.partition()), 
                    new OffsetAndMetadata(record.offset() + 1);
        if(count % 100 == 0){
            consumer.commitAsync(offsets, null); // 回调处理逻辑是 null
        }
        count++;
    }
}

程序先是创建了一个 Map 对象,用于保存 Consumer 消费处理过程中要提交的分区位移,之后开始逐条处理消息,并构造要提交的位移值。

还记得之前我说过要提交下一条消息的位移吗?就是这里构造 OffsetAndMetadata 对象时,使用当前消息位移加 1 的原因。

代码的最后部分是做位移的提交。我在这里设置了一个计数器,每累计 100 条消息就统一提交一次位移。与调用无参的 commitAsync 不同,这里调用了带 Map 对象参数的 commitAsync 进行细粒度的位移提交。这样,这段代码就能够实现每处理 100 条消息就提交一次位移,不用再受 poll 方法返回的消息总数的限制了。

05 | CommitFailedException异常怎么处理?

所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。如果异常是可恢复的瞬时错误,提交位移的 API 自己就能规避它们了,因为很多提交位移的 API 方法是支持自动错误重试的,比如我们在上一期中提到的 commitSync 方法。

下面我们就来讨论下该异常是什么时候被抛出的。从源代码方面来说,CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时。从使用场景来说,有两种典型的场景可能遭遇该异常。

场景一:消息处理时间过长

当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。

下面代码展示了这个异常:

…
Properties props = new Properties();
…
props.put("max.poll.interval.ms", 5000);
consumer.subscribe(Arrays.asList("test-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    // 使用 Thread.sleep 模拟真实的消息处理逻辑
    Thread.sleep(6000L);
    consumer.commitSync();
}

如果要防止这种场景下抛出异常,你需要简化你的消息处理逻辑。具体来说有 4 种方法。

  • 缩短单条消息处理的时间

    比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。

  • 增加 Consumer 端允许下游系统消费一批消息的最大时长

    这取决于 Consumer 端参数 max.poll.interval.ms 的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。

    值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms 参数的值。不幸的是,session.timeout.ms 参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms 参数,将这部分含义从 session.timeout.ms 中剥离出来的原因之一。

  • 减少下游系统一次性消费的消息总数

    这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。

  • 下游系统使用多线程来加速消费

    这应该算是“*”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。

    事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsumerThread 线程,自行处理多线程间的数据消费。不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错。

综合以上这 4 个处理方法,我个人推荐你首先尝试采用方法 1 来预防此异常的发生。优化下游系统的消费逻辑是百利而无一害的法子,不像方法 2、3 那样涉及到 Kafka Consumer 端 TPS 与消费延时(Latency)的权衡。如果方法 1 实现起来有难度,那么你可以按照下面的法则来实践方法 2、3。

场景二:同时出现相同的 GroupId

如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。

Kafka Java Consumer 端还提供了一个名为 Standalone Consumer 的独立消费者。它没有消费者组的概念,每个消费者实例都是独立工作的,彼此之间毫无联系。

不过,你需要注意的是,独立消费者的位移提交机制和消费者组是一样的,因此独立消费者的位移提交也必须遵守之前说的那些规定,比如独立消费者也要指定 group.id 参数才能提交位移。

虽然说这个场景很冷门,但也并非完全不会遇到。在一个大型公司中,特别是那些将 Kafka 作为全公司级消息引擎系统的公司中,每个部门或团队都可能有自己的消费者应用,谁能保证各自的 Consumer 程序配置的 group.id 没有重复呢?一旦出现不凑巧的重复,发生了上面提到的这种场景,你使用之前提到的哪种方法都不能规避该异常。令人沮丧的是,无论是刚才哪个版本的异常说明,都完全没有提及这个场景,因此,如果是这个原因引发的 CommitFailedException 异常,前面的 4 种方法全部都是无效的。

上一篇:kafka简介


下一篇:KAFKA进阶:【十二】能否说一下你对rebalance的了解?全网最详细的rebalance源码分析,基于1.1.0