Hudi MergeOnRead存储类型时Upsert分析

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Hudi MergeOnRead存储类型时Upsert分析

1. 引入

Hudi提供了两种存储类型,即 CopyOnWriteCOWMergeOnReadMORCOW在数据插入时会直接写入parquet数据文件,对于更新时也会直接更新并写入新的parquet数据文件;而 MOR在数据插入时会写入parquet数据文件,对于更新时则一般会写入log增量日志文件,而后进行压缩合并。之前在Upsert在Hudi中的实现分析已经分析过在 COW类型下Hudi是如何处理 upsert,这篇文章主要分析在 MOR类型下Hudi是如何处理 upsert

2. 分析

COW类型时,对于记录的 upsert,其步骤如下:

  • 给记录打标签,即记录存在于哪些文件中,用于判断是进行更新还是插入操作。
  • 创建分区器用于重新分区。会创建多个 bucket,其对应分区总数,每个 bucket对应一个 FileId(已存在文件ID或新文件ID)和类型( INSERTUPDATE)。对于 INSERT操作,在查找分区录下所有的小文件后,优先将记录插入到这些小文件中,若还剩余记录,则插入新文件。
  • 重新进行分区,不同分区获取对应的 bucket后,则可知对该分区上的记录进行何种操作(由 bucket类型决定),对于 UPDATE操作,则合并老记录后写入新的parquet文件;对于 INSERT操作,则直接写入新的parquet文件。

MOR类型时,对于记录的 upsert,总体步骤与上述类似,只是创建的分区器类型为 HoodieMergeOnReadTable.MergeOnReadUpsertPartitioner,其为 HoodieCopyOnWriteTable.UpsertPartitioner子类,两者在查找小文件时的表现不同。

2.1. Insert

对于记录的 insert而言(分区对应的bucket类型为 INSERT),最终会调用 HoodieMergeOnReadTable#handleInsert方法来处理该操作,其核心代码如下

public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
      throws Exception {
    if (index.canIndexLogFiles()) { // 支持索引日志文件,则写入log日志文件
      return new MergeOnReadLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx);
    } else { // 不支持索引日志文件,则直接插入parquet数据文件
      return super.handleInsert(commitTime, idPfx, recordItr);
    }
  }

可以看到,其首先会判断所采用的索引是否支持索引日志文件,Hudi提供的三种类型的索引:HoodieBloomIndexHBaseIndexInMemoryHashIndex,其中 HoodieBloomIndex不支持索引日志文件,而其他两种均支持,支持索引表示可以对日志log文件进行插入操作,如只有log增量日志文件而无parquet数据文件(现在社区正打算对log增量日志文件支持索引,因此后续就可以直接写入log增量日志文件了)。

若支持索引日志文件,则会生成一个 MergeOnReadLazyInsertIterable对象,其是 CopyOnWriteLazyInsertIterable的子类,然后由其 consumeOneRecord提供写入,其核心代码如下

protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
      final HoodieRecord insertPayload = payload.record;
      List<WriteStatus> statuses = new ArrayList<>();
      if (handle == null) {
        handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
      }
      if (handle.canWrite(insertPayload)) { // 还可继续写入
        // 实际写入
        handle.write(insertPayload, payload.insertValue, payload.exception);
      } else { // 已经满了
        handle.close();
        statuses.add(handle.getWriteStatus());
        // 新生成对象继续处理写入
        handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
        // 实际写入
        handle.write(insertPayload, payload.insertValue, payload.exception);
      }
    }

可以看到其会借助 HoodieAppendHandle#write完成真正的写入,具体对于log文件格式及写入Hudi做了很多优化,后续专门分析。

若不支持索引日志文件,则会调用父类的方法处理插入,即会生成一个 CopyOnWriteLazyInsertIterable对象来处理写入,其会写入parquet数据文件,前面文章Upsert在Hudi中的实现分析已经分析过,不再赘述。

2.2. Update

对于记录的 update而言(分区对应的bucket类型为 UPDATE),最终会调用 HoodieMergeOnReadTable#handleUpdate方法来处理该操作,其核心代码如下

public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
      throws IOException {
    if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
      // 不支持索引日志文件并且小文件集合中包含该文件ID,则交由父类处理,会更新parquet文件
      return super.handleUpdate(commitTime, fileId, recordItr);
    } else {
      // 写入日志文件
      HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
      appendHandle.doAppend();
      appendHandle.close();
      return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())).iterator();
    }
  }

可以看到,首先判断是否支持索引日志文件并且小文件集合中是否包含了正在操作的文件。

若不支持索引日志文件并且操作的文件为小文件,则直接调用父类的 HoodieCopyOnWrite#handleUpdate方法将记录与老记录合并后写入新的parquet数据文件。

否则,则使用 HoodieAppendHandle将记录写入log增量日志文件。

下面分析对于 HoodieMergeOnReadTable.MergeOnReadUpsertPartitioner查找小文件的方法,这也与 HoodieCopyOnWriteTable.UpsertPartitioner区分器的主要不同点。方法核心代码如下

protected List<SmallFile> getSmallFiles(String partitionPath) {
      List<SmallFile> smallFileLocations = new ArrayList<>();
      // 获取commit、deltacommit的timeline
      HoodieTimeline commitTimeline = getCompletedCommitsTimeline();
      if (!commitTimeline.empty()) {
        // 获取最后一个instant
        HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
        List<FileSlice> allSmallFileSlices = new ArrayList<>();
        if (!index.canIndexLogFiles()) { // 不支持索引日志文件
          // 过滤所有的FileSlice,然后进行排序后选出第一个(最小)的FileSlice
          Option<FileSlice> smallFileSlice = Option.fromJavaOptional(getRTFileSystemView()
              .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
              .filter(fileSlice -> fileSlice.getLogFiles().count() < 1
                  && fileSlice.getDataFile().get().getFileSize() < config.getParquetSmallFileLimit())
              .sorted((FileSlice left,
                  FileSlice right) -> left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize()
                      ? -1
                      : 1)
              .findFirst());
          if (smallFileSlice.isPresent()) {
            allSmallFileSlices.add(smallFileSlice.get());
          }
        } else { // 支持索引日志文件
          // 获取最新的所有FileSlice
          List<FileSlice> allFileSlices =
              getRTFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
                  .collect(Collectors.toList());
          for (FileSlice fileSlice : allFileSlices) {
            if (isSmallFile(partitionPath, fileSlice)) { // 为小文件(结合数据文件和日志文件计算)
              allSmallFileSlices.add(fileSlice);
            }
          }
        }
        for (FileSlice smallFileSlice : allSmallFileSlices) {
          SmallFile sf = new SmallFile();
          if (smallFileSlice.getDataFile().isPresent()) { // 数据文件存在
            String filename = smallFileSlice.getDataFile().get().getFileName();
            // 基于数据文件构造属性信息
            sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
            sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice);
            smallFileLocations.add(sf);
            smallFiles.add(sf);
          } else { // 数据文件不存在
            HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
            // 基于第一个日志文件构造属性信息
            sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
                FSUtils.getFileIdFromLogPath(logFile.getPath()));
            sf.sizeBytes = getTotalFileSize(partitionPath, smallFileSlice);
            smallFileLocations.add(sf);
            smallFiles.add(sf);
          }
        }
      }
      return smallFileLocations;
    }

该方法首先会过滤 completed状态的 commitdeltacommit类型的 timeline,并找到最后一次 commit(可能是 commitdeltacommit),为 MOR类型时, timeline中的 commit表示已完成的 compact

若不支持索引日志文件,则查找最新的所有 FileSlice(由一个数据parquet数据文件和多个log增量日志文件组成)并且其数据文件大小小于配置的大小且无日志文件,然后排序后取最小的文件,该文件即为小文件(一个)。

若支持索引日志文件,则查找最新的所有 FileSlice进行遍历,并利用log增量日志文件信息然后生成小文件(多个)。

该方法获取的小文件用于在 handleUpdate时判断操作的文件是否为小文件,若为小文件并且不支持日志文件索引,则可直接更新该文件,否则生成新的log增量日志文件。

总结

对于 MOR类型存储而言,数据写入及更新流程与 COW大致相同;但对于 MOR类型而言,在 insert时,会根据是否支持索引日志文件来决定将记录写入log增量日志文件还是parquet数据文件(支持则写入log增量文件,否则写入parquet数据文件);在 update时,其也会根据是否支持直接写入日志文件和更新的文件是否为小文件来决定是否合并新老记录写入parquet数据或者将新记录写入log增量日志文件中(不支持并且为小文件,则直接更新旧的parquet文件记录并写入新的parquet数据文件,否则写入log增量文件中)。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
4月前
|
存储 测试技术 分布式数据库
提升 Apache Hudi Upsert 性能的三个建议
提升 Apache Hudi Upsert 性能的三个建议
85 1
|
4月前
|
SQL 关系型数据库 MySQL
使用CTAS 把mysql 表同步数据 到hologres ,Flink有什么参数可以使hologres 的字段都小写吗?
使用CTAS 把mysql 表同步数据 到hologres ,Flink有什么参数可以使hologres 的字段都小写吗?
341 0
|
26天前
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之CTAS特性只支持新增表,不支持删除表吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
存储 分布式计算 关系型数据库
实时数仓 Hologres产品使用合集之在行式存储的 Hologres 中新增一列,DB 会锁表吗,如果不会的话现在是怎么处理的呢
实时数仓Hologres的基本概念和特点:1.一站式实时数仓引擎:Hologres集成了数据仓库、在线分析处理(OLAP)和在线服务(Serving)能力于一体,适合实时数据分析和决策支持场景。2.兼容PostgreSQL协议:Hologres支持标准SQL(兼容PostgreSQL协议和语法),使得迁移和集成变得简单。3.海量数据处理能力:能够处理PB级数据的多维分析和即席查询,支持高并发低延迟查询。4.实时性:支持数据的实时写入、实时更新和实时分析,满足对数据新鲜度要求高的业务场景。5.与大数据生态集成:与MaxCompute、Flink、DataWorks等阿里云产品深度融合,提供离在线
|
3月前
|
存储 SQL 分布式计算
MaxCompute产品使用问题之upsert into语句如何在2.0事务表使用
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
40 10
|
4月前
|
消息中间件 分布式计算 Kafka
Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
,Flink 1.16.2 版本在流式读取 Iceberg upsert primary key 表方面存在一些限制
47 2
|
4月前
|
分布式计算 索引
Upsert在Hudi中的实现分析
Upsert在Hudi中的实现分析
71 0
|
4月前
|
SQL 存储 测试技术
提升50%+!Presto如何提升Hudi表查询性能?
提升50%+!Presto如何提升Hudi表查询性能?
113 0
|
10月前
|
SQL 存储
Hologres支持UPSERT操作
Hologres支持UPSERT操作
116 2