开发者社区> 董可伦> 正文

通过offsets.retention.minutes设置kafka offset的过期时间

简介: 版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.
+关注继续查看
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80943281

我的原创地址:https://dongkelun.com/2018/06/21/modifyKafkaOffsetTime/

前言

本文记录博主如何设置kafka的offset过期时间并测试其效果

1、offsets.retention.minutes

通过修改offsets.retention.minutes的值即可改变kafka offset的过期时间,单位为分钟,改完之后需要重启kafka。具体的配置文件为$KAFKA_HOME/config/server.properties,原生的kafka配置文件里可能没有这个配置项,自己添加上即可,比如设置过期时间为一小时,那么按如下配置即可

offsets.retention.minutes=60

2、官方文档

网上有的博客说官网文档对于这个配置的说明有点错误,将offsets.retention.minutes错写成了offsets.topic.retention.minutes,但是我查看了一下,官方文档上并没有写错,可能是之前的版本写错了,而且很多博客按之前的版本写的,大家注意一下。官网文档地址http://kafka.apache.org/documentation/

3、ambari的bug

因本人用ambari管理大数据集群的各个组件,所以在界面上直接修改kafka的配置,在界面上查看kafka的配置offsets.retention.minutes为86400000,因为kafka offset默认过期时间为一天,那么根据这个86400000来看offsets.retention.minutes的单位为毫秒才对,所以一开始误认为单位为毫秒,所以修改配置后的时间设置的很大,导致一开始测试不成功,经过一点点的验证,发现单位实际上为分钟,而ambari上显示的86400000应该是个bug,因为kafka默认的配置文件里是没有这个配置项的,所以我估计ambari一开始也没有配置只是搜索的时候将其显示为86400000,而并没有真正的生效,只有将这个配置项修改之后,才会生效,并且单位为分钟(看了一下ambari的大部分默认时间单位都是毫秒~)。
后来在官网上看到offsets.retention.minutes的default为1440也证实了这一点。

4、测试效果

虽然本人的需求是将默认的一天的时间改长一点,但是时间长了测试太慢,所以将时间改短一点测试效果即可,测试代码见Spark Streamming+Kafka提交offset实现有且仅有一次,经过多次测试,得出结论,在修改重启之后,不管是新增加的topic还是之前的topic,只要是新保存的offset都会生效,而之前保存的offset,比如之前是一天才会删除,那么修改重启后,之前保存的offset还是会一天后才能删掉。

注:spark保存offset代码

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

5、注意

offset的过期时间是不精确的,实际上大于等于你设置的时间,假如设置的时间为10分钟,那么可能在10-20之后才会删掉,原因我想应该是kafka会定期的检查offset被标记为应该清理的offset,可能offsets.retention.check.interval.ms这个配置项有关,因为其默认时间为十分钟,但是没有去验证这一点。
* offsets.retention.check.interval.ms 600000 offset管理器检查陈旧offsets的频率

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
flink 手动维护kafka offset
flink 手动维护kafka offset
195 0
通过offsets.retention.minutes设置kafka offset的过期时间
通过offsets.retention.minutes设置kafka offset的过期时间
115 0
Spark Streaming+Kafka提交offset实现有且仅有一次(exactly-once)
Spark Streaming+Kafka提交offset实现有且仅有一次(exactly-once)
101 0
【Kafka】(二十)Kafka Consumer 重置 Offset
【Kafka】(二十)Kafka Consumer 重置 Offset
506 0
Kafka到底有几个Offset?——Kafka核心之偏移量机制
Kafka是由LinkIn开源的实时数据处理框架,目前已经更新到2.3版本。不同于一般的消息中间件,Kafka通过数据持久化和磁盘读写获得了极高的吞吐量,并可以不依赖Storm,SparkStreaming的流处理平台,自己进行实时的流处理。 Kakfa的Offset机制是其最核心机制之一,由于API对于部分功能的实现,我们有时并没有手动去设置Offset,那么Kafka到底有几个Offset呢?
268 0
Kafka offset commit 分析工具
订阅Kafka内部Topic __consumer_offsets 中的消息 统计consumer group提交数 分析异常提交情况 并定位问题服务
708 0
Kafka的offset管理
消费者需要自己保留一个offset,从kafka 获取消息时,只拉去当前offset 以后的消息。Kafka 的scala/java 版的client 已经实现了这部分的逻辑,将offset 保存到zookeeper 上 1.
3040 0
Spark Streamming+Kafka提交offset实现有且仅有一次
版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.
950 0
+关注
董可伦
大数据、Spark、前端、机器学习 个人博客:https://dongkelun.com/ 专业:信息与计算科学
文章
问答
文章排行榜
最热
最新
相关电子书
更多
消息队列 Kafka 版差异化特性
立即下载
2019大数据技术公开课第五季—kafka 数据如何同步到 MaxCompute
立即下载
任庆盛|Flink CDC + Kafka 加速业务实时化
立即下载