1. 介绍
对于旧版本的数据 Hudi
需要将其删除以节约宝贵的存储空间, Clean
操作有两种策略:KEEP_LATEST_FILE_VERSIONS
(保留最新的文件版本)和 KEEP_LATEST_COMMITS
(保留最新的提交),不同的策略会有不同的行为, Clean
阶段被分为生成 HoodieCleanerPlan
和执行 HoodieCleanerPlan
,下面分析 Clean
的具体实现。
2. 分析
在每次 commit
时,会判断是否开启自动 clean
来执行 clean
操作,或者由用户手动触发 clean
操作。
2.1 生成HoodieCleanerPlan
对于在 commit
时执行的 clean
操作,其核心入口为 HoodieCleanClient#clean
,核心代码如下
protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException { final HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> { // 先处理未完成的clean操作 runClean(table, hoodieInstant.getTimestamp()); }); // 生成cleanerPlan Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime); if (cleanerPlanOpt.isPresent()) { HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get(); if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { final HoodieTable<T> hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); // 开始执行clean操作 return runClean(hoodieTable, startCleanTime); } } return null; }
需要先处理未完成的 clean
,然后再通过 scheduleClean
方法生成 HoodieCleanerPlan
,其核心代码如下
protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) { HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); // 生成Plan HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc); if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { // 生成REQUESTED状态的instant HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime); // Save to both aux and timeline folder try { // 将Plan序列化后保存至aux文件夹(Plan序列化)和.hooidie文件夹(空内容) table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan)); } catch (IOException e) { LOG.error("Got exception when saving cleaner requested file", e); throw new HoodieIOException(e.getMessage(), e); } return Option.of(cleanerPlan); } return Option.empty(); }
会继续调用 HoodieCopyOnWriteTable#scheduleClean
生成 HoodieCleanerPlan
,核心代码如下
public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) { try { HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config); // 找出保留的最早的instant Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain(); // 获取需要处理的partition,对于增量clean而言,会找到上 List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant); if (partitionsToClean.isEmpty()) { // 空的Plan return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); } Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism) .map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean))) .collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue)); return new HoodieCleanerPlan(earliestInstant .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), config.getCleanerPolicy().name(), cleanOps, 1); } catch (IOException e) { throw new HoodieIOException("Failed to schedule clean operation", e); } }
对于 HoodieCleanerPlan
的生成,首先会找出最早需要保留的 instant
,然后获取对应所有的分区路径。
2.1.1 获取待Clean的所有分区
获取所有待clean分区路径的核心代码如下
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException { if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent() && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) { Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant(); if (lastClean.isPresent()) { HoodieCleanMetadata cleanMetadata = AvroUtils .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get()); if ((cleanMetadata.getEarliestCommitToRetain() != null) && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { // 过滤出大于上次clean的instant时间并且小于当前clean中最早保留的instant时间 return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> { return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER); }).flatMap(instant -> { try { // 获取对应的partition HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); return commitMetadata.getPartitionToWriteStats().keySet().stream(); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } }).distinct().collect(Collectors.toList()); } } } // Otherwise go to brute force mode of scanning all partitions return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()); }
可以看到,如果开启了增量 clean
并且当前 instant
不为空,同时策略为 KEEP_LATEST_COMMITS
时,则会先取上次已完成的 clean
,然后过滤出从上次已完成 clean
的 instant
时间到当前clean中需要保留的最早的 instant
时间,并获取对应的分区路径;否则直接返回所有的分区路径。
在获取了所有待 clean
的分区路径后,还需要调用 HoodieCleanHelper#getDeletePaths
获取所有待删除的文件路径。
2.1.2 获取待删除文件
获取待删除文件的核心代码如下
public List<String> getDeletePaths(String partitionPath) throws IOException { // 获取策略 HoodieCleaningPolicy policy = config.getCleanerPolicy(); List<String> deletePaths; if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { // 获取待删除的文件 deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { // 获取待删除的文件 deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath); } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } return deletePaths; }
可以看到,根据策略的不同调用不同方法来获取分区下待删除的所有文件。
对于 KEEP_LATEST_COMMITS
策略而言, getFilesToCleanKeepingLatestCommits
核心代码如下
private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) throws IOException { int commitsRetained = config.getCleanerCommitsRetained(); List<String> deletePaths = new ArrayList<>(); // 获取所有完成savepoint的文件 List<String> savepointedFiles = hoodieTable.getSavepoints().stream() .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList()); // 是否达到指定大小 if (commitTimeline.countInstants() > commitsRetained) { // 找到最早需要保留的instant HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get(); // 获取指定分区路径下的所有HoodieFileGroup List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { // 获取所有的FileSlice List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); if (fileSliceList.isEmpty()) { continue; } // 获取最大的instant时间 String lastVersion = fileSliceList.get(0).getBaseInstantTime(); // 获取小于最早需要保留的instant的最新版本 String lastVersionBeforeEarliestCommitToRetain = getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); // 遍历FileSlice for (FileSlice aSlice : fileSliceList) { // 获取数据文件 Option<HoodieDataFile> aFile = aSlice.getDataFile(); String fileCommitTime = aSlice.getBaseInstantTime(); if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { // 数据文件存在并且包含在savepoint文件里面,则略过 continue; } if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { // 文件时间与FileSlice时间相等,或者文件时间与小于最早需要保留的instant的最新版本相等,则略过 continue; } // 文件不用于压缩并且最早需要保留时间大于文件时间 if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline .compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) { // 需要被clean aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName())); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // 如果是MOR,则将所有的日志也可一并clean deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); } } } } } return deletePaths; }
可以看到,找出待删除的文件需要会经过一系列判断,如不能删除 savepoint
文件,不能删除小于最早需要保留的 instant
的时间文件(因为该文件可能还是会被使用),不能删除待压缩的文件。仅删除那些小于最早需要保留的 instant
的文件,并且如果是 MOR
类型,那么可以将日志文件一并删除。
对于 KEEP_LATEST_FILE_VERSIONS
策略而言, getFilesToCleanKeepingLatestVersions
获取待删除文件的核心代码如下
private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) throws IOException { // 获取所有的文件组 List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); List<String> deletePaths = new ArrayList<>(); // 获取所有完成savepoint的文件 List<String> savepointedFiles = hoodieTable.getSavepoints().stream() .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { // 需要保留的版本 int keepVersions = config.getCleanerFileVersionsRetained(); // 过滤需要被压缩的文件 Iterator<FileSlice> fileSliceIterator = fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator(); if (isFileGroupInPendingCompaction(fileGroup)) { // 正进行compaction keepVersions--; } while (fileSliceIterator.hasNext() && keepVersions > 0) { FileSlice nextSlice = fileSliceIterator.next(); Option<HoodieDataFile> dataFile = nextSlice.getDataFile(); if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { // 跳过savepoint文件处理 continue; } keepVersions--; } while (fileSliceIterator.hasNext()) { // 还剩余FileSlice FileSlice nextSlice = fileSliceIterator.next(); if (nextSlice.getDataFile().isPresent()) { HoodieDataFile dataFile = nextSlice.getDataFile().get(); deletePaths.add(dataFile.getFileName()); } if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // 如果是MOR,则将所有的日志也可一并clean deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList())); } } } return deletePaths; }
可以看到,对于 savepoint
的文件也不能删除,也不能删除待压缩的文件。仅删除那些不需要继续保留的版本的文件,如果是 MOR
类型,那么可以将日志文件一并删除。
2.2 执行HoodieCleanerPlan
在生成 HoodieCleanerPlan
后,会将其序列化并保存至元数据目录,然后开始执行,其核心在 HoodieCleanClient#runClean
,其核心代码如下
public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant) { try { // 从元数据目录下反序列化HoodieCleanerPlan HoodieCleanerPlan cleanerPlan = AvroUtils.deserializeCleanerPlan(getActiveTimeline() .getInstantAuxiliaryDetails(HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get()); // 计算并行度 int cleanerParallelism = Math.min( (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()), config.getCleanerParallelism()); List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<String, String>(x.getKey(), y))) .collect(Collectors.toList()), cleanerParallelism) .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect(); Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); // 返回统计信息 return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { PartitionCleanStat partitionCleanStat = (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath) : new PartitionCleanStat(partitionPath); HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) .withEarliestCommitRetained(Option.ofNullable( actionInstant != null ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), actionInstant.getAction(), actionInstant.getTimestamp()) : null)) .withDeletePathPattern(partitionCleanStat.deletePathPatterns) .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles) .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build(); }).collect(Collectors.toList()); } catch (IOException e) { throw new HoodieIOException("Failed to clean up after commit", e); } }
这块代码的核心逻辑非常简单,首先反序列出 HoodieCleanerPlan
,然后再开始执行删除操作,实际删除文件操作由 deleteFilesFunc
处理,然后返回 Clean
的统计信息。
3. 总结
对于 Clean
操作, Hudi
提供了两种策略:基于文件版本和基于提交保留数。并且将 Clean
分为生成 HoodieCleanerPlan
和执行 HoodieCleanerPlan
两个阶段,两阶段并不直接关联.在生成 HoodieCleanerPlan
时会找出所有符合指定策略的待删除文件,并且为了避免每次全分区处理,Hudi还提供了增量 Clean
配置项,即仅仅只处理从上次 Clean
后影响的分区,然后将 HoodieCleanerPlan
序列化至元数据(.aux)目录,在执行阶段会从元数据目录中反序列化后执行删除文件操作。