1. 介绍
压缩( compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。用户可通过 hudi-cli提供的命令行显示触发 compaction或者在使用 HoodieDeltaStreamer将上游(Kafka/DFS)数据写入 hudi数据集时进行相应配置,然后由系统自动进行 compaction操作。
2. 分析
对于 compaction操作,Hudi主要将其分为生成 HoodieCompactionPlan和执行 HoodieCompactionPlan两阶段。
2.1 生成HoodieCompactionPlan
生成 HoodieCompactionPlan的主要入口在 HoodieWriteClient#scheduleCompaction。其核心代码如下
public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException { // 创建新的commitTime,单调递增 String instantTime = HoodieActiveTimeline.createNewCommitTime(); // 调度compaction boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata); return notEmpty ? Option.of(instantTime) : Option.empty(); }
首先获取新的 commitTime(单调递增),然后调用 scheduleCompactionAtInstant生成 HoodieCompactionPlan,其核心代码如下
public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); // 先进行一些检查,1. 如果有inflight状态的写入,那么最早的instant的时间一定大于正在进行压缩的时间;2. commit、deltacommit、compaction类型的instant的时间一定小于正在进行压缩的时间 metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight -> { Preconditions.checkArgument( HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER), "Earliest write inflight instant time must be later " + "than compaction time. Earliest :" + earliestInflight + ", Compaction scheduled at " + instantTime); }); List<HoodieInstant> conflictingInstants = metaClient .getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> HoodieTimeline .compareTimestamps(instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL)) .collect(Collectors.toList()); Preconditions.checkArgument(conflictingInstants.isEmpty(), "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" + conflictingInstants); HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); // 开始生成compactionPlan HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime); if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) { extraMetadata.ifPresent(workload::setExtraMetadata); HoodieInstant compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); // 序列化后保存至元数据(.aux)目录下 metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant, AvroUtils.serializeCompactionPlan(workload)); return true; } return false; }
该方法首先会进行校验,包括如果存在 inflight状态的 instant,那么最早的 instant的时间一定要大于当前压缩的时间(可知 compaction时不允许还有处于 inflight状态的非 compaction类型的 instant),以及对于 commit、 deltacommit、 compaction类型的 instant的时间一定要小于当前压缩的时间( compaction时必须保证所有 completed、 inflight、 requested的 compaction的时间必须小于当前压缩时间)。
调度生成 CompactionPlan的 scheduleCompaction核心代码如下
public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) { // 找出最后完成的compaction的instant Option<HoodieInstant> lastCompaction = getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); String deltaCommitsSinceTs = "0"; if (lastCompaction.isPresent()) { // 上一次compaction存在 deltaCommitsSinceTs = lastCompaction.get().getTimestamp(); } // 统计从开始到现在总共有多少个deltacommit的instant int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline() .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants(); if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { // 生成空的Plan return new HoodieCompactionPlan(); } HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { // 生成CompactionPlan return compactor.generateCompactionPlan(jsc, this, config, instantTime, ((SyncableFileSystemView) getRTFileSystemView()).getPendingCompactionOperations() .map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId()) .collect(Collectors.toSet())); } catch (IOException e) { throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); } }
可以看到首先会根据从上次进行 compact之后是否又满足再次 compact的条件(即 deltacommit次数是否已经达到要求),然后再调用 generateCompactionPlan方法生成计划,其核心代码如下
public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime, Set<HoodieFileGroupId> fgIdsInPendingCompactions) throws IOException { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); // 找出所有的分区路径 List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); // 根据策略过滤分区路径 partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths); if (partitionPaths.isEmpty()) { // 无分区路径,则返回null return null; } RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) .filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())).map(s -> { // 获取增量日志文件 List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); // 获取数据文件 Option<HoodieDataFile> dataFile = s.getDataFile(); // 生成CompactionOperation return new CompactionOperation(dataFile, partitionPath, logFiles, config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator()) .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); // 根据策略、以及pending的CompactionPlan生成新的HoodieCompactionPlan HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); Preconditions.checkArgument( compactionPlan.getOperations().stream().noneMatch( op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " + "Please fix your strategy implementation." + "FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions + ", Selected workload :" + compactionPlan); if (compactionPlan.getOperations().isEmpty()) { log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); } return compactionPlan; }
可以看到,首先会获取所有的分区,对于每个分区,获取最新的所有不属于正在进行 compaction操作中的 FileSlice,对于 FileSlice,然后再获取对应的数据文件、日志文件、并计算指标信息后生成 CompactionOperation,并过滤出增量日志不为空的 CompactionOperation,然后根据 CompactionOperation构造 HoodieCompactionOperation,最后会根据 HoodieCompactionOperation生成 HoodieCompactionPlan(会对这次的 HoodieCompactionOperation和pending的 HoodieCompactionPlan中的operations进行排序,过滤选出 HoodieCompactionOperation),需确保同个文件不会存在于多个 HoodieCompactionPlan中。
在生成完 HoodieCompactionPlan后,会将其序列化后保存在 .hoodie/.aux元数据目录下,此时状态为 requested,此时便完成了 HoodieCompactionPlan的生成和写入。
2.2 执行HoodieCompactionPlan
在生成完 HoodieCompactionPlan并保存在文件中后,执行 compaction时,最终会调用 HoodieWriteClient#compact方法,其核心代码如下
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean autoCommit) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); // 获取compaction类型的timeline(包括requested、inflight状态) HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); // 构造inflight状态的instant HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { // timeline中包含该instant // 回滚inflight的compaction rollbackInflightCompaction(inflightInstant, table); metaClient = createMetaClient(true); table = HoodieTable.getHoodieTable(metaClient, config, jsc); // 再次获取compaction类型的timeline(包括requested、inflight状态) pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); } // 构造requested状态的instant HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(instant)) { // 包含 return runCompaction(instant, metaClient.getActiveTimeline(), autoCommit); } else { throw new IllegalStateException( "No Compaction request available at " + compactionInstantTime + " to run compaction"); } }
方法首先会进行检查,如果包含了 inflight状态的 instant,则需要回滚(以这次 compaction为准),然后再调用 runCompaction方法执行 compaction,其核心代码如下
private JavaRDD<WriteStatus> runCompaction(HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, boolean autoCommit) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); // 从之前的.hoodie/.aux目录下反序列化出Plan HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, compactionInstant.getTimestamp()); // 将状态从requested转化为inflight,会重命名之前的requested文件 activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); compactionTimer = metrics.getCompactionCtx(); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); // 开始真正执行compact操作 JavaRDD<WriteStatus> statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan); // Force compaction action statuses.persist(config.getWriteStatusStorageLevel()); // 提交compaction commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit, Option.ofNullable(compactionPlan.getExtraMetadata())); return statuses; }
可以看到,首先会从之前序列化的文件中反序列出 HoodieCompactionPlan,然后变更状态后开始调用 compact方法执行compact操作,该方法最终会调用 HoodieRealtimeTableCompactor#compact,其核心代码如下
public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, HoodieCompactionPlan compactionPlan, HoodieTable hoodieTable, HoodieWriteConfig config, String compactionInstantTime) throws IOException { // 检验 if (compactionPlan == null || (compactionPlan.getOperations() == null) || (compactionPlan.getOperations().isEmpty())) { return jsc.emptyRDD(); } HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); // 生成一个CopyOnWriteTable,开始的所有操作都是基于MergeOnReadTable HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); // 将HoodieCompactionOperation转化为CompactionOperation List<CompactionOperation> operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); return jsc.parallelize(operations, operations.size()) .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator); }
可以看到,其核心逻辑在于重新生成了一个 HoodieCopyOnWriteTable,然后将 HoodieCompactionOperation转化为 CompactionOperation,最后继续调用 compact进行压缩操作,其核心代码如下
private List<WriteStatus> compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String commitTime) throws IOException { FileSystem fs = metaClient.getFs(); // 获取schema Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); // 获取类型为(commit、rollback、deltacommit)中最大的instant的时间 String maxInstantTime = metaClient .getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); // 获取日志文件 List<String> logFiles = operation.getDeltaFileNames().stream().map( p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) .collect(toList()); // 用户读取日志文件中记录的扫描迭代器 HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles, readerSchema, maxInstantTime, config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), config.getSpillableMapBasePath()); if (!scanner.iterator().hasNext()) { return Lists.<WriteStatus>newArrayList(); } // 获取数据文件 Option<HoodieDataFile> oldDataFileOpt = operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath()); // Compacting is very similar to applying updates to existing file Iterator<List<WriteStatus>> result; if (oldDataFileOpt.isPresent()) { // 数据文件存在 // 则使用COW类型处理更新 result = hoodieCopyOnWriteTable.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords(), oldDataFileOpt.get()); } else { // 数据文件不存在 // 则使用COW类型处理插入 result = hoodieCopyOnWriteTable.handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(), scanner.iterator()); } // 获取指标信息 Iterable<List<WriteStatus>> resultIterable = () -> result; return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> { s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog()); s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles()); s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); s.getStat().setPartitionPath(operation.getPartitionPath()); s.getStat() .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue()); s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks()); s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks()); s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks()); RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks()); s.getStat().setRuntimeStats(runtimeStats); }).collect(toList()); }
可以看到核心流程是构造一个log增量日志文件的记录迭代器(后续单独分析),然后判断该 operation下的数据文件是否为空,若为空,则将所有记录写入新的parquet数据文件,若不为空,则将增量日志文件记录更新至parquet数据文件(与旧的数据文件记录合并后写入parquet文件)。在写入数据文件后会将写入的指标信息写入文件中, 并且将 compaction的状态标记为 completed,同时会将其变更为 timeline上的 commit(文件格式为 commitTime.commit)。
3. 总结
compaction时只存在于 MergeOnRead存储类型时的操作,其首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件( FileSlice),然后生成 HoodieCompactionPlan(每个FileSlice对应一个HoodieCompactionOperation)并将其序列化至文件中,然后在执行 compaction操作时会将其从文件中反序列化,然后从 HoodieCompactionPlan中获取 HoodieCompactionOperation并进行压缩,即会构建一个用于迭代log增量日志文件的迭代器,然后与旧的parquet数据文件进行合并或写入parquet数据文件,完成后会将统计信息写入文件,而 completed的 compaction操作在 timeline上表现为 commit。