背景
之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:
/dt=1/.hoodie_partition_metadata /dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet /dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet /dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet /dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet /dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet /dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet /dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet
因为是bulk insert操作,所以没有去重的需要,所以直接采用spark原生的方式,
以下我们讨论非spark原生的方式,
闲说杂谈
继续Apache Hudi初探(八)(与spark的结合)–非bulk_insert模式
剩下的代码:
val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation) val (writeSuccessful, compactionInstant, clusteringInstant) = commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc, TableInstantInfo(basePath, instantTime, commitActionType, operation))
doWriteOperation 最终调用的是SparkRDDWriteClient对应的方法,如bulkInsert/insert/upsert/insertOverwrite,这里我们以upsert为例:
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) { HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records)); HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses())); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); } return postWrite(resultRDD, instantTime, table); }
- initTable 创建获取一个HoodieSparkMergeOnReadTable
- validateSchema 校验Schema的兼容性
- preWrite 写之前的操作,这个之前有说过,具体参考:Apache Hudi初探(五)(与spark的结合)
- table.upsert 真正写入数据的操作
- 最终调用的是 SparkInsertDeltaCommitActionExecutor<>().execute() 方法,最后最调用到HoodieWriteHelper.write:
public HoodieWriteMetadata<O> write(String instantTime, I inputRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table, boolean shouldCombine, int shuffleParallelism, BaseCommitActionExecutor<T, I, K, O, R> executor, WriteOperationType operationType) { try { // De-dupe/merge if needed I dedupedRecords = combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table); Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; if (table.getIndex().requiresTagging(operationType)) { // perform index loop up to get existing location of records context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName()); taggedRecords = tag(dedupedRecords, context, table); } Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now()); HoodieWriteMetadata<O> result = executor.execute(taggedRecords); result.setIndexLookupDuration(indexLookupDuration); return result; } catch (Throwable e) { if (e instanceof HoodieUpsertException) { throw (HoodieUpsertException) e; } throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e); } }
combineOnCondition 数据去重
最终是调用HoodieRecordPayload.preCombine(默认是OverwriteWithLatestAvroPayload.preCombine)
taggedRecords = tag(dedupedRecords, context, table) 因为默认的index是HoodieSimpleIndex,所以这个时候会调用到打标记这个操作
最终调用到的是HoodieSimpleIndex的tagLocationInternal,此时获得的是带有location的记录(如果没有索引到,则 record中的location为null)
executor.execute(taggedRecords) 该方法最终调用到BaseSparkCommitActionExecutor.execute方法:
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) { // Cache the tagged records, so we don't end up computing both // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords); if (inputRDD.getStorageLevel() == StorageLevel.NONE()) { inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); } else { LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel()); } WorkloadProfile workloadProfile = null; if (isWorkloadProfileNeeded()) { context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile: " + config.getTableName()); workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles()); LOG.info("Input workload profile :" + workloadProfile); } // partition using the insert partitioner final Partitioner partitioner = getPartitioner(workloadProfile); if (isWorkloadProfileNeeded()) { saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime); } // handle records update with clustering Set<HoodieFileGroupId> fileGroupsInPendingClustering = table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()); HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering); context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName()); HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>(); updateIndexAndCommitIfNeeded(writeStatuses, result); return result; }
inputRDD.persist持久化当前的RDD,因为该RDD会被使用多次,便于加速
workloadProfile = new WorkloadProfile(buildProfile(inputRecords)
构建一个状态信息,主要是记录一下插入的记录数量和更新的记录数量 其中主要形成了以filedId为key,
*Pair<instantTime,count>*为value的Map数据
final Partitioner partitioner = getPartitioner(workloadProfile) 这里针对于upsert操作会返回UpsertPartitioner(因为默认hoodie.storage.layout.type为DEFAULT),
其中该UpsertPartitioner实例的构造方法中会进行一些额外的操作 assignUpdates和assignInserts(这里暂时忽略),主要是对数据进行分区处理,设计到小文件的处理
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner)
这里会根据hoodie.table.base.file.format的值(默认是parquet),如果是hfile,则会要求排序,如果没有则只是按照partitioner进行重分区,
之后再进行数据insert或者update,具体的方法为handleUpsertPartition,会根据之前的partitoner信息进行插入或者更新(里面的细节有点复杂)
updateIndexAndCommitIfNeeded(writeStatuses, result)
该操作会首先会更新索引信息,对于HoodieSimpleIndex来说,什么也不操作(因为该index每次都会从parquet文件中读取信息从而组装成index),
其次如果hoodie.auto.commit为true(默认是true)会进行元数据的commit操作,这些commit的操作和之前Apache Hudi初探(六)(与spark的结合)相似,会涉及到Compcation操作,可以后续再做分析
- postWrite 这里的postCommit 和之前的table.upsert有重复?
- commitAndPerformPostOperations
这里主要是异步Compcation和Clustering以及同步hive元数据,类似Apache Hudi初探(七)(与spark的结合)