开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

问一下 设置timestamp读 kafka 一直报错无效偏移量为啥 ?

大佬们问一下 设置timestamp读 kafka 一直报错无效偏移量为啥 ? setStartingOffsets(OffsetsInitializer.timestamp(1697617200000L))这个读kafka

展开
收起
cuicuicuic 2023-10-23 15:07:42 67 0
2 条回答
写回答
取消 提交回答
  • 如果在使用 setStartingOffsets 方法设置 Kafka 的起始偏量时,遇到报错 "Invalid offset",通常有以下几个可能的原因:

    1. 检查时间戳格式:setStartingOffsets 方法中设置的时间戳需要是毫秒级的时间戳。请确保您提供的时间戳格式正确,且精确到毫秒。例如,1697617200000L 是 2023 年 3 月 4 日的时间戳。

    2. 确定时间戳的有效范围:检查您提供的时间戳是否在 Kafka 消息中的有效范围内。如果指定的时间戳超出了 Kafka 中的有效时间范围,那么它将被认为是无效的。请确保您选择的时间戳在 Kafka 中有对应的消息。

    3. 检查 Kafka 主题的分区:如果 Kafka 主题的某个分区在指定时间戳之前没有消息,那么该分区的偏移量将被认为是无效的。请确保您的 Kafka 主题在指定时间戳之前有消息产生。

    4. 确认 Kafka 版本兼容性:某些版本的 Kafka 不支持在起始偏移量中使用时间戳的方式进行读取。请检查您所使用的 Kafka 版本,并确保它支持以时间戳为起始偏移量进行消费。

    如果您已经检查过以上原因仍然遇到问题,建议检查相关日志以获取更详细的错误信息。根据错误信息可以更精确地定位和解决问题。此外,您还可以参考相关的 Flink 和 Kafka 文档,或向 Kafka 社区进行咨询,以获取更详细的技术支持。

    2023-10-24 13:52:09
    赞同 展开评论 打赏
  • 这个问题看起来像是你在尝试使用错误的偏移量值。偏移量是一个长整数,表示消息在Kafka主题中的位置。如果你提供的偏移量值不正确,那么Flink Kafka connector可能会抛出"Invalid offset"的错误。

    在你的代码中,你使用了以下代码来设置起始偏移量:

    setStartingOffsets(OffsetsInitializer.timestamp(1697617200000L))
    

    这个代码的含义是,从给定的时间戳开始(在这个例子中是2022-12-31 12:00:00),向后读取所有的消息。但是,这个代码并不能保证能够读取到具体的偏移量值。

    如果你想从特定的偏移量值开始读取,你可以使用以下代码:

    setStartingOffsets(Collections.singletonMap("topic", offset))
    

    在这个代码中,你需要提供一个Map,其中键是你的Kafka主题名称,值是你想要开始的偏移量值。

    总的来说,如果你想要从特定的偏移量值开始读取,你需要提供正确的偏移量值。如果你不确定偏移量值是多少,你可以使用以下代码来获取当前的偏移量:

    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Arrays.asList("topic"));
    Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(Collections.singleton(new TopicPartition("topic", 0)));
    System.out.println(beginningOffsets);
    

    这段代码将打印出每个主题分区的开始偏移量。

    2023-10-23 16:42:47
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载