详解ApacheHudi如何节约宝贵的存储空间

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 详解ApacheHudi如何节约宝贵的存储空间

1. 介绍

对于旧版本的数据 Hudi需要将其删除以节约宝贵的存储空间, Clean操作有两种策略:KEEP_LATEST_FILE_VERSIONS(保留最新的文件版本)和 KEEP_LATEST_COMMITS(保留最新的提交),不同的策略会有不同的行为, Clean阶段被分为生成 HoodieCleanerPlan和执行 HoodieCleanerPlan,下面分析 Clean的具体实现。

2. 分析

在每次 commit时,会判断是否开启自动 clean来执行 clean操作,或者由用户手动触发 clean操作。

2.1 生成HoodieCleanerPlan

对于在 commit时执行的 clean操作,其核心入口为 HoodieCleanClient#clean,核心代码如下

protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
    final HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
  table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
      // 先处理未完成的clean操作
      runClean(table, hoodieInstant.getTimestamp());
    });
    // 生成cleanerPlan
    Option<HoodieCleanerPlan> cleanerPlanOpt = scheduleClean(startCleanTime);
    if (cleanerPlanOpt.isPresent()) {
      HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get();
      if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
          && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
        final HoodieTable<T> hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
        // 开始执行clean操作
        return runClean(hoodieTable, startCleanTime);
      }
    }
    return null;
  }

需要先处理未完成的 clean,然后再通过 scheduleClean方法生成 HoodieCleanerPlan,其核心代码如下

protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    // 生成Plan
    HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc);
    if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null)
        && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) {
      // 生成REQUESTED状态的instant
      HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
      // Save to both aux and timeline folder
      try {
        // 将Plan序列化后保存至aux文件夹(Plan序列化)和.hooidie文件夹(空内容)
        table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan));
      } catch (IOException e) {
        LOG.error("Got exception when saving cleaner requested file", e);
        throw new HoodieIOException(e.getMessage(), e);
      }
      return Option.of(cleanerPlan);
    }
    return Option.empty();
  }

会继续调用 HoodieCopyOnWriteTable#scheduleClean生成 HoodieCleanerPlan,核心代码如下

public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
    try {
      HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
      // 找出保留的最早的instant
      Option<HoodieInstant> earliestInstant = cleaner.getEarliestCommitToRetain();
      // 获取需要处理的partition,对于增量clean而言,会找到上
      List<String> partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant);
      if (partitionsToClean.isEmpty()) {
        // 空的Plan
        return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
      }
      Map<String, List<String>> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism)
          .map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean)))
          .collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
      return new HoodieCleanerPlan(earliestInstant
          .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
          config.getCleanerPolicy().name(), cleanOps, 1);
    } catch (IOException e) {
      throw new HoodieIOException("Failed to schedule clean operation", e);
    }
  }

对于 HoodieCleanerPlan的生成,首先会找出最早需要保留的 instant,然后获取对应所有的分区路径。

2.1.1 获取待Clean的所有分区

获取所有待clean分区路径的核心代码如下

public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
    if (config.incrementalCleanerModeEnabled() && newInstantToRetain.isPresent()
        && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
      Option<HoodieInstant> lastClean =
          hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
      if (lastClean.isPresent()) {
        HoodieCleanMetadata cleanMetadata = AvroUtils
            .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
        if ((cleanMetadata.getEarliestCommitToRetain() != null)
            && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
          // 过滤出大于上次clean的instant时间并且小于当前clean中最早保留的instant时间
          return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> {
            return HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
                HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
                newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER);
          }).flatMap(instant -> {
            try {
              // 获取对应的partition
              HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
                  hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
              return commitMetadata.getPartitionToWriteStats().keySet().stream();
            } catch (IOException e) {
              throw new HoodieIOException(e.getMessage(), e);
            }
          }).distinct().collect(Collectors.toList());
        }
      }
    }
    // Otherwise go to brute force mode of scanning all partitions
    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
        hoodieTable.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
  }

可以看到,如果开启了增量 clean并且当前 instant不为空,同时策略为 KEEP_LATEST_COMMITS时,则会先取上次已完成的 clean,然后过滤出从上次已完成 cleaninstant时间到当前clean中需要保留的最早的 instant时间,并获取对应的分区路径;否则直接返回所有的分区路径。

在获取了所有待 clean的分区路径后,还需要调用 HoodieCleanHelper#getDeletePaths获取所有待删除的文件路径。

2.1.2 获取待删除文件

获取待删除文件的核心代码如下

public List<String> getDeletePaths(String partitionPath) throws IOException {
    // 获取策略
    HoodieCleaningPolicy policy = config.getCleanerPolicy();
    List<String> deletePaths;
    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
      // 获取待删除的文件
      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
      // 获取待删除的文件
      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
    } else {
      throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
    }
    return deletePaths;
  }

可以看到,根据策略的不同调用不同方法来获取分区下待删除的所有文件。

对于 KEEP_LATEST_COMMITS策略而言, getFilesToCleanKeepingLatestCommits核心代码如下

private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) throws IOException {
    int commitsRetained = config.getCleanerCommitsRetained();
    List<String> deletePaths = new ArrayList<>();
    // 获取所有完成savepoint的文件
    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
        .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
    // 是否达到指定大小
    if (commitTimeline.countInstants() > commitsRetained) {
      // 找到最早需要保留的instant
      HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
      // 获取指定分区路径下的所有HoodieFileGroup
      List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
      for (HoodieFileGroup fileGroup : fileGroups) {
        // 获取所有的FileSlice
        List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
        if (fileSliceList.isEmpty()) {
          continue;
        }
        // 获取最大的instant时间
        String lastVersion = fileSliceList.get(0).getBaseInstantTime();
        // 获取小于最早需要保留的instant的最新版本
        String lastVersionBeforeEarliestCommitToRetain =
            getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
        // 遍历FileSlice
        for (FileSlice aSlice : fileSliceList) {
          // 获取数据文件
          Option<HoodieDataFile> aFile = aSlice.getDataFile();
          String fileCommitTime = aSlice.getBaseInstantTime();
          if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
            // 数据文件存在并且包含在savepoint文件里面,则略过
            continue;
          }
          if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
            // 文件时间与FileSlice时间相等,或者文件时间与小于最早需要保留的instant的最新版本相等,则略过
            continue;
          }
          // 文件不用于压缩并且最早需要保留时间大于文件时间
          if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
              .compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
            // 需要被clean
            aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName()));
            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
              // 如果是MOR,则将所有的日志也可一并clean
              deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
            }
          }
        }
      }
    }
    return deletePaths;
  }

可以看到,找出待删除的文件需要会经过一系列判断,如不能删除 savepoint文件,不能删除小于最早需要保留的 instant的时间文件(因为该文件可能还是会被使用),不能删除待压缩的文件。仅删除那些小于最早需要保留的 instant的文件,并且如果是 MOR类型,那么可以将日志文件一并删除。

对于 KEEP_LATEST_FILE_VERSIONS策略而言, getFilesToCleanKeepingLatestVersions获取待删除文件的核心代码如下

private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) throws IOException {
       // 获取所有的文件组
    List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
    List<String> deletePaths = new ArrayList<>();
    // 获取所有完成savepoint的文件
    List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
        .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
    for (HoodieFileGroup fileGroup : fileGroups) {
      // 需要保留的版本
      int keepVersions = config.getCleanerFileVersionsRetained();
      // 过滤需要被压缩的文件
      Iterator<FileSlice> fileSliceIterator =
          fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
      if (isFileGroupInPendingCompaction(fileGroup)) { // 正进行compaction
        keepVersions--;
      }
      while (fileSliceIterator.hasNext() && keepVersions > 0) {
        FileSlice nextSlice = fileSliceIterator.next();
        Option<HoodieDataFile> dataFile = nextSlice.getDataFile();
        if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
          // 跳过savepoint文件处理
          continue;
        }
        keepVersions--;
      }
      while (fileSliceIterator.hasNext()) { // 还剩余FileSlice
        FileSlice nextSlice = fileSliceIterator.next();
        if (nextSlice.getDataFile().isPresent()) {
          HoodieDataFile dataFile = nextSlice.getDataFile().get();
          deletePaths.add(dataFile.getFileName());
        }
        if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
          // 如果是MOR,则将所有的日志也可一并clean
          deletePaths.addAll(nextSlice.getLogFiles().map(file -> file.getFileName()).collect(Collectors.toList()));
        }
      }
    }
    return deletePaths;
  }

可以看到,对于 savepoint的文件也不能删除,也不能删除待压缩的文件。仅删除那些不需要继续保留的版本的文件,如果是 MOR类型,那么可以将日志文件一并删除。

2.2 执行HoodieCleanerPlan

在生成 HoodieCleanerPlan后,会将其序列化并保存至元数据目录,然后开始执行,其核心在 HoodieCleanClient#runClean,其核心代码如下

public List<HoodieCleanStat> clean(JavaSparkContext jsc, HoodieInstant cleanInstant) {
    try {
      // 从元数据目录下反序列化HoodieCleanerPlan
      HoodieCleanerPlan cleanerPlan = AvroUtils.deserializeCleanerPlan(getActiveTimeline()
          .getInstantAuxiliaryDetails(HoodieTimeline.getCleanRequestedInstant(cleanInstant.getTimestamp())).get());
      // 计算并行度
      int cleanerParallelism = Math.min(
          (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(x -> x.size()).count()),
          config.getCleanerParallelism());
      List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
          .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
              .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<String, String>(x.getKey(), y)))
              .collect(Collectors.toList()), cleanerParallelism)
          .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey((e1, e2) -> e1.merge(e2)).collect();
      Map<String, PartitionCleanStat> partitionCleanStatsMap =
          partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
      // 返回统计信息
      return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
        PartitionCleanStat partitionCleanStat =
            (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath)
                : new PartitionCleanStat(partitionPath);
        HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
        return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
            .withEarliestCommitRetained(Option.ofNullable(
                actionInstant != null
                    ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
                        actionInstant.getAction(), actionInstant.getTimestamp())
                    : null))
            .withDeletePathPattern(partitionCleanStat.deletePathPatterns)
            .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles)
            .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build();
      }).collect(Collectors.toList());
    } catch (IOException e) {
      throw new HoodieIOException("Failed to clean up after commit", e);
    }
  }

这块代码的核心逻辑非常简单,首先反序列出 HoodieCleanerPlan,然后再开始执行删除操作,实际删除文件操作由 deleteFilesFunc处理,然后返回 Clean的统计信息。

3. 总结

对于 Clean操作, Hudi提供了两种策略:基于文件版本和基于提交保留数。并且将 Clean分为生成 HoodieCleanerPlan和执行 HoodieCleanerPlan两个阶段,两阶段并不直接关联.在生成 HoodieCleanerPlan时会找出所有符合指定策略的待删除文件,并且为了避免每次全分区处理,Hudi还提供了增量 Clean配置项,即仅仅只处理从上次 Clean后影响的分区,然后将 HoodieCleanerPlan序列化至元数据(.aux)目录,在执行阶段会从元数据目录中反序列化后执行删除文件操作。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
2月前
|
存储 算法 NoSQL
11)面对千万级别的 key 应该如何节省内存
11)面对千万级别的 key 应该如何节省内存
31 0
11)面对千万级别的 key 应该如何节省内存
|
6月前
|
存储 安全 C++
联合:节省内存利器
联合:节省内存利器
|
存储 监控 对象存储
阿里云OSS预留空间全新发布 存储成本最高降低70%
阿里云OSS预留空间全新发布 存储成本最高降低70%,对象存储OSS预留空间是什么?预留空间是指定地域的,仅可抵扣该地域“标准存储 - 本地冗余”的OSS存储费用,不支持非存储容量费用抵扣,付费周期一年,阿里云对象存储推出全新预留空间产品(Reserved Capacity),客户购买一年的预留空间,较按量付费,最高可节省70%的费用。还有无地域属性预留空间
262 0
|
编解码 运维 监控
轻松处理高于平常10倍的视频需求,还能节省60%的IT成本,蓝墨做对了什么?
如果说Serverless到底解决了什么问题,核心就是节约成本、节省精力。
3339 11
轻松处理高于平常10倍的视频需求,还能节省60%的IT成本,蓝墨做对了什么?
|
存储 监控 架构师
避免云端浪费的5种方法
很多企业采用云计算的一个重要原因是为了避免建设和运营服务器机房或数据中心的费用以节省成本。但是,如果没有正确采用云计算技术,仍然会遭受资金损失。因此需要避免一些代价高昂的错误方法。
223 0
|
存储 弹性计算 NoSQL
突破内存应用瓶颈,让IT成本下降40%的秘诀
近两年5G、大数据、云计算一直为行业热点,数字化进程不断加速,全行业数据开始爆发式增长。面对数据的迅猛增长,企业一方面享受着数据化转型带来的红利,另一方面也承担着大内存运行实例的高额开支。传统内存面临挑战,持久内存方案开始受到了行业更多的关注。
突破内存应用瓶颈,让IT成本下降40%的秘诀
|
编解码 监控 Cloud Native
视频需求超平常数 10 倍,却节省了 60% 的 IT 成本投入是一种什么样的体验?
2020 年初,疫情期间,在线教育迎来需求爆发。为了应对高流量,蓝墨加大了整合业界优质课程资源的力度,不断拓展自身的业务边界,在赢得机遇的同时,技术团队也面临了前所未有的挑战。
视频需求超平常数 10 倍,却节省了 60% 的 IT 成本投入是一种什么样的体验?
|
Prometheus 运维 Kubernetes
了解这5大K8S管理服务,为你节省50%的部署时间!
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! Kubernetes已然成为IT世界的重要组成部分,并且仍在不断地发展壮大,现阶段,Kubernetes已经可以帮助企业进行微服务训练,加速企业数字化转型。
了解这5大K8S管理服务,为你节省50%的部署时间!