假设我们定义了一个自定义的TableSource和TableSink,那么如何与SQL Client集成?我是否需要手动注册下面的自定义TableSource Sink名称?如果不手动注册,连接器类型custom1 map 如何与custom1TableSource相关?
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
TableSource custom1TableSource = new custom1TableSource ( );
tableEnv.registerTableSource("custom1", custom1TableSource);
然后配置下面的环境文件?
tables:
- name: custom1TableSource
type: Source
update-mode: append
connector:
property-version: 1
type: ***custom1****
我声明的源和接收器:
package com.abc;
public static class custom1TableSource implements StreamTableSource, DefinedRowtimeAttributes, DefinedProctimeAttribute {
package com.abc;
public static class custom1TableSink implements TableSink, AppendStreamTableSink {
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/sqlClient.html#configuration
更新:
在从源代码检查后,我发现Flink通过实现StreamTableSinkFactory和ServiceLoader创建的Factory创建接收器和源实例,但是如何将接收器和源名称注册到TableSource和TabSink类?
SQL Client和Table&SQL API TableFactory都使用Java的服务提供程序接口(SPI)发现的所谓的s。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。