开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink读kafka,从某个时间戳读,配置不生效

配置不生效: .setStartingOffsets(OffsetsInitializer.timestamp(1706321869181L))

大家有遇到过这种问题吗?
配置:


        <properties>
            <hbase.version>2.4.4</hbase.version>
            <hadoop.version>3.2.1</hadoop.version>
            <flink.version>1.13.2</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
        </properties>
return KafkaSource.<Tuple4<String, String, byte[], Long>>builder()
                .setProperties(properties)
                .setTopics(topics)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.timestamp(1706321869181L))
                .setDeserializer(new DefaultRecordDeserializationSchema())
                .build();

展开
收起
飞行砖家 2024-02-02 15:17:11 250 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    根据您提供的配置和代码,问题可能出在setStartingOffsets方法的参数上。您设置的起始偏移量为1706321869181L,这个时间戳对应的日期是2024-01-01 00:00:00,这可能超出了您的Kafka数据的时间范围。

    为了解决这个问题,您可以尝试以下方法:

    1. 检查您的Kafka数据的时间范围,确保起始偏移量在数据的范围内。
    2. 如果您希望从最新的数据开始消费,可以使用OffsetsInitializer.latest()方法来设置起始偏移量。

    修改后的代码如下:

    return KafkaSource.<Tuple4<String, String, byte[], Long>>builder()
                    .setProperties(properties)
                    .setTopics(topics)
                    .setGroupId(groupId)
                    .setStartingOffsets(OffsetsInitializer.latest()) // 修改为从最新数据开始消费
                    .setDeserializer(new DefaultRecordDeserializationSchema())
                    .build();
    

    如果问题仍然存在,请检查其他配置项是否正确,以及您的Kafka和Flink环境是否正常。

    2024-02-04 13:39:11
    赞同 展开评论 打赏
  • Flink 1.13.2版本中设置起始偏移量为指定时间戳的方法看起来是正确的。若配置未生效,请检查以下方面:

    • 确保时间戳对应的偏移量在Kafka topic的partition中有对应的数据,即该时间戳不是早于topic最早消息的时间。
    • KafkaSource的时间戳偏移量功能依赖于Kafka broker是否开启log.message.timestamp.type参数,并且该时间戳类型需是CreateTimeLogAppendTime之一。
    • 确认Flink作业提交后,相关配置确实传递到了KafkaSource实例中,可以通过查看Flink运行时的日志来验证。
    2024-02-02 15:51:56
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
    MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
    消息队列kafka介绍 立即下载