那位大佬用过flink1.17.1版本,有没有遇到wartermark在flink webui显示和通过ctx.currentWatermark()获取的不一致的问题?
在flink1.13.2的版本中没这种问题,是使用方法不一样吗
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink 1.17.1 版本在获取 offset 的时候可能会出现不一致的问题。原因是 Flink 在获取 offset 的时候,会使用 offset 的存储时间来进行判断,如果存储时间和当前时间差距太大,那么 Flink 会认为 offset 已经过期,并重新从头开始消费。这可能会导致数据丢失。
为了避免这个问题,可以使用 offset.storage.timeWindow 配置项来指定 offset 的存储时间。该配置项的单位是毫秒,默认值为 86400000 毫秒(即 1 天)。如果设置为 0,那么 Flink 会始终使用最新的 offset。
如果您需要获取 offset 的准确时间,可以使用 offset.storage.timestamps.enabled 配置项来启用 timestamp 机制。启用 timestamp 机制后,Flink 会在 offset 中存储事件的处理时间,而不是 offset 的存储时间。这样可以确保 Flink 获取的 offset 是准确的。
以下是 Flink CDC 使用 offset 的示例代码:
// 创建 DebeziumSource
DebeziumSource source = new DebeziumSource<>(
new JsonDebeziumDeserializationSchema(),
"my-connector",
"my-database",
"my-table",
"my-server",
"my-port",
new KafkaOffsetBackingStore("my-kafka-topic", "my-offset-group"));
// 获取 offset
Map offset = source.getOffset();
// 从 offset 中获取时间戳
long timestamp = (long) offset.get("timestamp");
// 使用 timestamp 检索数据
SourceRecord record = source.fetch(timestamp, timestamp + 1000000L);
使用 timestamp 机制可以确保 Flink 获取的 offset 是准确的。但是,需要注意的是,timestamp 机制会增加 Flink CDC 的计算成本。因此,如果您不需要获取 offset 的准确时间,建议您使用默认的机制。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。