Flink进行Paimon写入源码分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文主要解析了Flink写入Paimon的核心流程。

1. 前言

Paimon的前身是Flink-Table-Store,希望提供流批一体的存储,提供一定的OLAP查询能力(基于列式存储),做到毫秒级别的实时流式读取。Flink-Table-Store希望能够支持Flink SQL的全部概念,能够结合Flink SQL提供DB级别体验,并且支持大规模的更新。Flink-Table-Store希望能够结合Flink,实现完整的流批一体体验(计算+存储),同时拓展Flink-Table-Store的生态,升级为Paimon,来支持更多大数据引擎的查询/写入。如果我们希望深度使用Paimon,并充分利用Paimon的特性,那么了解Flilnk写入Paimon的过程十分重要,本文希望通过源码分析的方式带大家充分了解Flink写入Paimon的完整过程。

2. 源码阅读demo

阅读源码最有效的方式就是跟读最简单的代码,由于Paimon的定位是打造类似Database的使用体验,与SQL结合很紧密,通常可以使用Flink SQL进行写入,但这里为了方便跟读代码,我们采用调api的方式进行写入

// 构建表
TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().build());
String tableSql = String.format(
    "create table %s (uuid string, name string, age int, ts int, part string, primary key (part,uuid) NOT ENFORCED)  partitioned by (part) with ('connector' = 'paimon', 'bucket' = '2', 'bucket-key' = 'uuid', 'path' = '%s')"
    , tableName
    , filePath
);
tableEnv.executeSql(tableSql);
Catalog catalog = tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get();
String database = catalog.getDefaultDatabase();
ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) catalog.getTable(new ObjectPath(database, tableName));
ObjectIdentifier tablePath = ObjectIdentifier.of(catalogName, database, tableName);
FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(
    tablePath, 
    catalogTable,
    Collections.emptyMap(),
    Configuration.fromMap(catalogTable.getOptions()),
    Thread.currentThread().getContextClassLoader(), 
    false
);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(20000);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 数据源
DataStream<RowData> dataStream = ...
// 将数据源写入Paimon表
((DataStreamSinkProvider) new FlinkTableFactory()
    .createDynamicTableSink(context)
    .getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)))
    .consumeDataStream(n -> Optional.empty(), dataStream);

env.execute("PaimonWriteDemoWithFlink");

3. Flink写入Paimon的完整流程

Paimon表的写入是通过FlinkTableSink实现DynamicTableSink接口来写入数据,核心逻辑位于getSinkRuntimeProvider方法中,后面我们会重点跟读getSinkRuntimeProvider方法中的内容。

这里也顺带讲一下通过Flink SQL进行写入的场景,对于Flink SQL而言,首先会通过calcite进行解析优化后生成JobGraph,再提交集群运行。我们希望了解Flink写入Paimon的完整过程,可以通过sql转化成JobGraph的过程中得到的Transformation列表去了解中间具体进行了哪些操作。下图是一条简单Flink SQL转化后的Transformation序列,其中红框内的Transformation序列是Paimon数据写入的完整过程,本质上是通过执行getSinkRuntimeProvider方法生成的。

图3-1 Flink SQL中Paimon表写入转化成Transformation序列

接着,我们对Flink写入Paimon表的流程进行了梳理,整理出完整的类图,如下图所示

图3-2 Flink写入Paimon完整流程类图

其中,核心入口FlinkTableSink的getSinkRuntimeProvider方法逻辑如下所示,

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
   
   
    ...
    LogSinkProvider logSinkProvider = null;
    if (logStoreTableFactory != null) {
   
   
        // 当配置了log.system,则会生成一个log sink的提供器,目前只有kafka的日志存储
        logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
    }

    Options conf = Options.fromMap(table.options());
    // 生成log sink函数,当overwrite模式时,不配置log sink
    final LogSinkFunction logSinkFunction =
            overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
    return new PaimonDataStreamSinkProvider(
            (dataStream) ->
                    // FileStoreTable时FileStore的抽象层,提供InternalRow的读取和写入
                    new FlinkSinkBuilder((FileStoreTable) table)
                            .withInput(
                                    new DataStream<>(dataStream.getExecutionEnvironment(),dataStream.getTransformation()))
                            .withLogSinkFunction(logSinkFunction)
                            .withOverwritePartition(overwrite ? staticPartitions : null)
                                            .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
                            .build());
}

继续跟到FlinkSinkBuilder的build方法,会根据Paimon表的bucket模式生成不同类型的Sink

public DataStreamSink<?> build() {
   
   
    BucketMode bucketMode = table.bucketMode();
    switch (bucketMode) {
   
   
        case FIXED:
            // 生成固定bucket个数的Sink,这种方式结构固定,本文着重分析这一路径
            return buildForFixedBucket();
        case DYNAMIC:
            // 生成动态变动Bucket个数的Sink,
            return buildDynamicBucketSink();
        case UNAWARE:
            return buildUnawareBucketSink();
        default:
            throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
    }
}

private DataStreamSink<?> buildForFixedBucket() {
   
   
    // 首先根据分区信息、bucket字段进行bucket分组
    DataStream<RowData> partitioned =
            partition(
                    input,
                    new RowDataChannelComputer(table.schema(), logSinkFunction != null),
                    parallelism);
    // FileStoreSink实现将记录写入Paimon,FileStoreSink提供了生成写入算子的方法
    FileStoreSink sink = new FileStoreSink(table, overwritePartition, logSinkFunction);
    return sink.sinkFrom(partitioned);
}

接着继续跟读到FileStoreSink的sinkFrom的方法

public DataStreamSink<?> sinkFrom(DataStream<T> input) {
   
   
    String initialCommitUser = UUID.randomUUID().toString();
    return sinkFrom(input, initialCommitUser);
}

public DataStreamSink<?> sinkFrom(DataStream<T> input, String initialCommitUser) {
   
   
    // 执行真正的写入操作,在这个阶段不会进行提交,相当于两阶段提交的第一阶段,进行数据写入,不会有snapshot生成
    SingleOutputStreamOperator<Committable> written =
                doWrite(input, initialCommitUser, input.getParallelism());
    // 执行提交操作,会生成snapshot,下游可见,如果日志配置
    return doCommit(written, initialCommitUser);
}

public SingleOutputStreamOperator<Committable> doWrite(
    DataStream<T> input, String commitUser, Integer parallelism) {
   
   
    ...
    // 是否只是writeOnly,如果是,则会忽略compact和snapshot过期,这个配置需要结合专门的compact任务执行,不然会造成小文件剧增,同时降低数据读的性能
    Boolean writeOnly = table.coreOptions().writeOnly();
    SingleOutputStreamOperator<Committable> written =
        input.transform(
           (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " -> " + table.name(),
            new CommittableTypeInfo(),
            // 生成写入算子,上游的数据会通过这个算子将数据写入到Paimon表
            createWriteOperator(
                createWriteProvider(env.getCheckpointConfig(), isStreaming), commitUser)
        ).setParallelism(parallelism == null ? input.getParallelism() : parallelism);
    ...
    Options options = Options.fromMap(table.options());
    if (options.get(SINK_USE_MANAGED_MEMORY)) {
   
   
        // Flink会创建一个独立的内存分配器用于merge tree的数据写入操作
        // 否则会使用TM的管理内存支持写入操作
        MemorySize memorySize = options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY);
        written.getTransformation()
            .declareManagedMemoryUseCaseAtOperatorScope(
                ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
    }
    return written;
}

protected DataStreamSink<?> doCommit(DataStream<Committable> written, String commitUser) {
   
   
    ...
    SingleOutputStreamOperator<?> committed =
        written.transform(
                GLOBAL_COMMITTER_NAME + " -> " + table.name(),
                new CommittableTypeInfo(),
                // 执行提交操作的算子,该算子会生成snapshot,下游可见
                new CommitterOperator<>(
                    streamingCheckpointEnabled,
                    commitUser,
                    createCommitterFactory(streamingCheckpointEnabled),
                    createCommittableStateManager()))
            .setParallelism(1)
            .setMaxParallelism(1);
    return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

至此,我们可以发现,Flink写入Paimon的所有Transformation已经构建完成了,接下来,我们可以跟着写入算子的processElement方法了解数据是如何成为Paimon底层的LSM-tree数据结构的。

4. Flink写入数据

Flink写入Paimon的算子是RowDataStoreWriteOperator,算子是预提交算子,会将数据flush的磁盘,但不会执行commit操作,核心代码如下

public void processElement(StreamRecord<RowData> element) throws Exception {
   
   
    sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;

    SinkRecord record;
    try {
   
   
        // 将数据写入Paimon的文件系统
        record = write.write(new FlinkRowWrapper(element.getValue()));
    } catch (Exception e) {
   
   
        throw new IOException(e);
    }

    if (logSinkFunction != null) {
   
   
        // 将数据双写到日志存储层,目前只支持写入到kafka
        SinkRecord logRecord = write.toLogRecord(record);
        logSinkFunction.invoke(logRecord, sinkContext);
    }
}

上述算子中的write生成是通过FlieStoreTable.store().newWrite()的逻辑生成的

FileStoreTable是对Paimon表的文件存储的抽象层,提供底层数据的读写api,Paimon会根据WriteMode和是否包含主键来决定生成哪一种FileStoreTable:

  • 当WriteMode = APPEND_ONLY,则会生成AppendOnlyFileStoreTable,这种表的写入流程很简单,当写入的记录数达到阈值且缓存数据量达到单文件大小时就会进行flush操作;
  • 当WriteMode = CHANGE_LOG 且不包含主键,则会生成ChangelogValueCountFileStoreTable,当从内存池拉不到内存块的时候,会进行flush操作,在flush操作的时候,会记录flush的数据的统计值;
  • 当WriteMode = CHANGE_LOG 且包含主键,则会生成ChangelogWithKeyFileStoreTable,与ChangelogValueCountFileStoreTable的flush时机一致,只是在flush中会根据主键进行merge操作;
  • 当WriteMode = AUTO,则会根据是否有主键生成AppendOnlyFileStoreTable还是ChangelogWithKeyFileStoreTable。

接下来,我们会分析FileStoreTable为ChangelogWithKeyFileStoreTable的情况。根据图3-2的类图会发现,最终会调用到MergeTreeWriter的write()方法。该方法会先将数据缓存在内存缓存中(SortBuffer),该部分缓存数据为排序数据,当缓存满了之后,会将缓存中的数据flush到磁盘。

public void write(KeyValue kv) throws Exception {
   
   
    long sequenceNumber =
            kv.sequenceNumber() == KeyValue.UNKNOWN_SEQUENCE
                    ? newSequenceNumber()
                    : kv.sequenceNumber();
    // 将数据put到内存缓存中
    boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
    if (!success) {
   
   
        // 缓存满了,会flush缓存数据到磁盘
        flushWriteBuffer(false, false);
        success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
        if (!success) {
   
   
            throw new RuntimeException("Mem table is too small to hold a single element.");
        }
    }
}

flush的代码如下

private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception {
   
   
    if (writeBuffer.size() > 0) {
   
   
        if (compactManager.shouldWaitForLatestCompaction()) {
   
   
            // 判断是否需要进行等待上一次compact结束
            waitForLatestCompaction = true;
        }
        // 根据changelogProducer的类型生成changelog文件写入器
        final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
            changelogProducer == ChangelogProducer.INPUT ? writerFactory.createRollingChangelogFileWriter(0) : null;
        // 生成flush数据的MergeTree文件写入器,根据文件大小进行rolling
        final RollingFileWriter<KeyValue, DataFileMeta> dataWriter = writerFactory.createRollingMergeTreeFileWriter(0);
        try {
   
   
            // 将缓存中的所有数据flush到磁盘
            writeBuffer.forEach(
                keyComparator,
                mergeFunction,
                changelogWriter == null ? null : changelogWriter::write,
                dataWriter::write);
        } finally {
   
   
            if (changelogWriter != null) {
   
   
                changelogWriter.close();
            }
            dataWriter.close();
        }

        if (changelogWriter != null) {
   
   
            // 将changelog新增文件缓存在算子中,供算子在进行checkpoint的时候将所有的flush下发到下游算子(提交算子),下发是在prepareSnapshotPreBarrier()方法中进行的,所以会在下游算子进行checkpoint之前接收所有的flush信息
            newFilesChangelog.addAll(changelogWriter.result());
        }

        for (DataFileMeta fileMeta : dataWriter.result()) {
   
   
            // 将新增的文件缓存在算子中,供算子在进行checkpoint的时候将所有的提交下发到下游算子(提交算子)
            newFiles.add(fileMeta);
            compactManager.addNewFile(fileMeta);
        }

        writeBuffer.clear();
    }
    // 尝试同步上一次compact结果
    trySyncLatestCompaction(waitForLatestCompaction);
    // 尝试去触发一次新的compact
    compactManager.triggerCompaction(forcedFullCompaction);
}

在writeBuffer进行flush磁盘的时候,会对数据进行merge操作,当FileStoreTable为ChangelogWithKeyFileStoreTable时,会根据merge-engine配置来生成特定的merge引擎,也就是3-2类图中的merge function,具体代码如下,

...
// 获取merge-engine配置
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
KeyValueFieldsExtractor extractor = ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;

MergeFunctionFactory<KeyValue> mfFactory;
switch (mergeEngine) {
   
   
    case DEDUPLICATE:
        // merge操作时,只会保留最新的一条
        mfFactory = DeduplicateMergeFunction.factory();
        break;
    case PARTIAL_UPDATE:
        // 可以更新部分非空字段,可以设置字段级的sequence来进行局部比较更新
        // 不支持retract类型的操作
        mfFactory = PartialUpdateMergeFunction.factory(conf, rowType);
        break;
    case AGGREGATE:
        // merge是对字段做聚合操作
        mfFactory =
            AggregateMergeFunction.factory(
                conf,
                tableSchema.fieldNames(),
                rowType.getFieldTypes(),
                tableSchema.primaryKeys());
        break;
    case FIRST_ROW:
        // 只保留第一条
        mfFactory =
            FirstRowMergeFunction.factory(
                new RowType(extractor.keyFields(tableSchema)), rowType);
            break;
    default:
        throw new UnsupportedOperationException(
            "Unsupported merge engine: " + mergeEngine);
}

if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
   
   
    // 当change log的生成方式为LOOKUP的时候,merge则为封装
    mfFactory =
        LookupMergeFunction.wrap(
            mfFactory,
            new RowType(extractor.keyFields(tableSchema)),
            rowType);
}

在写入后,会进行尝试同步上一次compact的结果,同时也会尝试去触发一次新的compact,Paimon的Compact原理争取通过下一篇文章来分析。

这里,与Flink写入Hudi的过程一样,Flink写入Paimon是如何保证Exactly-Once语义的呢?

5. Flink提交写入状态

提交算子CommitterOperator首先会接收上游算子下发的flush信息

public void processElement(StreamRecord<CommitT> element) {
   
   
    output.collect(element);
    // 缓存上游下发的flush信息在算子内部
    this.inputs.add(element.getValue());
}

public void notifyCheckpointComplete(long checkpointId) throws Exception {
   
   
    super.notifyCheckpointComplete(checkpointId);
    // 在算子进行checkpoint完成后进行commit操作
    commitUpToCheckpoint(endInput ? Long.MAX_VALUE : checkpointId);
}

private void commitUpToCheckpoint(long checkpointId) throws Exception {
   
   
    NavigableMap<Long, GlobalCommitT> headMap =
            committablesPerCheckpoint.headMap(checkpointId, true);
    // 将算子内部缓存的flush信息整合进行commit操作
    committer.commit(committables(headMap));
    headMap.clear();
}

commiter.commit的逻辑是将所有的flush信息进行一次commit操作,最终flush信息会被序列化成json信息保存在snapshot文件中,当commit成功后,Paimon表会新增一次snapshot。详细的commit过程的代码比较简单,可以直接跟读,这里就不再赘述。

6. 最后

本文通过跟读源码的方式对Flink写入Paimon的核心流程进行了解析,相信通过对Flink写入Paimon流程细节的梳理,对理解Paimon的特性及性能优化都是有极大的助力。由于本人能力有限,文章中出现的错误在所难免,希望大家发现后帮忙指正,万分感谢。

最后总结一下,本文主要解析了Flink写入Paimon的核心流程:

1. 介绍了Flink SQL/api的方式构建写入流程DAG的完整过程;
2. 介绍了写入算子RowDataStoreWriteOperator处理数据的完整流程;
3. 介绍了提交算子CommitterOperator进行snapshot提交的完整流程。

当然,本文由于篇幅有限,没有对Flink和Paimon架构和概念进行详细的介绍,同时对Flink写入Paimon的Compact过程及性能优化也没有涉及,后续会加上这些方面的解析。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
SQL 消息中间件 Kafka
流数据湖平台Apache Paimon(二)集成 Flink 引擎
流数据湖平台Apache Paimon(二)集成 Flink 引擎
420 0
|
4月前
|
SQL 存储 数据库
Flink + Paimon 数据 CDC 入湖最佳实践
Flink + Paimon 数据 CDC 入湖最佳实践
403 1
|
8天前
|
存储 消息中间件 运维
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
本文主要分享友盟+ U-App 整体的技术架构,以及在实时和离线计算上面的优化方案。
313 1
友盟+|如何通过阿里云Flink+Paimon实现流式湖仓落地方案
|
19天前
|
SQL 存储 JSON
Flink+Paimon+Hologres 构建实时湖仓数据分析
本文整理自阿里云高级专家喻良,在 Flink Forward Asia 2023 主会场的分享。
|
19天前
|
SQL 存储 JSON
Flink+Paimon+Hologres 构建实时湖仓数据分析
本文整理自阿里云高级专家喻良,在 Flink Forward Asia 2023 主会场的分享。
71501 2
Flink+Paimon+Hologres 构建实时湖仓数据分析
|
4月前
|
SQL 存储 Apache
Paimon 实践 | 基于 Flink SQL 和 Paimon 构建流式湖仓新方案
Paimon 实践 | 基于 Flink SQL 和 Paimon 构建流式湖仓新方案
421 1
|
4月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
682 1
|
4月前
|
SQL 关系型数据库 MySQL
Apache Flink 和 Paimon 在自如数据集成场景中的使用
Apache Flink 和 Paimon 在自如数据集成场景中的使用
314 0
|
4月前
|
SQL 存储 关系型数据库
Apache Flink 和 Paimon 在自如数据集成场景中的使用
自如目前线上有基于 Hive 的离线数仓和基于 Flink、Kafka 的实时数仓,随着业务发展,我们也在探索引入湖仓一体的架构更好的支持业务,我们对比了 Iceberg、Hudi、Paimon 后,最终选择 Paimon 作为我们湖仓一体的存储引擎,本文分享下自如在引入 Paimon 做数据集成的一些探索实践。
704 1
Apache Flink 和 Paimon 在自如数据集成场景中的使用
|
5月前
|
SQL 分布式计算 NoSQL
快速实践: 通过 Flink CDC 一键整库同步 MongoDB 到 Paimon
Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。
76563 4
快速实践: 通过 Flink CDC 一键整库同步 MongoDB 到 Paimon