一、客户端API
生产者(Producer)
-
send(ProducerRecord<K, V> record)
发送一条消息到broker,异步发送,无回调函数 -
send(ProducerRecord<K, V> record, Callback callback)
发送一条消息到broker,异步发送,有回调函数 -
partitionsFor(String topic)
获取topic的分区元数据 -
metrics()
获取生产者的指标数据 -
flush()
把缓存中的数据强制刷到磁盘 -
initTransactions()
初始化当前生产者是事务型生产者 -
beginTransaction()
开启事务 -
commitTransaction()
提交事务 -
abortTransaction()
事务回滚 -
close()
关闭生产者
消费者(Consumer)
-
subscribe(Collection<String> topics)
订阅主题列表 -
subscribe(Collection<String> topics, ConsumerRebalanceListener callback)
订阅主题列表,有一个重平衡监听器,当发生重平衡时触发 -
subscribe(Pattern pattern)
根据正则表达式订阅主题 -
subscribe(Pattern pattern, ConsumerRebalanceListener callback)
根据正则表达式订阅主题,有一个重平衡监听器,当发生重平衡时触发 -
unsubscribe()
取消订阅主题 -
subscription()
获取当前消费者订阅的主题列表 -
listTopics()
获取所有主题的分区元数据 -
assign(Collection<TopicPartition> partitions)
分配当前消费者的分区 -
assignment()
获取分配到当前消费者的分区 -
poll(Duration timeout)
拉取数据 -
commitSync()
同步提交所有订阅主题的偏移量,提交失败会进行重试,直到提交成功或遇到不可重试的错误 -
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
同步提交指定主题分区的偏移量 -
commitAsync()
异步提交所有订阅主题的偏移量 -
commitAsync(OffsetCommitCallback callback)
异步提交所有订阅主题的偏移量,并且有回调函数 -
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
异步提交指定主题分区的偏移量,并且有回调函数 -
position(TopicPartition partition)
获取将要提取的下一条记录的偏移量 -
committed(Set<TopicPartition> partitions)
获取指定分区已消费消息的最大偏移量 -
seek(TopicPartition partition, long offset)
重置偏移量到指定的位置 -
seekToBeginning(Collection<TopicPartition> partitions)
重置偏移量到分区的第一条消息的位置 -
seekToEnd(Collection<TopicPartition> partitions)
重置偏移量到分区的最后一条消息的位置 -
beginningOffsets(Collection<TopicPartition> partitions)
获取分区第一条消息的偏移量 -
endOffsets(Collection<TopicPartition> partitions)
获取分区最后一条消息的偏移量 -
enforceRebalance()
消费者主动发起一次重平衡 -
close()
关闭消费者
二、客户端实践及原理剖析
生产者(Producer)
1、分区机制
分区的作用
Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息,主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份,分区的作用就是提供负载均衡的能力。
分区策略
- 轮询策略:能保证最大限度地把数据平均分配到各个分区上。
- 随机策略:把数据随机的打散到各个分区上去。
- 按消息键保序策略:把一批有序的消息键值设置为一样的,这样能保证这批消息能发送到同一个分区上去,保证了消息的有序性。
2、避免消息丢失的配置
消息丢失的场景
-
生产者程序丢失数据
目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。所以一般都需要使用带回调通知的producer.send(msg,callback),回调可以准确的判断是否发送成功,然后进行针对性的处理,比如重试,这里注意一下,生产者重试只会发送到原分区)。 -
消费者程序丢失数据
多线程异步处理消息时,Consumer程序如果使用自动提交偏移量,很容易造成数据丢失,由于每个线程的处理速度可能不一样,有可能有些线程消费速度较慢卡住了或者超时了,其他线程提交了比它大的偏移量,那么前面那个线程如果没消费成功,那它后面也没办法再继续消费了,因为偏移量已经提交到它后面了。
最佳实践
- 发送消息时使用 producer.send(msg, callback)
- 设置 acks=all,表明所有副本 Broker 都要接收到消息,该消息才算是已提交。
- 设置 retries 为一个较大的值,当消息发送失败时能够自动重试,避免消息丢失。
- 设置 unclean.leader.election.enable = false,Broker 端的参数,控制哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失,所以一般设置为false。
- 设置 replication.factor>=3,Broker 端的参数,设置副本的数量。
- 设置 min.insync.replicas>1,Broker 端的参数,消息至少要写入几个副本才算已提交。
- 确保 replication.factor > min.insync.replicas,如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了,推荐设置成 replication.factor=min.insync.replicas + 1。
- 使用手动提交偏移量的方式,确保消息消费成功再提交。
3、压缩算法
产生压缩的场景
- 生产者配置了compression.type 参数即表示启用指定类型的压缩算法,启用压缩能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。
- 生产者和 Broker 端使用了不一样的压缩算法,导致 Broker 收到消息之后可能需要进行压缩和解压缩操作,这个有可能使 Broker 端 CPU 使用率飙升。
产生解压缩的场景
- 如果生产者使用了启用压缩算法,那么消费者接收到消息之后自然是要进行解压缩的。
- Broker 接收到生产者发送的消息时,需要进行一次解压缩操作,目的就是为了对消息执行各种验证。
最佳实践
Producer 端压缩、Broker 端保持、Consumer 端解压缩。即使用同一种压缩算法,避免进行多余的压缩和解压缩操作。
常见压缩算法性能比较
- 吞吐量:LZ4 > Snappy > zstd 和 GZIP
- 压缩比:zstd > LZ4 > GZIP > Snappy
4、幂等生产者
消息交付可靠性保障
- 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
- 至少一次(at least once):消息不会丢失,但有可能被重复发送。(Kafka默认)
- 精确一次(exactly once):消息不会丢失,也不会被重复发送。
幂等生产者的作用
Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。我们可以设置一个参数props.put(“enable.idempotence”, ture),这样生产者就变成一个幂等生产者了,Kafka能够自动做消息的去重操作。
作用范围
- 只能保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
- 它只能实现单会话上的幂等性,不能实现跨会话的幂等性,例如重启 Producer 进程幂等性就消失了。
5、事务生产者
使用方法
- 和幂等性 Producer 一样,开启 enable.idempotence = true。
- 设置 Producer 端参数 transactional. id。
- 调用了一些事务 API,如 initTransaction、beginTransaction、commitTransaction 和 abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
作用范围
- 能够保证将消息原子性地写入到多个分区中,这批消息要么全部写入成功,要么全部失败。
- 能实现跨会话,重启 Producer 进程,事务还是生效。
消费者(Consumer)
1、消费者组(Consumer Group)
什么是消费者组
消费者组是 Kafka 提供的可扩展且具有容错性的消费者机制。
消费者组特性
- Consumer Group 下可以有一个或多个 Consumer 实例,这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
- Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
- Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费,这个分区当然也可以被其他的 Group 消费。
最佳实践
- 理想情况下,消费者组的 Consumer 实例的数量应该等于该 Group 订阅主题的分区总数,每个实例消费一个分区的数据。
- 在实际使用过程中一般不推荐设置大于总分区数的 Consumer 实例,设置多余的实例只会浪费资源。
2、重平衡(Rebalance)
什么是重平衡
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
重平衡触发条件
- 组成员数发生变更
- 订阅主题数发生变更
- 订阅主题的分区数发生变更
重平衡带来的后果
- 在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成,类似垃圾回收机制的STW。
- 目前 Rebalance 的设计是所有 Consumer 实例共同参与,默认情况下,原先的方案都不会保留,全部实例重新分配所有分区。(社区于 0.11.0.0 版本推出了 StickyAssignor,即有粘性的分区分配策略。所谓的有粘性,是指每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动)
两类非必要的重平衡
- 因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。
- Consumer 消费时间过长导致的。
避免重平衡的最佳实践
- 设置 session.timeout.ms = 6s
- 设置 heartbeat.interval.ms = 2s
- 设置 max.poll.interval.ms 大于下游系统的处理时间
- 调整 GC 参数,避免频繁 Full GC
3、位移(Offset)
自动提交
Kafka Consumer 在后台默默地提交位移,这是 Kafka 默认的提交位移的方式,自动提交主要关注下面两个参数:
- enable.auto.commit,默认是true,开启自动提交
- auto.commit.interval.ms,自动提交位移的频率,默认是每5秒提交一次位移
手动提交
- enable.auto.commit,设置为false
- 手动调用 KafkaConsumer#commitSync() 或其他提交位移的方法
同步提交
同步提交位移会一直等待,直到位移被成功提交才会返回,如果提交过程中出现异常,该方法会将异常信息抛出。该操作支持自动重试,但是会导致程序阻塞,影响TPS。
异步提交
异步提交位移会立即返回结果,不会阻塞,Kafka 提供了回调函数,可以实现提交之后的逻辑,比如记录日志或处理异常等,但是不支持自动重试。
Kafka Consumer API 为手动提交提供了指定消费分区和位移的方法,可以更精细度的管理位移。
4、位移提交失败异常(CommitFailedException)
CommitFailedException 异常,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。
参数优化
- 增加期望的时间间隔 max.poll.interval.ms 参数值。
- 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值。
业务优化
- 缩短单条消息处理的时间
- 增加 Consumer 端允许下游系统消费一批消息的最大时长
- 减少下游系统一次性消费的消息总数
- 下游系统使用多线程来加速消费
5、位移主题(__consumer_offsets)
管理机制
将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。
创建时机
- 当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题,默认主题的分区数是 50,副本数是 3。可以通过修改 Broker 端的参数 offsets.topic.num.partitions 和 offsets.topic.replication.factor 设置分区数和副本数。
- 位移主题也支持手动创建,在 Consumer 程序启动前手动创建,但是一般情况下不建议这么做,目前 Kafka 源码中有一些地方硬编码了 50 分区数,手动创建可能出现bug,所以还是建议自动创建。
删除过期数据
Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。具体做法是 Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner,很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,可能是这个线程挂了导致的。
6、多线程消费方案
消费者设计原理
Consumer 设计了单线程 + 轮询的机制,这种设计能够较好地实现非阻塞式的消息获取,获取到消息之后,是否使用多线程消费消息完全由用户自己决定。
多线程方案
首选需要明确一点,KafkaConsumer 类不是线程安全的,所有的网络 I/O 处理都是发生在用户主线程中,因此不能在多个线程*享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
-
方案一
消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程,如图所示: -
方案二
消费者程序使用单或多线程获取消息,同时创建多个消费线程(线程池)执行消息处理逻辑,如图所示:
两种方案的优缺点:
7、消费者组进度监控
监控指标
- Lag:滞后程度,就是指消费者当前落后于生产者的程度。
- Lead:指消费者最新消费消息的位移与分区当前第一条消息位移的差值。
两者之间的关系:Lag 越大的话,Lead 就越小,反之也是同理。
一般情况下,Lag越小越好,说明消费速度接近于生产速度。
监控的重要性
在实际生产环境中,一定要同时监控 Lag 值和 Lead 值,否则可能会造成比较严重的后果。比如说当消费速度很慢时,慢到它要消费的数据快被 Kafka 删除了,这时就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。这可能产生两个后果:
- 一个是消费者从头消费一遍数据。
- 另一个是消费者从最新的消息位移处开始消费,之前没来得及消费的消息全部被跳过了,从而造成丢消息的假象。
参考:极客时间《Kafka 核心技术与实践》专栏。