开发者社区> 问答> 正文

flink 1.9 消费kafka报错

大家好, 升级到1.9后有几个问题: 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011

val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties) 但是现在这个类已经找不到了

2.所以我使用了 FlinkKafkaConsumer val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties) 不知道这个consumer背后对应的kafka版本是多少

3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}

不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory

引入flink-table-api-java-bridge_${scala.binary.version}后还是报错: Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329) at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286) at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366) at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336) at com.test.StreamingJob$.main(StreamingJob.scala:52) at com.test.StreamingJob.main(StreamingJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.

Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.

The following properties are requested: batch-mode=false

The following factories have been considered: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory

我的pom文件如下:

org.apache.flink flink-scala_${scala.binary.version} ${flink.version} provided org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} provided

org.scala-lang scala-library ${scala.version} provided

org.apache.flink flink-connector-kafka_2.11 ${flink.version} compile org.apache.flink flink-table-common ${flink.version}

org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-scala-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version}

谢谢大家*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 22:11:40 980 0
1 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载