前言
本文记录博主如何设置kafka的offset过期时间并测试其效果
1、offsets.retention.minutes
通过修改offsets.retention.minutes的值即可改变kafka offset的过期时间,单位为分钟,改完之后需要重启kafka。具体的配置文件为$KAFKA_HOME/config/server.properties,原生的kafka配置文件里可能没有这个配置项,自己添加上即可,比如设置过期时间为一小时,那么按如下配置即可
offsets.retention.minutes=60
2、官方文档
网上有的博客说官网文档对于这个配置的说明有点错误,将offsets.retention.minutes错写成了offsets.topic.retention.minutes,但是我查看了一下,官方文档上并没有写错,可能是之前的版本写错了,而且很多博客按之前的版本写的,大家注意一下。官网文档地址http://kafka.apache.org/documentation/
3、ambari的bug
因本人用ambari管理大数据集群的各个组件,所以在界面上直接修改kafka的配置,在界面上查看kafka的配置offsets.retention.minutes为86400000,因为kafka offset默认过期时间为一天,那么根据这个86400000来看offsets.retention.minutes的单位为毫秒才对,所以一开始误认为单位为毫秒,所以修改配置后的时间设置的很大,导致一开始测试不成功,经过一点点的验证,发现单位实际上为分钟,而ambari上显示的86400000应该是个bug,因为kafka默认的配置文件里是没有这个配置项的,所以我估计ambari一开始也没有配置只是搜索的时候将其显示为86400000,而并没有真正的生效,只有将这个配置项修改之后,才会生效,并且单位为分钟(看了一下ambari的大部分默认时间单位都是毫秒~)。
后来在官网上看到offsets.retention.minutes的default为1440也证实了这一点。
4、测试效果
虽然本人的需求是将默认的一天的时间改长一点,但是时间长了测试太慢,所以将时间改短一点测试效果即可,测试代码见Spark Streamming+Kafka提交offset实现有且仅有一次,经过多次测试,得出结论,在修改重启之后,不管是新增加的topic还是之前的topic,只要是新保存的offset都会生效,而之前保存的offset,比如之前是一天才会删除,那么修改重启后,之前保存的offset还是会一天后才能删掉。
注:spark保存offset代码
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
5、注意
offset的过期时间是不精确的,实际上大于等于你设置的时间,假如设置的时间为10分钟,那么可能在10-20之后才会删掉,原因我想应该是kafka会定期的检查offset被标记为应该清理的offset,可能offsets.retention.check.interval.ms这个配置项有关,因为其默认时间为十分钟,但是没有去验证这一点。
- offsets.retention.check.interval.ms 600000 offset管理器检查陈旧offsets的频率