注意本文采用最新版本进行Kafka的内核原理剖析,新版本每一个Consumer通过独立的线程,来管理多个Socket连接,即同时与多个broker通信实现消息的并行读取。这就是新版的技术革新。类似于Linux I/O模型或者Select NIO 模型。
Poll为什么要设置一个超时参数
- 条件:
- 1:获取足够多的可用数据
- 2:等待时间超过指定的超时时间。
- 目的在于让Consumer主线程定期的””苏醒”去做其他事情。比如:定期的执行常规任务,(比如写日志,写库等)。
- 获取消息,然后执行业务逻辑。
位移精度
- 最少一次 -> 消息会被重复处理
- 最多一次 -> 消息会丢失,但不会被重复处理。
- 精确一次 -> 一定会被处理,且也只会处理一次。
位移角色
- 上次提交位移 :last committed offset
- 当前位置 :current position
- 水位 : High watermark
- 日志终端位移: (Log End Offset)
位移管理
consumer的位移提交最终会向group coordinator来提交,不过这里重点需要重新说明一下:组协调者coordinator负责管理所有的Consumer实例。而且coordinator运行在broker上(通过选举出某个broker),不过请注意新版本coordinator只负责做组管理。
但是具体的reblance分区分配策略目前已经交由Consumer客户端。这样就解耦了组管理和分区分配。
权利下放的优势:
- 如果需要分配就貌似需要重启整个kafka集群。
- 在Consumer端可以定制分区分配策略。
- 每一个consumer位移提交时,都会向_consumer_offsets对应的分区上追加写入一条消息。如果某一个consumer为同一个group的同一个topic同一个分区提交多次位移,很显然我们只关心最新一次提交的位移。
reblance的触发条件
- 组订阅发生变更,比如基于正则表达式订阅,当匹配到新的topic创建时,组的订阅就会发生变更。
- 组的topic分区数发生变更,通过命令行脚本增加了订阅topic的分区数。
- 组成员发生变更:新加入组以及离开组。
reblance 分配策略
range分区分配策略
举例如下:一个拥有十个分区(0,1,2…..,9)的topic,相同group拥有三个consumerid为a,b,c的消费者:
-
consumer a分配对应的分区号为[0,4),即0,1,2,3前面四个分区
-
consumer b 分配对应分区4,5,6中间三个分区
-
consumer c 分配对应分区7,8,9最后三个分区。
class RangeAssignor() extends PartitionAssignor with Logging {
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37def (ctx: AssignmentContext) = {
val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
val partitionAssignment =
new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
for (topic <- ctx.myTopicThreadIds.keySet) {
val curConsumers = ctx.consumersForTopic(topic)
val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)
val nPartsPerConsumer = curPartitions.size / curConsumers.size
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
" for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- curConsumers) {
val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
* Range-partition the sorted partitions to consumers for better locality.
* The first few consumers pick up an extra partition, if any.
*/
if (nParts <= 0)
warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
else {
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
大专栏