1. 介绍
压缩( compaction
)用于在 MergeOnRead
存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。用户可通过 hudi-cli
提供的命令行显示触发 compaction
或者在使用 HoodieDeltaStreamer
将上游(Kafka/DFS)数据写入 hudi
数据集时进行相应配置,然后由系统自动进行 compaction
操作。
2. 分析
对于 compaction
操作,Hudi主要将其分为生成 HoodieCompactionPlan
和执行 HoodieCompactionPlan
两阶段。
2.1 生成HoodieCompactionPlan
生成 HoodieCompactionPlan
的主要入口在 HoodieWriteClient#scheduleCompaction
。其核心代码如下
public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException { // 创建新的commitTime,单调递增 String instantTime = HoodieActiveTimeline.createNewCommitTime(); // 调度compaction boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata); return notEmpty ? Option.of(instantTime) : Option.empty(); }
首先获取新的 commitTime
(单调递增),然后调用 scheduleCompactionAtInstant
生成 HoodieCompactionPlan
,其核心代码如下
public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); // 先进行一些检查,1. 如果有inflight状态的写入,那么最早的instant的时间一定大于正在进行压缩的时间;2. commit、deltacommit、compaction类型的instant的时间一定小于正在进行压缩的时间 metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight -> { Preconditions.checkArgument( HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER), "Earliest write inflight instant time must be later " + "than compaction time. Earliest :" + earliestInflight + ", Compaction scheduled at " + instantTime); }); List<HoodieInstant> conflictingInstants = metaClient .getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> HoodieTimeline .compareTimestamps(instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL)) .collect(Collectors.toList()); Preconditions.checkArgument(conflictingInstants.isEmpty(), "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" + conflictingInstants); HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); // 开始生成compactionPlan HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime); if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) { extraMetadata.ifPresent(workload::setExtraMetadata); HoodieInstant compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); // 序列化后保存至元数据(.aux)目录下 metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant, AvroUtils.serializeCompactionPlan(workload)); return true; } return false; }
该方法首先会进行校验,包括如果存在 inflight
状态的 instant
,那么最早的 instant
的时间一定要大于当前压缩的时间(可知 compaction
时不允许还有处于 inflight
状态的非 compaction
类型的 instant
),以及对于 commit
、 deltacommit
、 compaction
类型的 instant
的时间一定要小于当前压缩的时间( compaction
时必须保证所有 completed
、 inflight
、 requested
的 compaction
的时间必须小于当前压缩时间)。
调度生成 CompactionPlan
的 scheduleCompaction
核心代码如下
public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) { // 找出最后完成的compaction的instant Option<HoodieInstant> lastCompaction = getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); String deltaCommitsSinceTs = "0"; if (lastCompaction.isPresent()) { // 上一次compaction存在 deltaCommitsSinceTs = lastCompaction.get().getTimestamp(); } // 统计从开始到现在总共有多少个deltacommit的instant int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline() .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants(); if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { // 生成空的Plan return new HoodieCompactionPlan(); } HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { // 生成CompactionPlan return compactor.generateCompactionPlan(jsc, this, config, instantTime, ((SyncableFileSystemView) getRTFileSystemView()).getPendingCompactionOperations() .map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId()) .collect(Collectors.toSet())); } catch (IOException e) { throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); } }
可以看到首先会根据从上次进行 compact
之后是否又满足再次 compact
的条件(即 deltacommit
次数是否已经达到要求),然后再调用 generateCompactionPlan
方法生成计划,其核心代码如下
public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime, Set<HoodieFileGroupId> fgIdsInPendingCompactions) throws IOException { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); // 找出所有的分区路径 List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); // 根据策略过滤分区路径 partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths); if (partitionPaths.isEmpty()) { // 无分区路径,则返回null return null; } RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) .filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())).map(s -> { // 获取增量日志文件 List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); // 获取数据文件 Option<HoodieDataFile> dataFile = s.getDataFile(); // 生成CompactionOperation return new CompactionOperation(dataFile, partitionPath, logFiles, config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator()) .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); // 根据策略、以及pending的CompactionPlan生成新的HoodieCompactionPlan HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); Preconditions.checkArgument( compactionPlan.getOperations().stream().noneMatch( op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " + "Please fix your strategy implementation." + "FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions + ", Selected workload :" + compactionPlan); if (compactionPlan.getOperations().isEmpty()) { log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); } return compactionPlan; }
可以看到,首先会获取所有的分区,对于每个分区,获取最新的所有不属于正在进行 compaction
操作中的 FileSlice
,对于 FileSlice
,然后再获取对应的数据文件、日志文件、并计算指标信息后生成 CompactionOperation
,并过滤出增量日志不为空的 CompactionOperation
,然后根据 CompactionOperation
构造 HoodieCompactionOperation
,最后会根据 HoodieCompactionOperation
生成 HoodieCompactionPlan
(会对这次的 HoodieCompactionOperation
和pending的 HoodieCompactionPlan
中的operations进行排序,过滤选出 HoodieCompactionOperation
),需确保同个文件不会存在于多个 HoodieCompactionPlan
中。
在生成完 HoodieCompactionPlan
后,会将其序列化后保存在 .hoodie/.aux
元数据目录下,此时状态为 requested
,此时便完成了 HoodieCompactionPlan
的生成和写入。
2.2 执行HoodieCompactionPlan
在生成完 HoodieCompactionPlan
并保存在文件中后,执行 compaction
时,最终会调用 HoodieWriteClient#compact
方法,其核心代码如下
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); // 获取compaction类型的timeline(包括requested、inflight状态) HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); // 构造inflight状态的instant HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { // timeline中包含该instant // 回滚inflight的compaction rollbackInflightCompaction(inflightInstant, table); metaClient = createMetaClient(true); table = HoodieTable.getHoodieTable(metaClient, config, jsc); // 再次获取compaction类型的timeline(包括requested、inflight状态) pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); } // 构造requested状态的instant HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(instant)) { // 包含 return runCompaction(instant, metaClient.getActiveTimeline(), autoCommit); } else { throw new IllegalStateException( "No Compaction request available at " + compactionInstantTime + " to run compaction"); } }
方法首先会进行检查,如果包含了 inflight
状态的 instant
,则需要回滚(以这次 compaction
为准),然后再调用 runCompaction
方法执行 compaction
,其核心代码如下
private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, boolean autoCommit) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); // 从之前的.hoodie/.aux目录下反序列化出Plan HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp()); // 将状态从requested转化为inflight,会重命名之前的requested文件 activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); compactionTimer = metrics.getCompactionCtx(); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); // 开始真正执行compact操作 JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan); // Force compaction action statuses.persist(config.getWriteStatusStorageLevel()); // 提交compaction commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit, Option.ofNullable(compactionPlan.getExtraMetadata())); return statuses; }
可以看到,首先会从之前序列化的文件中反序列出 HoodieCompactionPlan
,然后变更状态后开始调用 compact
方法执行compact操作,该方法最终会调用 HoodieRealtimeTableCompactor#compact
,其核心代码如下
public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, HoodieCompactionPlan compactionPlan, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException { // 检验 if (compactionPlan == null || (compactionPlan.getOperations() == null) || (compactionPlan.getOperations().isEmpty())) { return jsc.emptyRDD(); } HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); // 生成一个CopyOnWriteTable,开始的所有操作都是基于MergeOnReadTable HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); // 将HoodieCompactionOperation转化为CompactionOperation List<CompactionOperation> operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); return jsc.parallelize(operations, operations.size()) .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator); }
可以看到,其核心逻辑在于重新生成了一个 HoodieCopyOnWriteTable
,然后将 HoodieCompactionOperation
转化为 CompactionOperation
,最后继续调用 compact
进行压缩操作,其核心代码如下
private List<WriteStatus> compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String commitTime) throws IOException { FileSystem fs = metaClient.getFs(); // 获取schema Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); // 获取类型为(commit、rollback、deltacommit)中最大的instant的时间 String maxInstantTime = metaClient .getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); // 获取日志文件 List<String> logFiles = operation.getDeltaFileNames().stream().map( p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) .collect(toList()); // 用户读取日志文件中记录的扫描迭代器 HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles, readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), config.getSpillableMapBasePath()); if (!scanner.iterator().hasNext()) { return Lists.<WriteStatus>newArrayList(); } // 获取数据文件 Option<HoodieDataFile> oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath()); // Compacting is very similar to applying updates to existing file Iterator<List<WriteStatus>> result; if (oldDataFileOpt.isPresent()) { // 数据文件存在 // 则使用COW类型处理更新 result = hoodieCopyOnWriteTable.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(), oldDataFileOpt.get()); } else { // 数据文件不存在 // 则使用COW类型处理插入 result = hoodieCopyOnWriteTable.handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(), scanner.iterator()); } // 获取指标信息 Iterable<List<WriteStatus>> resultIterable = () -> result; return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> { s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog()); s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles()); s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); s.getStat().setPartitionPath(operation.getPartitionPath()); s.getStat() .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue()); s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks()); s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks()); s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks()); RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks()); s.getStat().setRuntimeStats(runtimeStats); }).collect(toList()); }
可以看到核心流程是构造一个log增量日志文件的记录迭代器(后续单独分析),然后判断该 operation
下的数据文件是否为空,若为空,则将所有记录写入新的parquet数据文件,若不为空,则将增量日志文件记录更新至parquet数据文件(与旧的数据文件记录合并后写入parquet文件)。在写入数据文件后会将写入的指标信息写入文件中, 并且将 compaction
的状态标记为 completed
,同时会将其变更为 timeline
上的 commit
(文件格式为 commitTime.commit
)。
3. 总结
compaction
时只存在于 MergeOnRead
存储类型时的操作,其首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件( FileSlice
),然后生成 HoodieCompactionPlan(每个FileSlice对应一个HoodieCompactionOperation)
并将其序列化至文件中,然后在执行 compaction
操作时会将其从文件中反序列化,然后从 HoodieCompactionPlan
中获取 HoodieCompactionOperation
并进行压缩,即会构建一个用于迭代log增量日志文件的迭代器,然后与旧的parquet数据文件进行合并或写入parquet数据文件,完成后会将统计信息写入文件,而 completed
的 compaction
操作在 timeline
上表现为 commit
。