Flink进行Hudi写入源码分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 本文主要解析了Flink将DataStream写入到Hudi表的核心流程

1.前言

Flink是流计算领域的佼佼者,Hudi在数据湖领域的热度一直很高,两者的结合也是很多公司的选择。如果要深入了解Flink + Hudi技术的应用或者性能调优,那么了解源码中的原理会对我们有很大的帮助,本文主要围绕着Flink对Hudi的写入流程进行分析,从而去理解Hudi中的各种核心概念,像Copy-on-Write(COW)、Merge-on-Read(MOR)、File Layouts(文件布局)、Timeline(时间线)等,本文默认大家对这些概念有所了解,在文章中的代码分析时会涉及到这些概念将不会进行详细的讲解。

2.源码阅读demo

源码阅读最重要的是可以进行断点跟读,而一般最简单的demo代码能够提高源码阅读的效果,可以通过下面的demo代码来观察Flink写入Hudi的完整过程

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
// 为了更方便观察源码过程,这里将Bucket的缓存大小设置为0.5kb
options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.0005");
// 为了更方便观察源码过程,这里将完整流程中各任务的并行度设置为1
options.put(FlinkOptions.INDEX_BOOTSTRAP_TASKS.key(), "1");
options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), "1");
options.put(FlinkOptions.WRITE_TASKS.key(), "1");
options.put(FlinkOptions.COMPACTION_TASKS.key(), "1");
// 为了更方便观察源码过程,将compact的触发机制设置为间隔2次delta_commit
options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), FlinkOptions.NUM_COMMITS);
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");

HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
        .column("uuid VARCHAR(20)")
        .column("name VARCHAR(10)")
        .column("age INT")
        .column("ts INT")
        .column("`partition` VARCHAR(20)")
        .pk("uuid")
        .partition("partition")
        .options(options);

builder.sink(dataStream, false);

env.execute("HudiWriteDemoWithFlink");

3.Flink写入过程的完整流程介绍

Flink写入外部存储的接口是DynamicTableSink,Hudi通过HoodieTableSink来实现Flink的写入接口,核心的写入逻辑位于getSinkRuntimeProvider,由于篇幅的影响,本文只要会在每个环节中选择一条链路进行讲解。

public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
   
   
    return (DataStreamSinkProviderAdapter) dataStream -> {
   
   
      long ckpTimeout = dataStream.getExecutionEnvironment()
          .getCheckpointConfig().getCheckpointTimeout();
      // 设置Hudi的instant commit的超时时间为Flink的checkpoint超时时间,因为Flink将instant的commit过程放在checkpoint的完成时执行
      conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
      OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());

      RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();

      // 当写入操作类型为bulk_insert的时候会进行到这个分支,本文主要介绍Flink的流写过程,我们采用的写入操作类型upsert
      final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
      if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
   
   
        return Pipelines.bulkInsert(conf, rowType, dataStream);
      }

      // 当操作类型为insert的时候,对于mor类型的表,数据的写入为append的方式,对于cow的类型的表,只有采用write.insert.cluster=flase的时候才能采用append的方式,当write.insert.cluster=true的时候,在写入数据的时候会对小文件进行merge的操作,这个分支也不是本次讲解的路径
      if (OptionsResolver.isAppendMode(conf)) {
   
   
        DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
        if (OptionsResolver.needsAsyncClustering(conf)) {
   
   
          return Pipelines.cluster(conf, rowType, pipeline);
        } else {
   
   
          return Pipelines.dummySink(pipeline);
        }
      }

      DataStream<Object> pipeline;
      // 从这里开始是本文分析等核心起点,这里是将数据源生成HoodieRecord DataStream的过程,默认会进行key和fileid索引的加载过程,Flink在这个写入过程中没有去加载文件的index,默认会将文件中的key和fileid对读取出来下发到下游,利用Flink的状态机制进行保存
      final DataStream<HoodieRecord> hoodieRecordDataStream =
          Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
      // Flink将生成的HoodieRecord DataStream持久化,在持久化的过程中,涉及到instant的commit过程,当写入任务的所有并行度完成写持久化操作后,会将写入结果上报到OperatorCoordinator,最后在checkpoint完成的时候,会通知OperatorCoordinator进行instant的commit过程和开启下一阶段instant的requested过程
      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
      // 当表类型为mor时,同时设置了异步compact,则会在Flink的DAG中增加任务来处理compact的过程
      if (OptionsResolver.needsAsyncCompaction(conf)) {
   
   
        // use synchronous compaction for bounded source.
        if (context.isBounded()) {
   
   
          conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        }
        return Pipelines.compact(conf, pipeline);
      } else {
   
   
        return Pipelines.clean(conf, pipeline);
      }
};

完整的主要流程可以通过下图来表示,其中各环节中的具体步骤将在后面环节进行讲解

图3-1 Flink写入Hudi完整流程

4.HoodieRecord DataStream生成过程详解

HoodieRecord DataStream生成过程的详细代码为

public static DataStream<HoodieRecord> bootstrap(
      Configuration conf,
      RowType rowType,
      DataStream<RowData> dataStream,
      boolean bounded,
      boolean overwrite) {
   
   
    // 是否更新旧分区路径的索引
    final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
    if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
   
   
      // 当overwrite或者索引类型为bucket时,这一步这一步只是做DataStream转化操作
      return rowDataToHoodieRecord(conf, rowType, dataStream);
    } else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) {
   
   
      // 当满足批任务、允许多分区存在相同key、分区表时,
      return boundedBootstrap(conf, rowType, dataStream);
    } else {
   
   
      // 对于其他类型,flink利用flink的state特性加载旧分区的索引信息,这一部份是本文的核心分析,详细分析看4.2部分
      return streamBootstrap(conf, rowType, dataStream, bounded);
    }
}

4.1 rowDataToHoodieRecord

在RowDataToHoodieFunction算子中实现,核心是将RowData转化成HoodieAvroRecord,HoodieAvroRecord的主要成员如下:

//主要包括数据的recordKey及分区信息partitionPath,根据建表信息从原始数据中提取
HoodieKey key; 
//原始数据,包含输入的所有字段
T data; 
//数据在Hudi中的操作类型,描述在changelog
HoodieOperation operation; 
//数据对应在Hudi中的文件,这一步该值为null,在进行2.2步骤后会进行填充
HoodieRecordLocation currentLocation;

4.2 BootstrapOperator

核心代码如下:

private static DataStream<HoodieRecord> streamBootstrap(
      Configuration conf,
      RowType rowType,
      DataStream<RowData> dataStream,
      boolean bounded) {
   
   
    // 对DataStream进行转化成HoodieRecord封装类
    DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);

    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
   
   
      // 当设置启动加载状态或者处理批任务时,会将历史数据的索引信息加载到flink下游算子的状态中
      dataStream1 = dataStream1
          .transform(
              "index_bootstrap",
              TypeInformation.of(HoodieRecord.class),
              new BootstrapOperator<>(conf))
          .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
          .uid(opUID("index_bootstrap", conf));
    }

    return dataStream1;
}

BootstrapOperator的作用是将Hudi表中已经持久化的HoodieKey和对应的文件FileSlice信息加载到下游算子(BucketAssignFunction)的状态中,供新增数据查找bucket使用。核心实现逻辑在initializeState方法中,首先拉取到Hudi表目录下的所有分区路径,然后根据分区路径进行正则匹配,匹配上的就加载该分区下的所有的HoodieKey,加载的时候没有使用文件的Index,而是通过读取分区下的所有文件的key和partitionPath,具体实现可以对loadRecords方法进行跟读。

Flink天然支持状态,所以能够将Hudi的HoodieKey和FileSlice关系对保存在状态中供新增数据进行查找,而不是像Spark那样通过加载表的Index来进行查找,这样可以复用Flink的状态管理机制及Checkpoint机制,不用用户再进行Index加载和由此产生的内存管理等复杂操作了,同时,Hudi采用的Index是有可能存在假阳性的可能,而通过状态的方式,可以保证下游算子中的状态都是准确的。

5.HoodieRecord DataStream流写过程详解

写入的完整过程的代码如下:

public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
   
   
    if (OptionsResolver.isBucketIndexType(conf)) {
   
   
      // 当Hudi的索引类型采用bucket的时候,Flink任务会缓存分区/bucket/fileId映射关系,然后根据数据通过内存中的映射关系找到对应文件
      WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
      int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
      String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
      BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
      return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
          .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
          .uid(opUID("bucket_write", conf))
          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
    } else {
   
   
      // 其他类型将会进行这一步
      WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
      return dataStream
          // 按key进行分组,避免多任务在同一时间对一个桶进行写入操作
          .keyBy(HoodieRecord::getRecordKey)
          // 再对数据进行bucket分配操作,这里会使用到启动时加载的状态(如果设置了启动加载)以及增量数据更新的状态信息
          .transform(
              "bucket_assigner",
              TypeInformation.of(HoodieRecord.class),
              new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
          .uid(opUID("bucket_assigner", conf))
          .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
          // 根据fileId进行分组,保证同一个文件的写入在同一个任务里
          .keyBy(record -> record.getCurrentLocation().getFileId())
          // 执行数据写入文件操作,这一步将是后面核心分析的重点
          .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
          .uid(opUID("stream_write", conf))
          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
    }
  }

5.1 分区下数据的bucket分配

核心类为 BucketAssignFunction,核心处理代码如下所示:

private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
   
   
    ...
    // 获取算子状态中的索引
    HoodieRecordGlobalLocation oldLoc = indexState.value();
    if (isChangingRecords && oldLoc != null) {
   
   
      // 当操作类型是UPSERT/UPSERT_PREPPED/DELETE,同时状态有定义的时候,会从状态中查找location(fileId),并且将instant time标记为‘U’,表明是一个更新bucket操作
      if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
   
   
        // 当与旧分区不同,则表示key跨分区了,则会根据配置进行后面的操作
        if (globalIndex) {
   
   
          // 当配置需要更新旧分区索引的时候,则在旧分区需要新增一条删除的记录
          HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
              payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));

          deleteRecord.unseal();
          deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
          deleteRecord.seal();

          out.collect((O) deleteRecord);
        }
        // 在新分区新一条插入记录
        location = getNewRecordLocation(partitionPath);
      } else {
   
   
        // 没有跨分区,则直接返回状态中的索引
        location = oldLoc.toLocal("U");
        this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
      }
    } else {
   
   
      // 新增一条索引信息
      location = getNewRecordLocation(partitionPath);
    }
    if (isChangingRecords) {
   
   
      // 更新状态里的索引
      updateIndexState(partitionPath, location);
    }

    record.unseal();
    record.setCurrentLocation(location);
    record.seal();

    out.collect((O) record);
}

Hudi对于相同key的跨分区行为,当配置了需要更新旧分区的索引时,会对旧分区中的数据采用删除操作,在新分区中保留最新的数据,从而做到全局唯一。

5.2 执行hudi的写入操作

Flink进行Hudi的写入func是StreamWriteFunction,由于运行到这一步已经知道了数据需要写入到哪个fileId了,所以这一步只需要做到常规的持久化操作,本地内存缓存 -> flush磁盘,核心代码如下所示

protected void bufferRecord(HoodieRecord<?> value) {
   
   
    final String bucketID = getBucketID(value);
    // 从缓存中找到对应的bucket,如果缓存中没有则新建一个bucket
    DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
        k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
    final DataItem item = DataItem.fromHoodieRecord(value);

    bucket.records.add(item);
    // 判断当前bucket是否达到写入的最小size
    boolean flushBucket = bucket.detector.detect(item);
    // 判断当前算子中所有bucket的数据量是否达到了写入的size
    boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
    if (flushBucket) {
   
   
      // flush当前bucket
      if (flushBucket(bucket)) {
   
   
        this.tracer.countDown(bucket.detector.totalSize);
        bucket.reset();
      }
    } else if (flushBuffer) {
   
   
      // 找到算子中最大的bucket进行flush
      DataBucket bucketToFlush = this.buckets.values().stream()
          .max(Comparator.comparingLong(b -> b.detector.totalSize))
          .orElseThrow(NoSuchElementException::new);
      if (flushBucket(bucketToFlush)) {
   
   
        this.tracer.countDown(bucketToFlush.detector.totalSize);
        bucketToFlush.reset();
      } else {
   
   
        LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize);
      }
    }
}

这里的bucket大小评估采用的抽样1%进行,当流量大或者数据大小不稳定的情况下,可以调抽样比例。

执行刷盘的过程核心代码

private boolean flushBucket(DataBucket bucket) {
   
   
    String instant = instantToWrite(true);

    if (instant == null) {
   
   
      LOG.info("No inflight instant when flushing data, skip.");
      return false;
    }

    List<HoodieRecord> records = bucket.writeBuffer();
    ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
    if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
   
   
      // 在insert/upsert前执行去重操作
      records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
          .deduplicateRecords(records, null, -1, this.writeClient.getConfig().getSchema(), this.writeClient.getConfig().getProps(), recordMerger);
    }
    bucket.preWrite(records);
    // 将bucket执行flush磁盘操作,这一部份属于Hudi的写入过程,后期将新开一节进行分析
    final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
    records.clear();
    // 记录bucket刷写磁盘结果
    final WriteMetadataEvent event = WriteMetadataEvent.builder()
        .taskID(taskID)
        .instantTime(instant) // the write instant may shift but the event still use the currentInstant.
        .writeStatus(writeStatus)
        .lastBatch(false)
        .endInput(false)
        .build();
    // 将bucket刷写磁盘的结果同步到算子的coordinator
    this.eventGateway.sendEventToCoordinator(event);
    writeStatuses.addAll(writeStatus);
    return true;
}

至此,Flink对Hudi的写入已经分析完了,数据已经flush到了磁盘了,那下游对数据的可见性、Hudi的写入事务性是在哪里体现的呢?

首先,Flink对于Hudi刷写磁盘的结果会同步到算子的coordinator,后续会通过coordinator来进行commit操作,这一步类似于kafka的两阶段提交,数据会先刷到存储磁盘,之后再统一提交commit操作,核心代码如下:

private void handleWriteMetaEvent(WriteMetadataEvent event) {
   
    
    // 存在task在完成ckp后,再flush缓存中的数据,所以此时的instant会小于当前的instant              
    ValidationUtils.checkState(
        HoodieTimeline.compareTimestamps(this.instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
        String.format("Receive an unexpected event for instant %s from task %d",
            event.getInstantTime(), event.getTaskID()));

    addEventToBuffer(event);
}

private void addEventToBuffer(WriteMetadataEvent event) {
   
   
    // 来自算子各task的flush结果,都会缓存在coordinator的eventBuffer中
    if (this.eventBuffer[event.getTaskID()] != null) {
   
   
      this.eventBuffer[event.getTaskID()].mergeWith(event);
    } else {
   
   
      this.eventBuffer[event.getTaskID()] = event;
    }
}

上面的步骤会将这一次ckp中间各任务提交的flush结果发送给到coordinator,coodinator会将结果缓存在本地。

当所有任务完成了ckp后,coodinator会接收到ckp完成的通知,则会进行flush后的commit操作,核心代码如下:

public void notifyCheckpointComplete(long checkpointId) {
   
   
    executor.execute(
        () -> {
   
   
    Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
          // 开始进行commit操作
          final boolean committed = commitInstant(this.instant, checkpointId);

          if (tableState.scheduleCompaction) {
   
   
            // 如果设置异步compaction操作,则会在这儿进行plan
            CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
          }

          if (tableState.scheduleClustering) {
   
   
            // 如果设置异步cluster操作,则会在这儿进行调度
            ClusteringUtil.scheduleClustering(conf, writeClient, committed);
          }

          if (committed) {
   
   
            // 开启新一轮instant
            startInstant();
            // 同步hive元数据
            syncHiveAsync();
          }
        }, "commits the instant %s", this.instant
    );
}

详细的提交逻辑在commitInstant方法中,核心逻辑是将当前instant中发生变更的文件写入到对应的commit文件中,并重置eventBuffer,至此,数据已经完成提交,下游对数据已经可见了。

完整的写入及提交逻辑如下图所示:

图5-1 Flink写入流程详解

这个写入过程很长,对于Flink而言,我们一般会要求具有Exactly-once语义,那么上述过程是是否能做到Exactly-once语义的?对于Hudi而言,在CAP上的考量又是怎样的?

6.Hudi文件compact过程详解

6.1 Compact plan的生成

Hudi的compact过程分为同步和异步,同步对性能影响很大,本节将重点对Hudi的异步compact进行解析,在上一章节,当配置了异步compact的时候,Flink会在完成commit后生成compact的plan,核心代码逻辑如下

public static void scheduleCompaction(
      HoodieTableMetaClient metaClient,
      HoodieFlinkWriteClient<?> writeClient,
      boolean deltaTimeCompaction,
      boolean committed) {
   
   
    if (committed) {
   
   
      // 当完成了提交后,会生成compact的plan
      writeClient.scheduleCompaction(Option.empty());
    } else if (deltaTimeCompaction) {
   
   
      // 当配置了基于时间间隔调度compact的话,不管是否完成commit也会进行plan的生成
      metaClient.reloadActiveTimeline();
      Option<String> compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
      if (compactionInstantTime.isPresent()) {
   
   
        writeClient.scheduleCompactionAtInstant(compactionInstantTime.get(), Option.empty());
      }
    }
}

我们核心跟scheduleCompaction方法,最终会进入到Table的scheduleCompaction方法,我们这里选择HoodieFlinkMergeOnReadTable进行跟读,首先会生成产出plan的执行器ScheduleCompactionActionExecutor,这个执行器会初始化一个Compact plan的生成器,也就是BaseHoodieCompactionPlanGenerator,最终根据执行器根据compact策略判断是否需要进行compact后,然后调用plan生成器,产出Compact的执行plan,核心代码如下

public Option<HoodieCompactionPlan> execute() {
   
   
    ...
    // 
    HoodieCompactionPlan plan = scheduleCompaction();
    Option<HoodieCompactionPlan> option = Option.empty();
    if (plan != null && nonEmpty(plan.getOperations())) {
   
   
      ...
      try {
   
   
        if (operationType.equals(WriteOperationType.COMPACT)) {
   
   
          // 持久化Hudi的数据文件的compact plan
          HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
              HoodieTimeline.COMPACTION_ACTION, instantTime);
       // 将生成的plan序列化并持久化到Hudi表目录下
    table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
              TimelineMetadataUtils.serializeCompactionPlan(plan));
        } else {
   
   
          // 持久化Hudi的log文件的compact plan
          HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
              HoodieTimeline.LOG_COMPACTION_ACTION, instantTime);
          table.getActiveTimeline().saveToLogCompactionRequested(logCompactionInstant,
              TimelineMetadataUtils.serializeCompactionPlan(plan));
        }
      } catch (IOException ioe) {
   
   
          ...
      }
      option = Option.of(plan);
    }

    return option;
}

继续跟着scheduleCompaction方法解析会发现,首先会去拉取最近的DeltaCommit信息,然后根据compact执行策略进行比较,是否需要进行Compact操作,如果需要操作,则使用BaseHoodieCompactionPlanGenerator对DeltaCommit信息进行分组生成HoodieCompactionPlan,然后将HoodieCompactionPlan进行Avro编码,然后序列化到文件系统。

private HoodieCompactionPlan scheduleCompaction() {
   
   
    // 根据最近的增量commit信息和compact策略进行判断是否需要compact
    boolean compactable = needCompact(config.getInlineCompactTriggerStrategy());
    if (compactable) {
   
   
      try {
   
   
        context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan");
        // 生成compact plan,可进行avro编码,可以序列化
        return planGenerator.generateCompactionPlan();
      } catch (IOException e) {
   
   
        ...      }
    }
    return new HoodieCompactionPlan();
}

6.2 Compact的执行与提交

compact的执行和提交过程的核心代码如下,

public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
   
   
    return dataStream.transform("compact_plan_generate",
            TypeInformation.of(CompactionPlanEvent.class),
            // 算子本身不执行,当checkpoint完成时,会去加载上一个算子持久化的compact plan,并进行下发,本算子必须设置为单并行度
            new CompactionPlanOperator(conf))
        .setParallelism(1) 
        // 按照fileId进行分区
        .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
        .transform("compact_task",
            TypeInformation.of(CompactionCommitEvent.class),
            // 根据接收到的compact plan进行merge操作,生成新的文件持久化到文件系统,由于篇幅限制,这一步不做详细解析,可以参考源码继续跟读
            new CompactOperator(conf))
        .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
        // 接收到上游完成compact后下发的结果,进行commit操作
        .addSink(new CompactionCommitSink(conf))
        .name("compact_commit")
        .setParallelism(1); 
}

完整的compact过程如下图所示

图6-1 Flink Compact与提交流程

7.最后

本文通过跟读源码的方式对Flink写入Hudi的核心流程进行了解析,相信只有通过对写入流程的充分认识后,才能充分理解Flink和Hudi的各方面特性,才能充分提升线上任务的性能,由于本人能力有限,文章中出现的错误在所难免,希望大家发现后帮忙指正,万分感谢。

最后总结一下,本文主要解析了Flink将DataStream写入到Hudi表中的核心过程:

  1. HoodieRecord DataStream的生成过程,这个过程会进行索引的加载过程;
  2. HoodieRecord DataStream的写入与提交过程,Flink首先会根据索引进行分桶,找到数据对应的文件地址,然后将数据flush到磁盘,然后通过算子的coordinator根据checkpoint的完成进行commit操作,完整数据的写入过程;
  3. Hudi文件的compact过程,对于异步compact过程,Flink会在提交后去生成compact的执行plan,然后将plan序列化并持久化到Hudi表目录下,最后通过新的算子去读取Hudi表目录下的plan并执行compact,最后完成compact后进行commit操作,至此Flink完整的写入流程已经梳理完成了。

当然,本文由于篇幅有限,没有对Flink和Hudi架构和概念进行详细的介绍,同时对Flink写入Hudi的性能优化也没有涉及,后续会加上Flink写入Hudi的性能分析。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何将MySQL的CDC实时数据写入到Hudi
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
关系型数据库 MySQL OLAP
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
存储 JSON Kubernetes
实时计算 Flink版操作报错合集之 写入hudi时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之同步tidb到hudi报错,一般是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL JSON 数据库
实时计算 Flink版操作报错合集之写入Hudi时,遇到从 COW(Copy-On-Write)表类型转换为 MOR(Merge-On-Read)表类型时报字段错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL Java 关系型数据库
实时计算 Flink版操作报错合集之通过flink sql形式同步数据到hudi中,本地启动mian方法报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
172 8
|
5月前
|
Java 流计算
Flink中异步AsyncIO的实现 (源码分析)
Flink中异步AsyncIO的实现 (源码分析)
|
5月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
122 0