详解ApacheHudi如何节约宝贵的存储空间

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 详解ApacheHudi如何节约宝贵的存储空间

1. 介绍

对于旧版本的数据 Hudi需要将其删除以节约宝贵的存储空间, Clean操作有两种策略:KEEP_LATEST_FILE_VERSIONS(保留最新的文件版本)和 KEEP_LATEST_COMMITS(保留最新的提交),不同的策略会有不同的行为, Clean阶段被分为生成 HoodieCleanerPlan和执行 HoodieCleanerPlan,下面分析 Clean的具体实现。

2. 分析

在每次 commit时,会判断是否开启自动 clean来执行 clean操作,或者由用户手动触发 clean操作。

2.1 生成HoodieCleanerPlan

对于在 commit时执行的 clean操作,其核心入口为 HoodieCleanClient#clean,核心代码如下

protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
    final HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
  table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
      // 先处理未完成的clean操作
      runClean(table, hoodieInstant.getTimestamp());
    });
    // 生成cleanerPlan
    Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
    if (cleanerPlanOpt.isPresent()) {
      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
      if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
          && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
        final HoodieTable<T> hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
        // 开始执行clean操作
        return runClean(hoodieTable, startCleanTime);
      }
    }
    return null;
  }

需要先处理未完成的 clean,然后再通过 scheduleClean方法生成 HoodieCleanerPlan,其核心代码如下

protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    // 生成Plan
    HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc);
    if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
      // 生成REQUESTED状态的instant
      HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
      // Save to both aux and timeline folder
      try {
        // 将Plan序列化后保存至aux文件夹(Plan序列化)和.hooidie文件夹(空内容)
        table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan));
      } catch (IOException e) {
        LOG.error("Got exception when saving cleaner requested file", e);
        throw new HoodieIOException(e.getMessage(), e);
      }
      return Option.of(cleanerPlan);
    }
    return Option.empty();
  }

会继续调用 HoodieCopyOnWriteTable#scheduleClean生成 HoodieCleanerPlan,核心代码如下

public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
    try {
      HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
      // 找出保留的最早的instant
      Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
      // 获取需要处理的partition,对于增量clean而言,会找到上
      List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant);
      if (partitionsToClean.isEmpty()) {
        // 空的Plan
        return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
      }
      Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism)
          .map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean)))
          .collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
      return new HoodieCleanerPlan(earliestInstant
          .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
          config.getCleanerPolicy().name(), cleanOps, 1);
    } catch (IOException e) {
      throw new HoodieIOException("Failed to schedule clean operation", e);
    }
  }

对于 HoodieCleanerPlan的生成,首先会找出最早需要保留的 instant,然后获取对应所有的分区路径。

2.1.1 获取待Clean的所有分区

获取所有待clean分区路径的核心代码如下

public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
    if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent()
        && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
      Option<HoodieInstant> lastClean =
          hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
      if (lastClean.isPresent()) {
        HoodieCleanMetadata cleanMetadata = AvroUtils
            .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
        if ((cleanMetadata.getEarliestCommitToRetain() != null)
            && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
          // 过滤出大于上次clean的instant时间并且小于当前clean中最早保留的instant时间
          return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> {
            return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
                HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
                newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER);
          }).flatMap(instant -> {
            try {
              // 获取对应的partition
              HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
                  hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
              return commitMetadata.getPartitionToWriteStats().keySet().stream();
            } catch (IOException e) {
              throw new HoodieIOException(e.getMessage(), e);
            }
          }).distinct().collect(Collectors.toList());
        }
      }
    }
    // Otherwise go to brute force mode of scanning all partitions
    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
        hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
  }

可以看到,如果开启了增量 clean并且当前 instant不为空,同时策略为 KEEP_LATEST_COMMITS时,则会先取上次已完成的 clean,然后过滤出从上次已完成 cleaninstant时间到当前clean中需要保留的最早的 instant时间,并获取对应的分区路径;否则直接返回所有的分区路径。

在获取了所有待 clean的分区路径后,还需要调用 HoodieCleanHelper#getDeletePaths获取所有待删除的文件路径。

2.1.2 获取待删除文件

获取待删除文件的核心代码如下

public List<String> getDeletePaths(String partitionPath) throws IOException {
    // 获取策略
    HoodieCleaningPolicy policy = config.getCleanerPolicy();
    List<String> deletePaths;
    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
      // 获取待删除的文件
      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
      // 获取待删除的文件
      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
    } else {
      throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
    }
    return deletePaths;
  }

可以看到,根据策略的不同调用不同方法来获取分区下待删除的所有文件。

对于 KEEP_LATEST_COMMITS策略而言, getFilesToCleanKeepingLatestCommits核心代码如下

private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) throws IOException {
    int commitsRetained = config.getCleanerCommitsRetained();
    List<String> deletePaths = new ArrayList<>();
    // 获取所有完成savepoint的文件
    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
        .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
    // 是否达到指定大小
    if (commitTimeline.countInstants() > commitsRetained) {
      // 找到最早需要保留的instant
      HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
      // 获取指定分区路径下的所有HoodieFileGroup
      List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
      for (HoodieFileGroup fileGroup : fileGroups) {
        // 获取所有的FileSlice
        List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
        if (fileSliceList.isEmpty()) {
          continue;
        }
        // 获取最大的instant时间
        String lastVersion = fileSliceList.get(0).getBaseInstantTime();
        // 获取小于最早需要保留的instant的最新版本
        String lastVersionBeforeEarliestCommitToRetain =
            getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
        // 遍历FileSlice
        for (FileSlice aSlice : fileSliceList) {
          // 获取数据文件
          Option<HoodieDataFile> aFile = aSlice.getDataFile();
          String fileCommitTime = aSlice.getBaseInstantTime();
          if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
            // 数据文件存在并且包含在savepoint文件里面,则略过
            continue;
          }
          if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
            // 文件时间与FileSlice时间相等,或者文件时间与小于最早需要保留的instant的最新版本相等,则略过
            continue;
          }
          // 文件不用于压缩并且最早需要保留时间大于文件时间
          if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
              .compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
            // 需要被clean
            aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName()));
            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
              // 如果是MOR,则将所有的日志也可一并clean
              deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
            }
          }
        }
      }
    }
    return deletePaths;
  }

可以看到,找出待删除的文件需要会经过一系列判断,如不能删除 savepoint文件,不能删除小于最早需要保留的 instant的时间文件(因为该文件可能还是会被使用),不能删除待压缩的文件。仅删除那些小于最早需要保留的 instant的文件,并且如果是 MOR类型,那么可以将日志文件一并删除。

对于 KEEP_LATEST_FILE_VERSIONS策略而言, getFilesToCleanKeepingLatestVersions获取待删除文件的核心代码如下

private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) throws IOException {
       // 获取所有的文件组
    List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
    List<String> deletePaths = new ArrayList<>();
    // 获取所有完成savepoint的文件
    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
        .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
    for (HoodieFileGroup fileGroup : fileGroups) {
      // 需要保留的版本
      int keepVersions = config.getCleanerFileVersionsRetained();
      // 过滤需要被压缩的文件
      Iterator<FileSlice> fileSliceIterator =
          fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
      if (isFileGroupInPendingCompaction(fileGroup)) { // 正进行compaction
        keepVersions--;
      }
      while (fileSliceIterator.hasNext() && keepVersions > 0) {
        FileSlice nextSlice = fileSliceIterator.next();
        Option<HoodieDataFile> dataFile = nextSlice.getDataFile();
        if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
          // 跳过savepoint文件处理
          continue;
        }
        keepVersions--;
      }
      while (fileSliceIterator.hasNext()) { // 还剩余FileSlice
        FileSlice nextSlice = fileSliceIterator.next();
        if (nextSlice.getDataFile().isPresent()) {
          HoodieDataFile dataFile = nextSlice.getDataFile().get();
          deletePaths.add(dataFile.getFileName());
        }
        if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
          // 如果是MOR,则将所有的日志也可一并clean
          deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
        }
      }
    }
    return deletePaths;
  }

可以看到,对于 savepoint的文件也不能删除,也不能删除待压缩的文件。仅删除那些不需要继续保留的版本的文件,如果是 MOR类型,那么可以将日志文件一并删除。

2.2 执行HoodieCleanerPlan

在生成 HoodieCleanerPlan后,会将其序列化并保存至元数据目录,然后开始执行,其核心在 HoodieCleanClient#runClean,其核心代码如下

public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant) {
    try {
      // 从元数据目录下反序列化HoodieCleanerPlan
      HoodieCleanerPlan cleanerPlan = AvroUtils.deserializeCleanerPlan(getActiveTimeline()
          .getInstantAuxiliaryDetails(HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get());
      // 计算并行度
      int cleanerParallelism = Math.min(
          (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()),
          config.getCleanerParallelism());
      List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
          .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
              .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<String, String>(x.getKey(), y)))
              .collect(Collectors.toList()), cleanerParallelism)
          .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect();
      Map<String, PartitionCleanStat> partitionCleanStatsMap =
          partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
      // 返回统计信息
      return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
        PartitionCleanStat partitionCleanStat =
            (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
                : new PartitionCleanStat(partitionPath);
        HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
        return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
            .withEarliestCommitRetained(Option.ofNullable(
                actionInstant != null
                    ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
                        actionInstant.getAction(), actionInstant.getTimestamp())
                    : null))
            .withDeletePathPattern(partitionCleanStat.deletePathPatterns)
            .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
            .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
      }).collect(Collectors.toList());
    } catch (IOException e) {
      throw new HoodieIOException("Failed to clean up after commit", e);
    }
  }

这块代码的核心逻辑非常简单,首先反序列出 HoodieCleanerPlan,然后再开始执行删除操作,实际删除文件操作由 deleteFilesFunc处理,然后返回 Clean的统计信息。

3. 总结

对于 Clean操作, Hudi提供了两种策略:基于文件版本和基于提交保留数。并且将 Clean分为生成 HoodieCleanerPlan和执行 HoodieCleanerPlan两个阶段,两阶段并不直接关联.在生成 HoodieCleanerPlan时会找出所有符合指定策略的待删除文件,并且为了避免每次全分区处理,Hudi还提供了增量 Clean配置项,即仅仅只处理从上次 Clean后影响的分区,然后将 HoodieCleanerPlan序列化至元数据(.aux)目录,在执行阶段会从元数据目录中反序列化后执行删除文件操作。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
存储 开发工具 git
【软考学习16】用位示图法,轻松解决空闲存储空间的管理难题
【软考学习16】用位示图法,轻松解决空闲存储空间的管理难题
405 0
|
3月前
|
存储 算法 NoSQL
11)面对千万级别的 key 应该如何节省内存
11)面对千万级别的 key 应该如何节省内存
37 0
11)面对千万级别的 key 应该如何节省内存
|
7月前
|
消息中间件 存储 容灾
AutoMQ 云上十倍成本节约的奥秘: SPOT 实例
AutoMQ Kafka 优化设计,充分利用云基础设施,尤其是成本低廉的Spot实例,实现公有云成本节约。尽管Spot实例的不确定性可能导致服务中断,AutoMQ通过Broker无状态化、快速弹性扩展和Serverless支持,以及应对Spot实例回收的优雅停机和容灾机制,确保了可靠的Kafka服务。混合使用按需实例以保证关键服务稳定,同时在面临Spot实例库存不足时,具备回退到按需实例的能力。AutoMQ Kafka通过创新技术在稳定性与成本之间找到了平衡,为用户提供灵活且经济高效的解决方案。
126 0
AutoMQ 云上十倍成本节约的奥秘: SPOT 实例
|
7月前
|
存储 安全 C++
联合:节省内存利器
联合:节省内存利器
|
存储 监控 对象存储
阿里云OSS预留空间全新发布 存储成本最高降低70%
阿里云OSS预留空间全新发布 存储成本最高降低70%,对象存储OSS预留空间是什么?预留空间是指定地域的,仅可抵扣该地域“标准存储 - 本地冗余”的OSS存储费用,不支持非存储容量费用抵扣,付费周期一年,阿里云对象存储推出全新预留空间产品(Reserved Capacity),客户购买一年的预留空间,较按量付费,最高可节省70%的费用。还有无地域属性预留空间
264 0
|
编解码 运维 监控
轻松处理高于平常10倍的视频需求,还能节省60%的IT成本,蓝墨做对了什么?
如果说Serverless到底解决了什么问题,核心就是节约成本、节省精力。
3341 13
轻松处理高于平常10倍的视频需求,还能节省60%的IT成本,蓝墨做对了什么?
|
存储 开发者
UPYUN 又拍云进行大幅度降价:数据量持续高速增长致成本降低
今天我们刚刚得到了SegmentFault 与开发者的好伙伴又拍云的官方消息,UPYUN(又拍云)进行了大幅度的价格调整。本次价格调整主要表现在存储空间和流量价格的全面下调,存储空间最高降价67%,流量最高降价40%。据了解,UPYUN本次进行价格调整的根本原因是过去一年UPYUN平台数据量持续高速增长令整体成本降低所致。
176 0
|
编解码 监控 Cloud Native
视频需求超平常数 10 倍,却节省了 60% 的 IT 成本投入是一种什么样的体验?
2020 年初,疫情期间,在线教育迎来需求爆发。为了应对高流量,蓝墨加大了整合业界优质课程资源的力度,不断拓展自身的业务边界,在赢得机遇的同时,技术团队也面临了前所未有的挑战。
视频需求超平常数 10 倍,却节省了 60% 的 IT 成本投入是一种什么样的体验?