我有这样一个业务场景,在checkpoint disable的情况下,不想用auto.commit 功能在等待interval时间后提交offsets, 想要在flink job sink 完成后及时提交offset, 我阅读了一下flinkKafkaConsumer的源码,没有找到实现方式,麻烦请教大神有什么建议的实现方式吗? 或者还有什么其他的建议? 谢谢! 附件中是我写的测试代码。
53行我想new 一个kafkaConsumer出来,调用commitSync()方法但不管用。
KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.commitSync();*来自志愿者整理的flink邮件归档
我猜测,要是可以在invoke的时候,拿到sink当前对应的barriar的话,从这个barriar取出offset,或者在richSinkFunction时,调用getRuntimeContext()上下文对象,从这个上下文中,拿到当前的task metrics,从而取出task的offset*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。