开发者社区> 问答> 正文

flink表api异常

"我正在使用flink表api,使用kafka作为输入源,使用json作为表模式。提交程序时出现此错误:`程序完成以下异常:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:

Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=localhost:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_table_test
connector.type=kafka
connector.version=0.10
format.fail-on-missing-field=true
format.json-schema={t'type': 'object',t'properties': {tt'uid': {ttt'type': 'number'tt},tt'name': {ttt'type': 'string'tt},tt'age': {ttt'type': 'number'tt},tt'timestamp': {ttt'type': 'long'tt}t}}
format.property-version=1
format.type=json
schema.0.name=uid
schema.0.type=BIGINT
schema.1.name=name
schema.1.type=VARCHAR
schema.2.name=age
schema.2.type=INT
schema.3.name=ts
schema.3.type=TIMESTAMP
update-mode=append

The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:44)
at com.akulaku.data.flink.QueryTable$.main(QueryTable.scala:53)
at com.akulaku.data.flink.QueryTable.main(QueryTable.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more

展开
收起
flink小助手 2018-11-28 16:30:42 11089 0
2 条回答
写回答
取消 提交回答
  • flink助手的方式解决了你的问题吗?


    image


    官网的这段话也只是说了我们配置的连接属性会转化为键值对,table factory 通过键值对配置的属性来解析,通过java来完成factory的找寻工作,如果找不到或找到多个table factory 都能匹配给定的属性,那么就会抛出我们遇到的这个异常。但是却没有说明如果遇到如何解决,请问你目前解决了没有?

    2019-07-17 23:16:52
    赞同 展开评论 打赏
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    尝试添加kafka连接器sql jar(例如flink-connector-kafka-0.11_2.11-1.6.1-sql-jar.jar)。您可以在此处找到连接器依赖项的完整列表。

    2019-07-17 23:16:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载