flink kafka connector源码解读(超详细)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 为了掌握Flink自定义Connector,本文直接从源码出发,研究Flink的kafka connector是如何实现的?

01 引言


在上一节《Flink自定义Connector》,我们知道了Flink自定义Connector的基本流程,其流程图如下:

dc7e567192644728a9bd121b9062e819.png进入代码层面,开发者自定义connector的流程如下:


  1. 首先定义Flink SQL的DDL;
  2. 创建解析和验证选项的工厂(DynamicTableSourceFactory、DeserializationFormatFactory),注意两个工厂都已添加到META-INF/services目录中;
  3. 实现ScanTableSource;
  4. 具体业务细节在getScanRuntimeProvider实现;


为了掌握Flink自定义Connector,本文直接从源码出发,研究Flink的kafka connector是如何实现的?


附:Flink源码下载地址


02 Kafka-Connector 源码分析


2.1 项目结构


导入Flink的源码后,可以看到Kafka Connector的源码结构如下:

3a4ddf6705c1411781dfbab0c525839d.png


先来看看里面的pom文件,其余细节不用看,主要看依赖的内容,下面做了一些整理与添加了相关的描述:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <artifactId>flink-connectors</artifactId>
    <groupId>org.apache.flink</groupId>
    <version>1.13.6</version>
    <relativePath>..</relativePath>
  </parent>
  <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  <name>Flink : Connectors : Kafka</name>
  <packaging>jar</packaging>
  <properties>
    <kafka.version>2.4.1</kafka.version>
  </properties>
  <dependencies>
    <!----- Flink的相关依赖,带flink的都是 ----->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${project.version}</version>
      <scope>provided</scope>
    </dependency>
      <dependency>
        <!-- 其它Flink相关依赖,此处不再详述 ...... -->
      </dependency>
    <!-- Kafka相关依赖 -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
  </dependencies>
  <build>
    <!-- 打包插件的定义这些内容.....-->
  </build>
</project>


2.2 工厂(源码解读入口)


src/main/resources/META-INF/services目录,我们可以看到了有两个文件:

8340daad9d824a57a787115c8a2d719b.png


结合引言里面的《流程图》,可以知道定义完DDL后,工厂就是整个流程的入口了,我们看看里面的两个文件内容。


org.apache.flink.table.factories.Factory(Table SQL模式)

org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory

org.apache.flink.table.factories.TableFactory(Table API模式)

org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory


配置文件可以得知,我们需要继续研读以下工厂类:


  • KafkaDynamicTableFactory
  • UpsertKafkaDynamicTableFactory
  • KafkaTableSourceSinkFactory


2.2.1 KafkaDynamicTableFactory


KafkaDynamicTableFactory 的详细代码如下,里面已经写好注释了,可以方便大家的理解:

/**
 * Kafka KafkaDynamicTable工厂
 * <p>
 * 描述:其作用是为 KafkaDynamicSource和KafkaDynamicSink类配置实例
 *
 * @author : YangLinWei
 * @createTime: 2022/4/12 11:06 上午
 * @version: 1.0.0
 */
@Internal
public class KafkaDynamicTableFactory
        implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public static final String IDENTIFIER = "kafka";
    /**
     * 返回一个唯一的标识符(对应的是FlinkSQL里面的`connector = '...'`)
     */
    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }
    /**
     * 设置该工厂实现需要(必填)的Options集合
     */
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(PROPS_BOOTSTRAP_SERVERS);
        return options;
    }
    /**
     * 设置工厂可能需要(非必填)的Options集合
     */
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(FactoryUtil.FORMAT);
        options.add(KEY_FORMAT);
        options.add(KEY_FIELDS);
        options.add(KEY_FIELDS_PREFIX);
        options.add(VALUE_FORMAT);
        options.add(VALUE_FIELDS_INCLUDE);
        options.add(TOPIC);
        options.add(TOPIC_PATTERN);
        options.add(PROPS_GROUP_ID);
        options.add(SCAN_STARTUP_MODE);
        options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
        options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
        options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
        options.add(SINK_PARTITIONER);
        options.add(SINK_SEMANTIC);
        options.add(SINK_PARALLELISM);
        return options;
    }
    /**
     * 创建DynamicTableSource
     * <p>
     * 描述:在这个方法中执行验证和进一步嵌套工厂
     */
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        //1. 在这里使用FactoryUtil实现的自定义验证逻辑…
        final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        //2. 获取所有的配置
        final ReadableConfig tableOptions = helper.getOptions();
        //3. 获取一个合适的key解码格式
        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
                getKeyDecodingFormat(helper);
        //4. 获取一个合适的value解码格式
        final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
                getValueDecodingFormat(helper);
        //5. 校验参数(参数格式、主题、starupmode、主键等,校验失败直接抛异常)
        helper.validateExcept(PROPERTIES_PREFIX);
        validateTableSourceOptions(tableOptions);
        validatePKConstraints(
                context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
        //6. 获取参数并设置参数到Properties
        final StartupOptions startupOptions = getStartupOptions(tableOptions);
        final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
        properties.setProperty(
                FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
                String.valueOf(
                        tableOptions
                                .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
                                .map(Duration::toMillis)
                                .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
        final DataType physicalDataType =
                context.getCatalogTable().getSchema().toPhysicalRowDataType();
        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
        // 7. 最后创建DynamicTableSource实例并注入参数
        return createKafkaTableSource(
                physicalDataType,
                keyDecodingFormat.orElse(null),
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                KafkaOptions.getSourceTopics(tableOptions),
                KafkaOptions.getSourceTopicPattern(tableOptions),
                properties,
                startupOptions.startupMode,
                startupOptions.specificOffsets,
                startupOptions.startupTimestampMillis);
    }
    /**
     * 创建DynamicTableSink(原理和创建DynamicTableSource几乎一致)
     * <p>
     * 描述:在这个方法中执行验证和进一步嵌套工厂
     */
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        //1. 在这里使用FactoryUtil实现的自定义验证逻辑…
        final TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(
                        this, autoCompleteSchemaRegistrySubject(context));
        //2. 获取所有的配置
        final ReadableConfig tableOptions = helper.getOptions();
        //3. 获取一个合适的key解码格式
        final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
                getKeyEncodingFormat(helper);
        //4. 获取一个合适的value解码格式
        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
                getValueEncodingFormat(helper);
        //5. 校验参数(参数格式、主题、starupmode、主键等,校验失败直接抛异常)
        helper.validateExcept(PROPERTIES_PREFIX);
        validateTableSinkOptions(tableOptions);
        validatePKConstraints(
                context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
        //6. 获取参数
        final DataType physicalDataType =
                context.getCatalogTable().getSchema().toPhysicalRowDataType();
        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
        final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
        // 7. 最后创建DynamicTableSink实例并注入参数
        return createKafkaTableSink(
                physicalDataType,
                keyEncodingFormat.orElse(null),
                valueEncodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                tableOptions.get(TOPIC).get(0),
                getKafkaProperties(context.getCatalogTable().getOptions()),
                getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
                getSinkSemantic(tableOptions),
                parallelism);
    }
    // ---以下方法均是上面代码的校验、创建实例细节,无需深入研究,了解即可-----------------------------------
    private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat(
            TableFactoryHelper helper) {
        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
                helper.discoverOptionalDecodingFormat(
                        DeserializationFormatFactory.class, KEY_FORMAT);
        keyDecodingFormat.ifPresent(
                format -> {
                    if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
                        throw new ValidationException(
                                String.format(
                                        "A key format should only deal with INSERT-only records. "
                                                + "But %s has a changelog mode of %s.",
                                        helper.getOptions().get(KEY_FORMAT),
                                        format.getChangelogMode()));
                    }
                });
        return keyDecodingFormat;
    }
    private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat(
            TableFactoryHelper helper) {
        final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
                helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
        keyEncodingFormat.ifPresent(
                format -> {
                    if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
                        throw new ValidationException(
                                String.format(
                                        "A key format should only deal with INSERT-only records. "
                                                + "But %s has a changelog mode of %s.",
                                        helper.getOptions().get(KEY_FORMAT),
                                        format.getChangelogMode()));
                    }
                });
        return keyEncodingFormat;
    }
    private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
            TableFactoryHelper helper) {
        return helper.discoverOptionalDecodingFormat(
                DeserializationFormatFactory.class, FactoryUtil.FORMAT)
                .orElseGet(
                        () ->
                                helper.discoverDecodingFormat(
                                        DeserializationFormatFactory.class, VALUE_FORMAT));
    }
    private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
            TableFactoryHelper helper) {
        return helper.discoverOptionalEncodingFormat(
                SerializationFormatFactory.class, FactoryUtil.FORMAT)
                .orElseGet(
                        () ->
                                helper.discoverEncodingFormat(
                                        SerializationFormatFactory.class, VALUE_FORMAT));
    }
    private static void validatePKConstraints(
            ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
        if (catalogTable.getSchema().getPrimaryKey().isPresent()
                && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
            Configuration options = Configuration.fromMap(catalogTable.getOptions());
            String formatName =
                    options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
            throw new ValidationException(
                    String.format(
                            "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
                                    + " on the table, because it can't guarantee the semantic of primary key.",
                            tableName.asSummaryString(),
                            formatName));
        }
    }
    // --------------------------------------------------------------------------------------------
    protected KafkaDynamicSource createKafkaTableSource(
            DataType physicalDataType,
            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            @Nullable List<String> topics,
            @Nullable Pattern topicPattern,
            Properties properties,
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets,
            long startupTimestampMillis) {
        return new KafkaDynamicSource(
                physicalDataType,
                keyDecodingFormat,
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topics,
                topicPattern,
                properties,
                startupMode,
                specificStartupOffsets,
                startupTimestampMillis,
                false);
    }
    protected KafkaDynamicSink createKafkaTableSink(
            DataType physicalDataType,
            @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            String topic,
            Properties properties,
            FlinkKafkaPartitioner<RowData> partitioner,
            KafkaSinkSemantic semantic,
            Integer parallelism) {
        return new KafkaDynamicSink(
                physicalDataType,
                physicalDataType,
                keyEncodingFormat,
                valueEncodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topic,
                properties,
                partitioner,
                semantic,
                false,
                SinkBufferFlushMode.DISABLED,
                parallelism);
    }
}


从源码我们得知,KafkaDynamicTableFactory的主要作用有:

  • 定义key和value的序列化和反序列化格式工厂
  • 校验连接参数
  • 注入连接参数到DynamicTableSource和DynamicTableSink实例


后面会继续介绍如何具体实现SerializationFormatFactory、DeserializationFormatFactory,以及DynamicTableSource和DynamicTableSink实例的实现。


2.2.2 UpsertKafkaDynamicTableFactory


UpsertKafkaDynamicTableFactory源码里面未找到这个工厂,其代码流程猜测与KafkaDynamicTableFactory一致。


2.2.3 KafkaTableSourceSinkFactory


Table API模式与Table SQL模式(声明式)原理几乎一致,都是用作查询的输入和输出,只是写法表现上有些区别,因此,这里我们只需要研读Table SQL模式下的工厂代码就好了,这里不再讲解KafkaTableSourceSinkFactory,不影响整体理解。


2.3 序列化和反序列化工厂

2.3.1 DeserializationFormatFactory


从 《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂定义了SerializationFormatFactory、DeserializationFormatFactory,以及DynamicTableSource和DynamicTableSink实例的实现。下面按顺序的讲解。首先来看

看DeserializationFormatFactory。


Ctrl+T,可以查看 DeserializationFormatFactory 的反序列化工厂有哪些实现。

33b41bb10a2644c1b3e0f5a5f5f74ff7.png


这里拿典型的JsonFormatFactory来举例,完整代码如下,里面已经写好注释,方便大家的理解:

/**
 * JsonFormatFactory支持将json字符串转换为RowData行数据
 *
 * @author : YangLinWei
 * @createTime: 2022/4/12 11:57 上午
 * @version: 1.0.0
 */
public class JsonFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
    /**
     * 唯一标识(对应Flink SQL的'format' = 'json)
     */
    public static final String IDENTIFIER = "json";
    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }
    /**
     * 创建解码格式,主要为了创建运行时解码器,即:DeserializationSchema
     */
    @Override
    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
        FactoryUtil.validateFactoryOptions(this, formatOptions);
        validateDecodingFormatOptions(formatOptions);
        final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
        final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
        TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
        return new DecodingFormat<DeserializationSchema<RowData>>() {
            @Override
            public DeserializationSchema<RowData> createRuntimeDecoder(
                    DynamicTableSource.Context context, DataType producedDataType) {
                final RowType rowType = (RowType) producedDataType.getLogicalType();
                final TypeInformation<RowData> rowDataTypeInfo =
                        context.createTypeInformation(producedDataType);
                return new JsonRowDataDeserializationSchema(
                        rowType,
                        rowDataTypeInfo,
                        failOnMissingField,
                        ignoreParseErrors,
                        timestampOption);
            }
            @Override
            public ChangelogMode getChangelogMode() {
                return ChangelogMode.insertOnly();
            }
        };
    }
    /**
     * 创建编码格式,主要为了创建运行时编码器,即:SerializationSchema
     */
    @Override
    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
        FactoryUtil.validateFactoryOptions(this, formatOptions);
        validateEncodingFormatOptions(formatOptions);
        TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
        JsonOptions.MapNullKeyMode mapNullKeyMode = JsonOptions.getMapNullKeyMode(formatOptions);
        String mapNullKeyLiteral = formatOptions.get(MAP_NULL_KEY_LITERAL);
        final boolean encodeDecimalAsPlainNumber =
                formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
        return new EncodingFormat<SerializationSchema<RowData>>() {
            @Override
            public SerializationSchema<RowData> createRuntimeEncoder(
                    DynamicTableSink.Context context, DataType consumedDataType) {
                final RowType rowType = (RowType) consumedDataType.getLogicalType();
                return new JsonRowDataSerializationSchema(
                        rowType,
                        timestampOption,
                        mapNullKeyMode,
                        mapNullKeyLiteral,
                        encodeDecimalAsPlainNumber);
            }
            @Override
            public ChangelogMode getChangelogMode() {
                return ChangelogMode.insertOnly();
            }
        };
    }
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(FAIL_ON_MISSING_FIELD);
        options.add(IGNORE_PARSE_ERRORS);
        options.add(TIMESTAMP_FORMAT);
        options.add(MAP_NULL_KEY_MODE);
        options.add(MAP_NULL_KEY_LITERAL);
        options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
        return options;
    }
}


我们在前面可以知道,工厂的思想就是定义规则,并把这些规则注入到我们新建的实例并返回给上一级。


从上面的代码注释,可以看到JsonFormatFactory工厂实现了DeserializationFormatFactory和SerializationFormatFactory 两个工厂,里面均实现了这两个类的方法,分别为“createDecodingFormat”和“createEncodingFormat”,而这两个方法的核心主要是为了生成“DeserializationSchema”和“SerializationSchema”。


简单的说,就是JsonFormatFactory这个工厂具体的序列化和反序列化实现在“DeserializationSchema”和“SerializationSchema”这两个接口的实现里面,分别对应“JsonRowDataDeserializationSchema”和“JsonRowDataSerializationSchema”。


下面先看看“JsonRowDataDeserializationSchema”这个类。


2.3.1.2.1 JsonRowDataDeserializationSchema


JsonRowDataDeserializationSchema的主要作用是将JSON反序列化为Flink Table/SQL的内部数据结构,其代码与注释如下:

/**
 * 反序列化JSON为Flink Table/SQL的内部数据结构
 * <p>
 * 将byte[]消息反序列化为JSON对象,并读取指定的字段
 */
@Internal
public class JsonRowDataDeserializationSchema implements DeserializationSchema<RowData> {
    private static final long serialVersionUID = 1L;
    /** 如果字段缺失,指示是否失败的标志. */
    private final boolean failOnMissingField;
    /** 指示是否忽略无效字段/行的标志(默认:抛出异常)。 */
    private final boolean ignoreParseErrors;
    /** 类型生成的信息. */
    private final TypeInformation<RowData> resultTypeInfo;
    /**
     * 运行时转换器,将JsonNode转换为Flink SQL内部数据结构的对象。
     */
    private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter;
    /** 解析JSON的对象映射器。 */
    private final ObjectMapper objectMapper = new ObjectMapper();
    /** 时间戳格式规范,用于解析时间戳。 */
    private final TimestampFormat timestampFormat;
    /** 构造函数 */
    public JsonRowDataDeserializationSchema(
            RowType rowType,
            TypeInformation<RowData> resultTypeInfo,
            boolean failOnMissingField,
            boolean ignoreParseErrors,
            TimestampFormat timestampFormat) {
        if (ignoreParseErrors && failOnMissingField) {
            throw new IllegalArgumentException(
                    "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
        }
        this.resultTypeInfo = checkNotNull(resultTypeInfo);
        this.failOnMissingField = failOnMissingField;
        this.ignoreParseErrors = ignoreParseErrors;
        this.runtimeConverter =
                new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
                        .createConverter(checkNotNull(rowType));
        this.timestampFormat = timestampFormat;
        boolean hasDecimalType =
                LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
        if (hasDecimalType) {
            objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
        }
        objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
    }
    /**
     * 实际的反序列化操作
     */
    @Override
    public RowData deserialize(@Nullable byte[] message) throws IOException {
        if (message == null) {
            return null;
        }
        try {
            return convertToRowData(deserializeToJsonNode(message));
        } catch (Throwable t) {
            if (ignoreParseErrors) {
                return null;
            }
            throw new IOException(
                    format("Failed to deserialize JSON '%s'.", new String(message)), t);
        }
    }
    /** 反序列化内容为JsonNode */
    public JsonNode deserializeToJsonNode(byte[] message) throws IOException {
        return objectMapper.readTree(message);
    }
    /** 转换JsonNode为Flink Table/SQL的内部数据结构 */
    public RowData convertToRowData(JsonNode message) {
        return (RowData) runtimeConverter.convert(message);
    }
    @Override
    public boolean isEndOfStream(RowData nextElement) {
        return false;
    }
    @Override
    public TypeInformation<RowData> getProducedType() {
        return resultTypeInfo;
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        JsonRowDataDeserializationSchema that = (JsonRowDataDeserializationSchema) o;
        return failOnMissingField == that.failOnMissingField
                && ignoreParseErrors == that.ignoreParseErrors
                && resultTypeInfo.equals(that.resultTypeInfo)
                && timestampFormat.equals(that.timestampFormat);
    }
    @Override
    public int hashCode() {
        return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);
    }
}


里面使用过了工具类JsonToRowDataConverter ,它的主要作用是将JsonNode转换为Flink Table/SQL的内部数据结构,这里由于篇幅原因,代码细节不再详述,不过可以看下代码片段:

760d953d8ea046449674cb88f54171e2.png


2.3.2 SerializationFormatFactory


与 DeserializationFormatFactory原理差不多,此处不再详述。


2.3.2.2 JsonRowDataSerializationSchema


JsonRowDataSerializationSchema与JsonRowDataDeserializationSchema的原理差不多,主要作用是将Flink Table/SQL的内部数据结构序列化为JSON,这里不再详述。


2.4 DynamicTable动态表


从 《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂除了定义了SerializationFormatFactory、DeserializationFormatFactory,还定义DynamicTableSource和DynamicTableSink实例的实现。


接下来讲解DynamicTableSource,其对应的实现类是KafkaDynamicSource,我们看看KafkaDynamicSource里面的代码。


2.4.1 KafkaDynamicSource


KafkaDynamicSource主要是对Source的一些处理,比如这里定义了Kafka的消费者,具体的代码及注释如下:

/** KafkaDynamicSource. */
@Internal
public class KafkaDynamicSource
        implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
    // --------------------------------------------------------------------------------------------
    // 可变的属性
    // --------------------------------------------------------------------------------------------
    /** 描述源的最终输出的数据类型。 */
    protected DataType producedDataType;
    /** 附加在物理源行末尾的元数据。 */
    protected List<String> metadataKeys;
    /** 用于生成每个分区水印的水印策略。 */
    protected @Nullable
    WatermarkStrategy<RowData> watermarkStrategy;
    // --------------------------------------------------------------------------------------------
    // 格式属性
    // --------------------------------------------------------------------------------------------
    private static final String VALUE_METADATA_PREFIX = "value.";
    /** 配置格式的数据类型。 */
    protected final DataType physicalDataType;
    /** 从Kafka解码keys的可选格式。 */
    protected final @Nullable
    DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
    /** 从Kafka解码值的格式。 */
    protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
    /** 确定key字段和生成行的目标位置的索引。 */
    protected final int[] keyProjection;
    /** 确定value字段和生成行的目标位置的索引。 */
    protected final int[] valueProjection;
    /** 在构造物理数据类型时需要从字段中删除的前缀。 */
    protected final @Nullable
    String keyPrefix;
    // --------------------------------------------------------------------------------------------
    // Kafka特定的属性
    // --------------------------------------------------------------------------------------------
    /** 要消费的Kafka主题。 */
    protected final List<String> topics;
    /** 消费的Kafka主题模式。 */
    protected final Pattern topicPattern;
    /** Kafka消费者的属性。 */
    protected final Properties properties;
    /**
     * 被包含的消费者的启动模式(默认是{@link StartupMode#GROUP_OFFSETS})。
     */
    protected final StartupMode startupMode;
    /**
     * 具体启动补偿;仅当启动模式为{@link时相关StartupMode # SPECIFIC_OFFSETS}。
     */
    protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
    /**
     * 定位分区偏移量的开始时间戳;仅当启动模式为{@link时相关 StartupMode #时间戳}。
     */
    protected final long startupTimestampMillis;
    /** 标志以确定源模式。在upsert模式下,它将保留tombstone消息。 * */
    protected final boolean upsertMode;
    /** 构造函数,初始化一些信息 */
    public KafkaDynamicSource(
            DataType physicalDataType,
            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            @Nullable List<String> topics,
            @Nullable Pattern topicPattern,
            Properties properties,
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets,
            long startupTimestampMillis,
            boolean upsertMode) {
        // Format attributes
        this.physicalDataType =
                Preconditions.checkNotNull(
                        physicalDataType, "Physical data type must not be null.");
        this.keyDecodingFormat = keyDecodingFormat;
        this.valueDecodingFormat =
                Preconditions.checkNotNull(
                        valueDecodingFormat, "Value decoding format must not be null.");
        this.keyProjection =
                Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
        this.valueProjection =
                Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
        this.keyPrefix = keyPrefix;
        // Mutable attributes
        this.producedDataType = physicalDataType;
        this.metadataKeys = Collections.emptyList();
        this.watermarkStrategy = null;
        // Kafka-specific attributes
        Preconditions.checkArgument(
                (topics != null && topicPattern == null)
                        || (topics == null && topicPattern != null),
                "Either Topic or Topic Pattern must be set for source.");
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
        this.startupMode =
                Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
        this.specificStartupOffsets =
                Preconditions.checkNotNull(
                        specificStartupOffsets, "Specific offsets must not be null.");
        this.startupTimestampMillis = startupTimestampMillis;
        this.upsertMode = upsertMode;
    }
    @Override
    public ChangelogMode getChangelogMode() {
        return valueDecodingFormat.getChangelogMode();
    }
    /**
     * 返回一个用于读取数据的运行时实现的Provider,这里是主要的业务实现
     */
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        final DeserializationSchema<RowData> keyDeserialization =
                createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
        final DeserializationSchema<RowData> valueDeserialization =
                createDeserialization(context, valueDecodingFormat, valueProjection, null);
        final TypeInformation<RowData> producedTypeInfo =
                context.createTypeInformation(producedDataType);
        final FlinkKafkaConsumer<RowData> kafkaConsumer =
                createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);
        return SourceFunctionProvider.of(kafkaConsumer, false);
    }
    /**
     * 返回元数据键的映射及其可以生成的相应数据类型
     */
    @Override
    public Map<String, DataType> listReadableMetadata() {
        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
        // according to convention, the order of the final row must be
        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
        // where the format metadata has highest precedence
        // add value format metadata with prefix
        valueDecodingFormat
                .listReadableMetadata()
                .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
        // add connector metadata
        Stream.of(ReadableMetadata.values())
                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
        return metadataMap;
    }
    @Override
    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
        // separate connector and format metadata
        final List<String> formatMetadataKeys =
                metadataKeys.stream()
                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
                        .collect(Collectors.toList());
        final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
        connectorMetadataKeys.removeAll(formatMetadataKeys);
        // push down format metadata
        final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata();
        if (formatMetadata.size() > 0) {
            final List<String> requestedFormatMetadataKeys =
                    formatMetadataKeys.stream()
                            .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
                            .collect(Collectors.toList());
            valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
        }
        this.metadataKeys = connectorMetadataKeys;
        this.producedDataType = producedDataType;
    }
    @Override
    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }
    @Override
    public DynamicTableSource copy() {
        final KafkaDynamicSource copy =
                new KafkaDynamicSource(
                        physicalDataType,
                        keyDecodingFormat,
                        valueDecodingFormat,
                        keyProjection,
                        valueProjection,
                        keyPrefix,
                        topics,
                        topicPattern,
                        properties,
                        startupMode,
                        specificStartupOffsets,
                        startupTimestampMillis,
                        upsertMode);
        copy.producedDataType = producedDataType;
        copy.metadataKeys = metadataKeys;
        copy.watermarkStrategy = watermarkStrategy;
        return copy;
    }
    @Override
    public String asSummaryString() {
        return "Kafka table source";
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        final KafkaDynamicSource that = (KafkaDynamicSource) o;
        return Objects.equals(producedDataType, that.producedDataType)
                && Objects.equals(metadataKeys, that.metadataKeys)
                && Objects.equals(physicalDataType, that.physicalDataType)
                && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
                && Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
                && Arrays.equals(keyProjection, that.keyProjection)
                && Arrays.equals(valueProjection, that.valueProjection)
                && Objects.equals(keyPrefix, that.keyPrefix)
                && Objects.equals(topics, that.topics)
                && Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern))
                && Objects.equals(properties, that.properties)
                && startupMode == that.startupMode
                && Objects.equals(specificStartupOffsets, that.specificStartupOffsets)
                && startupTimestampMillis == that.startupTimestampMillis
                && Objects.equals(upsertMode, that.upsertMode)
                && Objects.equals(watermarkStrategy, that.watermarkStrategy);
    }
    @Override
    public int hashCode() {
        return Objects.hash(
                producedDataType,
                metadataKeys,
                physicalDataType,
                keyDecodingFormat,
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topics,
                topicPattern,
                properties,
                startupMode,
                specificStartupOffsets,
                startupTimestampMillis,
                upsertMode,
                watermarkStrategy);
    }
    // ---------具体的业务实现------------------------------------------------------------------------
    /** 创建Kafka消费者 **/
    protected FlinkKafkaConsumer<RowData> createKafkaConsumer(
            DeserializationSchema<RowData> keyDeserialization,
            DeserializationSchema<RowData> valueDeserialization,
            TypeInformation<RowData> producedTypeInfo) {
        final MetadataConverter[] metadataConverters =
                metadataKeys.stream()
                        .map(
                                k ->
                                        Stream.of(ReadableMetadata.values())
                                                .filter(rm -> rm.key.equals(k))
                                                .findFirst()
                                                .orElseThrow(IllegalStateException::new))
                        .map(m -> m.converter)
                        .toArray(MetadataConverter[]::new);
        // check if connector metadata is used at all
        final boolean hasMetadata = metadataKeys.size() > 0;
        // adjust physical arity with value format's metadata
        final int adjustedPhysicalArity =
                producedDataType.getChildren().size() - metadataKeys.size();
        // adjust value format projection to include value format's metadata columns at the end
        final int[] adjustedValueProjection =
                IntStream.concat(
                        IntStream.of(valueProjection),
                        IntStream.range(
                                keyProjection.length + valueProjection.length,
                                adjustedPhysicalArity))
                        .toArray();
        final KafkaDeserializationSchema<RowData> kafkaDeserializer =
                new DynamicKafkaDeserializationSchema(
                        adjustedPhysicalArity,
                        keyDeserialization,
                        keyProjection,
                        valueDeserialization,
                        adjustedValueProjection,
                        hasMetadata,
                        metadataConverters,
                        producedTypeInfo,
                        upsertMode);
        final FlinkKafkaConsumer<RowData> kafkaConsumer;
        if (topics != null) {
            kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
        } else {
            kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
        }
        switch (startupMode) {
            case EARLIEST:
                kafkaConsumer.setStartFromEarliest();
                break;
            case LATEST:
                kafkaConsumer.setStartFromLatest();
                break;
            case GROUP_OFFSETS:
                kafkaConsumer.setStartFromGroupOffsets();
                break;
            case SPECIFIC_OFFSETS:
                kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
                break;
            case TIMESTAMP:
                kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
                break;
        }
        kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
        if (watermarkStrategy != null) {
            kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy);
        }
        return kafkaConsumer;
    }
    /** 创建反序列化模式 **/
    private @Nullable
    DeserializationSchema<RowData> createDeserialization(
            DynamicTableSource.Context context,
            @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
            int[] projection,
            @Nullable String prefix) {
        if (format == null) {
            return null;
        }
        DataType physicalFormatDataType =
                DataTypeUtils.projectRow(this.physicalDataType, projection);
        if (prefix != null) {
            physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
        }
        return format.createRuntimeDecoder(context, physicalFormatDataType);
    }
    // --------------------------------------------------------------------------------------------
    // 元数据处理
    // --------------------------------------------------------------------------------------------
    enum ReadableMetadata {
        TOPIC(
                "topic",
                DataTypes.STRING().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return StringData.fromString(record.topic());
                    }
                }),
        PARTITION(
                "partition",
                DataTypes.INT().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return record.partition();
                    }
                }),
        HEADERS(
                "headers",
                // key and value of the map are nullable to make handling easier in queries
                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable())
                        .notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        final Map<StringData, byte[]> map = new HashMap<>();
                        for (Header header : record.headers()) {
                            map.put(StringData.fromString(header.key()), header.value());
                        }
                        return new GenericMapData(map);
                    }
                }),
        LEADER_EPOCH(
                "leader-epoch",
                DataTypes.INT().nullable(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return record.leaderEpoch().orElse(null);
                    }
                }),
        OFFSET(
                "offset",
                DataTypes.BIGINT().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return record.offset();
                    }
                }),
        TIMESTAMP(
                "timestamp",
                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return TimestampData.fromEpochMillis(record.timestamp());
                    }
                }),
        TIMESTAMP_TYPE(
                "timestamp-type",
                DataTypes.STRING().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return StringData.fromString(record.timestampType().toString());
                    }
                });
        final String key;
        final DataType dataType;
        final MetadataConverter converter;
        ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
            this.key = key;
            this.dataType = dataType;
            this.converter = converter;
        }
    }
}


当然,里面最主要的还是FlinkKafkaConsumer消费者这个类,这里就不再描述具体的代码细节,贴上核心的解析:

7dbaa23a372949cb863c14fc1bd1f149.png

8ce0a588a3124185bb262734099f9bb2.png


RichParallelSourceFunction就自己看源码吧,一般都不会改的。


2.4.2 KafkaDynamicSink


原理和KafkaDynamicSource差不多,这里就不再详述了。

03 总结


本文的代码繁多,需要读者耐心的去看,为了更进一步的加深大家了解,这里画了一张图,希望能让大家更加容易理解:

28e7bfd4dbea4371ae346ffce924f5bd.png


到这里,本文讲解完了Flink Kafka Connector的源码了,希望能帮助到大家,谢谢大家的阅读,本文完!



目录
相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
202 0
|
4月前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
2月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
44 7
|
2月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
72 4
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
62 1
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
199 0
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
52 0
|
2月前
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
50 0
|
2月前
|
消息中间件 NoSQL Kafka
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
Flink-05 Flink Java 3分钟上手 Redis FlinkJedisPoolConfig 从Kafka写入Redis FlinkKafkaConsumer消费 结果写入Redis
52 0
|
2月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
96 0