Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

简介: Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

背景


之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:

/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet

因为是bulk insert操作,所以没有去重的需要,所以直接采用spark原生的方式,

以下我们讨论非spark原生的方式,


闲说杂谈


继续Apache Hudi初探(八)(与spark的结合)–非bulk_insert模式
剩下的代码:

   val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
   val (writeSuccessful, compactionInstant, clusteringInstant) =
        commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
          writeResult, parameters, writeClient, tableConfig, jsc,
          TableInstantInfo(basePath, instantTime, commitActionType, operation))

doWriteOperation 最终调用的是SparkRDDWriteClient对应的方法,如bulkInsert/insert/upsert/insertOverwrite,这里我们以upsert为例:

 public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
  HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
      initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
  table.validateUpsertSchema();
  preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
  HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records));
  HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
  if (result.getIndexLookupDuration().isPresent()) {
    metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
  }
  return postWrite(resultRDD, instantTime, table);
}
  • initTable 创建获取一个HoodieSparkMergeOnReadTable

  • validateSchema 校验Schema的兼容性

  • preWrite 写之前的操作,这个之前有说过,具体参考:Apache Hudi初探(五)(与spark的结合)

  • table.upsert 真正写入数据的操作

  • 最终调用的是 SparkInsertDeltaCommitActionExecutor<>().execute() 方法,最后最调用到HoodieWriteHelper.write
  public HoodieWriteMetadata<O> write(String instantTime,
                                  I inputRecords,
                                  HoodieEngineContext context,
                                  HoodieTable<T, I, K, O> table,
                                  boolean shouldCombine,
                                  int shuffleParallelism,
                                  BaseCommitActionExecutor<T, I, K, O, R> executor,
                                  WriteOperationType operationType) {
    try {
        // De-dupe/merge if needed
        I dedupedRecords =
            combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
        Instant lookupBegin = Instant.now();
        I taggedRecords = dedupedRecords;
        if (table.getIndex().requiresTagging(operationType)) {
          // perform index loop up to get existing location of records
          context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());
          taggedRecords = tag(dedupedRecords, context, table);
        }
        Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
        HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
        result.setIndexLookupDuration(indexLookupDuration);
        return result;
      } catch (Throwable e) {
        if (e instanceof HoodieUpsertException) {
          throw (HoodieUpsertException) e;
        }
        throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
      }
    }

combineOnCondition 数据去重
最终是调用HoodieRecordPayload.preCombine(默认是OverwriteWithLatestAvroPayload.preCombine


taggedRecords = tag(dedupedRecords, context, table) 因为默认的indexHoodieSimpleIndex,所以这个时候会调用到打标记这个操作


最终调用到的是HoodieSimpleIndextagLocationInternal,此时获得的是带有location的记录(如果没有索引到,则 record中的locationnull


executor.execute(taggedRecords) 该方法最终调用到BaseSparkCommitActionExecutor.execute方法:

public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
   // Cache the tagged records, so we don't end up computing both
   // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
   JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
   if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
     inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
   } else {
     LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
   }
   WorkloadProfile workloadProfile = null;
   if (isWorkloadProfileNeeded()) {
     context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile: " + config.getTableName());
     workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles());
     LOG.info("Input workload profile :" + workloadProfile);
   }
   // partition using the insert partitioner
   final Partitioner partitioner = getPartitioner(workloadProfile);
   if (isWorkloadProfileNeeded()) {
     saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
   }
   // handle records update with clustering
   Set<HoodieFileGroupId> fileGroupsInPendingClustering =
       table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
   HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);
   context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName());
   HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
   HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
   updateIndexAndCommitIfNeeded(writeStatuses, result);
   return result;
 }

inputRDD.persist持久化当前的RDD,因为该RDD会被使用多次,便于加速


workloadProfile = new WorkloadProfile(buildProfile(inputRecords)


构建一个状态信息,主要是记录一下插入的记录数量和更新的记录数量 其中主要形成了以filedId为key,


*Pair<instantTime,count>*为value的Map数据


final Partitioner partitioner = getPartitioner(workloadProfile) 这里针对于upsert操作会返回UpsertPartitioner(因为默认hoodie.storage.layout.typeDEFAULT),


其中该UpsertPartitioner实例的构造方法中会进行一些额外的操作 assignUpdatesassignInserts(这里暂时忽略),主要是对数据进行分区处理,设计到小文件的处理


mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner)


这里会根据hoodie.table.base.file.format的值(默认是parquet),如果是hfile,则会要求排序,如果没有则只是按照partitioner进行重分区,


之后再进行数据insert或者update,具体的方法为handleUpsertPartition,会根据之前的partitoner信息进行插入或者更新(里面的细节有点复杂)


updateIndexAndCommitIfNeeded(writeStatuses, result)

该操作会首先会更新索引信息,对于HoodieSimpleIndex来说,什么也不操作(因为该index每次都会从parquet文件中读取信息从而组装成index),


其次如果hoodie.auto.committrue(默认是true)会进行元数据的commit操作,这些commit的操作和之前Apache Hudi初探(六)(与spark的结合)相似,会涉及到Compcation操作,可以后续再做分析


  • postWrite 这里的postCommit 和之前的table.upsert有重复?




相关文章
|
11月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
278 1
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
239 0
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
402 0
|
9月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
660 33
The Past, Present and Future of Apache Flink
|
11月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1459 13
Apache Flink 2.0-preview released
|
6月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
741 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
11月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
382 3
|
12月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
340 21

热门文章

最新文章

推荐镜像

更多