flink1.17.1版本,有没有遇到获取的不一致的问题?

那位大佬用过flink1.17.1版本,有没有遇到wartermark在flink webui显示和通过ctx.currentWatermark()获取的不一致的问题?image.png
image.png
在flink1.13.2的版本中没这种问题,是使用方法不一样吗

展开
收起
真的很搞笑 2023-09-28 10:36:35 111 分享 版权
1 条回答
写回答
取消 提交回答
  • Flink 1.17.1 版本在获取 offset 的时候可能会出现不一致的问题。原因是 Flink 在获取 offset 的时候,会使用 offset 的存储时间来进行判断,如果存储时间和当前时间差距太大,那么 Flink 会认为 offset 已经过期,并重新从头开始消费。这可能会导致数据丢失。

    为了避免这个问题,可以使用 offset.storage.timeWindow 配置项来指定 offset 的存储时间。该配置项的单位是毫秒,默认值为 86400000 毫秒(即 1 天)。如果设置为 0,那么 Flink 会始终使用最新的 offset。

    如果您需要获取 offset 的准确时间,可以使用 offset.storage.timestamps.enabled 配置项来启用 timestamp 机制。启用 timestamp 机制后,Flink 会在 offset 中存储事件的处理时间,而不是 offset 的存储时间。这样可以确保 Flink 获取的 offset 是准确的。

    以下是 Flink CDC 使用 offset 的示例代码:

    // 创建 DebeziumSource
    DebeziumSource source = new DebeziumSource<>(
    new JsonDebeziumDeserializationSchema(),
    "my-connector",
    "my-database",
    "my-table",
    "my-server",
    "my-port",
    new KafkaOffsetBackingStore("my-kafka-topic", "my-offset-group"));

    // 获取 offset
    Map offset = source.getOffset();

    // 从 offset 中获取时间戳
    long timestamp = (long) offset.get("timestamp");

    // 使用 timestamp 检索数据
    SourceRecord record = source.fetch(timestamp, timestamp + 1000000L);
    使用 timestamp 机制可以确保 Flink 获取的 offset 是准确的。但是,需要注意的是,timestamp 机制会增加 Flink CDC 的计算成本。因此,如果您不需要获取 offset 的准确时间,建议您使用默认的机制。

    2023-10-18 10:18:15
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理