开发者社区> 问答> 正文

Flinkjob消费kafka 失败,无法拿到offset值,如何解决

有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。

Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:

java.lang.Exception: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition Test-topic-27 could be determined at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition Test-topic-27 could be determined 查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。 请指教*来自志愿者整理的flink

展开
收起
毛毛虫雨 2021-12-05 11:46:27 1273 0
1 条回答
写回答
取消 提交回答
  • 从错误上来看是 Kafka Consumer 没有连上 Kafka Brokers。这些方法可能帮助排查问题: 1. 确认 Flink TaskManager 和 Kafka Broker 之间的网络连通性。 2. Flink TaskManager 与 Kafka Broker 之间网络连通并不意味着能够消费数据,可能需要修改 Kafka Broker 的配置。这篇文章[1] 或许会有帮助,绝大多数 Kafka 的连接问题是由于文章中描述的配置问题导致的。 3. 配置 Log4j 将 org.apache.kafka.clients.consumer 的 Log level 配置为 DEBUG 或 TRACE,在日志中获取到更多的信息以帮助排查。

    希望有所帮助!*来自志愿者整理的flink

    2021-12-05 12:59:13
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载