开发者社区> 问答> 正文

Flink job消费kafka 失败,无法拿到offset值,请指教

有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。

Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:

java.lang.Exception: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition Test-topic-27 could be determined at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.runtime.taskmanage有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。

Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:

java.lang.Exception: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition Test-topic-27 could be determined at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition Test-topic-27 could be determined

查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。

请指教*来自志愿者整理的flinkr.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition Test-topic-27 could be determined

查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。

请指教*来自志愿者整理的flink

展开
收起
毛毛虫雨 2021-12-05 11:45:47 1890 0
1 条回答
写回答
取消 提交回答
  • 一个查Flink与集群B的网络连通性,一个是查集群B本身服务状态,比如是所有topic都连不上还是某个topic连不上,是不是有节点挂掉了,另外一个再看看kafka的_offset_consumer是不是单节点正好挂了,如果是单个topic看分区问题。*来自志愿者整理的flink

    2021-12-05 12:58:51
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载