开发者社区> 问答> 正文

Flink 1.11.2 SQL消费kafka写Hive报错

Dear All,

我在使用Flink SQL消费kafka并写入hive时报错,代码和报错信息如下

其中,Flink 版本为1.11.2,kafka connector使用的是flink-sql-connector-kafka。 搜索了很久,依然没有解决,相同的sql语句在flink sql clients可以执行,但在Java代码中,提交到hadoop集群后就报错如下:

请指教

Java Code

TableResult tableResult = tableEnvironment.executeSql("DROP TABLE IF EXISTS user_behavior_kafka_table"); tableResult.print(); TableResult tableResult2 = tableEnvironment.executeSql( "CREATE TABLE user_behavior_kafka_table (\r\n" + " user_id STRING,\r\n" + " item_id STRING\r\n" + " ) WITH (\r\n" + " 'connector' = 'kafka',\r\n" + " 'topic' = 'TestTopic',\r\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\r\n" + " 'properties.group.id' = 'consumerTest',\r\n" + " 'scan.startup.mode' = 'earliest-offset',\r\n" + " 'format' = 'json'\r\n" + ")"); tableResult2.print();

// 数据写入 tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnvironment.executeSql( "INSERT INTO user_behavior_hive_table SELECT user_id, item_id FROM user_behavior_kafka_table");

POM File

org.apache.flink flink-json ${flink.version}

org.apache.flink flink-streaming-java_2.11 ${flink.version} provided

org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided

org.apache.flink

flink-table-api-java-bridge_${scala.binary.version} ${flink.version} provided

org.apache.flink

flink-table-planner-blink_${scala.binary.version} ${flink.version} provided

org.apache.flink

flink-sql-connector-kafka_${scala.binary.version} ${flink.version}

org.apache.flink flink-shaded-hadoop-2-uber 2.7.5-10.0 provided

org.apache.flink flink-connector-hive_2.11 ${flink.version} provided

org.apache.hive hive-exec ${hive.version} provided

Error Messge

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='kafka''. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSource(HiveDynamicTableFactory.java:81) ~[flink-connector-hive_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) ~[flink-table-blink_2.11-1.11.2.jar:1.11.2] at com.newegg.flink.sqlkafka.Main.main(Main.java:66) ~[flink-0.0.1-SNAPSHOT.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_191-ojdkbuild] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_191-ojdkbuild] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_191-ojdkbuild] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191-ojdkbuild] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.2.jar:1.11.2] ... 10 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-02 14:46:27 975 0
1 条回答
写回答
取消 提交回答
  • Maybe you miss the kafka connector dependency in your pom, you could refer to this url : https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:01:34
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hive Bucketing in Apache Spark 立即下载
spark替代HIVE实现ETL作业 立即下载
2019大数据技术公开课第五季—Hive迁移到MaxCompute最佳实践 立即下载