01 引言
因为最近需要使用自定义Connectors,所以参照了Flink官网的教程,整理了在Flink上如何实现自定义Source和Sink。
官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sourcessinks/
首先我们需要知道动态表(dynamic tables)只是一个逻辑概念,Flink本身并不拥有数据。相反,动态表的内容存储在外部系统(如数据库、键值存储、消息队列)或文件中。
动态源(sources)和动态接收器(sink)可用于从外部系统读取数据和向外部系统写入数据,一般源(sources)和接收器(sink)通常用术语连接器(connector)下进行总结。
02 Connector概述
在许多情况下,开发人员不需要从头开始创建新的连接器(connector),只要稍微修改现有的连接器或钩子,就可以加载连接器到现有的堆栈。
Flink 实现Connector的流程如下:
从上图可以看到,Flink
实现Connector
连接器主要分为Metadata
(元数据)、Planning
(规划),Runtime
(运行时提供者)三个部分的内容。
2.1 Metadata - 元数据模块
Table API 和 SQL 都是声明式 API,注意是表的声明。因此,如上图所示,在执行CREATE TABLE语句会导致目标目录Catalog中的元数据更新。
对于大多数目标目录Catalog实现,外部系统中的物理数据不会针对此类操作进行修改,因此Connector连接器的依赖项不必存在于类路径中,SQL语句中声明的选项WITH既不被验证也不被解释。
动态表的元数据(即通过 DDL 创建或由目录提供)表示为CatalogTable, 必要时,将在CatalogTable内部解析表名。
2.2 Planning - 规划模块
如上图所示,目标目录CatalogTable下一步在对表程序进行规划和优化时,CatalogTable需要将a解析为两种动态表类型,分别是:
- DynamicTableSource:用于在查询中读取SELECT
- DynamicTableSink:用于在语句中写入INSERT INTO
DynamicTableSourceFactory和DynamicTableSinkFactory提供特定于连接器的逻辑,用于将元数据转换为DynamicTableSource 和 DynamicTableSink实例。在大多数情况下,工厂的目的是验证选项(例如:DynamicTableSourceDynamicTableSink'port' = '5022'),配置编码/解码格式(如果需要),并创建表连接器的参数化实例。
默认情况下,使用 Java 的服务提供者接口 (SPI)实现DynamicTableSourceFactory和DynamicTableSinkFactory。连接器选项(例如 'connector' = 'custom')必须对应于有效的工厂标识符。
虽然它在类命名中可能并不明显,但DynamicTableSource和DynamicTableSink 可以看作是有状态的工厂,最终产生具体的运行时实现来读取/写入实际数据。
规划器planner使用源source和接收器sink实例来执行特定于连接器connector的双向通信,直到找到最佳逻辑规划。
根据可选声明的能力接口(例如 SupportsProjectionPushDown或SupportsOverwrite),规划器planner可能会将更改应用于实例,从而改变生成的运行时实现。
2.3 Runtime - 运行时模块
一旦逻辑规划完成,规划器planner将从表连接器connector获取运行时实现。运行时逻辑在 Flink 的核心连接器接口中实现,例如InputFormat或SourceFunction。
这些接口按另一个抽象级别分组为ScanRuntimeProvider、 LookupRuntimeProvider和的子类SinkRuntimeProvider。
例如:
- OutputFormatProvider:providing org.apache.flink.api.common.io.OutputFormat
- SinkFunctionProvider:providing org.apache.flink.streaming.api.functions.sink.SinkFunction
都是SinkRuntimeProvider 计划者Planner可以处理的具体实例。
03 Connector相关API
3.1 Dynamic Table Factories - 动态表工厂
动态表工厂用于根据目录和会话信息为外部存储系统配置动态表连接器。
- org.apache.flink.table.factories.DynamicTableSourceFactory :可以实现构造一个DynamicTableSource.
- org.apache.flink.table.factories.DynamicTableSinkFactory:可以实现构造一个DynamicTableSink.
默认情况下,使用connector选项的值作为工厂标识符和Java的服务提供者接口(SPI)来发现工厂。
在 JAR 文件中,可以将对新实现的引用添加到服务文件中:
- META-INF/services/org.apache.flink.table.factories.Factory
该框架由工厂标识符和请求的基类唯一标识的单个匹配工厂检查,例如:DynamicTableSourceFactory。
如有必要,目录实现可以绕过工厂发现过程。为此,目录需要返回一个实例,该实例在org.apache.flink.table.catalog.Catalog#getFactory.
3.2 Dynamic Table Source - 动态表源
根据定义,动态表可以随时间变化。
在读取动态表时,内容可以被认为是:
- 一个更改日志(有限或无限),所有更改都会持续使用,直到更改日志用完,这由ScanTableSource接口表示。
- 一个不断变化的或非常大的外部表,其内容通常不会被完全读取,而是在必要时查询单个值 ,这由LookupTableSource 接口表示。
一个类可以同时实现这两个接口,规划器根据指定的查询决定它们的使用。
3.2.1 Scan Table Source - 扫描表源
ScanTableSource(扫描表源)在运行时扫描来自外部存储系统的所有行。
扫描的行不一定只包含插入,还可以包含更新和删除。因此,表源可用于读取(有限或无限)变更日志。返回的更改日志模式指示计划程序在运行时可以预期的一组更改。
- 对于常规的批处理场景,源可以发出有限的仅插入行流。
- 对于常规流式处理方案,源可以发出无限制的仅插入行流。
- 对于变更数据捕获 (CDC) 方案,源可以发出带有插入、更新和删除行的有界或无界流。
Table Source可以实现进一步的能力接口,SupportsProjectionPushDown在规划期间可能会改变实例。所有能力都可以在org.apache.flink.table.connector.source.abilities 包中找到并列在源能力表中。
运行时实现的ScanTableSource必须产生内部数据结构,因此,记录必须以org.apache.flink.table.data.RowData. 该框架提供了运行时转换器,因此源仍然可以处理常见的数据结构并在最后执行转换。
3.2.2 Lookup Table Source - 查找表源
Lookup TableSource在运行时通过一个或多个键查找外部存储系统的行。
- 与ScanTableSource相比,源不必读取整个表,并且可以在必要时从(可能不断变化的)外部表中懒惰地获取单个值。
- 与ScanTableSource相比, LookupTableSource当前仅支持发出仅插入更改。
LookupTableSource 的运行时由 TableFunction或AsyncTableFunction实现,该函数将在运行时使用给定查找键的值调用。
3.3 Dynamic Table Sink - 动态表接收器
根据定义,动态表可以随时间变化。在编写动态表时,可以始终将内容视为更改日志(有限或无限),其中所有更改都被连续写出,直到更改日志用完为止。返回的更改日志模式 指示接收器在运行时接受的更改集。
- 对于常规批处理场景,接收器可以仅接受仅插入行并写出有界流。
- 对于常规的流式处理方案,接收器只能接受仅插入行,并且可以写出无界流。
- 对于变更数据捕获 (CDC) 场景,接收器可以使用插入、更新和删除行写出有界或无界流。
表接收器可以实现进一步的能力接口,所有能力都可以在org.apache.flink.table.connector.sink.abilities 包装中找到,并列在水槽能力表中。
运行时实现DynamicTableSink必须使用内部数据结构。因此,记录必须被接受为org.apache.flink.table.data.RowData, 该框架提供了运行时转换器,因此接收器仍然可以在通用数据结构上工作并在开始时执行转换。
3.4 Encoding / Decoding Formats - 编码/解码格式
一些表连接器接受对键和/或值进行编码和解码的不同格式。
格式的工作方式类似于模式DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider,工厂负责翻译选项,源负责创建运行时逻辑。
因为格式可能位于不同的模块中,所以使用类似于表工厂的 Java 服务提供者接口来发现它们。为了发现格式工厂,动态表工厂搜索与工厂标识符和特定于连接器的基类相对应的工厂。
例如,Kafka 表源需要一个DeserializationSchema作为运行时接口的解码格式。因此,Kafka 表源工厂使用该value.format选项的值来发现一个DeserializationFormatFactory.
当前支持以下格式工厂:
org.apache.flink.table.factories.DeserializationFormatFactory org.apache.flink.table.factories.SerializationFormatFactory
格式工厂将选项转换为EncodingFormat或DecodingFormat。这些接口是另一种为给定数据类型生成专用格式运行时逻辑的工厂。
例如:对于 Kafka 表源工厂,DeserializationFormatFactory将返回一个EncodingFormat<DeserializationSchema> 可以传递给 Kafka 表源的值。
04 案例演示
需求:表源使用简单的单线程SourceFunction打开一个监听传入字节的socket,原始字节通过可插入格式解码成行,该格式需要一个更改日志标志作为第一列。
下面讲解所有组件是如何协同工作的,比如:
- 创建解析和验证选项的工厂
- 实现表连接器
- 实现和发现自定义格式,
- 使用提供的工具类,例如:数据结构转换器和FactoryUtil.
我们将使用上面提到的大部分接口来启用以下 DDL:
CREATE TABLE UserScores (name STRING, score INT) WITH ( 'connector' = 'socket', 'hostname' = 'localhost', 'port' = '9999', 'byte-delimiter' = '10', 'format' = 'changelog-csv', 'changelog-csv.column-delimiter' = '|' );
由于该格式支持变更日志语义,我们能够在运行时摄取更新并创建一个可以持续评估变化数据的更新视图:
SELECT name, SUM(score) FROM UserScores GROUP BY name;
使用以下命令在终端中抓取数据:
> nc -lk 9999 INSERT|Alice|12 INSERT|Bob|5 DELETE|Alice|12 INSERT|Alice|18
4.1 工厂实现
这里说明如何将来自目录的元数据(Metadata)转换为具体的连接器实例。
注意:两个工厂都已添加到META-INF/services目录中
4.1.1 SocketDynamicTableFactory
这里将SocketDynamicTableFactory目录表转换为表源(因为表源需要解码格式,所以FactoryUtil为了方便起见,我们正在使用提供的格式来发现格式)。
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; public class SocketDynamicTableFactory implements DynamicTableSourceFactory { // define all options statically public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname") .stringType() .noDefaultValue(); public static final ConfigOption<Integer> PORT = ConfigOptions.key("port") .intType() .noDefaultValue(); public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter") .intType() .defaultValue(10); // corresponds to '\n' @Override public String factoryIdentifier() { return "socket"; // used for matching to `connector = '...'` } @Override public Set<ConfigOption<?>> requiredOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(HOSTNAME); options.add(PORT); options.add(FactoryUtil.FORMAT); // use pre-defined option for format return options; } @Override public Set<ConfigOption<?>> optionalOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(BYTE_DELIMITER); return options; } @Override public DynamicTableSource createDynamicTableSource(Context context) { // either implement your custom validation logic here ... // or use the provided helper utility final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); // discover a suitable decoding format final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat( DeserializationFormatFactory.class, FactoryUtil.FORMAT); // validate all options helper.validate(); // get the validated options final ReadableConfig options = helper.getOptions(); final String hostname = options.get(HOSTNAME); final int port = options.get(PORT); final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER); // derive the produced data type (excluding computed columns) from the catalog table final DataType producedDataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); // create and return dynamic table source return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType); } }
4.1.2 ChangelogCsvFormatFactory
ChangelogCsvFormatFactory的功能主要是转换为特定的格式。
FactoryUtilinSocketDynamicTableFactory 负责相应地调整选项键(option keys)并处理前缀,如changelog-csv.column-delimiter.
因为这个工厂实现DeserializationFormatFactory了 ,它也可以用于支持反序列化格式的其他连接器,例如 Kafka 连接器。
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableFactory; public class ChangelogCsvFormatFactory implements DeserializationFormatFactory { // define all options statically public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter") .stringType() .defaultValue("|"); @Override public String factoryIdentifier() { return "changelog-csv"; } @Override public Set<ConfigOption<?>> requiredOptions() { return Collections.emptySet(); } @Override public Set<ConfigOption<?>> optionalOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(COLUMN_DELIMITER); return options; } @Override public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { // either implement your custom validation logic here ... // or use the provided helper method FactoryUtil.validateFactoryOptions(this, formatOptions); // get the validated options final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER); // create and return the format return new ChangelogCsvFormat(columnDelimiter); } }
4.2 表源和解码格式
这里讲解如何从计划层的实例转换为交付到集群的运行时实例。
4.2.1 SocketDynamicTableSource
SocketDynamicTableSource在计划期间使用。在我们的示例中,我们没有实现任何可用的能力接口。
因此,主要逻辑(getScanRuntimeProvider(...)) 可以在我们实例化所需SourceFunction及其DeserializationSchema运行时的地方找到。两个实例都被参数化以返回内部数据结构(即RowData)。
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; public class SocketDynamicTableSource implements ScanTableSource { private final String hostname; private final int port; private final byte byteDelimiter; private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat; private final DataType producedDataType; public SocketDynamicTableSource( String hostname, int port, byte byteDelimiter, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, DataType producedDataType) { this.hostname = hostname; this.port = port; this.byteDelimiter = byteDelimiter; this.decodingFormat = decodingFormat; this.producedDataType = producedDataType; } @Override public ChangelogMode getChangelogMode() { // in our example the format decides about the changelog mode // but it could also be the source itself return decodingFormat.getChangelogMode(); } @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { // create runtime classes that are shipped to the cluster final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder( runtimeProviderContext, producedDataType); final SourceFunction<RowData> sourceFunction = new SocketSourceFunction( hostname, port, byteDelimiter, deserializer); return SourceFunctionProvider.of(sourceFunction, false); } @Override public DynamicTableSource copy() { return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType); } @Override public String asSummaryString() { return "Socket Table Source"; } }
4.2.2 ChangelogCsvFormat
ChangelogCsvFormat
是DeserializationSchema
在运行时使用的解码格式。它支持INSERT
和DELETE
更改。
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.RowKind; public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> { private final String columnDelimiter; public ChangelogCsvFormat(String columnDelimiter) { this.columnDelimiter = columnDelimiter; } @Override @SuppressWarnings("unchecked") public DeserializationSchema<RowData> createRuntimeDecoder( DynamicTableSource.Context context, DataType producedDataType) { // create type information for the DeserializationSchema final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation( producedDataType); // most of the code in DeserializationSchema will not work on internal data structures // create a converter for conversion at the end final DataStructureConverter converter = context.createDataStructureConverter(producedDataType); // use logical types during runtime for parsing final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren(); // create runtime class return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter); } @Override public ChangelogMode getChangelogMode() { // define that this format can produce INSERT and DELETE rows return ChangelogMode.newBuilder() .addContainedKind(RowKind.INSERT) .addContainedKind(RowKind.DELETE) .build(); } }
4.3 Runtime运行时
接下来将说明SourceFunction和DeserializationSchema的运行时逻辑。
4.3.1 ChangelogCsvDeserializer
ChangelogCsvDeserializer包含一个简单的解析逻辑,用于将字节转换Integer 类型Row行为String,最后的转换步骤将它们转换为内部数据结构。
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.connector.RuntimeConverter.Context; import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> { private final List<LogicalType> parsingTypes; private final DataStructureConverter converter; private final TypeInformation<RowData> producedTypeInfo; private final String columnDelimiter; public ChangelogCsvDeserializer( List<LogicalType> parsingTypes, DataStructureConverter converter, TypeInformation<RowData> producedTypeInfo, String columnDelimiter) { this.parsingTypes = parsingTypes; this.converter = converter; this.producedTypeInfo = producedTypeInfo; this.columnDelimiter = columnDelimiter; } @Override public TypeInformation<RowData> getProducedType() { // return the type information required by Flink's core interfaces return producedTypeInfo; } @Override public void open(InitializationContext context) { // converters must be open converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader())); } @Override public RowData deserialize(byte[] message) { // parse the columns including a changelog flag final String[] columns = new String(message).split(Pattern.quote(columnDelimiter)); final RowKind kind = RowKind.valueOf(columns[0]); final Row row = new Row(kind, parsingTypes.size()); for (int i = 0; i < parsingTypes.size(); i++) { row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1])); } // convert to internal data structure return (RowData) converter.toInternal(row); } private static Object parse(LogicalTypeRoot root, String value) { switch (root) { case INTEGER: return Integer.parseInt(value); case VARCHAR: return value; default: throw new IllegalArgumentException(); } } @Override public boolean isEndOfStream(RowData nextElement) { return false; } }
4.3.2 SocketSourceFunction
SocketSourceFunction打开一个套接字并消费字节。它通过给定的字节分隔符(\n默认情况下)拆分记录,并将解码委托给可插入的DeserializationSchema
,源函数只能在并行度为 1 的情况下工作。
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.table.data.RowData; public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> { private final String hostname; private final int port; private final byte byteDelimiter; private final DeserializationSchema<RowData> deserializer; private volatile boolean isRunning = true; private Socket currentSocket; public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) { this.hostname = hostname; this.port = port; this.byteDelimiter = byteDelimiter; this.deserializer = deserializer; } @Override public TypeInformation<RowData> getProducedType() { return deserializer.getProducedType(); } @Override public void open(Configuration parameters) throws Exception { deserializer.open(() -> getRuntimeContext().getMetricGroup()); } @Override public void run(SourceContext<RowData> ctx) throws Exception { while (isRunning) { // open and consume from socket try (final Socket socket = new Socket()) { currentSocket = socket; socket.connect(new InetSocketAddress(hostname, port), 0); try (InputStream stream = socket.getInputStream()) { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); int b; while ((b = stream.read()) >= 0) { // buffer until delimiter if (b != byteDelimiter) { buffer.write(b); } // decode and emit record else { ctx.collect(deserializer.deserialize(buffer.toByteArray())); buffer.reset(); } } } } catch (Throwable t) { t.printStackTrace(); // print and continue } Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; try { currentSocket.close(); } catch (Throwable t) { // ignore } } }