Apache Hudi Rollback实现分析

简介: Apache Hudi Rollback实现分析

1. 介绍

在发现有些commit出错时,可使用Hudi提供的rollback回滚至指定的commit,这样可防止出现错误的结果,并且当一次commit失败时,也会进行rollback操作,保证一次commit的原子性。

2. 分析

rollback(回滚)的入口在 HoodieWriteClient#rollback,其依赖 HoodieWriteClient#rollbackInternal方法完成实际的回滚,其核心代码如下

protected void rollbackInternal(String commitToRollback) {
    // 生成新的rollback时间
    final String startRollbackTime = HoodieActiveTimeline.createNewInstantTime();
    try {
      HoodieTable<T> table = HoodieTable.getHoodieTable(
          createMetaClient(true), config, jsc);
      // 找出第一个与rollback commit相等的instant
      Option<HoodieInstant> rollbackInstantOpt =
          Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
              .filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback))
              .findFirst());
      // 存在
      if (rollbackInstantOpt.isPresent()) {
        // 进行回滚
        List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackInstantOpt.get());
        // 结束回滚
        finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime);
      }
    } catch (IOException e) {
      throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback,
          e);
    }
  }

首先过滤出commit/delta_commit中是否存在待回滚instant的时间,如果存在,则进行回滚,回滚的核心方法为 doRollbackAndGetStats,该方法在前一篇讲解savepoint时已经分析过,该方法会调用 HoodieTable#rollback完成实际回滚动作,下面着重分析 HoodieTable#rollback方法,对于MOR和COW不同类型有不同实现,下面一一进行分析。

2.1 HoodieCopyOnWriteTable#rollback

对于COW类型而言, rollback核心代码如下

public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants)
      throws IOException {
    long startTime = System.currentTimeMillis();
    List<HoodieRollbackStat> stats = new ArrayList<>();
    HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
    if (instant.isCompleted()) { // instant状态为completed
      // 转变至inflight状态
      instant = activeTimeline.revertToInflight(instant);
    }
    if (!instant.isRequested()) { // 不为requested状态
      String commit = instant.getTimestamp();
      // 生成回滚的请求
      List<RollbackRequest> rollbackRequests = generateRollbackRequests(instant);
      // 进行回滚
      stats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests);
    }
    // 删除inflight和requested状态的instant
    deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant);
    return stats;
  }

可以看到,进行回滚总体分为四步:1. 对于处理completed状态的instant,首先会将其转变至inflight状态,而对于不处于requested状态的instant(compaction会存在requested状态);2. 生成回滚请求;3. 进行回滚;4. 删除instant。

2.1.1 转变instant状态

对于处于completed状态的instant,将其转变至 inflight状态,其核心代码如下

public HoodieInstant revertToInflight(HoodieInstant instant) {
    // 获取inflight状态的instant
    HoodieInstant inflight = HoodieTimeline.getInflightInstant(instant, metaClient.getTableType());
    // 转变至inflight,即文件名会变为.inflight
    revertCompleteToInflight(instant, inflight);
    return inflight;
  }

对于状态转变体现在文件名后缀的变化,即会变为 .inflght状态。

2.1.2 生成回滚请求

回滚请求由 generateRollbackRequests方法生成,其核心代码如下

private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
      throws IOException {
    // 获取所有的分区路径,对每个分区路径生成DELETE_DATA_AND_LOG_FILES类型的RollbackRequest
    return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
        config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback))
            .collect(Collectors.toList());
  }

会根据不同的分区路径生成不同的RollbackRequest,该方法会生成会生成DELETEDATAANDLOGFILES类型,指定分区路径的RollbackRequest。

2.1.3 进行回滚

通过 RollbackExecutor#performRollback进行回滚,其核心代码如下

public List<HoodieRollbackStat> performRollback(JavaSparkContext jsc, HoodieInstant instantToRollback,
      List<RollbackRequest> rollbackRequests) {
    SerializablePathFilter filter = (path) -> {
      if (path.toString().contains(".parquet")) {
        // 获取parquet文件提交时间
        String fileCommitTime = FSUtils.getCommitTime(path.getName());
        // 是否等于指定回滚的时间
        return instantToRollback.getTimestamp().equals(fileCommitTime);
      } else if (path.toString().contains(".log")) {
        // 获取log文件提交时间
        String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
        // 是否等于指定回滚的时间
        return instantToRollback.getTimestamp().equals(fileCommitTime);
      }
      return false;
    };
    int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
    return jsc.parallelize(rollbackRequests, sparkPartitions).mapToPair(rollbackRequest -> {
      final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
      switch (rollbackRequest.getRollbackAction()) { // rollback类型
        case DELETE_DATA_FILES_ONLY: { // 仅仅删除数据文件
          // 根据分区路径来删除该路径下文件
          deleteCleanedFiles(metaClient, config, filesToDeletedStatus, instantToRollback.getTimestamp(),
              rollbackRequest.getPartitionPath());
          return new Tuple2<>(rollbackRequest.getPartitionPath(),
                  HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
                          .withDeletedFileResults(filesToDeletedStatus).build());
        }
        case DELETE_DATA_AND_LOG_FILES: { // 删除数据文件和日志文件
          // 根据分区路径来删除该路径下文件
          deleteCleanedFiles(metaClient, config, filesToDeletedStatus, rollbackRequest.getPartitionPath(), filter);
          return new Tuple2<>(rollbackRequest.getPartitionPath(),
                  HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
                          .withDeletedFileResults(filesToDeletedStatus).build());
        }
        case APPEND_ROLLBACK_BLOCK: { // 添加ROLLBACK块
          Writer writer = null;
          try {
            writer = HoodieLogFormat.newWriterBuilder()
                .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
                .withFileId(rollbackRequest.getFileId().get())
                .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs())
                .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
            // 生成元数据,如生成控制块(CommandBlock)
            Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
            writer = writer.appendBlock(new HoodieCommandBlock(header));
          } catch (IOException | InterruptedException io) {
            throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
          } finally {
            try {
              if (writer != null) {
                writer.close();
              }
            } catch (IOException io) {
              throw new UncheckedIOException(io);
            }
          }
          Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
          filesToNumBlocksRollback.put(metaClient.getFs().getFileStatus(Preconditions.checkNotNull(writer).getLogFile().getPath()), 1L);
          return new Tuple2<>(rollbackRequest.getPartitionPath(),
                  HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
                          .withRollbackBlockAppendResults(filesToNumBlocksRollback).build());
        }
        default:
          throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
      }
    }).reduceByKey(this::mergeRollbackStat).map(Tuple2::_2).collect();
  }

对于DELETEDATAFILES_ONLY类型的rollback,会调用 deleteCleanedFiles来删除数据文件,其核心代码如下

private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
      Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
    FileSystem fs = metaClient.getFs();
    PathFilter filter = (path) -> {
      if (path.toString().contains(".parquet")) { // 数据文件
        String fileCommitTime = FSUtils.getCommitTime(path.getName());
        // 与rollback时间相等
        return commit.equals(fileCommitTime);
      }
      return false;
    };
    // 过滤出与rollback时间相等的所有parquet文件
    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
    for (FileStatus file : toBeDeleted) { // 逐一删除
      boolean success = fs.delete(file.getPath(), false);
      results.put(file, success);
    }
    return results;
  }

首先会过滤指定分区下所有与rollback时间相等的parquet文件,然后逐一删除。

对于DELETEDATAANDLOGFILES类型的rollback,会调用同名的 deleteCleanedFiles来删除文件,其核心代码如下

private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
      Map<FileStatus, Boolean> results, String partitionPath, PathFilter filter) throws IOException {
    FileSystem fs = metaClient.getFs();
    // 过滤出与rollback时间相等的所有parquet和log文件
    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
    for (FileStatus file : toBeDeleted) { // 逐一删除
      boolean success = fs.delete(file.getPath(), false);
      results.put(file, success);
    }
    return results;
  }

首先会过滤指定分区下所有与rollback时间相等的parquet/log文件,然后逐一删除。

对于APPENDROLLBACKBLOCK类型的rollback,会生成日志文件控制块并写入指定的文件中,在读取时,将不会读取该控制块的前一个块。

2.1.4 删除instant

在完成回滚后,还需要调用 deleteInflightAndRequestedInstant来删除instant,其核心代码如下

protected void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
      HoodieInstant instantToBeDeleted) {
    // 删除marker下的目录
    deleteMarkerDir(instantToBeDeleted.getTimestamp());
    if (deleteInstant) { // 删除instant
      // 删除处于pending状态的instant
      activeTimeline.deletePending(instantToBeDeleted);
      if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) {
        // 删除处于requested状态的instant
        instantToBeDeleted = new HoodieInstant(State.REQUESTED, instantToBeDeleted.getAction(),
            instantToBeDeleted.getTimestamp());
        activeTimeline.deletePending(instantToBeDeleted);
      }
    } 
  }

删除instant主要是删除处于inflight和requested状态的在元数据目录下的文件。

2.2 HoodieMergeOnReadTable#rollback

对于MOR而言, rollback核心代码如下

public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant,
      boolean deleteInstants) throws IOException {
    long startTime = System.currentTimeMillis();
    // rollback时间
    String commit = instant.getTimestamp();
    if (instant.isCompleted()) { // instant状态为completed
      // // 转变至inflight状态
      instant = this.getActiveTimeline().revertToInflight(instant);
    }
    List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();
    // 不为requested状态
    if (!instant.isRequested()) {
      // 生成rollback请求
      List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instant);
      // 进行回滚
      allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests);
    }
    // 删除inflight和requested状态的instant
    deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant);
    return allRollbackStats;
  }

可以看到其流程与COW相同,不再赘述。

3. 总结

对于rollback而言,其主要分为四步:转变instant状态;2. 生成回滚请求;3. 进行回滚;4. 删除instant。而回滚时会分为三种情况,对于 DELETE_DATA_FILES_ONLYDELETE_DATA_AND_LOG_FILES类型的rollback,会直接删除对应commit的数据文件和日志文件,而对于 APPEND_ROLLBACK_BLOCK类型,则会写入控制块至文件中,在读取时不读取其前一个块。

目录
相关文章
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
225 2
|
15天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
16天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
2月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
32 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
54 1
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
73 0
|
2月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
143 11
|
3月前
|
存储 运维 数据处理
Apache Paimon:重塑阿里智能引擎数据处理新纪元,解锁高效存储与实时分析潜能!
【8月更文挑战第2天】探索 Apache Paimon 在阿里智能引擎的应用场景
209 2
|
4月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)

推荐镜像

更多