https://blog.csdn.net/qq_26323323/article/details/84938892
这篇文章对spring-kafka消费端源码分析较为详细,可查看其customer初始化的过程。
整个初始化的开始是从@EnableKafka开始讲起的
初始化的工程归纳如下:
得到一个含有KafkaListener基本信息的Endpoint,最后Endpoint被封装到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被添加到KafkaListenerEndpointRegistrar.endpointDescriptors中,也就是一个list中
在afterPropertiesSet方法中遍历endpointDescriptors,并执行对应的方法。
- 创建Endpoint对应的MessageListenerContainer,将创建好的MessageListenerContainer放入listenerContainers
- 初始化Container的"topics", "topicPartitions", "topicPattern","messageListener", "ackCount", "ackTime"等参数
-
默认ConcurrentKafkaListenerContainerFactory.createContainerInstance()
-
由该方法可知,最终创建的容器是ConcurrentMessageListenerContainer,根据用户设定的参数
-
- 初始化Container的"topics", "topicPartitions", "topicPattern","messageListener", "ackCount", "ackTime"等参数
-
如果我们的KafkaListener注解中有对应的group信息,则将container添加到对应的group中
@KafkaListener注解变为ConcurrentMessageListenerContainer类,这个Container中包含了我们所需要的topic相关信息
ConcurrentMessageListenerContainer的分析
该类实现了lifeCycle接口
启动
- 将concurrency设置为topicPartitions大小
- 根据分片数创建KafkaMessageListenerContainer,每一个分片创建一个Container
- 启动Container,并添加其到containers中
- 获取对应的监听类,被包装到BatchMessagingMessageListenerAdapter中
- 根据this.listener, this.acknowledgingMessageListener创建对应的listenerConsumer
- 将listenerConsumer作为一个任务提交
最终将我们@KafkaListener中的topicPartitions
@KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0","1","2" })})
转换为ListenerConsumer,当concurrency的值大于partitions的数量时,每一个partition生成一个ListenerConsumer。
并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者
if (topicPartitions != null
&& this.concurrency > topicPartitions.length) {
this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
ListenerConsumer的工作
- 构造Consumer
- subscribe对应的topic
总结:通过构造方法,ListenerConsumer完成了Consumer的创建以及topic和partition的监听
轮询拉取的业务逻辑
- 不停轮询实现topic的消费
- 使用poll方法拉取数据
- 获取到数据后执行监听器的回调方法
提交偏移量:
那么消费者是如何提交偏移量的呢?消费者往一个 叫作 _consumer_offset 的特殊主题发送 消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有 什么用处,因为每个消费者都会在内存中记录自己消费到哪里了。不过,如果悄费者发生崩溃或者有新 的消费者加入群组,就会触发再均衡,完 成再均衡之后,每个消费者可能分配到新 的分区,而不是之前处理的那个。为了能够继续 之前的工作,消费者需要读取每个分区最后一次提交 的偏移量,然后从偏移量指定的地方 继续处理。
提交偏移量有很多种方式
kafka自动提交
最简单的提交方式是让悄费者自动提交偏移量。如果enable.auto.commit被设为 true,那么每过5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西 一样,自动提交也是在轮询(poll() )里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。所以这里auto.commit.interval.ms参数是两次提交偏移量的最小时间间隔,因为提交偏移量也是在poll中实现的,是每次poll的时候判断当前时间和上次提交的时间差是否大于等于5s,是,则需要提交偏移量,否则,本次轮询不需要提交偏移量。源码如下:
不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。
假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的 。
在使用自动提交时 ,每次调用轮询方法都会把上一次调用返 回的偏移量提交上去,它并不 知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回 的消息 都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。 一般情况下不会有什么问 题,不过在处理异常或提前退出轮询时要格外小心
提交偏移量除了自动提交,还有手动提交,AckMode中是所有的提交模式枚举
public enum AckMode {
// 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
RECORD,
// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
BATCH,
// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
TIME,
// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
COUNT,
// TIME | COUNT 有一个条件满足时提交
COUNT_TIME,
// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
MANUAL,
// 手动调用Acknowledgment.acknowledge()后立即提交
MANUAL_IMMEDIATE,
拉取消息的过程
这篇博文写消费者分析很不错 https://blog.csdn.net/liyiming2017/article/details/89187474
KafkaConsumer有几个主要的组件:
1、消费者要自己记录消费的位置(但也需要提交到服务端保存,为了rebalance后的消费能衔接上),所以我们需要SubScriptionState来保存消费的状态。
2、ConsumerCoordinator负责和GroupCoordinator通讯,例如在leader选举,入组,分区分配等过程。
3、ConsumerNetworkClient是对NetworkClient的封装,他对nio的组件进行封装,实现网络IO。
4、PartitionAssignor,这是分区分配策略,在进行分区分配的时候会用到。
5、Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。
可参考的文献:
源码分析Kafka 消息拉取流程
https://cloud.tencent.com/developer/article/1551705
https://blog.csdn.net/evasnowind/article/details/108534598
kafka消费者--加入consumergroup流程
https://blog.csdn.net/asdfsadfasdfsa/article/details/104883173
自动提交是调用poll方法
的时候顺便提交的,如果没有调用poll,时间到了也不会提交.
https://zhuanlan.zhihu.com/p/112745985
Kafka consumer消息的拉取及偏移的管理
https://blog.csdn.net/E_Possible/article/details/109564700
Kafka消费者源码解析之二Fetcher
https://blog.csdn.net/lt793843439/article/details/89634643
concurrency问题
https://blog.csdn.net/u010634066/article/details/109778491
https://hengheng.blog.csdn.net/article/details/107468648
https://blog.csdn.net/weixin_39672680/article/details/111744351