通过offsets.retention.minutes设置kafka offset的过期时间

前言

本文记录博主如何设置kafka的offset过期时间并测试其效果

1、offsets.retention.minutes

通过修改offsets.retention.minutes的值即可改变kafka offset的过期时间,单位为分钟,改完之后需要重启kafka。具体的配置文件为$KAFKA_HOME/config/server.properties,原生的kafka配置文件里可能没有这个配置项,自己添加上即可,比如设置过期时间为一小时,那么按如下配置即可

offsets.retention.minutes=60

2、官方文档

网上有的博客说官网文档对于这个配置的说明有点错误,将offsets.retention.minutes错写成了offsets.topic.retention.minutes,但是我查看了一下,官方文档上并没有写错,可能是之前的版本写错了,而且很多博客按之前的版本写的,大家注意一下。官网文档地址http://kafka.apache.org/documentation/

3、ambari的bug

因本人用ambari管理大数据集群的各个组件,所以在界面上直接修改kafka的配置,在界面上查看kafka的配置offsets.retention.minutes为86400000,因为kafka offset默认过期时间为一天,那么根据这个86400000来看offsets.retention.minutes的单位为毫秒才对,所以一开始误认为单位为毫秒,所以修改配置后的时间设置的很大,导致一开始测试不成功,经过一点点的验证,发现单位实际上为分钟,而ambari上显示的86400000应该是个bug,因为kafka默认的配置文件里是没有这个配置项的,所以我估计ambari一开始也没有配置只是搜索的时候将其显示为86400000,而并没有真正的生效,只有将这个配置项修改之后,才会生效,并且单位为分钟(看了一下ambari的大部分默认时间单位都是毫秒~)。
后来在官网上看到offsets.retention.minutes的default为1440也证实了这一点。

4、测试效果

虽然本人的需求是将默认的一天的时间改长一点,但是时间长了测试太慢,所以将时间改短一点测试效果即可,测试代码见Spark Streamming+Kafka提交offset实现有且仅有一次,经过多次测试,得出结论,在修改重启之后,不管是新增加的topic还是之前的topic,只要是新保存的offset都会生效,而之前保存的offset,比如之前是一天才会删除,那么修改重启后,之前保存的offset还是会一天后才能删掉。

注:spark保存offset代码

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

5、注意

offset的过期时间是不精确的,实际上大于等于你设置的时间,假如设置的时间为10分钟,那么可能在10-20之后才会删掉,原因我想应该是kafka会定期的检查offset被标记为应该清理的offset,可能offsets.retention.check.interval.ms这个配置项有关,因为其默认时间为十分钟,但是没有去验证这一点。

  • offsets.retention.check.interval.ms 600000 offset管理器检查陈旧offsets的频率

转自:https://dongkelun.com/2018/06/21/modifyKafkaOffsetTime/
上一篇:hive元数据格式化 在hive中执行sql语句:SemanticException org.apache.hadoop.hive.ql.metadata.HiveException:


下一篇:C# 过滤sql特殊字符方法集合