一、Broker启动
当每个 broker 启动时,会在 ZooKeeper 中的 /brokers/ids 路径下创建⼀个节点来注册⾃⼰,节点 ID 为配置⽂件中的 broker.id 参数,后注册的 broker 会报 NodeExists 的错。如果不指定 broker.id 或者指定成 -1,节点 ID 会从 reserved.broker.max.id 这个参数加 1的值开始,这个参数默认值是 1000,所以经常可以看⻅ 1001、1002 的 broker ID。
kafka的ZooKeeper存储结构如下图所示:
每个 broker 除了注册⾃身之外,还会监听 /brokers/ids 这个节点,当这个节点下增加或删除⼦节点时,ZooKeeper 会通知监听了的 broker。每个 broker 创建的节点都是临时节点,如果 broker 下线, /brokers/ids 下对应的节点就会被删除。
注意:broker 下线,只会删除 /brokers/ids 下的节点,其它的节点中可能还包含这个broker 的 ID,⽐如 /brokers/toics 下的节点会记录每个分区的副本存储在哪些 broker 上,这些节点不会删除这个 broker 的 ID,因为这个 broker 还有可能恢复,如果恢复不了,可以⽤相同的 ID 启动⼀个新的 broker,新的 broker 会代替原来 broker 的位置,开始同步数据。
二、删除Topic
建议 设置 auto.create.topics.enable = false
建议设置 delete.topic.enable = true
删除流程如下图所示:
三、消息的路由策略:
在通过API方式发布消息时,生产者是以Record为消息进行发布的,Record中包含Key和Value,其中value就是我们真正要使用的消息,而key是用于路由消息要存放的位置的。
消息要放入到哪个partition并不是随机的,而是按照以下路由策略进行处理的:
如果制定了partition,则直接写入指定的partition
如果没有指定partition但是指定了key,则通过key的hash值与partition数量进行取模,取模结果就是partition的索引
如果partition和key都未指定,则使用轮询算法选出一个partition
四、消息分区算法
Kafka中提供了多重分区分配算法(PartitionAssignor)的实现:RangeAssignor、RoundRobinAssignor、StickyAssignor。PartitionAssignor接⼝⽤于⽤户定义实现分区分配算法,以实现Consumer之间的分区分配。Kafka默认采⽤RangeAssignor的分配算法。
1、RangeAssignor
RangeAssignor策略的原理是按照消费者总数和分区总数进⾏整除运算来获得⼀个跨度,然后将分区按照跨度进⾏平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每⼀个Topic,RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配⼀个分区。
简单的说,就是平均分配,如果不能平均分配,靠前的分配的多,但是多的和少的相差不会超过一,如果消费者大于分区,那么后面的消费者则分不到partition。
用一张图简单明了的展示一下这种分配策略。
2、RoundRobinAssignor
RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进⾏排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进⾏排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。
如下面的两张图片所示,前面的是RangeAssignor,多topic的情况,其仍然不能保证均衡的消费,其只是保证同一个topic尽可能的均衡;后面的是RoundRobinAssignor,其不但保证了单topic的均衡消费,也保证了多topic的均衡消费。
3、StickyAssignor
尽管RoundRobinAssignor已经在RangeAssignor上做了⼀些优化来更均衡的分配分区,但是在⼀些情况下依旧会产⽣严重的分配偏差,从字⾯意义上看,Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每⼀次分配变更相对上⼀次分配做最少的变动(上⼀次的结果是有粘性的)。
其⽬标有两点:1. 分区的分配尽量的均衡;2. 每⼀次重分配的结果尽量与上⼀次分配结果保持⼀致。
StickyAssignor的分配结果如下图所示(增加RoundRobinAssignor分配作为对⽐):
上⾯的例⼦中,Sticky模式原来分配给C0、C2的分区都没有发⽣变动,且最终C0、C2达到的均衡的⽬的。
4、⾃定义分区策略
只需要继承AbstractPartitionAssignor并复写其中⽅法即可(当然也可以直接实现PartitionAssignor接⼝),其中有两个⽅法需要复写://分区分配⽅案的实现 Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); //表示了这个分配策略的唯⼀名称 String name();
⽽name()⽅法则表示了这个分配策略的唯⼀名称,⽐如之前提到的range,roundrobin和sticky,这个名字会在和GroupCoordinator的通信中返回,通过它consumer leader来确定整个group的分区⽅案(分区策略是由group中的consumer共同投票决定的,谁使⽤的多,就是⽤哪个策略)。
// 指定分区分配策略 props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
五、消息写入算法
1、producer向broker集群提交连接请求,其所连接上的任意一个broker都会向其发送broker controller的通信URL,即broker controller主机配置文件中的listeners地址
2、当producer指定了要生产消息的topic后,其会向broker contriller发送请求,请求当前topic的所有partition leader
3、broker controller在接收到请求后,会从zk服务器中查找指定topic的所有partition leader返回给producer
4、producer在接收到partition leader列表后,会根据路由策略找到对应的partition leader,将消息发送该partition leader
5、leader将消息写入log,并通知ISR中的followers
6、ISR中的follower从leader中同步消息后向leader发送ACK消息
7、leader收到了所有ISR中的follower的ACK后,增加HW,表示消费者可以消费到该位置;如果leader在等待的follower的ACK超时了,发现还有follower没有发送ACK,则会将这些没有发送ACK的follower从ISR中剔除,然后再增加HW
六、HW(高水位)
上面说到leader收到了所有ISR中的follower的ACK后,就会增加HW,这里的HW是高水位的意思,表示consumer可以消费到的最高partition偏移量。
HW保证了Kafka集群中消息的一致性,确切地说,是在broker集群正常运转的情况下,保证了partition的follower和leader之间数据的一致性。
LEO,Log End Offset,日志最后消息的偏移量,消息是被写入到Kafka的日志文件中的,这是当前最后一个写入的消息在partition中的偏移量。
对于新写入的消息,consumer是不能立刻消费的,leader会等待该消息被所有的ISR中的partition follower同步后才会更新HW,此时消息才能被consumer消费。
1、HW的更新机制
每个broker都存储着当前partition的HW和LEO,但Partition leader中同时还存储着所有Partition Follower的副本数据。
Leader 副本的HW更新原则:取当前leader副本的LEO和所有remote副本的LEO的最⼩值
Follower副本的HW更新原则:取leader副本发送的HW和⾃身的LEO中的最⼩值
上图演示了⽣产者写⼊数据到Kafka的leader副本,然后同步到follower副本的流程。副本同步机制流程:
(1)⽣产者写⼊消息到leader副本
(2)leader副本LEO值更新
(3)follower副本尝试拉取消息,发现有消息可以拉取,更新⾃身LEO
(4)follower副本继续尝试拉取消息,这时会更新remote副本LEO,同时会更新leader副本的HW
(5)完成4步骤后,leader副本会将已更新过的HW发送给所有follower副本
(6)follower副本接收leader副本HW,更新⾃身的HW
七、HW截断机制
1、HW机制的缺陷
如果partition leader接收到了新的消息,ISR中其他follower正在同步过程中,还未同步完毕leader就挂了,此时就需要选举新的leader,若没有HW截断机制,将会导致partition中Leader与Follower数据不一致。
当原leader宕机恢复后,将其LEO回退到宕机时的HW,然后再与新的Leader进行数据同步,这种机制称为HW截断机制。
HW机制会引起数据丢失。
2、HW截断机制
(1)follower 故障
follower 发⽣故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘 记录的上次的 HW,并将 log ⽂件⾼于 HW 的部分截取掉,从 HW 开始向 leader 进⾏同步。 等该 follower 的LEO ⼤于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重 新加⼊ ISR 了。
(2)leader 故障
该机制在leader出现宕机情况然后⼜恢复时,可以防 ⽌ partition leader 与 follower 间出现数据不⼀致。当原 Leader 宕机后⼜恢复时,将其 LEO 回退到其宕机时的 HW,然后再与新的 Leader 进⾏数据同步,这种机制称为 HW 截断机制。
注意:这只能保证副本之间的数据⼀致性,并不能保证数据不丢失或者不重复。
八、消息发送的可靠性机制
生产者向Kafka发送消息时,可以选择可靠性级别,通过acks参数的值进行设置
0:异步发送,生产者向kafka发送消息而不需要kafka反馈成功ack,该方式效率最高,但是可靠性最低,可能会存在数据丢失的问题。
1:同步发送,默认值。生产者发送消息给kafka,broker的partition leader在收到消息后,会反馈ack,生产者收到后才会在发送消息,如果一直未收到kafka的ack,则生产者会认为消息发送失败,会重发消息。该种情况,仍然可能存在数据丢失的问题,因为如果partition leader收到消息并返回了ack,但是在同步partition follower时leader发生宕机,此时需要选举新的leader,即HW截断机制的发生。
-1:同步发送,其值等同于all,生产者发送消息给kafka,kafka收到消息后,要等到ISR列表中的所有副本都同步消息完成后,才向生产者发送ack。该模型的可靠性最高,很少出现数据丢失的情况,但是可能出现部分follower重复接收消息的情况(不是重复消息)。
九、消费者消费过程解析
1、consumer向broker集群提交连接请求,其所连接上的任意broker都会向其发送broker controller的通信URL,即broker controller主机配置文件中的listeners。
2、当consumer指定了要消费的topic后,其会向broker controller发送poll请求
3、broker controller会为consumer分配一个或这几个partition leader,并将该partition的当前offset发送给consumer
4、consumer会按照broker controller分配的partition对其中的消息进行消费
5、当consumer消费完该条数据后,消费者会向broker发送一个消息已被消费的反馈,即该消息的offset。
6、当broker接收到consumer的offset后,会将其更新到__consumer_offset中
7、以上过程一直重复,直到消费者停止请求消息;消费者可以重置offset,从而可以灵活的消费存储在broker上的消息
十、Partititon Leader选举范围
当leader挂了后,broker controller会从ISR中选一个follower成为新的leader,但是如果所有的follower都挂了怎么办?可以通过unclean.leader.election.enable的取值来设置leader的选举范围。
false:必须等待ISR列表中由副本活过来才进行新的选举,该策略可靠性有保证,但是可用性低
true:在ISR中没有副本存活的情况下,可以选择任何一个该topic的partition作为新的leader,该策略可用性高,但是可靠性没有保证,可能会引发大量的消息丢失。
十一、重复消费问题即解决方案
重复消费最常见的有两种情况:同一个consumer的重复消费和不同consumer的重复消费
同一个consumer的重复消费:
当consumer由于消费能力较低而引发消费超时时,则可能会引发重复消费。
解决方案:可以减少读取的消息个数,也可以演唱自动提交的时间,还可以将自动提交转变为手动提交。
不同consumer的重复消费:
当consumer消费了消息但还未提交offset时宕机,则这些已被消费过的消息会被重复消费。
十二、位移重放
消费者在消费消息时,仅仅是从磁盘⽂件上读取数据⽽已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。
1、auto.offset.reset
kafka中没有offset时,不论是什么原因,offset没了,这时auto.offset.reset配置就会起作⽤,auto.offset.reset值含义解释:
earliest:当各分区下有已提交的offset时,从提交的offset开始消费;⽆提交的offset时,从头开始消费
latest:当各分区下有已提交的offset时,从提交的offset开始消费;⽆提交的offset时,消费新产⽣的该分区下的数据
none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有⼀个分区不存在已提交的offset,则抛出异常
默认建议⽤earliest。设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。
2、更新OffsetTopic的作⽤域
--all-topics :为consumer group下所有topic的所有分区调整位移
--topic t1 --topic t2 :为指定的若⼲个topic的所有分区调整位移
--topic t1:0,1,2 :为指定的topic分区调整位移
3、重置策略
--to-earliest :Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不⼀定就是 0,因为在⽣产环境中,很久远的消息会被 Kafka ⾃动删除,所以当前最早位移很可能是⼀个⼤于 0 的值。如果你想要重新消费主题的所有消息,那么可以使⽤ Earliest 策略。
--to-latest :Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使⽤ Latest 策略。
--to-current :Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。
--to-offset <offset> :Specified-Offset 策略则是⽐较通⽤的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使⽤场景是,消费者程序在处理某条错误消息时,你可以⼿动地“跳过”此消息的处理。在实际使⽤过程中,可能会出现 corrupted 消息⽆法被消费的情形,此时消费者程序会抛出异常,⽆法继续⼯作。⼀旦碰到这个问题,你就可以尝试使⽤ Specified-Offset 策略来规避。
--shift-by N : 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动;如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的⼀段消息的距离即可。这⾥的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。⽐如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定N 为 -100。
--to-datetime <datetime> :把位移调整到⼤于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,⽐如2017-08-04T00:00:00.000;DateTime 允许你指定⼀个时间,然后将位移重置到该时间之后的最早位移处。常⻅的使⽤场景是,你想重新消费昨天的数据,那么你可以使⽤该策略重设位移到昨天 0点。
--by-duration <duration> :把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,⽐如PT0H5M0S。Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引⼊的 Duration 类的话,你应该不会对这个格式感到陌⽣。它就是⼀个符合 ISO-8601 规范的 Duration 格式,以字⺟ P 开头,后⾯由 4 部分组成,即 D、H、M 和 S,分别表示天、⼩时、分钟和秒。举个例⼦,如果你想将位移调回到 15分钟前,那么你就可以指定 PT0H15M0S
--from-file <file> :从CSV⽂件中读取调整策略
4、确定执⾏⽅案
什么参数都不加:只是打印出位移调整⽅案,不具体执⾏
--execute :执⾏真正的位移调整
--export :把位移调整⽅案按照CSV格式打印,⽅便⽤户成csv⽂件,供后续直接使⽤
5、注意事项
(1)如果kafka 开启了认证、授权的操作,需要配置赋予了相应权限的⽤户。
(2)需要制定对应的Consumer Group 的id,重置的是Consumer Group 的位移
(3)consumer group状态必须是inactive的,即不能是处于正在⼯作中的状态
(4)要关闭kakfa的⾃动位移提交功能
6、API
实现消息重放需要用到KafkaConsumer的seek方法或者seekToEnd方法。
(1)earliest实现
首先创建的消费者程序,要禁⽌⾃动提交位移
然后组 ID 要设置成要重设的消费者组的组 ID
其次调⽤ seekToBeginning ⽅法时,需要⼀次性构造主题的所有分区对象。
最重要的是,⼀定要调⽤带⻓整型的 poll ⽅法,⽽不要调⽤consumer.poll(Duration.ofSecond(0))。
代码样例如下所示:
consumer.subscribe(Collections.singleton(topic)); consumer.poll(0); consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) .collect(Collectors.toList())
);
(2)latest实现
consumer.seekToEnd( consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) .collect(Collectors.toList())
);
(3)Current实现
实现 Current 策略的⽅法很简单,需要借助 KafkaConsumer 的 committed ⽅法来获取当前提交的最新位移
consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())) .forEach(tp -> { long committedOffset = consumer.committed(tp).offset(); consumer.seek(tp, committedOffset); });
(4)Specified-Offset实现
long targetOffset = 1234L; for (PartitionInfo info : consumer.partitionsFor(topic)) { TopicPartition tp = new TopicPartition(topic, info.partition()); consumer.seek(tp, targetOffset); }
(5)Shift-By-N 实现
for (PartitionInfo info : consumer.partitionsFor(topic)) { TopicPartition tp = new TopicPartition(topic, info.partition()); // 假设向前跳 123 条消息 long targetOffset = consumer.committed(tp).offset() + 123L; consumer.seek(tp, targetOffset); }
(6)datatime 实现
如果要实现 DateTime 策略,需要借助另⼀个⽅法:KafkaConsumer. offsetsForTimes ⽅法。假设要重设位移到 2020 年 7 ⽉ 20 ⽇晚上 8 点,那么具体代码如下:
long ts = LocalDateTime.of(2020, 7, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli(); Map<TopicPartition, Long> timeToSearch =consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())) .collect(Collectors.toMap(Function.identity(), tp -> ts)
); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) { consumer.seek(entry.getKey(), entry.getValue().offset()); }
(7)Duration 实现
位移回调30分钟前:
Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream() .map(info -> new TopicPartition(topic, info.partition())) .collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000 * 60)
); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) { consumer.seek(entry.getKey(), entry.getValue().offset()); }
总之,使⽤ Java API 的⽅式来实现重设策略的主要⼊⼝⽅法,就是 seek ⽅法。
7、kafka-consumer-groups
位移重设还有另⼀个重要的途径:通过 kafka-consumer-groups 脚本。需要注意的是,这个功能是在Kafka 0.11 版本中新引⼊的。这就是说,如果你使⽤的 Kafka 是 0.11 版本之前的,那么你只能使⽤API 的⽅式来重设位移。
(1)Earliest
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --all-topics --to-earliest –execute
(2)Latest
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --all-topics --to-latest --execute
(3)Current
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --all-topics --to-current --execute
(4)Specified-Offset
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --all-topics --to-offset <offset> --execute
(5)Shift-By-N
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --shift-by <offset_N> --execute
(6)DateTime
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
(7)Duration
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group testgroup --reset-offsets --by-duration PT0H30M0S --execute
十三、消息压缩
压缩就是⽤时间去换空间的经典 trade-off 思想,具体来说就是⽤ CPU 时间去换磁盘空间或⽹络I/O 传输量,希望以较⼩的 CPU 开销带来更少的磁盘占⽤或更少的⽹络 I/O 传输。
消息格式有两⼤类消息格式,分别称之为V1版本和V2版本。V2版本是Kafka 0.11.0.0中正式引⼊的,V2版本都⽐V1版本节省磁盘空间,当启⽤压缩时,这种节省空间的效果更加明显。
在kafka中Producer 端压缩、Broker 端保持、Consumer 端解压缩。
⽣产者程序中配置compression.type参数即表示启⽤指定类型的压缩算法。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 开启GZIP压缩 props.put("compression.type", "gzip"); Producer<String, String> producer = new KafkaProducer<>(props);
这⾥⽐较关键的代码⾏是props.put(“compression.type”, “gzip”),它表明该Producer的压缩算法使⽤的是GZIP。这样Producer启动后⽣产的每个消息集合都是经GZIP压缩过的,故⽽能很好地节省⽹络传输带宽以及Kafka Broker端的磁盘占⽤。在⽣产者端启⽤压缩是很⾃然的想法,那为什么说在Broker端也可能进⾏压缩呢?两种例外情况就可能让Broker重新压缩消息:
(1)Broker端指定了和Producer端不同的压缩算法
(2)Broker端发⽣了消息格式转换
各种压缩算法对⽐
吞吐量对比:LZ4 > Snappy > zstd / GZIP 压缩⽐对比:zstd > LZ4 > GZIP > Snappy十四、⽂件存储机制
消息在磁盘上都是以⽇志的形式保存的,我们这⾥说的⽇志是存放在(config/server.properties:log.dirs=/Users/hadoop/kafka-logs)⽬录中的消息⽇志,即partition与segment。
1、存储信息查看
首先创建一个3个分区、3个备份的topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replicationfactor 3 --partitions 3 --topic two
可以查看一下其存储内容
(1)/brokers/ids⽬录
(2)每个id的数据内容为当前主机的信息
(3)/brokers/topics
2、存储结构
3、segment
log⽂件⼤⼩可以通过log.segment.bytes参数设定,默认值是1073741824。
index⽂件的的格式如下图所示,每个索引项占⽤8个字节,分为两个部分:
1) relativeOffset:相对偏移量,表示消息相对于baseOffset的偏移量,占⽤4个字节,当前索引⽂件的⽂件名即为baseOffset的值。
2) position:物理地址,也就是消息在⽇志分段⽂件中的物理位置,占⽤4个字节
示例: 查找偏移量为23的信息
以上是最简单的⼀种情况。如果要查找偏移量为268的消息,那么应该怎么办呢?⾸先是定位到baseOffset为251的⽇志分段,然后计算相对偏移量relavtiveOffset=268-251=17,之后再在对应的索引⽂件中知道不⼤于17的索引项,最后根据索引项中的postition定位到具体的⽇志分段⽂件位置开始查找消息。
index⽂件中并没有为数据⽂件中的每条消息都建⽴索引,⽽是采⽤了稀疏存储的⽅式(跳跃表),每隔⼀定字节的数据建⽴⼀条索引。 这样避免了索引⽂件占⽤过多的空间,从⽽可以将索引⽂件保留在内存中。
4、mesage
解释说明:
8 byte offset:在parition(分区)内的每条消息都有⼀个有序的id号,这个id号被称为偏移(offset),它可以唯⼀确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message。
4 byte message size:message⼤⼩
4 byte CRC32:⽤crc32校验message
1 byte “magic":表示本次发布Kafka服务程序协议版本号
1 byte “attributes":表示为独⽴版本、或标识压缩类型、或编码类型。
4 byte key length:表示key的⻓度,当key为-1时,K byte key字段不填
K byte key:可选value
4 bytes payload:表示实际消息数据。
5、⾼效⽂件存储设计特点
Kafka把topic中⼀个parition⼤⽂件分成多个⼩⽂件段,通过多个⼩⽂件段,就容易定期清除或删除已经消费完⽂件,减少磁盘占⽤。
通过索引信息可以快速定位message
通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
通过索引⽂件稀疏存储,可以⼤幅降低index⽂件元数据占⽤空间⼤⼩。
顺序写: 操作系统每次从磁盘读写数据的时候,需要先寻址,也就是先要找到数据在磁盘上的物理位置,然后再进⾏数据读写,如果是机械硬盘,寻址就需要较⻓的时间。kafka的设计中,数据其实是存储在磁盘上⾯,⼀般来说,会把数据存储在内存上⾯性能才会好。但是kafka⽤的是顺序写,追加数据是追加到末尾,磁盘顺序写的性能极⾼,在磁盘个数⼀定,转数达到⼀定的情况下,基本和内存速度⼀致随机写的话是在⽂件的某个位置修改数据,性能会较低。
零拷⻉
十五、日志清理策略
kafka log的清理策略有两种:delete,compact,默认是delete,这个对应了kafka中每个topic对于record的管理模式:
delete:⼀般是使⽤按照时间保留的策略,当不活跃的segment的时间戳是⼤于设置的时间的时候,当前segment就会被删除
compact: ⽇志不会被删除,会被去重清理,这种模式要求每个record都必须有key,然后kafka会按照⼀定的时机清理segment中的key,对于同⼀个key只保留最新的那个key.同样的,compact也只针对不活跃的segment
cleanup.policy: delete cleanup.policy: compact
1、delete 相关配置
假如对某个topic(假设为user_topic)设置了 cleanup.policy: delete,那么当前topic使⽤的log删除策略就是 delete,这个策略会周期性的检查partion中的不活跃的segment,根据配置采⽤两种⽅式删除⼀些旧的segment。
retention.bytes:总的segment的⼤⼩限制,达到这个限制后会删除旧的segment,默认值为-1,就是不会删除
retention.ms:当前时间 - segment的最后写⼊record的时间 > retention.ms 的segment会被删除,默认是168h, 7天
⼀些其他的辅助性配置:
log.retention.check.interval.ms: 每隔多久检查⼀次是否有可以删除的log,默认是300s,5分钟这个是broker级别的设置
file.delete.delay.ms: 在彻底删除⽂件前保留的时间,默认为1分钟 这个是broker级别的设置
在delete的⽇志策略下,⽇志保留3天。
retention.ms: 259200000
2、⽇志清理compact策略
使⽤场景:⽇志清理的compact策略,对于那种需要留存⼀份全量数据的需求⽐较有⽤,什么意思呢,⽐如,我⽤flink计算了所有⽤户的粉丝数,⽽且每5分钟更新⼀次,结果都存储到kafka当中。这个时候kafka相当于是⼀个数据总线,任何需要⽤户粉丝数的业务部⻔都可以从kafka中拿到这个数据。这个时候如果数据的保存使⽤delete策略,为了保存所有⽤户的粉丝数,只能设置不删除,也就是这样的话,数据会⽆限膨胀,⽽且,很多数据是⽆意义的,因为业务⽅从kafka中消费数据的时候,实际上只是想知道⽤户的当前粉丝数是多少,不关注⼀个⽉前这个⽤户有多少粉丝数,但是这些数据都在kafka中存储,会造成⽆意义的消费。kafka提供了⼀种叫做compact的清理策略,这个策略可以很好的帮助我们应对这种情况。kafka的compact 策略要求每个record都要有key,kafka是根据key来进⾏去重合并的。每个key⾄少保留⼀个最新的值。
compact的⼯作模式:对于每⼀个kafka partition的⽇志,以segment为单位,都会被分为两部分,已清理和未清理的部分。同时,未清理的那部分⼜分为可以清理的和不可清理的。同时对于清理过后的segment如果太⼩,kafka也会有⼀定的策略去合并这些segemnt,防⽌
segment碎⽚化。通过配置cleanup.policy: compact来开启compact的⽇志清理策略。配套的配置还有:
min.cleanable.dirty.ratio: 可以进⾏compact的脏数据的⽐例,dirtyRatio = dirtyBytes /(cleanBytes + dirtyBytes) 其中dirtyBytes表示可清理部分的⽇志⼤⼩,cleanBytes表示已清理部分的⽇志⼤⼩。这个配置也是为了提升清理的性价⽐设置的,因为清理数据需要对磁盘进⾏读写,开销并不⼩,如果你的数据只有很⼩的重复⽐例,实际上是没有清理的必要的。这个值默认是0.5 也就是脏了的数据达到了总数据的50%才会清理,⼀般情况下我如果开启了compact策略,都会将这个值设置为0.1,感觉这样对于想要消费当前topic的业务⽅更加友好。
min.compaction.lag.ms: 这个设置了在⼀条消息在被produer发送到kafka当中之后,多久时间以内不会被compact,为了满⾜有些想要获取⼀定时间内的历史快照的业务,默认是0,就是不会根据消息投递的时间来决定消息是否应该被compacted
tombstone 消息:
在compact下,还有⼀类⽐较特殊的消息,只有key,value值为null的消息,这⼀类消息如果合并了实际上也是没有意义的,因为没有值,所以kafka在compact的时候会删除value为null的消息,但是并不是在第⼀次去重的时候⽴刻删除,⽽是允许存储的更久⼀些。有⼀个特殊的配置来处理。
delete.retention.ms: 这个配置就是专⻔针对tombstone类型的消息进⾏设置的。默认为24⼩时,也就是这个tombstone在当次compact完成后并不会被清理,在下次compact的时候,他的最后修改时间+delete.retention.ms<当前时间,才会被删掉。
简单总结compact的配置(kafka启⽤delete的清理策略的时候需要注意配置)
cleanup.policy: compact
segment.bytes: 每个segment的⼤⼩,达到这个⼤⼩会产⽣新的segment, 默认是1G
segment.ms: 配置每隔 ms产⽣⼀个新的segment,默认是168h,也就是7天
retention.bytes: 总的segment的⼤⼩限制,达到这个限制后会删除旧的segment,默认值为-1,就是不会删除
retention.ms: segment的最后写⼊record的时间-当前时间 > retention.ms 的segment会被删除,默认是168h, 7天
min.cleanable.dirty.ratio: 脏数据可以容忍的⽐例,如果你的机器性能可以,⽽且数据量较⼤的话,建议这个值设置更⼩⼀些,对consumer更友好
min.compaction.lag.ms: 看业务有需要的话可以设置