详解ApacheHudi如何节约宝贵的存储空间

简介: 详解ApacheHudi如何节约宝贵的存储空间

1. 介绍

对于旧版本的数据 Hudi需要将其删除以节约宝贵的存储空间, Clean操作有两种策略:KEEP_LATEST_FILE_VERSIONS(保留最新的文件版本)和 KEEP_LATEST_COMMITS(保留最新的提交),不同的策略会有不同的行为, Clean阶段被分为生成 HoodieCleanerPlan和执行 HoodieCleanerPlan,下面分析 Clean的具体实现。

2. 分析

在每次 commit时,会判断是否开启自动 clean来执行 clean操作,或者由用户手动触发 clean操作。

2.1 生成HoodieCleanerPlan

对于在 commit时执行的 clean操作,其核心入口为 HoodieCleanClient#clean,核心代码如下

protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
    final HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
  table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
      // 先处理未完成的clean操作
      runClean(table, hoodieInstant.getTimestamp());
    });
    // 生成cleanerPlan
    Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
    if (cleanerPlanOpt.isPresent()) {
      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
      if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
          && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
        final HoodieTable<T> hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
        // 开始执行clean操作
        return runClean(hoodieTable, startCleanTime);
      }
    }
    return null;
  }

需要先处理未完成的 clean,然后再通过 scheduleClean方法生成 HoodieCleanerPlan,其核心代码如下

protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    // 生成Plan
    HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc);
    if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
      // 生成REQUESTED状态的instant
      HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
      // Save to both aux and timeline folder
      try {
        // 将Plan序列化后保存至aux文件夹(Plan序列化)和.hooidie文件夹(空内容)
        table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan));
      } catch (IOException e) {
        LOG.error("Got exception when saving cleaner requested file", e);
        throw new HoodieIOException(e.getMessage(), e);
      }
      return Option.of(cleanerPlan);
    }
    return Option.empty();
  }

会继续调用 HoodieCopyOnWriteTable#scheduleClean生成 HoodieCleanerPlan,核心代码如下

public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
    try {
      HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
      // 找出保留的最早的instant
      Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
      // 获取需要处理的partition,对于增量clean而言,会找到上
      List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant);
      if (partitionsToClean.isEmpty()) {
        // 空的Plan
        return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
      }
      Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism)
          .map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean)))
          .collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
      return new HoodieCleanerPlan(earliestInstant
          .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
          config.getCleanerPolicy().name(), cleanOps, 1);
    } catch (IOException e) {
      throw new HoodieIOException("Failed to schedule clean operation", e);
    }
  }

对于 HoodieCleanerPlan的生成,首先会找出最早需要保留的 instant,然后获取对应所有的分区路径。

2.1.1 获取待Clean的所有分区

获取所有待clean分区路径的核心代码如下

public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
    if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent()
        && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
      Option<HoodieInstant> lastClean =
          hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
      if (lastClean.isPresent()) {
        HoodieCleanMetadata cleanMetadata = AvroUtils
            .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
        if ((cleanMetadata.getEarliestCommitToRetain() != null)
            && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
          // 过滤出大于上次clean的instant时间并且小于当前clean中最早保留的instant时间
          return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> {
            return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
                HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
                newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER);
          }).flatMap(instant -> {
            try {
              // 获取对应的partition
              HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
                  hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
              return commitMetadata.getPartitionToWriteStats().keySet().stream();
            } catch (IOException e) {
              throw new HoodieIOException(e.getMessage(), e);
            }
          }).distinct().collect(Collectors.toList());
        }
      }
    }
    // Otherwise go to brute force mode of scanning all partitions
    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
        hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
  }

可以看到,如果开启了增量 clean并且当前 instant不为空,同时策略为 KEEP_LATEST_COMMITS时,则会先取上次已完成的 clean,然后过滤出从上次已完成 cleaninstant时间到当前clean中需要保留的最早的 instant时间,并获取对应的分区路径;否则直接返回所有的分区路径。

在获取了所有待 clean的分区路径后,还需要调用 HoodieCleanHelper#getDeletePaths获取所有待删除的文件路径。

2.1.2 获取待删除文件

获取待删除文件的核心代码如下

public List<String> getDeletePaths(String partitionPath) throws IOException {
    // 获取策略
    HoodieCleaningPolicy policy = config.getCleanerPolicy();
    List<String> deletePaths;
    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
      // 获取待删除的文件
      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
      // 获取待删除的文件
      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
    } else {
      throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
    }
    return deletePaths;
  }

可以看到,根据策略的不同调用不同方法来获取分区下待删除的所有文件。

对于 KEEP_LATEST_COMMITS策略而言, getFilesToCleanKeepingLatestCommits核心代码如下

private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) throws IOException {
    int commitsRetained = config.getCleanerCommitsRetained();
    List<String> deletePaths = new ArrayList<>();
    // 获取所有完成savepoint的文件
    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
        .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
    // 是否达到指定大小
    if (commitTimeline.countInstants() > commitsRetained) {
      // 找到最早需要保留的instant
      HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
      // 获取指定分区路径下的所有HoodieFileGroup
      List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
      for (HoodieFileGroup fileGroup : fileGroups) {
        // 获取所有的FileSlice
        List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
        if (fileSliceList.isEmpty()) {
          continue;
        }
        // 获取最大的instant时间
        String lastVersion = fileSliceList.get(0).getBaseInstantTime();
        // 获取小于最早需要保留的instant的最新版本
        String lastVersionBeforeEarliestCommitToRetain =
            getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
        // 遍历FileSlice
        for (FileSlice aSlice : fileSliceList) {
          // 获取数据文件
          Option<HoodieDataFile> aFile = aSlice.getDataFile();
          String fileCommitTime = aSlice.getBaseInstantTime();
          if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
            // 数据文件存在并且包含在savepoint文件里面,则略过
            continue;
          }
          if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
            // 文件时间与FileSlice时间相等,或者文件时间与小于最早需要保留的instant的最新版本相等,则略过
            continue;
          }
          // 文件不用于压缩并且最早需要保留时间大于文件时间
          if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
              .compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
            // 需要被clean
            aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName()));
            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
              // 如果是MOR,则将所有的日志也可一并clean
              deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
            }
          }
        }
      }
    }
    return deletePaths;
  }

可以看到,找出待删除的文件需要会经过一系列判断,如不能删除 savepoint文件,不能删除小于最早需要保留的 instant的时间文件(因为该文件可能还是会被使用),不能删除待压缩的文件。仅删除那些小于最早需要保留的 instant的文件,并且如果是 MOR类型,那么可以将日志文件一并删除。

对于 KEEP_LATEST_FILE_VERSIONS策略而言, getFilesToCleanKeepingLatestVersions获取待删除文件的核心代码如下

private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) throws IOException {
       // 获取所有的文件组
    List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
    List<String> deletePaths = new ArrayList<>();
    // 获取所有完成savepoint的文件
    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
        .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
    for (HoodieFileGroup fileGroup : fileGroups) {
      // 需要保留的版本
      int keepVersions = config.getCleanerFileVersionsRetained();
      // 过滤需要被压缩的文件
      Iterator<FileSlice> fileSliceIterator =
          fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
      if (isFileGroupInPendingCompaction(fileGroup)) { // 正进行compaction
        keepVersions--;
      }
      while (fileSliceIterator.hasNext() && keepVersions > 0) {
        FileSlice nextSlice = fileSliceIterator.next();
        Option<HoodieDataFile> dataFile = nextSlice.getDataFile();
        if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
          // 跳过savepoint文件处理
          continue;
        }
        keepVersions--;
      }
      while (fileSliceIterator.hasNext()) { // 还剩余FileSlice
        FileSlice nextSlice = fileSliceIterator.next();
        if (nextSlice.getDataFile().isPresent()) {
          HoodieDataFile dataFile = nextSlice.getDataFile().get();
          deletePaths.add(dataFile.getFileName());
        }
        if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
          // 如果是MOR,则将所有的日志也可一并clean
          deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
        }
      }
    }
    return deletePaths;
  }

可以看到,对于 savepoint的文件也不能删除,也不能删除待压缩的文件。仅删除那些不需要继续保留的版本的文件,如果是 MOR类型,那么可以将日志文件一并删除。

2.2 执行HoodieCleanerPlan

在生成 HoodieCleanerPlan后,会将其序列化并保存至元数据目录,然后开始执行,其核心在 HoodieCleanClient#runClean,其核心代码如下

public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant) {
    try {
      // 从元数据目录下反序列化HoodieCleanerPlan
      HoodieCleanerPlan cleanerPlan = AvroUtils.deserializeCleanerPlan(getActiveTimeline()
          .getInstantAuxiliaryDetails(HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get());
      // 计算并行度
      int cleanerParallelism = Math.min(
          (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()),
          config.getCleanerParallelism());
      List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
          .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
              .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<String, String>(x.getKey(), y)))
              .collect(Collectors.toList()), cleanerParallelism)
          .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect();
      Map<String, PartitionCleanStat> partitionCleanStatsMap =
          partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
      // 返回统计信息
      return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
        PartitionCleanStat partitionCleanStat =
            (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
                : new PartitionCleanStat(partitionPath);
        HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
        return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
            .withEarliestCommitRetained(Option.ofNullable(
                actionInstant != null
                    ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
                        actionInstant.getAction(), actionInstant.getTimestamp())
                    : null))
            .withDeletePathPattern(partitionCleanStat.deletePathPatterns)
            .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
            .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
      }).collect(Collectors.toList());
    } catch (IOException e) {
      throw new HoodieIOException("Failed to clean up after commit", e);
    }
  }

这块代码的核心逻辑非常简单,首先反序列出 HoodieCleanerPlan,然后再开始执行删除操作,实际删除文件操作由 deleteFilesFunc处理,然后返回 Clean的统计信息。

3. 总结

对于 Clean操作, Hudi提供了两种策略:基于文件版本和基于提交保留数。并且将 Clean分为生成 HoodieCleanerPlan和执行 HoodieCleanerPlan两个阶段,两阶段并不直接关联.在生成 HoodieCleanerPlan时会找出所有符合指定策略的待删除文件,并且为了避免每次全分区处理,Hudi还提供了增量 Clean配置项,即仅仅只处理从上次 Clean后影响的分区,然后将 HoodieCleanerPlan序列化至元数据(.aux)目录,在执行阶段会从元数据目录中反序列化后执行删除文件操作。

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
目录
相关文章
|
SQL 机器学习/深度学习 分布式计算
Apache Spark 3.0.0重磅发布 —— 重要特性全面解析
开发了近两年(自2018年10月份至今)的Apache SparkTM 3.0.0正式发布! Apache SparkTM 3.0.0版本包含3400多个补丁,是开源社区做出巨大贡献的结晶,在Python和SQL功能方面带来了重大进展并且将重点聚焦在了开发和生产的易用性上。同时,今年也是Spark开源10周年,这些举措反映了Spark自开源以来,是如何不断的满足更广泛的受众需求以及更多的应用场景
Apache Spark 3.0.0重磅发布 —— 重要特性全面解析
|
10月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1233 43
|
存储 消息中间件 缓存
腾讯看点基于 Flink 的实时数仓及多维实时数据分析实践
当业务发展到一定规模,实时数据仓库是一个必要的基础服务。从数据驱动方面考虑,多维实时数据分析系统的重要性也不言而喻。但是当数据量巨大的情况下,拿腾讯看点来说,一天上报的数据量达到万亿级的规模,要实现极低延迟的实时计算和亚秒级的多维实时查询是有技术挑战的。
腾讯看点基于 Flink 的实时数仓及多维实时数据分析实践
|
9月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
3107 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
6月前
|
缓存 开发工具 git
QLExpress使用及源码分析
QLExpress是阿里开源的轻量级规则引擎,支持通过注解与YAML配置实现业务逻辑与代码解耦。结合实体别名、接口规则定义及脚本化表达式,实现动态计算与判断,如用户年龄判断、BMI计算等。支持AST语法树解析与上下文绑定,提供灵活的二次扩展能力,适用于复杂业务场景的延迟执行与缓存优化。
|
人工智能 Linux API
119K star!无需GPU轻松本地部署多款大模型,DeepSeek支持!这个开源神器绝了
"只需一行命令就能在本地运行Llama 3、DeepSeek-R1等前沿大模型,支持Windows/Mac/Linux全平台,这个开源项目让AI开发从未如此简单!"
884 0
|
Java Linux 测试技术
JMeter的运行
JMeter是一款基于Java的压力测试工具,适用于Windows、Mac及Linux系统。运行JMeter需Java 8及以上版本,建议至少1GB内存。用户可通过双击bin目录下的jmeter.bat/.sh文件或命令行启动。其主界面包括文件、编辑、查找、运行、选项与帮助等菜单,支持测试计划的创建、编辑与执行,并提供详细的帮助文档。正确配置环境变量可简化启动流程。
|
安全 Java
【JAVA】在 Queue 中 poll()和 remove()有什么区别
【JAVA】在 Queue 中 poll()和 remove()有什么区别
揭秘ApacheHudi数据湖的文件管理
揭秘ApacheHudi数据湖的文件管理
420 0
|
SQL 分布式计算 HIVE
sparksql 参数调优
sparksql 参数调优