开发者社区 问答 正文

【Flink SQL】无法启动 env.yaml怎么处理?

在服务器上试用sql-client时,启动指令如下:

./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d /data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml -e /root/flink-sql-client/sql-client-demo.yml

配置如下:

定义表

tables: - name: SourceTable type: source-table update-mode: append connector: type: datagen rows-per-second: 5 fields: f_sequence: kind: sequence start: 1 end: 1000 f_random: min: 1 max: 1000 f_random_str: length: 10 schema: - name: f_sequence data-type: INT - name: f_random data-type: INT - name: f_random_str data-type: STRING

遇到了如下报错:

Reading default environment from: file:/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml Reading session environment from: file:/root/flink-sql-client/sql-client-demo.yml

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.

Reason: Required context properties mismatch.

The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Missing properties: format.type=csv Mismatched properties: 'connector.type' expects 'filesystem', but is 'datagen'

The following properties are requested: connector.fields.f_random.max=1000 connector.fields.f_random.min=1 connector.fields.f_random_str.length=10 connector.fields.f_sequence.end=1000 connector.fields.f_sequence.kind=sequence connector.fields.f_sequence.start=1 connector.rows-per-second=5 connector.type=datagen schema.0.data-type=INT schema.0.name=f_sequence schema.1.data-type=INT schema.1.name=f_random schema.2.data-type=STRING schema.2.name=f_random_str update-mode=append

The following factories have been considered: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.filesystem.FileSystemTableFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523) at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:183) at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:136) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859) ... 3 more

看描述是有包找不到,到我看官网上说 json 的解析 jar 在 sql-client 中包含啊,试用 sql-client 也需要自己导包的么?哪里有更详细的资料,求指点,谢谢

*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-06 12:36:45 997 分享 版权
阿里云 AI 助理回答

遇到的这个错误是由于Flink SQL客户端在尝试创建表时,找不到与你配置的datagen连接器类型相匹配的TableSourceFactory。错误信息显示它期望找到一个类型为filesystem的连接器,而你的配置却是datagen,这导致了不匹配。

Flink SQL客户端通常会自带一些基本的依赖和连接器工厂,但如果你使用的是特定的或自定义的连接器(如datagen),可能需要确保所有必要的依赖都已正确添加到类路径中。对于datagen这样的内置连接器,理论上应该不需要额外引入jar包,因为它应当包含在Flink发行版内。

解决这个问题的步骤可以包括:

  1. 确认版本兼容性:首先确认你使用的Flink SQL客户端版本是否支持datagen连接器。查看Flink官方文档或发行说明,确认该功能在你使用的1.11.2版本中是否可用且无已知问题。

  2. 检查环境配置:确认你的Flink安装没有遗漏任何组件或依赖。有时候,虽然某些组件随Flink一起发布,但在某些定制或部分安装中可能会被忽略。

  3. 手动添加依赖:如果确认是依赖缺失的问题,尽管不太常见于内置连接器,你可以尝试手动将相关依赖添加到你的项目或SQL客户端的类路径中。对于datagen,这通常不应该必要,但如果其他方法都无效,可以考虑检查Flink的Maven仓库,找到对应的flink-connector-datagen jar包,并将其加入到你的/root/flink-sql-client/libs/目录下。

  4. 查看官方文档:Flink官网有详细的Table API & SQL文档,其中包含了关于如何使用不同类型的连接器和数据源的信息。针对你的具体需求,查找是否有专门针对datagen连接器的使用指南或示例。

  5. 社区求助:如果上述步骤都不能解决问题,可以在Apache Flink的用户邮件列表或者Stack Overflow上提问,提供详细的问题描述和你已经尝试过的解决步骤,社区成员很可能会给出帮助。

请按照这些步骤进行排查,希望能帮助你解决问题。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答