Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

简介: Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式

背景


之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:

/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet

因为是bulk insert操作,所以没有去重的需要,所以直接采用spark原生的方式,

以下我们讨论非spark原生的方式,


闲说杂谈


继续Apache Hudi初探(八)(与spark的结合)–非bulk_insert模式
剩下的代码:

   val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
   val (writeSuccessful, compactionInstant, clusteringInstant) =
        commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
          writeResult, parameters, writeClient, tableConfig, jsc,
          TableInstantInfo(basePath, instantTime, commitActionType, operation))

doWriteOperation 最终调用的是SparkRDDWriteClient对应的方法,如bulkInsert/insert/upsert/insertOverwrite,这里我们以upsert为例:

 public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
  HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
      initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
  table.validateUpsertSchema();
  preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
  HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records));
  HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
  if (result.getIndexLookupDuration().isPresent()) {
    metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
  }
  return postWrite(resultRDD, instantTime, table);
}
  • initTable 创建获取一个HoodieSparkMergeOnReadTable

  • validateSchema 校验Schema的兼容性

  • preWrite 写之前的操作,这个之前有说过,具体参考:Apache Hudi初探(五)(与spark的结合)

  • table.upsert 真正写入数据的操作

  • 最终调用的是 SparkInsertDeltaCommitActionExecutor<>().execute() 方法,最后最调用到HoodieWriteHelper.write
  public HoodieWriteMetadata<O> write(String instantTime,
                                  I inputRecords,
                                  HoodieEngineContext context,
                                  HoodieTable<T, I, K, O> table,
                                  boolean shouldCombine,
                                  int shuffleParallelism,
                                  BaseCommitActionExecutor<T, I, K, O, R> executor,
                                  WriteOperationType operationType) {
    try {
        // De-dupe/merge if needed
        I dedupedRecords =
            combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
        Instant lookupBegin = Instant.now();
        I taggedRecords = dedupedRecords;
        if (table.getIndex().requiresTagging(operationType)) {
          // perform index loop up to get existing location of records
          context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());
          taggedRecords = tag(dedupedRecords, context, table);
        }
        Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
        HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
        result.setIndexLookupDuration(indexLookupDuration);
        return result;
      } catch (Throwable e) {
        if (e instanceof HoodieUpsertException) {
          throw (HoodieUpsertException) e;
        }
        throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
      }
    }

combineOnCondition 数据去重
最终是调用HoodieRecordPayload.preCombine(默认是OverwriteWithLatestAvroPayload.preCombine


taggedRecords = tag(dedupedRecords, context, table) 因为默认的indexHoodieSimpleIndex,所以这个时候会调用到打标记这个操作


最终调用到的是HoodieSimpleIndextagLocationInternal,此时获得的是带有location的记录(如果没有索引到,则 record中的locationnull


executor.execute(taggedRecords) 该方法最终调用到BaseSparkCommitActionExecutor.execute方法:

public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
   // Cache the tagged records, so we don't end up computing both
   // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
   JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
   if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
     inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
   } else {
     LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
   }
   WorkloadProfile workloadProfile = null;
   if (isWorkloadProfileNeeded()) {
     context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile: " + config.getTableName());
     workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles());
     LOG.info("Input workload profile :" + workloadProfile);
   }
   // partition using the insert partitioner
   final Partitioner partitioner = getPartitioner(workloadProfile);
   if (isWorkloadProfileNeeded()) {
     saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
   }
   // handle records update with clustering
   Set<HoodieFileGroupId> fileGroupsInPendingClustering =
       table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
   HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);
   context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName());
   HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
   HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
   updateIndexAndCommitIfNeeded(writeStatuses, result);
   return result;
 }

inputRDD.persist持久化当前的RDD,因为该RDD会被使用多次,便于加速


workloadProfile = new WorkloadProfile(buildProfile(inputRecords)


构建一个状态信息,主要是记录一下插入的记录数量和更新的记录数量 其中主要形成了以filedId为key,


*Pair<instantTime,count>*为value的Map数据


final Partitioner partitioner = getPartitioner(workloadProfile) 这里针对于upsert操作会返回UpsertPartitioner(因为默认hoodie.storage.layout.typeDEFAULT),


其中该UpsertPartitioner实例的构造方法中会进行一些额外的操作 assignUpdatesassignInserts(这里暂时忽略),主要是对数据进行分区处理,设计到小文件的处理


mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner)


这里会根据hoodie.table.base.file.format的值(默认是parquet),如果是hfile,则会要求排序,如果没有则只是按照partitioner进行重分区,


之后再进行数据insert或者update,具体的方法为handleUpsertPartition,会根据之前的partitoner信息进行插入或者更新(里面的细节有点复杂)


updateIndexAndCommitIfNeeded(writeStatuses, result)

该操作会首先会更新索引信息,对于HoodieSimpleIndex来说,什么也不操作(因为该index每次都会从parquet文件中读取信息从而组装成index),


其次如果hoodie.auto.committrue(默认是true)会进行元数据的commit操作,这些commit的操作和之前Apache Hudi初探(六)(与spark的结合)相似,会涉及到Compcation操作,可以后续再做分析


  • postWrite 这里的postCommit 和之前的table.upsert有重复?




相关文章
|
2月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
195 6
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
83 2
|
2月前
|
存储 分布式计算 druid
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
45 1
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
|
2月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
65 1
|
2月前
|
分布式计算 大数据 分布式数据库
大数据-158 Apache Kylin 安装配置详解 集群模式启动(一)
大数据-158 Apache Kylin 安装配置详解 集群模式启动(一)
56 5
|
2月前
|
资源调度 大数据 分布式数据库
大数据-158 Apache Kylin 安装配置详解 集群模式启动(二)
大数据-158 Apache Kylin 安装配置详解 集群模式启动(二)
52 2
|
2月前
|
消息中间件 分布式计算 druid
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(二)
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(二)
46 2
|
2月前
|
存储 消息中间件 druid
大数据-151 Apache Druid 集群模式 配置启动【上篇】 超详细!
大数据-151 Apache Druid 集群模式 配置启动【上篇】 超详细!
94 1
|
3月前
|
Apache
多应用模式下,忽略项目的入口文件,重写Apache规则
本文介绍了在多应用模式下,如何通过编辑Apache的.htaccess文件来重写URL规则,从而实现忽略项目入口文件index.php进行访问的方法。
|
4月前
|
Linux Apache
在Linux中,apache有几种工作模式,分别介绍下其特点,并说明什么情况下采用不同的工作模式?
在Linux中,apache有几种工作模式,分别介绍下其特点,并说明什么情况下采用不同的工作模式?

热门文章

最新文章

推荐镜像

更多