flink表api异常-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

flink表api异常

2018-11-28 16:30:42 10275 2

"我正在使用flink表api,使用kafka作为输入源,使用json作为表模式。提交程序时出现此错误:`程序完成以下异常:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:

Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=localhost:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_table_test
connector.type=kafka
connector.version=0.10
format.fail-on-missing-field=true
format.json-schema={t'type': 'object',t'properties': {tt'uid': {ttt'type': 'number'tt},tt'name': {ttt'type': 'string'tt},tt'age': {ttt'type': 'number'tt},tt'timestamp': {ttt'type': 'long'tt}t}}
format.property-version=1
format.type=json
schema.0.name=uid
schema.0.type=BIGINT
schema.1.name=name
schema.1.type=VARCHAR
schema.2.name=age
schema.2.type=INT
schema.3.name=ts
schema.3.type=TIMESTAMP
update-mode=append

The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:44)
at com.akulaku.data.flink.QueryTable$.main(QueryTable.scala:53)
at com.akulaku.data.flink.QueryTable.main(QueryTable.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more
取消 提交回答
全部回答(2)
  • 西北笑无常
    2019-07-17 23:16:52

    flink助手的方式解决了你的问题吗?


    image


    官网的这段话也只是说了我们配置的连接属性会转化为键值对,table factory 通过键值对配置的属性来解析,通过java来完成factory的找寻工作,如果找不到或找到多个table factory 都能匹配给定的属性,那么就会抛出我们遇到的这个异常。但是却没有说明如果遇到如何解决,请问你目前解决了没有?

    0 0
  • flink小助手
    2019-07-17 23:16:52

    尝试添加kafka连接器sql jar(例如flink-connector-kafka-0.11_2.11-1.6.1-sql-jar.jar)。您可以在此处找到连接器依赖项的完整列表。

    0 0
添加回答
相关问答

1

回答

Flink SQL常用的内置函数有哪些?

2021-12-08 21:15:57 96浏览量 回答数 1

1

回答

Flink SQL中的GROUP BY的作用及示例有什么?

2021-12-08 21:08:23 194浏览量 回答数 1

1

回答

Flink SQL中的SELECT的作用是什么?

2021-12-08 21:05:40 145浏览量 回答数 1

1

回答

Flink SQL 的编程模型是什么?

2021-12-08 21:02:21 88浏览量 回答数 1

1

回答

Flink SQL中的时态表是什么?

2021-12-08 21:01:21 214浏览量 回答数 1

1

回答

Flink SQL的背景是什么?

2021-12-08 20:59:23 99浏览量 回答数 1

1

回答

如何用 Flink SQL 做简单的数据去重?

2021-12-07 17:22:10 362浏览量 回答数 1

1

回答

请教大神们关于flink-sql中数据赋值问题

2021-12-07 10:53:25 409浏览量 回答数 1

1

回答

flink sql实时计算分位数如何实现

2021-12-06 11:44:19 748浏览量 回答数 1

1

回答

Flink SQL读取复杂JSON格式

2021-12-06 12:16:54 564浏览量 回答数 1
+关注
flink小助手
flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。
0
文章
377
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载