flink kafka connector源码解读(超详细)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: flink kafka connector源码解读(超详细)

01 引言

在上一节《Flink自定义Connector》,我们知道了Flink自定义Connector的基本流程,其流程图如下:

进入代码层面,开发者自定义connector的流程如下:

  1. 首先定义Flink SQL的DDL;
  2. 创建解析和验证选项的工厂(DynamicTableSourceFactory、DeserializationFormatFactory),注意两个工厂都已添加到META-INF/services目录中;
  3. 实现ScanTableSource;
  4. 具体业务细节在getScanRuntimeProvider实现;

为了掌握Flink自定义Connector,本文直接从源码出发,研究Flinkkafka connector是如何实现的?

附:Flink源码下载地址

02 Kafka-Connector 源码分析

2.1 项目结构

导入Flink的源码后,可以看到Kafka Connector的源码结构如下:

先来看看里面的pom文件,其余细节不用看,主要看依赖的内容,下面做了一些整理与添加了相关的描述:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <artifactId>flink-connectors</artifactId>
    <groupId>org.apache.flink</groupId>
    <version>1.13.6</version>
    <relativePath>..</relativePath>
  </parent>
  <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  <name>Flink : Connectors : Kafka</name>
  <packaging>jar</packaging>
  <properties>
    <kafka.version>2.4.1</kafka.version>
  </properties>
  <dependencies>
    <!----- Flink的相关依赖,带flink的都是 ----->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${project.version}</version>
      <scope>provided</scope>
    </dependency>
      <dependency>
        <!-- 其它Flink相关依赖,此处不再详述 ...... -->
      </dependency>
    <!-- Kafka相关依赖 -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
  </dependencies>
  <build>
    <!-- 打包插件的定义这些内容.....-->
  </build>
</project>

2.2 工厂(源码解读入口)

src/main/resources/META-INF/services目录,我们可以看到了有两个文件:

结合引言里面的《流程图》,可以知道定义完DDL后,工厂就是整个流程的入口了,我们看看里面的两个文件内容。

org.apache.flink.table.factories.Factory(Table SQL模式)

org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory

org.apache.flink.table.factories.TableFactory(Table API模式)

org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

从配置文件可以得知,我们需要继续研读以下工厂类:

  • KafkaDynamicTableFactory
  • UpsertKafkaDynamicTableFactory
  • KafkaTableSourceSinkFactory

2.2.1 KafkaDynamicTableFactory

KafkaDynamicTableFactory 的详细代码如下,里面已经写好注释了,可以方便大家的理解:

/**
 * Kafka KafkaDynamicTable工厂
 * <p>
 * 描述:其作用是为 KafkaDynamicSource和KafkaDynamicSink类配置实例
 *
 * @author : YangLinWei
 * @createTime: 2022/4/12 11:06 上午
 * @version: 1.0.0
 */
@Internal
public class KafkaDynamicTableFactory
        implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    public static final String IDENTIFIER = "kafka";
    /**
     * 返回一个唯一的标识符(对应的是FlinkSQL里面的`connector = '...'`)
     */
    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }
    /**
     * 设置该工厂实现需要(必填)的Options集合
     */
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(PROPS_BOOTSTRAP_SERVERS);
        return options;
    }
    /**
     * 设置工厂可能需要(非必填)的Options集合
     */
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(FactoryUtil.FORMAT);
        options.add(KEY_FORMAT);
        options.add(KEY_FIELDS);
        options.add(KEY_FIELDS_PREFIX);
        options.add(VALUE_FORMAT);
        options.add(VALUE_FIELDS_INCLUDE);
        options.add(TOPIC);
        options.add(TOPIC_PATTERN);
        options.add(PROPS_GROUP_ID);
        options.add(SCAN_STARTUP_MODE);
        options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
        options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
        options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
        options.add(SINK_PARTITIONER);
        options.add(SINK_SEMANTIC);
        options.add(SINK_PARALLELISM);
        return options;
    }
    /**
     * 创建DynamicTableSource
     * <p>
     * 描述:在这个方法中执行验证和进一步嵌套工厂
     */
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        //1. 在这里使用FactoryUtil实现的自定义验证逻辑…
        final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        //2. 获取所有的配置
        final ReadableConfig tableOptions = helper.getOptions();
        //3. 获取一个合适的key解码格式
        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
                getKeyDecodingFormat(helper);
        //4. 获取一个合适的value解码格式
        final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
                getValueDecodingFormat(helper);
        //5. 校验参数(参数格式、主题、starupmode、主键等,校验失败直接抛异常)
        helper.validateExcept(PROPERTIES_PREFIX);
        validateTableSourceOptions(tableOptions);
        validatePKConstraints(
                context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
        //6. 获取参数并设置参数到Properties
        final StartupOptions startupOptions = getStartupOptions(tableOptions);
        final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
        properties.setProperty(
                FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
                String.valueOf(
                        tableOptions
                                .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
                                .map(Duration::toMillis)
                                .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
        final DataType physicalDataType =
                context.getCatalogTable().getSchema().toPhysicalRowDataType();
        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
        // 7. 最后创建DynamicTableSource实例并注入参数
        return createKafkaTableSource(
                physicalDataType,
                keyDecodingFormat.orElse(null),
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                KafkaOptions.getSourceTopics(tableOptions),
                KafkaOptions.getSourceTopicPattern(tableOptions),
                properties,
                startupOptions.startupMode,
                startupOptions.specificOffsets,
                startupOptions.startupTimestampMillis);
    }
    /**
     * 创建DynamicTableSink(原理和创建DynamicTableSource几乎一致)
     * <p>
     * 描述:在这个方法中执行验证和进一步嵌套工厂
     */
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        //1. 在这里使用FactoryUtil实现的自定义验证逻辑…
        final TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(
                        this, autoCompleteSchemaRegistrySubject(context));
        //2. 获取所有的配置
        final ReadableConfig tableOptions = helper.getOptions();
        //3. 获取一个合适的key解码格式
        final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
                getKeyEncodingFormat(helper);
        //4. 获取一个合适的value解码格式
        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
                getValueEncodingFormat(helper);
        //5. 校验参数(参数格式、主题、starupmode、主键等,校验失败直接抛异常)
        helper.validateExcept(PROPERTIES_PREFIX);
        validateTableSinkOptions(tableOptions);
        validatePKConstraints(
                context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
        //6. 获取参数
        final DataType physicalDataType =
                context.getCatalogTable().getSchema().toPhysicalRowDataType();
        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
        final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
        // 7. 最后创建DynamicTableSink实例并注入参数
        return createKafkaTableSink(
                physicalDataType,
                keyEncodingFormat.orElse(null),
                valueEncodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                tableOptions.get(TOPIC).get(0),
                getKafkaProperties(context.getCatalogTable().getOptions()),
                getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
                getSinkSemantic(tableOptions),
                parallelism);
    }
    // ---以下方法均是上面代码的校验、创建实例细节,无需深入研究,了解即可-----------------------------------
    private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat(
            TableFactoryHelper helper) {
        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
                helper.discoverOptionalDecodingFormat(
                        DeserializationFormatFactory.class, KEY_FORMAT);
        keyDecodingFormat.ifPresent(
                format -> {
                    if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
                        throw new ValidationException(
                                String.format(
                                        "A key format should only deal with INSERT-only records. "
                                                + "But %s has a changelog mode of %s.",
                                        helper.getOptions().get(KEY_FORMAT),
                                        format.getChangelogMode()));
                    }
                });
        return keyDecodingFormat;
    }
    private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat(
            TableFactoryHelper helper) {
        final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
                helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
        keyEncodingFormat.ifPresent(
                format -> {
                    if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
                        throw new ValidationException(
                                String.format(
                                        "A key format should only deal with INSERT-only records. "
                                                + "But %s has a changelog mode of %s.",
                                        helper.getOptions().get(KEY_FORMAT),
                                        format.getChangelogMode()));
                    }
                });
        return keyEncodingFormat;
    }
    private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
            TableFactoryHelper helper) {
        return helper.discoverOptionalDecodingFormat(
                DeserializationFormatFactory.class, FactoryUtil.FORMAT)
                .orElseGet(
                        () ->
                                helper.discoverDecodingFormat(
                                        DeserializationFormatFactory.class, VALUE_FORMAT));
    }
    private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
            TableFactoryHelper helper) {
        return helper.discoverOptionalEncodingFormat(
                SerializationFormatFactory.class, FactoryUtil.FORMAT)
                .orElseGet(
                        () ->
                                helper.discoverEncodingFormat(
                                        SerializationFormatFactory.class, VALUE_FORMAT));
    }
    private static void validatePKConstraints(
            ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
        if (catalogTable.getSchema().getPrimaryKey().isPresent()
                && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
            Configuration options = Configuration.fromMap(catalogTable.getOptions());
            String formatName =
                    options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
            throw new ValidationException(
                    String.format(
                            "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
                                    + " on the table, because it can't guarantee the semantic of primary key.",
                            tableName.asSummaryString(),
                            formatName));
        }
    }
    // --------------------------------------------------------------------------------------------
    protected KafkaDynamicSource createKafkaTableSource(
            DataType physicalDataType,
            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            @Nullable List<String> topics,
            @Nullable Pattern topicPattern,
            Properties properties,
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets,
            long startupTimestampMillis) {
        return new KafkaDynamicSource(
                physicalDataType,
                keyDecodingFormat,
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topics,
                topicPattern,
                properties,
                startupMode,
                specificStartupOffsets,
                startupTimestampMillis,
                false);
    }
    protected KafkaDynamicSink createKafkaTableSink(
            DataType physicalDataType,
            @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            String topic,
            Properties properties,
            FlinkKafkaPartitioner<RowData> partitioner,
            KafkaSinkSemantic semantic,
            Integer parallelism) {
        return new KafkaDynamicSink(
                physicalDataType,
                physicalDataType,
                keyEncodingFormat,
                valueEncodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topic,
                properties,
                partitioner,
                semantic,
                false,
                SinkBufferFlushMode.DISABLED,
                parallelism);
    }
}

从源码我们得知,KafkaDynamicTableFactory的主要作用有:

  • 定义keyvalue的序列化和反序列化格式工厂
  • 校验连接参数
  • 注入连接参数到DynamicTableSourceDynamicTableSink实例

后面会继续介绍如何具体实现SerializationFormatFactoryDeserializationFormatFactory,以及DynamicTableSourceDynamicTableSink实例的实现。

2.2.2 UpsertKafkaDynamicTableFactory

UpsertKafkaDynamicTableFactory源码里面未找到这个工厂,其代码流程猜测与KafkaDynamicTableFactory一致。

2.2.3 KafkaTableSourceSinkFactory

Table API模式与Table SQL模式(声明式)原理几乎一致,都是用作查询的输入和输出,只是写法表现上有些区别,因此,这里我们只需要研读Table SQL模式下的工厂代码就好了,这里不再讲解KafkaTableSourceSinkFactory,不影响整体理解。

2.3 序列化和反序列化工厂

2.3.1 DeserializationFormatFactory

《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂定义了SerializationFormatFactoryDeserializationFormatFactory,以及DynamicTableSourceDynamicTableSink实例的实现。下面按顺序的讲解。首先来看看DeserializationFormatFactory

Ctrl+T,可以查看 DeserializationFormatFactory 的反序列化工厂有哪些实现。

这里拿典型的JsonFormatFactory来举例,完整代码如下,里面已经写好注释,方便大家的理解:

/**
 * JsonFormatFactory支持将json字符串转换为RowData行数据
 *
 * @author : YangLinWei
 * @createTime: 2022/4/12 11:57 上午
 * @version: 1.0.0
 */
public class JsonFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
    /**
     * 唯一标识(对应Flink SQL的'format' = 'json)
     */
    public static final String IDENTIFIER = "json";
    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }
    /**
     * 创建解码格式,主要为了创建运行时解码器,即:DeserializationSchema
     */
    @Override
    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
        FactoryUtil.validateFactoryOptions(this, formatOptions);
        validateDecodingFormatOptions(formatOptions);
        final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
        final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
        TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
        return new DecodingFormat<DeserializationSchema<RowData>>() {
            @Override
            public DeserializationSchema<RowData> createRuntimeDecoder(
                    DynamicTableSource.Context context, DataType producedDataType) {
                final RowType rowType = (RowType) producedDataType.getLogicalType();
                final TypeInformation<RowData> rowDataTypeInfo =
                        context.createTypeInformation(producedDataType);
                return new JsonRowDataDeserializationSchema(
                        rowType,
                        rowDataTypeInfo,
                        failOnMissingField,
                        ignoreParseErrors,
                        timestampOption);
            }
            @Override
            public ChangelogMode getChangelogMode() {
                return ChangelogMode.insertOnly();
            }
        };
    }
    /**
     * 创建编码格式,主要为了创建运行时编码器,即:SerializationSchema
     */
    @Override
    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
        FactoryUtil.validateFactoryOptions(this, formatOptions);
        validateEncodingFormatOptions(formatOptions);
        TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
        JsonOptions.MapNullKeyMode mapNullKeyMode = JsonOptions.getMapNullKeyMode(formatOptions);
        String mapNullKeyLiteral = formatOptions.get(MAP_NULL_KEY_LITERAL);
        final boolean encodeDecimalAsPlainNumber =
                formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
        return new EncodingFormat<SerializationSchema<RowData>>() {
            @Override
            public SerializationSchema<RowData> createRuntimeEncoder(
                    DynamicTableSink.Context context, DataType consumedDataType) {
                final RowType rowType = (RowType) consumedDataType.getLogicalType();
                return new JsonRowDataSerializationSchema(
                        rowType,
                        timestampOption,
                        mapNullKeyMode,
                        mapNullKeyLiteral,
                        encodeDecimalAsPlainNumber);
            }
            @Override
            public ChangelogMode getChangelogMode() {
                return ChangelogMode.insertOnly();
            }
        };
    }
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(FAIL_ON_MISSING_FIELD);
        options.add(IGNORE_PARSE_ERRORS);
        options.add(TIMESTAMP_FORMAT);
        options.add(MAP_NULL_KEY_MODE);
        options.add(MAP_NULL_KEY_LITERAL);
        options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
        return options;
    }
}

我们在前面可以知道,工厂的思想就是定义规则,并把这些规则注入到我们新建的实例并返回给上一级。

从上面的代码注释,可以看到JsonFormatFactory工厂实现了DeserializationFormatFactorySerializationFormatFactory 两个工厂,里面均实现了这两个类的方法,分别为“createDecodingFormat”和“createEncodingFormat”,而这两个方法的核心主要是为了生成“DeserializationSchema”和“SerializationSchema”。

简单的说,就是JsonFormatFactory这个工厂具体的序列化和反序列化实现在“DeserializationSchema”和“SerializationSchema”这两个接口的实现里面,分别对应“JsonRowDataDeserializationSchema”和“JsonRowDataSerializationSchema”。

下面先看看“JsonRowDataDeserializationSchema”这个类。

2.3.1.2.1 JsonRowDataDeserializationSchema

JsonRowDataDeserializationSchema的主要作用是将JSON反序列化为Flink Table/SQL的内部数据结构,其代码与注释如下:

/**
 * 反序列化JSON为Flink Table/SQL的内部数据结构
 * <p>
 * 将byte[]消息反序列化为JSON对象,并读取指定的字段
 */
@Internal
public class JsonRowDataDeserializationSchema implements DeserializationSchema<RowData> {
    private static final long serialVersionUID = 1L;
    /** 如果字段缺失,指示是否失败的标志. */
    private final boolean failOnMissingField;
    /** 指示是否忽略无效字段/行的标志(默认:抛出异常)。 */
    private final boolean ignoreParseErrors;
    /** 类型生成的信息. */
    private final TypeInformation<RowData> resultTypeInfo;
    /**
     * 运行时转换器,将JsonNode转换为Flink SQL内部数据结构的对象。
     */
    private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter;
    /** 解析JSON的对象映射器。 */
    private final ObjectMapper objectMapper = new ObjectMapper();
    /** 时间戳格式规范,用于解析时间戳。 */
    private final TimestampFormat timestampFormat;
    /** 构造函数 */
    public JsonRowDataDeserializationSchema(
            RowType rowType,
            TypeInformation<RowData> resultTypeInfo,
            boolean failOnMissingField,
            boolean ignoreParseErrors,
            TimestampFormat timestampFormat) {
        if (ignoreParseErrors && failOnMissingField) {
            throw new IllegalArgumentException(
                    "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
        }
        this.resultTypeInfo = checkNotNull(resultTypeInfo);
        this.failOnMissingField = failOnMissingField;
        this.ignoreParseErrors = ignoreParseErrors;
        this.runtimeConverter =
                new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
                        .createConverter(checkNotNull(rowType));
        this.timestampFormat = timestampFormat;
        boolean hasDecimalType =
                LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
        if (hasDecimalType) {
            objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
        }
        objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
    }
    /**
     * 实际的反序列化操作
     */
    @Override
    public RowData deserialize(@Nullable byte[] message) throws IOException {
        if (message == null) {
            return null;
        }
        try {
            return convertToRowData(deserializeToJsonNode(message));
        } catch (Throwable t) {
            if (ignoreParseErrors) {
                return null;
            }
            throw new IOException(
                    format("Failed to deserialize JSON '%s'.", new String(message)), t);
        }
    }
    /** 反序列化内容为JsonNode */
    public JsonNode deserializeToJsonNode(byte[] message) throws IOException {
        return objectMapper.readTree(message);
    }
    /** 转换JsonNode为Flink Table/SQL的内部数据结构 */
    public RowData convertToRowData(JsonNode message) {
        return (RowData) runtimeConverter.convert(message);
    }
    @Override
    public boolean isEndOfStream(RowData nextElement) {
        return false;
    }
    @Override
    public TypeInformation<RowData> getProducedType() {
        return resultTypeInfo;
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        JsonRowDataDeserializationSchema that = (JsonRowDataDeserializationSchema) o;
        return failOnMissingField == that.failOnMissingField
                && ignoreParseErrors == that.ignoreParseErrors
                && resultTypeInfo.equals(that.resultTypeInfo)
                && timestampFormat.equals(that.timestampFormat);
    }
    @Override
    public int hashCode() {
        return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat);
    }
}

里面使用过了工具类JsonToRowDataConverter ,它的主要作用是将JsonNode转换为Flink Table/SQL的内部数据结构,这里由于篇幅原因,代码细节不再详述,不过可以看下代码片段:

2.3.2 SerializationFormatFactory

与 DeserializationFormatFactory原理差不多,此处不再详述。

2.3.2.2 JsonRowDataSerializationSchema

JsonRowDataSerializationSchema与JsonRowDataDeserializationSchema的原理差不多,主要作用是将Flink Table/SQL的内部数据结构序列化为JSON,这里不再详述。

2.4 DynamicTable动态表

《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂除了定义了SerializationFormatFactoryDeserializationFormatFactory,还定义DynamicTableSourceDynamicTableSink实例的实现。

接下来讲解DynamicTableSource,其对应的实现类是KafkaDynamicSource,我们看看KafkaDynamicSource里面的代码。

2.4.1 KafkaDynamicSource

KafkaDynamicSource主要是对Source的一些处理,比如这里定义了Kafka的消费者,具体的代码及注释如下:

/** KafkaDynamicSource. */
@Internal
public class KafkaDynamicSource
        implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
    // --------------------------------------------------------------------------------------------
    // 可变的属性
    // --------------------------------------------------------------------------------------------
    /** 描述源的最终输出的数据类型。 */
    protected DataType producedDataType;
    /** 附加在物理源行末尾的元数据。 */
    protected List<String> metadataKeys;
    /** 用于生成每个分区水印的水印策略。 */
    protected @Nullable
    WatermarkStrategy<RowData> watermarkStrategy;
    // --------------------------------------------------------------------------------------------
    // 格式属性
    // --------------------------------------------------------------------------------------------
    private static final String VALUE_METADATA_PREFIX = "value.";
    /** 配置格式的数据类型。 */
    protected final DataType physicalDataType;
    /** 从Kafka解码keys的可选格式。 */
    protected final @Nullable
    DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
    /** 从Kafka解码值的格式。 */
    protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
    /** 确定key字段和生成行的目标位置的索引。 */
    protected final int[] keyProjection;
    /** 确定value字段和生成行的目标位置的索引。 */
    protected final int[] valueProjection;
    /** 在构造物理数据类型时需要从字段中删除的前缀。 */
    protected final @Nullable
    String keyPrefix;
    // --------------------------------------------------------------------------------------------
    // Kafka特定的属性
    // --------------------------------------------------------------------------------------------
    /** 要消费的Kafka主题。 */
    protected final List<String> topics;
    /** 消费的Kafka主题模式。 */
    protected final Pattern topicPattern;
    /** Kafka消费者的属性。 */
    protected final Properties properties;
    /**
     * 被包含的消费者的启动模式(默认是{@link StartupMode#GROUP_OFFSETS})。
     */
    protected final StartupMode startupMode;
    /**
     * 具体启动补偿;仅当启动模式为{@link时相关StartupMode # SPECIFIC_OFFSETS}。
     */
    protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
    /**
     * 定位分区偏移量的开始时间戳;仅当启动模式为{@link时相关 StartupMode #时间戳}。
     */
    protected final long startupTimestampMillis;
    /** 标志以确定源模式。在upsert模式下,它将保留tombstone消息。 * */
    protected final boolean upsertMode;
    /** 构造函数,初始化一些信息 */
    public KafkaDynamicSource(
            DataType physicalDataType,
            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            @Nullable List<String> topics,
            @Nullable Pattern topicPattern,
            Properties properties,
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets,
            long startupTimestampMillis,
            boolean upsertMode) {
        // Format attributes
        this.physicalDataType =
                Preconditions.checkNotNull(
                        physicalDataType, "Physical data type must not be null.");
        this.keyDecodingFormat = keyDecodingFormat;
        this.valueDecodingFormat =
                Preconditions.checkNotNull(
                        valueDecodingFormat, "Value decoding format must not be null.");
        this.keyProjection =
                Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
        this.valueProjection =
                Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
        this.keyPrefix = keyPrefix;
        // Mutable attributes
        this.producedDataType = physicalDataType;
        this.metadataKeys = Collections.emptyList();
        this.watermarkStrategy = null;
        // Kafka-specific attributes
        Preconditions.checkArgument(
                (topics != null && topicPattern == null)
                        || (topics == null && topicPattern != null),
                "Either Topic or Topic Pattern must be set for source.");
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
        this.startupMode =
                Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
        this.specificStartupOffsets =
                Preconditions.checkNotNull(
                        specificStartupOffsets, "Specific offsets must not be null.");
        this.startupTimestampMillis = startupTimestampMillis;
        this.upsertMode = upsertMode;
    }
    @Override
    public ChangelogMode getChangelogMode() {
        return valueDecodingFormat.getChangelogMode();
    }
    /**
     * 返回一个用于读取数据的运行时实现的Provider,这里是主要的业务实现
     */
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        final DeserializationSchema<RowData> keyDeserialization =
                createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
        final DeserializationSchema<RowData> valueDeserialization =
                createDeserialization(context, valueDecodingFormat, valueProjection, null);
        final TypeInformation<RowData> producedTypeInfo =
                context.createTypeInformation(producedDataType);
        final FlinkKafkaConsumer<RowData> kafkaConsumer =
                createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);
        return SourceFunctionProvider.of(kafkaConsumer, false);
    }
    /**
     * 返回元数据键的映射及其可以生成的相应数据类型
     */
    @Override
    public Map<String, DataType> listReadableMetadata() {
        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
        // according to convention, the order of the final row must be
        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
        // where the format metadata has highest precedence
        // add value format metadata with prefix
        valueDecodingFormat
                .listReadableMetadata()
                .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
        // add connector metadata
        Stream.of(ReadableMetadata.values())
                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
        return metadataMap;
    }
    @Override
    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
        // separate connector and format metadata
        final List<String> formatMetadataKeys =
                metadataKeys.stream()
                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
                        .collect(Collectors.toList());
        final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
        connectorMetadataKeys.removeAll(formatMetadataKeys);
        // push down format metadata
        final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata();
        if (formatMetadata.size() > 0) {
            final List<String> requestedFormatMetadataKeys =
                    formatMetadataKeys.stream()
                            .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
                            .collect(Collectors.toList());
            valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
        }
        this.metadataKeys = connectorMetadataKeys;
        this.producedDataType = producedDataType;
    }
    @Override
    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }
    @Override
    public DynamicTableSource copy() {
        final KafkaDynamicSource copy =
                new KafkaDynamicSource(
                        physicalDataType,
                        keyDecodingFormat,
                        valueDecodingFormat,
                        keyProjection,
                        valueProjection,
                        keyPrefix,
                        topics,
                        topicPattern,
                        properties,
                        startupMode,
                        specificStartupOffsets,
                        startupTimestampMillis,
                        upsertMode);
        copy.producedDataType = producedDataType;
        copy.metadataKeys = metadataKeys;
        copy.watermarkStrategy = watermarkStrategy;
        return copy;
    }
    @Override
    public String asSummaryString() {
        return "Kafka table source";
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        final KafkaDynamicSource that = (KafkaDynamicSource) o;
        return Objects.equals(producedDataType, that.producedDataType)
                && Objects.equals(metadataKeys, that.metadataKeys)
                && Objects.equals(physicalDataType, that.physicalDataType)
                && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
                && Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
                && Arrays.equals(keyProjection, that.keyProjection)
                && Arrays.equals(valueProjection, that.valueProjection)
                && Objects.equals(keyPrefix, that.keyPrefix)
                && Objects.equals(topics, that.topics)
                && Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern))
                && Objects.equals(properties, that.properties)
                && startupMode == that.startupMode
                && Objects.equals(specificStartupOffsets, that.specificStartupOffsets)
                && startupTimestampMillis == that.startupTimestampMillis
                && Objects.equals(upsertMode, that.upsertMode)
                && Objects.equals(watermarkStrategy, that.watermarkStrategy);
    }
    @Override
    public int hashCode() {
        return Objects.hash(
                producedDataType,
                metadataKeys,
                physicalDataType,
                keyDecodingFormat,
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topics,
                topicPattern,
                properties,
                startupMode,
                specificStartupOffsets,
                startupTimestampMillis,
                upsertMode,
                watermarkStrategy);
    }
    // ---------具体的业务实现------------------------------------------------------------------------
    /** 创建Kafka消费者 **/
    protected FlinkKafkaConsumer<RowData> createKafkaConsumer(
            DeserializationSchema<RowData> keyDeserialization,
            DeserializationSchema<RowData> valueDeserialization,
            TypeInformation<RowData> producedTypeInfo) {
        final MetadataConverter[] metadataConverters =
                metadataKeys.stream()
                        .map(
                                k ->
                                        Stream.of(ReadableMetadata.values())
                                                .filter(rm -> rm.key.equals(k))
                                                .findFirst()
                                                .orElseThrow(IllegalStateException::new))
                        .map(m -> m.converter)
                        .toArray(MetadataConverter[]::new);
        // check if connector metadata is used at all
        final boolean hasMetadata = metadataKeys.size() > 0;
        // adjust physical arity with value format's metadata
        final int adjustedPhysicalArity =
                producedDataType.getChildren().size() - metadataKeys.size();
        // adjust value format projection to include value format's metadata columns at the end
        final int[] adjustedValueProjection =
                IntStream.concat(
                        IntStream.of(valueProjection),
                        IntStream.range(
                                keyProjection.length + valueProjection.length,
                                adjustedPhysicalArity))
                        .toArray();
        final KafkaDeserializationSchema<RowData> kafkaDeserializer =
                new DynamicKafkaDeserializationSchema(
                        adjustedPhysicalArity,
                        keyDeserialization,
                        keyProjection,
                        valueDeserialization,
                        adjustedValueProjection,
                        hasMetadata,
                        metadataConverters,
                        producedTypeInfo,
                        upsertMode);
        final FlinkKafkaConsumer<RowData> kafkaConsumer;
        if (topics != null) {
            kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
        } else {
            kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
        }
        switch (startupMode) {
            case EARLIEST:
                kafkaConsumer.setStartFromEarliest();
                break;
            case LATEST:
                kafkaConsumer.setStartFromLatest();
                break;
            case GROUP_OFFSETS:
                kafkaConsumer.setStartFromGroupOffsets();
                break;
            case SPECIFIC_OFFSETS:
                kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
                break;
            case TIMESTAMP:
                kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
                break;
        }
        kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
        if (watermarkStrategy != null) {
            kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy);
        }
        return kafkaConsumer;
    }
    /** 创建反序列化模式 **/
    private @Nullable
    DeserializationSchema<RowData> createDeserialization(
            DynamicTableSource.Context context,
            @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
            int[] projection,
            @Nullable String prefix) {
        if (format == null) {
            return null;
        }
        DataType physicalFormatDataType =
                DataTypeUtils.projectRow(this.physicalDataType, projection);
        if (prefix != null) {
            physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
        }
        return format.createRuntimeDecoder(context, physicalFormatDataType);
    }
    // --------------------------------------------------------------------------------------------
    // 元数据处理
    // --------------------------------------------------------------------------------------------
    enum ReadableMetadata {
        TOPIC(
                "topic",
                DataTypes.STRING().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return StringData.fromString(record.topic());
                    }
                }),
        PARTITION(
                "partition",
                DataTypes.INT().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return record.partition();
                    }
                }),
        HEADERS(
                "headers",
                // key and value of the map are nullable to make handling easier in queries
                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable())
                        .notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        final Map<StringData, byte[]> map = new HashMap<>();
                        for (Header header : record.headers()) {
                            map.put(StringData.fromString(header.key()), header.value());
                        }
                        return new GenericMapData(map);
                    }
                }),
        LEADER_EPOCH(
                "leader-epoch",
                DataTypes.INT().nullable(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return record.leaderEpoch().orElse(null);
                    }
                }),
        OFFSET(
                "offset",
                DataTypes.BIGINT().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return record.offset();
                    }
                }),
        TIMESTAMP(
                "timestamp",
                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return TimestampData.fromEpochMillis(record.timestamp());
                    }
                }),
        TIMESTAMP_TYPE(
                "timestamp-type",
                DataTypes.STRING().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object read(ConsumerRecord<?, ?> record) {
                        return StringData.fromString(record.timestampType().toString());
                    }
                });
        final String key;
        final DataType dataType;
        final MetadataConverter converter;
        ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
            this.key = key;
            this.dataType = dataType;
            this.converter = converter;
        }
    }
}

当然,里面最主要的还是FlinkKafkaConsumer消费者这个类,这里就不再描述具体的代码细节,贴上核心的解析:

RichParallelSourceFunction就自己看源码吧,一般都不会改的。

2.4.2 KafkaDynamicSink

原理和KafkaDynamicSource差不多,这里就不再详述了。

03 总结

本文的代码繁多,需要读者耐心的去看,为了更进一步的加深大家了解,这里画了一张图,希望能让大家更加容易理解:

到这里,本文讲解完了Flink Kafka Connector的源码了,希望能帮助到大家,谢谢大家的阅读,本文完!

目录
相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
196 0
|
2月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
42 7
|
2月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
71 4
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
195 0
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
50 0
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1214 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
1月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
156 56
|
5月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
867 7
阿里云实时计算Flink在多行业的应用和实践
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。