Apache Hudi Savepoint实现分析

简介: Apache Hudi Savepoint实现分析

1. 介绍

Hudi提供了savepoint机制,即可对instant进行备份,当后续出现提交错误时,便可rollback至指定savepoint,这对于线上系统至为重要,而savepoint由hudi-CLI手动触发,下面分析savepoint的实现机制。

2. 分析

2.1 创建savepoint

创建savepoint的入口为 HoodieWriteClient#savepoint,其核心代码如下

public boolean savepoint(String commitTime, String user, String comment) {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
      // MERGE_ON_READ类型表不支持savepoint
      throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
    }
    // 获取已完成的clean的最后一个instant
    Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
    HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
    if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
      // instant不存在
      throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
    }
    try {
      String lastCommitRetained;
      if (cleanInstant.isPresent()) { // 若clean instant存在
        // 反序列化出clean的相关信息
        HoodieCleanMetadata cleanMetadata = AvroUtils
            .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
        // 保存的最早的commit
        lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
      } else {
        // 获取最早已完成的instant
        lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
      }
      // savepoint的时间必须大于/等于最早保留的commit时间
      Preconditions.checkArgument(
          HoodieTimeline.compareTimestamps(commitTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
          "Could not savepoint commit " + commitTime + " as this is beyond the lookup window " + lastCommitRetained);
      Map<String, List<String>> latestFilesMap = jsc
          .parallelize(FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(),
              config.shouldAssumeDatePartitioning()))
          .mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
            ReadOptimizedView view = table.getROFileSystemView();
            // 获取该partition下所有小于commitTime的文件
            List<String> latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime)
                .map(HoodieDataFile::getFileName).collect(Collectors.toList());
            return new Tuple2<>(partitionPath, latestFiles);
          }).collectAsMap();
      // 转化为savepoint metadata.
      HoodieSavepointMetadata metadata = AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap);
      // 创建savepoint类型的instant,在meta目录下会创建inflight类型文件
      table.getActiveTimeline().createNewInstant(
          new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime));
      // 标记为complete状态,并将savepoint metadata存入文件
      table.getActiveTimeline()
          .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime),
              AvroUtils.serializeSavepointMetadata(metadata));
      return true;
    } catch (IOException e) {
      throw new HoodieSavepointException("Failed to savepoint " + commitTime, e);
    }
  }

可以看到,首先会根据是否已经有完成的clean类型的instant,如果存在则会反序列化出对应的 HoodieCleanMetadata,并获取最早保留的commit的时间,然后获取所有分区路径下所有小于savepoint时间的文件,然后转化为 HoodieSavepointMetadata后保存至元数据目录下的文件中。创建savepoint的最终结果就是在元数据目录下创建了一个*.savepoint的文件。

2.2 回滚savepoint

在创建完savepoint之后,便可回滚至指定的savepoint,其入口为 HoodieWriteClient#rollbackToSavepoint,其核心代码如下

public boolean rollbackToSavepoint(String savepointTime) {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
    // rollback是需要手动操作,并且不支持并发写或compaction,rollback将会移除savepoint之后处于pending状态的compaction
    HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();
    HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
    boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
    if (!isSavepointPresent) { // 检查instant是否存在
      throw new HoodieRollbackException("No savepoint for commitTime " + savepointTime);
    }
    // 恢复至指定savepoint的instant
    restoreToInstant(savepointTime);
    Option<HoodieInstant> lastInstant =
        activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();
    Preconditions.checkArgument(lastInstant.isPresent());
    Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
        savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was "
            + lastInstant.get().getTimestamp());
    return true;
  }

可以看到,回滚至指定savepoint时必须是手动执行,然后校验该instant是否存在,然后调用 restoreToInstant恢复至指定instant, restoreToInstant的核心代码如下

public void restoreToInstant(final String instantTime) throws HoodieRollbackException {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    // 过滤出大于指定commit time的instant
    List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()
        .getReverseOrderedInstants()
        .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))
        .collect(Collectors.toList());
    // 创建新的instant
    String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime();
       //
    ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder();
    // 创建restore类型的instant,在元数据目录下创建inflight文件
    table.getActiveTimeline().createNewInstant(
        new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant));
    instantsToRollback.stream().forEach(instant -> {
      try {
        switch (instant.getAction()) {
          case HoodieTimeline.COMMIT_ACTION:
          case HoodieTimeline.DELTA_COMMIT_ACTION:
            // 回滚,从高到低
            List<HoodieRollbackStat> statsForInstant = doRollbackAndGetStats(instant);
            instantsToStats.put(instant.getTimestamp(), statsForInstant);
            break;
          case HoodieTimeline.COMPACTION_ACTION:
            // 回滚,从高到低
            List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant);
            instantsToStats.put(instant.getTimestamp(), statsForCompaction);
            break;
          default:
            throw new IllegalArgumentException("invalid action name " + instant.getAction());
        }
      } catch (IOException io) {
        throw new HoodieRollbackException("unable to rollback instant " + instant, io);
      }
    });
    try {
      // 结束恢复
      finishRestore(context, instantsToStats.build(),
          instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),
          startRollbackInstant, instantTime);
    } catch (IOException io) {
      throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
    }
  }

可以看到会根据指定的instant过滤出大于该instant的所有instant(包括commit和compaction类型的待回滚的instant),然后依次遍历每个instant,其中 doRollbackAndGetStats核心代码如下

private List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant instantToRollback) throws
      IOException {
    // 获取commit time
    final String commitToRollback = instantToRollback.getTimestamp();
    HoodieTable<T> table = HoodieTable.getHoodieTable(
        createMetaClient(true), config, jsc);
    HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
    // Check if any of the commits is a savepoint - do not allow rollback on those commits
    // 获取所有的已完成的savepoint的timeline
    List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp)
        .collect(Collectors.toList());
    savepoints.stream().forEach(s -> {
      if (s.contains(commitToRollback)) { // 为savepoint,则抛出异常,不允许回滚savepoint
        throw new HoodieRollbackException(
            "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);
      }
    });
    String lastCommit = commitToRollback;
    // 保证严格按照从高到低顺序回滚
    if ((lastCommit != null) && !commitTimeline.empty()
        && !commitTimeline.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) {
      throw new HoodieRollbackException(
          "Found commits after time :" + lastCommit + ", please rollback greater commits first");
    }
    // 获取处于inflight和requested状态的timeline
    List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
        .collect(Collectors.toList());
    if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) {
      throw new HoodieRollbackException(
          "Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first");
    }
    // 进行rollback
    List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true);
    // 回滚索引
    if (!index.rollbackCommit(commitToRollback)) {
      throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback);
    }
    return stats;
  }

可以看到,在进行回滚时会进行严格的检验,即从高的commit(timestamp大)到低的commit(timestamp小),在校验通过后会调用 rollback进行实际的回滚, rollback将会删除对应的instant和文件,具体细节后续会单独分析。

在回滚完后会调用 finishRestore表示结束恢复,把一些统计信息持久化,其核心代码如下

private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats,
      List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    Option<Long> durationInMs = Option.empty();
    Long numFilesDeleted = 0L;
    for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
      // 统计删除的文件数
      List<HoodieRollbackStat> stats = commitToStat.getValue();
      numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
    }
    // 转化为metadata
    HoodieRestoreMetadata restoreMetadata =
        AvroUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats);
    // 保存至meta目录
    table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime),
        AvroUtils.serializeRestoreMetadata(restoreMetadata));
    if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
      // 清理老的meta文件
      FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),
          table.getActiveTimeline().getRestoreTimeline().getInstants());
    }
  }

可以看到在rollback之后会生成一些统计信息,然后会将统计信息存储至元数据目录下。

3. 总结

Hudi提供了savepoint机制可对某一instant进行备份,然后可通过rollback回滚至指定的savepoint,但值得注意的是回滚只能从大的savepoint开始回滚,即存在多个savepoint的情况下,不能直接回退至较小的savepoint。而创建savepoint流程则会将待回滚的信息先存储至元数据目录,而回滚savepoint流程则会从最大的instant开始进行回滚,最后会将回滚的统计信息写入元数据目录。

目录
相关文章
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
225 2
|
15天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
16天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
2月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
32 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
54 1
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
73 0
|
2月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
143 11
|
3月前
|
存储 运维 数据处理
Apache Paimon:重塑阿里智能引擎数据处理新纪元,解锁高效存储与实时分析潜能!
【8月更文挑战第2天】探索 Apache Paimon 在阿里智能引擎的应用场景
209 2
|
4月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)

推荐镜像

更多