开发者社区> 问答> 正文

Flink1.9 sql 提交失败

代码: EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings);

tableEnv.connect( new Kafka() .version("0.10") .topic("installmentdb_t_user") .startFromEarliest() .property("zookeeper.connect", "risk-kafka.aku:2181") .property("bootstrap.servers", "risk-kafka.aku:9092")) .withFormat(new Json().deriveSchema()) .withSchema(new Schema() .field("business", Types.STRING) .field("type", Types.STRING) .field("es", Types.LONG) ) .inAppendMode().registerTableSource("installmentdb_t_user");

Starting execution of program


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at feature.flinktask.sqltest.main(sqltest.java:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 12 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.

Reason: No context matches.

The following properties are requested: connector.properties.0.key=zookeeper.connect connector.properties.0.value=risk-kafka.aku:2181 connector.properties.1.key=bootstrap.servers connector.properties.1.value=risk-kafka.aku:9092 connector.property-version=1 connector.startup-mode=earliest-offset connector.topic=installmentdb_t_user connector.type=kafka connector.version=0.10 format.derive-schema=true format.property-version=1 format.type=json schema.0.name=business schema.0.type=VARCHAR schema.1.name=type schema.1.type=VARCHAR schema.2.name=es schema.2.type=BIGINT update-mode=append

The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.formats.json.JsonRowFormatFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) ... 20 more*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-08 11:10:04 955 0
1 条回答
写回答
取消 提交回答
  • 看你的报错,Kafka010TableSourceSinkFactory不在classpath里,需要把kafka connector的jar(0.10需要依赖flink-connector-kafka-0.10_2.11或者flink-connector-kafka-0.10_2.12)放到依赖里。*来自志愿者整理的flink邮件归档

    2021-12-08 19:28:36
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载