Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

简介: Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

背景


之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:

/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet

因为是bulk insert操作,所以没有去重的需要,所以直接采用spark原生的方式,

以下我们讨论非spark原生的方式,


闲说杂谈


继续Apache Hudi初探(八)(与spark的结合)–非bulk_insert模式
剩下的代码:

   val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
   val (writeSuccessful, compactionInstant, clusteringInstant) =
        commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
          writeResult, parameters, writeClient, tableConfig, jsc,
          TableInstantInfo(basePath, instantTime, commitActionType, operation))

doWriteOperation 最终调用的是SparkRDDWriteClient对应的方法,如bulkInsert/insert/upsert/insertOverwrite,这里我们以upsert为例:

 public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
  HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
      initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
  table.validateUpsertSchema();
  preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
  HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records));
  HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
  if (result.getIndexLookupDuration().isPresent()) {
    metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
  }
  return postWrite(resultRDD, instantTime, table);
}
  • initTable 创建获取一个HoodieSparkMergeOnReadTable

  • validateSchema 校验Schema的兼容性

  • preWrite 写之前的操作,这个之前有说过,具体参考:Apache Hudi初探(五)(与spark的结合)

  • table.upsert 真正写入数据的操作

  • 最终调用的是 SparkInsertDeltaCommitActionExecutor<>().execute() 方法,最后最调用到HoodieWriteHelper.write
  public HoodieWriteMetadata<O> write(String instantTime,
                                  I inputRecords,
                                  HoodieEngineContext context,
                                  HoodieTable<T, I, K, O> table,
                                  boolean shouldCombine,
                                  int shuffleParallelism,
                                  BaseCommitActionExecutor<T, I, K, O, R> executor,
                                  WriteOperationType operationType) {
    try {
        // De-dupe/merge if needed
        I dedupedRecords =
            combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
        Instant lookupBegin = Instant.now();
        I taggedRecords = dedupedRecords;
        if (table.getIndex().requiresTagging(operationType)) {
          // perform index loop up to get existing location of records
          context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());
          taggedRecords = tag(dedupedRecords, context, table);
        }
        Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
        HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
        result.setIndexLookupDuration(indexLookupDuration);
        return result;
      } catch (Throwable e) {
        if (e instanceof HoodieUpsertException) {
          throw (HoodieUpsertException) e;
        }
        throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
      }
    }

combineOnCondition 数据去重
最终是调用HoodieRecordPayload.preCombine(默认是OverwriteWithLatestAvroPayload.preCombine


taggedRecords = tag(dedupedRecords, context, table) 因为默认的indexHoodieSimpleIndex,所以这个时候会调用到打标记这个操作


最终调用到的是HoodieSimpleIndextagLocationInternal,此时获得的是带有location的记录(如果没有索引到,则 record中的locationnull


executor.execute(taggedRecords) 该方法最终调用到BaseSparkCommitActionExecutor.execute方法:

public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
   // Cache the tagged records, so we don't end up computing both
   // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
   JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
   if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
     inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
   } else {
     LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
   }
   WorkloadProfile workloadProfile = null;
   if (isWorkloadProfileNeeded()) {
     context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile: " + config.getTableName());
     workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles());
     LOG.info("Input workload profile :" + workloadProfile);
   }
   // partition using the insert partitioner
   final Partitioner partitioner = getPartitioner(workloadProfile);
   if (isWorkloadProfileNeeded()) {
     saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
   }
   // handle records update with clustering
   Set<HoodieFileGroupId> fileGroupsInPendingClustering =
       table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
   HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);
   context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName());
   HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
   HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
   updateIndexAndCommitIfNeeded(writeStatuses, result);
   return result;
 }

inputRDD.persist持久化当前的RDD,因为该RDD会被使用多次,便于加速


workloadProfile = new WorkloadProfile(buildProfile(inputRecords)


构建一个状态信息,主要是记录一下插入的记录数量和更新的记录数量 其中主要形成了以filedId为key,


*Pair<instantTime,count>*为value的Map数据


final Partitioner partitioner = getPartitioner(workloadProfile) 这里针对于upsert操作会返回UpsertPartitioner(因为默认hoodie.storage.layout.typeDEFAULT),


其中该UpsertPartitioner实例的构造方法中会进行一些额外的操作 assignUpdatesassignInserts(这里暂时忽略),主要是对数据进行分区处理,设计到小文件的处理


mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner)


这里会根据hoodie.table.base.file.format的值(默认是parquet),如果是hfile,则会要求排序,如果没有则只是按照partitioner进行重分区,


之后再进行数据insert或者update,具体的方法为handleUpsertPartition,会根据之前的partitoner信息进行插入或者更新(里面的细节有点复杂)


updateIndexAndCommitIfNeeded(writeStatuses, result)

该操作会首先会更新索引信息,对于HoodieSimpleIndex来说,什么也不操作(因为该index每次都会从parquet文件中读取信息从而组装成index),


其次如果hoodie.auto.committrue(默认是true)会进行元数据的commit操作,这些commit的操作和之前Apache Hudi初探(六)(与spark的结合)相似,会涉及到Compcation操作,可以后续再做分析


  • postWrite 这里的postCommit 和之前的table.upsert有重复?




相关文章
|
13天前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
56 3
|
20天前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
3天前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
22 0
|
1月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
76 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
1月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
1月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
30天前
|
分布式计算 Apache Spark
|
1月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
84 1
Spark快速大数据分析PDF下载读书分享推荐
|
13天前
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
37 2
|
2月前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
152 59

推荐镜像

更多