Apache Hudi Savepoint实现分析

简介: Apache Hudi Savepoint实现分析

1. 介绍

Hudi提供了savepoint机制,即可对instant进行备份,当后续出现提交错误时,便可rollback至指定savepoint,这对于线上系统至为重要,而savepoint由hudi-CLI手动触发,下面分析savepoint的实现机制。

2. 分析

2.1 创建savepoint

创建savepoint的入口为 HoodieWriteClient#savepoint,其核心代码如下

public boolean savepoint(String commitTime, String user, String comment) {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
      // MERGE_ON_READ类型表不支持savepoint
      throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types");
    }
    // 获取已完成的clean的最后一个instant
    Option<HoodieInstant> cleanInstant = table.getCompletedCleanTimeline().lastInstant();
    HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
    if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
      // instant不存在
      throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant);
    }
    try {
      String lastCommitRetained;
      if (cleanInstant.isPresent()) { // 若clean instant存在
        // 反序列化出clean的相关信息
        HoodieCleanMetadata cleanMetadata = AvroUtils
            .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get());
        // 保存的最早的commit
        lastCommitRetained = cleanMetadata.getEarliestCommitToRetain();
      } else {
        // 获取最早已完成的instant
        lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp();
      }
      // savepoint的时间必须大于/等于最早保留的commit时间
      Preconditions.checkArgument(
          HoodieTimeline.compareTimestamps(commitTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
          "Could not savepoint commit " + commitTime + " as this is beyond the lookup window " + lastCommitRetained);
      Map<String, List<String>> latestFilesMap = jsc
          .parallelize(FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(),
              config.shouldAssumeDatePartitioning()))
          .mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
            ReadOptimizedView view = table.getROFileSystemView();
            // 获取该partition下所有小于commitTime的文件
            List<String> latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime)
                .map(HoodieDataFile::getFileName).collect(Collectors.toList());
            return new Tuple2<>(partitionPath, latestFiles);
          }).collectAsMap();
      // 转化为savepoint metadata.
      HoodieSavepointMetadata metadata = AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap);
      // 创建savepoint类型的instant,在meta目录下会创建inflight类型文件
      table.getActiveTimeline().createNewInstant(
          new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime));
      // 标记为complete状态,并将savepoint metadata存入文件
      table.getActiveTimeline()
          .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime),
              AvroUtils.serializeSavepointMetadata(metadata));
      return true;
    } catch (IOException e) {
      throw new HoodieSavepointException("Failed to savepoint " + commitTime, e);
    }
  }

可以看到,首先会根据是否已经有完成的clean类型的instant,如果存在则会反序列化出对应的 HoodieCleanMetadata,并获取最早保留的commit的时间,然后获取所有分区路径下所有小于savepoint时间的文件,然后转化为 HoodieSavepointMetadata后保存至元数据目录下的文件中。创建savepoint的最终结果就是在元数据目录下创建了一个*.savepoint的文件。

2.2 回滚savepoint

在创建完savepoint之后,便可回滚至指定的savepoint,其入口为 HoodieWriteClient#rollbackToSavepoint,其核心代码如下

public boolean rollbackToSavepoint(String savepointTime) {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
    // rollback是需要手动操作,并且不支持并发写或compaction,rollback将会移除savepoint之后处于pending状态的compaction
    HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();
    HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
    boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
    if (!isSavepointPresent) { // 检查instant是否存在
      throw new HoodieRollbackException("No savepoint for commitTime " + savepointTime);
    }
    // 恢复至指定savepoint的instant
    restoreToInstant(savepointTime);
    Option<HoodieInstant> lastInstant =
        activeTimeline.reload().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants().lastInstant();
    Preconditions.checkArgument(lastInstant.isPresent());
    Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
        savepointTime + "is not the last commit after rolling back " + commitsToRollback + ", last commit was "
            + lastInstant.get().getTimestamp());
    return true;
  }

可以看到,回滚至指定savepoint时必须是手动执行,然后校验该instant是否存在,然后调用 restoreToInstant恢复至指定instant, restoreToInstant的核心代码如下

public void restoreToInstant(final String instantTime) throws HoodieRollbackException {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    // 过滤出大于指定commit time的instant
    List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()
        .getReverseOrderedInstants()
        .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))
        .collect(Collectors.toList());
    // 创建新的instant
    String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime();
       //
    ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats = ImmutableMap.builder();
    // 创建restore类型的instant,在元数据目录下创建inflight文件
    table.getActiveTimeline().createNewInstant(
        new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant));
    instantsToRollback.stream().forEach(instant -> {
      try {
        switch (instant.getAction()) {
          case HoodieTimeline.COMMIT_ACTION:
          case HoodieTimeline.DELTA_COMMIT_ACTION:
            // 回滚,从高到低
            List<HoodieRollbackStat> statsForInstant = doRollbackAndGetStats(instant);
            instantsToStats.put(instant.getTimestamp(), statsForInstant);
            break;
          case HoodieTimeline.COMPACTION_ACTION:
            // 回滚,从高到低
            List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant);
            instantsToStats.put(instant.getTimestamp(), statsForCompaction);
            break;
          default:
            throw new IllegalArgumentException("invalid action name " + instant.getAction());
        }
      } catch (IOException io) {
        throw new HoodieRollbackException("unable to rollback instant " + instant, io);
      }
    });
    try {
      // 结束恢复
      finishRestore(context, instantsToStats.build(),
          instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),
          startRollbackInstant, instantTime);
    } catch (IOException io) {
      throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
    }
  }

可以看到会根据指定的instant过滤出大于该instant的所有instant(包括commit和compaction类型的待回滚的instant),然后依次遍历每个instant,其中 doRollbackAndGetStats核心代码如下

private List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant instantToRollback) throws
      IOException {
    // 获取commit time
    final String commitToRollback = instantToRollback.getTimestamp();
    HoodieTable<T> table = HoodieTable.getHoodieTable(
        createMetaClient(true), config, jsc);
    HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
    // Check if any of the commits is a savepoint - do not allow rollback on those commits
    // 获取所有的已完成的savepoint的timeline
    List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp)
        .collect(Collectors.toList());
    savepoints.stream().forEach(s -> {
      if (s.contains(commitToRollback)) { // 为savepoint,则抛出异常,不允许回滚savepoint
        throw new HoodieRollbackException(
            "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);
      }
    });
    String lastCommit = commitToRollback;
    // 保证严格按照从高到低顺序回滚
    if ((lastCommit != null) && !commitTimeline.empty()
        && !commitTimeline.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) {
      throw new HoodieRollbackException(
          "Found commits after time :" + lastCommit + ", please rollback greater commits first");
    }
    // 获取处于inflight和requested状态的timeline
    List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
        .collect(Collectors.toList());
    if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) {
      throw new HoodieRollbackException(
          "Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first");
    }
    // 进行rollback
    List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true);
    // 回滚索引
    if (!index.rollbackCommit(commitToRollback)) {
      throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback);
    }
    return stats;
  }

可以看到,在进行回滚时会进行严格的检验,即从高的commit(timestamp大)到低的commit(timestamp小),在校验通过后会调用 rollback进行实际的回滚, rollback将会删除对应的instant和文件,具体细节后续会单独分析。

在回滚完后会调用 finishRestore表示结束恢复,把一些统计信息持久化,其核心代码如下

private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats,
      List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
    HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
    Option<Long> durationInMs = Option.empty();
    Long numFilesDeleted = 0L;
    for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
      // 统计删除的文件数
      List<HoodieRollbackStat> stats = commitToStat.getValue();
      numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
    }
    // 转化为metadata
    HoodieRestoreMetadata restoreMetadata =
        AvroUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats);
    // 保存至meta目录
    table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime),
        AvroUtils.serializeRestoreMetadata(restoreMetadata));
    if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
      // 清理老的meta文件
      FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),
          table.getActiveTimeline().getRestoreTimeline().getInstants());
    }
  }

可以看到在rollback之后会生成一些统计信息,然后会将统计信息存储至元数据目录下。

3. 总结

Hudi提供了savepoint机制可对某一instant进行备份,然后可通过rollback回滚至指定的savepoint,但值得注意的是回滚只能从大的savepoint开始回滚,即存在多个savepoint的情况下,不能直接回退至较小的savepoint。而创建savepoint流程则会将待回滚的信息先存储至元数据目录,而回滚savepoint流程则会从最大的instant开始进行回滚,最后会将回滚的统计信息写入元数据目录。

目录
相关文章
|
2月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
108 2
|
23天前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
1月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
82 11
|
2月前
|
存储 运维 数据处理
Apache Paimon:重塑阿里智能引擎数据处理新纪元,解锁高效存储与实时分析潜能!
【8月更文挑战第2天】探索 Apache Paimon 在阿里智能引擎的应用场景
171 2
|
3月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
4月前
|
SQL 运维 druid
深度分析:Apache Doris及其在大数据处理中的应用
Apache Doris是一款开源的高性能实时分析数据库,设计用于低延迟SQL查询和实时数据处理,适合大规模实时分析场景。与Apache Druid、ClickHouse和Greenplum相比,Doris在易用性和实时性上有优势,但其他产品在特定领域如高吞吐、SQL支持或数据处理有特长。选型要考虑查询性能、实时性、SQL需求和运维成本。Doris适用于实时数据分析、BI报表、数据中台和物联网数据处理。使用时注意资源配置、数据模型设计、监控调优和导入策略。
|
4月前
|
easyexcel Java API
Apache POI与easyExcel:Excel文件导入导出的技术深度分析
Apache POI与easyExcel:Excel文件导入导出的技术深度分析
|
4月前
|
消息中间件 存储 大数据
深度分析:Apache Kafka及其在大数据处理中的应用
Apache Kafka是高吞吐、低延迟的分布式流处理平台,常用于实时数据流、日志收集和事件驱动架构。与RabbitMQ(吞吐量有限)、Pulsar(多租户支持但生态系统小)和Amazon Kinesis(托管服务,成本高)对比,Kafka在高吞吐和持久化上有优势。适用场景包括实时处理、数据集成、日志收集和消息传递。选型需考虑吞吐延迟、持久化、协议支持等因素,使用时注意资源配置、数据管理、监控及安全性。
|
4月前
|
消息中间件 分布式计算 Kafka
深度分析:Apache Flink及其在大数据处理中的应用
Apache Flink是低延迟、高吞吐量的流处理框架,以其状态管理和事件时间处理能力脱颖而出。与Apache Spark Streaming相比,Flink在实时性上更强,但Spark生态系统更丰富。Apache Storm在低延迟上有优势,而Kafka Streams适合轻量级流处理。选型考虑延迟、状态管理、生态系统和运维成本。Flink适用于实时数据分析、复杂事件处理等场景,使用时注意资源配置、状态管理和窗口操作的优化。
|
4月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

推荐镜像

更多
下一篇
无影云桌面