01 引言
在上一节《Flink自定义Connector》,我们知道了Flink自定义Connector
的基本流程,其流程图如下:
进入代码层面,开发者自定义connector的流程如下:
- 首先定义Flink SQL的DDL;
- 创建解析和验证选项的工厂(DynamicTableSourceFactory、DeserializationFormatFactory),注意两个工厂都已添加到META-INF/services目录中;
- 实现ScanTableSource;
- 具体业务细节在getScanRuntimeProvider实现;
为了掌握Flink自定义Connector,本文直接从源码出发,研究Flink的kafka connector是如何实现的?
02 Kafka-Connector 源码分析
2.1 项目结构
导入Flink
的源码后,可以看到Kafka Connector
的源码结构如下:
先来看看里面的pom
文件,其余细节不用看,主要看依赖的内容,下面做了一些整理与添加了相关的描述:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>flink-connectors</artifactId> <groupId>org.apache.flink</groupId> <version>1.13.6</version> <relativePath>..</relativePath> </parent> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <name>Flink : Connectors : Kafka</name> <packaging>jar</packaging> <properties> <kafka.version>2.4.1</kafka.version> </properties> <dependencies> <!----- Flink的相关依赖,带flink的都是 -----> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <!-- 其它Flink相关依赖,此处不再详述 ...... --> </dependency> <!-- Kafka相关依赖 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> </dependencies> <build> <!-- 打包插件的定义这些内容.....--> </build> </project>
2.2 工厂(源码解读入口)
在src/main/resources/META-INF/services
目录,我们可以看到了有两个文件:
结合引言里面的《流程图》,可以知道定义完DDL后,工厂就是整个流程的入口了,我们看看里面的两个文件内容。
org.apache.flink.table.factories.Factory(Table SQL模式):
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory
org.apache.flink.table.factories.TableFactory(Table API模式):
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
配置文件可以得知,我们需要继续研读以下工厂类:
- KafkaDynamicTableFactory
- UpsertKafkaDynamicTableFactory
- KafkaTableSourceSinkFactory
2.2.1 KafkaDynamicTableFactory
KafkaDynamicTableFactory 的详细代码如下,里面已经写好注释了,可以方便大家的理解:
/** * Kafka KafkaDynamicTable工厂 * <p> * 描述:其作用是为 KafkaDynamicSource和KafkaDynamicSink类配置实例 * * @author : YangLinWei * @createTime: 2022/4/12 11:06 上午 * @version: 1.0.0 */ @Internal public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { public static final String IDENTIFIER = "kafka"; /** * 返回一个唯一的标识符(对应的是FlinkSQL里面的`connector = '...'`) */ @Override public String factoryIdentifier() { return IDENTIFIER; } /** * 设置该工厂实现需要(必填)的Options集合 */ @Override public Set<ConfigOption<?>> requiredOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(PROPS_BOOTSTRAP_SERVERS); return options; } /** * 设置工厂可能需要(非必填)的Options集合 */ @Override public Set<ConfigOption<?>> optionalOptions() { final Set<ConfigOption<?>> options = new HashSet<>(); options.add(FactoryUtil.FORMAT); options.add(KEY_FORMAT); options.add(KEY_FIELDS); options.add(KEY_FIELDS_PREFIX); options.add(VALUE_FORMAT); options.add(VALUE_FIELDS_INCLUDE); options.add(TOPIC); options.add(TOPIC_PATTERN); options.add(PROPS_GROUP_ID); options.add(SCAN_STARTUP_MODE); options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); options.add(SCAN_TOPIC_PARTITION_DISCOVERY); options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); options.add(SINK_PARTITIONER); options.add(SINK_SEMANTIC); options.add(SINK_PARALLELISM); return options; } /** * 创建DynamicTableSource * <p> * 描述:在这个方法中执行验证和进一步嵌套工厂 */ @Override public DynamicTableSource createDynamicTableSource(Context context) { //1. 在这里使用FactoryUtil实现的自定义验证逻辑… final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); //2. 获取所有的配置 final ReadableConfig tableOptions = helper.getOptions(); //3. 获取一个合适的key解码格式 final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = getKeyDecodingFormat(helper); //4. 获取一个合适的value解码格式 final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = getValueDecodingFormat(helper); //5. 校验参数(参数格式、主题、starupmode、主键等,校验失败直接抛异常) helper.validateExcept(PROPERTIES_PREFIX); validateTableSourceOptions(tableOptions); validatePKConstraints( context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); //6. 获取参数并设置参数到Properties final StartupOptions startupOptions = getStartupOptions(tableOptions); final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); properties.setProperty( FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, String.valueOf( tableOptions .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY) .map(Duration::toMillis) .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED))); final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); // 7. 最后创建DynamicTableSource实例并注入参数 return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), valueDecodingFormat, keyProjection, valueProjection, keyPrefix, KafkaOptions.getSourceTopics(tableOptions), KafkaOptions.getSourceTopicPattern(tableOptions), properties, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis); } /** * 创建DynamicTableSink(原理和创建DynamicTableSource几乎一致) * <p> * 描述:在这个方法中执行验证和进一步嵌套工厂 */ @Override public DynamicTableSink createDynamicTableSink(Context context) { //1. 在这里使用FactoryUtil实现的自定义验证逻辑… final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper( this, autoCompleteSchemaRegistrySubject(context)); //2. 获取所有的配置 final ReadableConfig tableOptions = helper.getOptions(); //3. 获取一个合适的key解码格式 final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat = getKeyEncodingFormat(helper); //4. 获取一个合适的value解码格式 final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = getValueEncodingFormat(helper); //5. 校验参数(参数格式、主题、starupmode、主键等,校验失败直接抛异常) helper.validateExcept(PROPERTIES_PREFIX); validateTableSinkOptions(tableOptions); validatePKConstraints( context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat); //6. 获取参数 final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null); // 7. 最后创建DynamicTableSink实例并注入参数 return createKafkaTableSink( physicalDataType, keyEncodingFormat.orElse(null), valueEncodingFormat, keyProjection, valueProjection, keyPrefix, tableOptions.get(TOPIC).get(0), getKafkaProperties(context.getCatalogTable().getOptions()), getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null), getSinkSemantic(tableOptions), parallelism); } // ---以下方法均是上面代码的校验、创建实例细节,无需深入研究,了解即可----------------------------------- private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat( TableFactoryHelper helper) { final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat = helper.discoverOptionalDecodingFormat( DeserializationFormatFactory.class, KEY_FORMAT); keyDecodingFormat.ifPresent( format -> { if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { throw new ValidationException( String.format( "A key format should only deal with INSERT-only records. " + "But %s has a changelog mode of %s.", helper.getOptions().get(KEY_FORMAT), format.getChangelogMode())); } }); return keyDecodingFormat; } private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat( TableFactoryHelper helper) { final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat = helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT); keyEncodingFormat.ifPresent( format -> { if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { throw new ValidationException( String.format( "A key format should only deal with INSERT-only records. " + "But %s has a changelog mode of %s.", helper.getOptions().get(KEY_FORMAT), format.getChangelogMode())); } }); return keyEncodingFormat; } private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat( TableFactoryHelper helper) { return helper.discoverOptionalDecodingFormat( DeserializationFormatFactory.class, FactoryUtil.FORMAT) .orElseGet( () -> helper.discoverDecodingFormat( DeserializationFormatFactory.class, VALUE_FORMAT)); } private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat( TableFactoryHelper helper) { return helper.discoverOptionalEncodingFormat( SerializationFormatFactory.class, FactoryUtil.FORMAT) .orElseGet( () -> helper.discoverEncodingFormat( SerializationFormatFactory.class, VALUE_FORMAT)); } private static void validatePKConstraints( ObjectIdentifier tableName, CatalogTable catalogTable, Format format) { if (catalogTable.getSchema().getPrimaryKey().isPresent() && format.getChangelogMode().containsOnly(RowKind.INSERT)) { Configuration options = Configuration.fromMap(catalogTable.getOptions()); String formatName = options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT)); throw new ValidationException( String.format( "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + " on the table, because it can't guarantee the semantic of primary key.", tableName.asSummaryString(), formatName)); } } // -------------------------------------------------------------------------------------------- protected KafkaDynamicSource createKafkaTableSource( DataType physicalDataType, @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, @Nullable List<String> topics, @Nullable Pattern topicPattern, Properties properties, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, topics, topicPattern, properties, startupMode, specificStartupOffsets, startupTimestampMillis, false); } protected KafkaDynamicSink createKafkaTableSink( DataType physicalDataType, @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat, EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, String topic, Properties properties, FlinkKafkaPartitioner<RowData> partitioner, KafkaSinkSemantic semantic, Integer parallelism) { return new KafkaDynamicSink( physicalDataType, physicalDataType, keyEncodingFormat, valueEncodingFormat, keyProjection, valueProjection, keyPrefix, topic, properties, partitioner, semantic, false, SinkBufferFlushMode.DISABLED, parallelism); } }
从源码我们得知,KafkaDynamicTableFactory的主要作用有:
- 定义key和value的序列化和反序列化格式工厂
- 校验连接参数
- 注入连接参数到DynamicTableSource和DynamicTableSink实例
后面会继续介绍如何具体实现SerializationFormatFactory、DeserializationFormatFactory,以及DynamicTableSource和DynamicTableSink实例的实现。
2.2.2 UpsertKafkaDynamicTableFactory
UpsertKafkaDynamicTableFactory源码里面未找到这个工厂,其代码流程猜测与KafkaDynamicTableFactory一致。
2.2.3 KafkaTableSourceSinkFactory
Table API模式与Table SQL模式(声明式)原理几乎一致,都是用作查询的输入和输出,只是写法表现上有些区别,因此,这里我们只需要研读Table SQL模式下的工厂代码就好了,这里不再讲解KafkaTableSourceSinkFactory,不影响整体理解。
2.3 序列化和反序列化工厂
2.3.1 DeserializationFormatFactory
从 《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂定义了SerializationFormatFactory、DeserializationFormatFactory,以及DynamicTableSource和DynamicTableSink实例的实现。下面按顺序的讲解。首先来看
看DeserializationFormatFactory。
Ctrl+T,可以查看 DeserializationFormatFactory 的反序列化工厂有哪些实现。
这里拿典型的JsonFormatFactory
来举例,完整代码如下,里面已经写好注释,方便大家的理解:
/** * JsonFormatFactory支持将json字符串转换为RowData行数据 * * @author : YangLinWei * @createTime: 2022/4/12 11:57 上午 * @version: 1.0.0 */ public class JsonFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { /** * 唯一标识(对应Flink SQL的'format' = 'json) */ public static final String IDENTIFIER = "json"; @Override public String factoryIdentifier() { return IDENTIFIER; } /** * 创建解码格式,主要为了创建运行时解码器,即:DeserializationSchema */ @Override public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { FactoryUtil.validateFactoryOptions(this, formatOptions); validateDecodingFormatOptions(formatOptions); final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD); final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions); return new DecodingFormat<DeserializationSchema<RowData>>() { @Override public DeserializationSchema<RowData> createRuntimeDecoder( DynamicTableSource.Context context, DataType producedDataType) { final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation<RowData> rowDataTypeInfo = context.createTypeInformation(producedDataType); return new JsonRowDataDeserializationSchema( rowType, rowDataTypeInfo, failOnMissingField, ignoreParseErrors, timestampOption); } @Override public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } }; } /** * 创建编码格式,主要为了创建运行时编码器,即:SerializationSchema */ @Override public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { FactoryUtil.validateFactoryOptions(this, formatOptions); validateEncodingFormatOptions(formatOptions); TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions); JsonOptions.MapNullKeyMode mapNullKeyMode = JsonOptions.getMapNullKeyMode(formatOptions); String mapNullKeyLiteral = formatOptions.get(MAP_NULL_KEY_LITERAL); final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); return new EncodingFormat<SerializationSchema<RowData>>() { @Override public SerializationSchema<RowData> createRuntimeEncoder( DynamicTableSink.Context context, DataType consumedDataType) { final RowType rowType = (RowType) consumedDataType.getLogicalType(); return new JsonRowDataSerializationSchema( rowType, timestampOption, mapNullKeyMode, mapNullKeyLiteral, encodeDecimalAsPlainNumber); } @Override public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } }; } @Override public Set<ConfigOption<?>> requiredOptions() { return Collections.emptySet(); } @Override public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); options.add(FAIL_ON_MISSING_FIELD); options.add(IGNORE_PARSE_ERRORS); options.add(TIMESTAMP_FORMAT); options.add(MAP_NULL_KEY_MODE); options.add(MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); return options; } }
我们在前面可以知道,工厂的思想就是定义规则,并把这些规则注入到我们新建的实例并返回给上一级。
从上面的代码注释,可以看到JsonFormatFactory工厂实现了DeserializationFormatFactory和SerializationFormatFactory 两个工厂,里面均实现了这两个类的方法,分别为“createDecodingFormat”和“createEncodingFormat”,而这两个方法的核心主要是为了生成“DeserializationSchema”和“SerializationSchema”。
简单的说,就是JsonFormatFactory这个工厂具体的序列化和反序列化实现在“DeserializationSchema”和“SerializationSchema”这两个接口的实现里面,分别对应“JsonRowDataDeserializationSchema”和“JsonRowDataSerializationSchema”。
下面先看看“JsonRowDataDeserializationSchema”这个类。
2.3.1.2.1 JsonRowDataDeserializationSchema
JsonRowDataDeserializationSchema的主要作用是将JSON反序列化为Flink Table/SQL的内部数据结构,其代码与注释如下:
/** * 反序列化JSON为Flink Table/SQL的内部数据结构 * <p> * 将byte[]消息反序列化为JSON对象,并读取指定的字段 */ @Internal public class JsonRowDataDeserializationSchema implements DeserializationSchema<RowData> { private static final long serialVersionUID = 1L; /** 如果字段缺失,指示是否失败的标志. */ private final boolean failOnMissingField; /** 指示是否忽略无效字段/行的标志(默认:抛出异常)。 */ private final boolean ignoreParseErrors; /** 类型生成的信息. */ private final TypeInformation<RowData> resultTypeInfo; /** * 运行时转换器,将JsonNode转换为Flink SQL内部数据结构的对象。 */ private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter; /** 解析JSON的对象映射器。 */ private final ObjectMapper objectMapper = new ObjectMapper(); /** 时间戳格式规范,用于解析时间戳。 */ private final TimestampFormat timestampFormat; /** 构造函数 */ public JsonRowDataDeserializationSchema( RowType rowType, TypeInformation<RowData> resultTypeInfo, boolean failOnMissingField, boolean ignoreParseErrors, TimestampFormat timestampFormat) { if (ignoreParseErrors && failOnMissingField) { throw new IllegalArgumentException( "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled."); } this.resultTypeInfo = checkNotNull(resultTypeInfo); this.failOnMissingField = failOnMissingField; this.ignoreParseErrors = ignoreParseErrors; this.runtimeConverter = new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat) .createConverter(checkNotNull(rowType)); this.timestampFormat = timestampFormat; boolean hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType); if (hasDecimalType) { objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); } objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true); } /** * 实际的反序列化操作 */ @Override public RowData deserialize(@Nullable byte[] message) throws IOException { if (message == null) { return null; } try { return convertToRowData(deserializeToJsonNode(message)); } catch (Throwable t) { if (ignoreParseErrors) { return null; } throw new IOException( format("Failed to deserialize JSON '%s'.", new String(message)), t); } } /** 反序列化内容为JsonNode */ public JsonNode deserializeToJsonNode(byte[] message) throws IOException { return objectMapper.readTree(message); } /** 转换JsonNode为Flink Table/SQL的内部数据结构 */ public RowData convertToRowData(JsonNode message) { return (RowData) runtimeConverter.convert(message); } @Override public boolean isEndOfStream(RowData nextElement) { return false; } @Override public TypeInformation<RowData> getProducedType() { return resultTypeInfo; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } JsonRowDataDeserializationSchema that = (JsonRowDataDeserializationSchema) o; return failOnMissingField == that.failOnMissingField && ignoreParseErrors == that.ignoreParseErrors && resultTypeInfo.equals(that.resultTypeInfo) && timestampFormat.equals(that.timestampFormat); } @Override public int hashCode() { return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat); } }
里面使用过了工具类JsonToRowDataConverter
,它的主要作用是将JsonNode转换为Flink Table/SQL的内部数据结构,这里由于篇幅原因,代码细节不再详述,不过可以看下代码片段:
2.3.2 SerializationFormatFactory
与 DeserializationFormatFactory原理差不多,此处不再详述。
2.3.2.2 JsonRowDataSerializationSchema
JsonRowDataSerializationSchema与JsonRowDataDeserializationSchema的原理差不多,主要作用是将Flink Table/SQL的内部数据结构序列化为JSON,这里不再详述。
2.4 DynamicTable动态表
从 《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂除了定义了SerializationFormatFactory、DeserializationFormatFactory,还定义DynamicTableSource和DynamicTableSink实例的实现。
接下来讲解DynamicTableSource,其对应的实现类是KafkaDynamicSource,我们看看KafkaDynamicSource里面的代码。
2.4.1 KafkaDynamicSource
KafkaDynamicSource主要是对Source的一些处理,比如这里定义了Kafka的消费者,具体的代码及注释如下:
/** KafkaDynamicSource. */ @Internal public class KafkaDynamicSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown { // -------------------------------------------------------------------------------------------- // 可变的属性 // -------------------------------------------------------------------------------------------- /** 描述源的最终输出的数据类型。 */ protected DataType producedDataType; /** 附加在物理源行末尾的元数据。 */ protected List<String> metadataKeys; /** 用于生成每个分区水印的水印策略。 */ protected @Nullable WatermarkStrategy<RowData> watermarkStrategy; // -------------------------------------------------------------------------------------------- // 格式属性 // -------------------------------------------------------------------------------------------- private static final String VALUE_METADATA_PREFIX = "value."; /** 配置格式的数据类型。 */ protected final DataType physicalDataType; /** 从Kafka解码keys的可选格式。 */ protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat; /** 从Kafka解码值的格式。 */ protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat; /** 确定key字段和生成行的目标位置的索引。 */ protected final int[] keyProjection; /** 确定value字段和生成行的目标位置的索引。 */ protected final int[] valueProjection; /** 在构造物理数据类型时需要从字段中删除的前缀。 */ protected final @Nullable String keyPrefix; // -------------------------------------------------------------------------------------------- // Kafka特定的属性 // -------------------------------------------------------------------------------------------- /** 要消费的Kafka主题。 */ protected final List<String> topics; /** 消费的Kafka主题模式。 */ protected final Pattern topicPattern; /** Kafka消费者的属性。 */ protected final Properties properties; /** * 被包含的消费者的启动模式(默认是{@link StartupMode#GROUP_OFFSETS})。 */ protected final StartupMode startupMode; /** * 具体启动补偿;仅当启动模式为{@link时相关StartupMode # SPECIFIC_OFFSETS}。 */ protected final Map<KafkaTopicPartition, Long> specificStartupOffsets; /** * 定位分区偏移量的开始时间戳;仅当启动模式为{@link时相关 StartupMode #时间戳}。 */ protected final long startupTimestampMillis; /** 标志以确定源模式。在upsert模式下,它将保留tombstone消息。 * */ protected final boolean upsertMode; /** 构造函数,初始化一些信息 */ public KafkaDynamicSource( DataType physicalDataType, @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, @Nullable List<String> topics, @Nullable Pattern topicPattern, Properties properties, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis, boolean upsertMode) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( physicalDataType, "Physical data type must not be null."); this.keyDecodingFormat = keyDecodingFormat; this.valueDecodingFormat = Preconditions.checkNotNull( valueDecodingFormat, "Value decoding format must not be null."); this.keyProjection = Preconditions.checkNotNull(keyProjection, "Key projection must not be null."); this.valueProjection = Preconditions.checkNotNull(valueProjection, "Value projection must not be null."); this.keyPrefix = keyPrefix; // Mutable attributes this.producedDataType = physicalDataType; this.metadataKeys = Collections.emptyList(); this.watermarkStrategy = null; // Kafka-specific attributes Preconditions.checkArgument( (topics != null && topicPattern == null) || (topics == null && topicPattern != null), "Either Topic or Topic Pattern must be set for source."); this.topics = topics; this.topicPattern = topicPattern; this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null."); this.specificStartupOffsets = Preconditions.checkNotNull( specificStartupOffsets, "Specific offsets must not be null."); this.startupTimestampMillis = startupTimestampMillis; this.upsertMode = upsertMode; } @Override public ChangelogMode getChangelogMode() { return valueDecodingFormat.getChangelogMode(); } /** * 返回一个用于读取数据的运行时实现的Provider,这里是主要的业务实现 */ @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { final DeserializationSchema<RowData> keyDeserialization = createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix); final DeserializationSchema<RowData> valueDeserialization = createDeserialization(context, valueDecodingFormat, valueProjection, null); final TypeInformation<RowData> producedTypeInfo = context.createTypeInformation(producedDataType); final FlinkKafkaConsumer<RowData> kafkaConsumer = createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo); return SourceFunctionProvider.of(kafkaConsumer, false); } /** * 返回元数据键的映射及其可以生成的相应数据类型 */ @Override public Map<String, DataType> listReadableMetadata() { final Map<String, DataType> metadataMap = new LinkedHashMap<>(); // according to convention, the order of the final row must be // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA // where the format metadata has highest precedence // add value format metadata with prefix valueDecodingFormat .listReadableMetadata() .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value)); // add connector metadata Stream.of(ReadableMetadata.values()) .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType)); return metadataMap; } @Override public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) { // separate connector and format metadata final List<String> formatMetadataKeys = metadataKeys.stream() .filter(k -> k.startsWith(VALUE_METADATA_PREFIX)) .collect(Collectors.toList()); final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys); connectorMetadataKeys.removeAll(formatMetadataKeys); // push down format metadata final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata(); if (formatMetadata.size() > 0) { final List<String> requestedFormatMetadataKeys = formatMetadataKeys.stream() .map(k -> k.substring(VALUE_METADATA_PREFIX.length())) .collect(Collectors.toList()); valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys); } this.metadataKeys = connectorMetadataKeys; this.producedDataType = producedDataType; } @Override public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) { this.watermarkStrategy = watermarkStrategy; } @Override public DynamicTableSource copy() { final KafkaDynamicSource copy = new KafkaDynamicSource( physicalDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, topics, topicPattern, properties, startupMode, specificStartupOffsets, startupTimestampMillis, upsertMode); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; return copy; } @Override public String asSummaryString() { return "Kafka table source"; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } final KafkaDynamicSource that = (KafkaDynamicSource) o; return Objects.equals(producedDataType, that.producedDataType) && Objects.equals(metadataKeys, that.metadataKeys) && Objects.equals(physicalDataType, that.physicalDataType) && Objects.equals(keyDecodingFormat, that.keyDecodingFormat) && Objects.equals(valueDecodingFormat, that.valueDecodingFormat) && Arrays.equals(keyProjection, that.keyProjection) && Arrays.equals(valueProjection, that.valueProjection) && Objects.equals(keyPrefix, that.keyPrefix) && Objects.equals(topics, that.topics) && Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern)) && Objects.equals(properties, that.properties) && startupMode == that.startupMode && Objects.equals(specificStartupOffsets, that.specificStartupOffsets) && startupTimestampMillis == that.startupTimestampMillis && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(watermarkStrategy, that.watermarkStrategy); } @Override public int hashCode() { return Objects.hash( producedDataType, metadataKeys, physicalDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, topics, topicPattern, properties, startupMode, specificStartupOffsets, startupTimestampMillis, upsertMode, watermarkStrategy); } // ---------具体的业务实现------------------------------------------------------------------------ /** 创建Kafka消费者 **/ protected FlinkKafkaConsumer<RowData> createKafkaConsumer( DeserializationSchema<RowData> keyDeserialization, DeserializationSchema<RowData> valueDeserialization, TypeInformation<RowData> producedTypeInfo) { final MetadataConverter[] metadataConverters = metadataKeys.stream() .map( k -> Stream.of(ReadableMetadata.values()) .filter(rm -> rm.key.equals(k)) .findFirst() .orElseThrow(IllegalStateException::new)) .map(m -> m.converter) .toArray(MetadataConverter[]::new); // check if connector metadata is used at all final boolean hasMetadata = metadataKeys.size() > 0; // adjust physical arity with value format's metadata final int adjustedPhysicalArity = producedDataType.getChildren().size() - metadataKeys.size(); // adjust value format projection to include value format's metadata columns at the end final int[] adjustedValueProjection = IntStream.concat( IntStream.of(valueProjection), IntStream.range( keyProjection.length + valueProjection.length, adjustedPhysicalArity)) .toArray(); final KafkaDeserializationSchema<RowData> kafkaDeserializer = new DynamicKafkaDeserializationSchema( adjustedPhysicalArity, keyDeserialization, keyProjection, valueDeserialization, adjustedValueProjection, hasMetadata, metadataConverters, producedTypeInfo, upsertMode); final FlinkKafkaConsumer<RowData> kafkaConsumer; if (topics != null) { kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties); } else { kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties); } switch (startupMode) { case EARLIEST: kafkaConsumer.setStartFromEarliest(); break; case LATEST: kafkaConsumer.setStartFromLatest(); break; case GROUP_OFFSETS: kafkaConsumer.setStartFromGroupOffsets(); break; case SPECIFIC_OFFSETS: kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); break; case TIMESTAMP: kafkaConsumer.setStartFromTimestamp(startupTimestampMillis); break; } kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null); if (watermarkStrategy != null) { kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy); } return kafkaConsumer; } /** 创建反序列化模式 **/ private @Nullable DeserializationSchema<RowData> createDeserialization( DynamicTableSource.Context context, @Nullable DecodingFormat<DeserializationSchema<RowData>> format, int[] projection, @Nullable String prefix) { if (format == null) { return null; } DataType physicalFormatDataType = DataTypeUtils.projectRow(this.physicalDataType, projection); if (prefix != null) { physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); } return format.createRuntimeDecoder(context, physicalFormatDataType); } // -------------------------------------------------------------------------------------------- // 元数据处理 // -------------------------------------------------------------------------------------------- enum ReadableMetadata { TOPIC( "topic", DataTypes.STRING().notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(ConsumerRecord<?, ?> record) { return StringData.fromString(record.topic()); } }), PARTITION( "partition", DataTypes.INT().notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(ConsumerRecord<?, ?> record) { return record.partition(); } }), HEADERS( "headers", // key and value of the map are nullable to make handling easier in queries DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()) .notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(ConsumerRecord<?, ?> record) { final Map<StringData, byte[]> map = new HashMap<>(); for (Header header : record.headers()) { map.put(StringData.fromString(header.key()), header.value()); } return new GenericMapData(map); } }), LEADER_EPOCH( "leader-epoch", DataTypes.INT().nullable(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(ConsumerRecord<?, ?> record) { return record.leaderEpoch().orElse(null); } }), OFFSET( "offset", DataTypes.BIGINT().notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(ConsumerRecord<?, ?> record) { return record.offset(); } }), TIMESTAMP( "timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(ConsumerRecord<?, ?> record) { return TimestampData.fromEpochMillis(record.timestamp()); } }), TIMESTAMP_TYPE( "timestamp-type", DataTypes.STRING().notNull(), new MetadataConverter() { private static final long serialVersionUID = 1L; @Override public Object read(ConsumerRecord<?, ?> record) { return StringData.fromString(record.timestampType().toString()); } }); final String key; final DataType dataType; final MetadataConverter converter; ReadableMetadata(String key, DataType dataType, MetadataConverter converter) { this.key = key; this.dataType = dataType; this.converter = converter; } } }
当然,里面最主要的还是FlinkKafkaConsumer
消费者这个类,这里就不再描述具体的代码细节,贴上核心的解析:
RichParallelSourceFunction就自己看源码吧,一般都不会改的。
2.4.2 KafkaDynamicSink
原理和KafkaDynamicSource差不多,这里就不再详述了。
03 总结
本文的代码繁多,需要读者耐心的去看,为了更进一步的加深大家了解,这里画了一张图,希望能让大家更加容易理解:
到这里,本文讲解完了Flink Kafka Connector的源码了,希望能帮助到大家,谢谢大家的阅读,本文完!