Apache Hudi索引实现分析(一)之HoodieBloomIndex

简介: Apache Hudi索引实现分析(一)之HoodieBloomIndex

1. 介绍

为了加快数据的upsert,Hudi提供了索引机制,现在Hudi内置支持四种索引:HoodieBloomIndex、HoodieGlobalBloomIndex、InMemoryHashIndex和HBaseIndex,下面对Hudi基于BloomFilter索引机制进行分析。

2. 分析

对于所有索引类型的基类HoodieIndex,其包含了如下核心的抽象方法

// 给输入记录RDD打位置标签  
public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
      HoodieTable<T> hoodieTable) throws HoodieIndexException;

对于Hudi默认实现HoodieBloomIndex,在给输入记录打位置标签时,会有如下步骤

1.根据配置缓存输入记录JavaRDD,避免重复加载开销。

2.将输入记录JavaRDD转化为JavaPairRDD。

3.根据索引查看位置信息,获取JavaPairRDD。

4.缓存第三步结果。

5.将位置信息推回给输入记录后返回。

2.1 LookupIndex分析

其中第三步的主要逻辑在 HoodieBloomIndex#lookupIndex方法中,其核心代码如下

private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
      JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext jsc,
      final HoodieTable hoodieTable) {
    // 1. 按照分区路径进行一次count
    Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
    List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
    // 2. 加载分区下所有最新的数据文件
    List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
        loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable);
    // 按照分区路径先进行分组
    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
        fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
    // 3. 先计算合适的并行度,然后继续查找包含记录的文件  
    // 会根据之前的最大和最小recordKey过滤不需要进行比较的文件  
    Map<String, Long> comparisonsPerFileGroup =
        computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
    int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
    int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
    return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
        comparisonsPerFileGroup);
  }

查看索引方法的逻辑非常简单,主要分为三步(1. 根据分区路径进行count、2. 加载分区下所有最新的文件、3. 查找包含记录的文件)。

第二步中加载分区下所有最新的文件的逻辑在 HoodieBloomIndex#loadInvolvedFiles方法中,其核心代码如下

List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
      final HoodieTable hoodieTable) {
    // 获取所有分区下最新的文件
    List<Pair<String, String>> partitionPathFileIDList =
        jsc.parallelize(partitions, Math.max(partitions.size(), 1)).flatMap(partitionPath -> {
          // 最新完成的commit
          Option<HoodieInstant> latestCommitTime =
              hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
          // Pair<分区路径, 文件ID>
          List<Pair<String, String>> filteredFiles = new ArrayList<>();
          if (latestCommitTime.isPresent()) { // 存在上一次commit
            // 获取指定分区下小于指定时间的所有数据文件  
            filteredFiles = hoodieTable.getBaseFileOnlyView()
                .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
                .map(f -> Pair.of(partitionPath, f.getFileId())).collect(toList());
          }
          return filteredFiles.iterator();
        }).collect();
    // hoodie.bloom.index.prune.by.ranges配置项为true
    if (config.getBloomIndexPruneByRanges()) {
      return jsc.parallelize(partitionPathFileIDList, Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> {
        try {
          HoodieRangeInfoHandle<T> rangeInfoHandle = new HoodieRangeInfoHandle<T>(config, hoodieTable, pf);
          // 从指定文件获取对应的最大和最小recordKey  
          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
          return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
        } catch (MetadataNotFoundException me) {
          // 出错时默认最大和最小recordKey为null
          return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
        }
      }).collect();
    } else {
      // 配置项未开启则默认最大和最小recordKey为null  
      return partitionPathFileIDList.stream()
          .map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
    }
  }

该方法最核心的逻辑便是获取分区下最新的数据文件。若文件ID相同,但是commitTime不同,那么会返回小于指定commitTime,最新提交的文件;若文件ID不同,那么返回小于指定commitTime的最新提交文件即可,总结而言就是如果同一文件ID对应多个文件,则选取最新的文件。然后根据配置决定是否从文件读取最大最小的recordKey,最大最小recordKey可用于后续过滤不相关的文件,否则会比较分区下所有的文件。

第三步中查找包含记录的文件在 HoodieBloomIndex#findMatchingFilesForRecordKeys中,其核心代码如下

JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
      Map<String, Long> fileGroupToComparisons) {
    // 1. 查找需要比对的文件Tuple2  
    JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
        explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
    // hoodie.bloom.index.bucketized.checking = true,默认为true
    if (config.useBloomIndexBucketizedChecking()) {
      // 2. 使用Partitioner重新分区
      Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
          config.getBloomIndexKeysPerBucket());
      fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
          .repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2);
    } else {
      fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
    }
    // 3. 使用分区CheckFunction进行处理,然后将处理的结果转化为对应的HoodieKey
    return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true)
        .flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)
        .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
            .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
                new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())))
            .collect(Collectors.toList()).iterator());
  }

该方法可分为三步,首先查找对应的文件(通过最大和最小recordKey过滤),然后进行重新分区或者排序,最后处理分区。其中查找记录对应的文件 explodeRecordRDDWithFileComparisons方法核心逻辑如下

JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
    // 使用索引过滤器,根据之前读取的最大和最小recordKey进行初始化
    IndexFileFilter indexFileFilter =
        config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
            : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
    return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
      String recordKey = partitionRecordKeyPair._2();
      String partitionPath = partitionRecordKeyPair._1();
      // 获取匹配的文件,recordKey是否大于最小recordKey和小于最大recordKey
      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
          .map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
              new HoodieKey(recordKey, partitionPath)))
          .collect(Collectors.toList());
    }).flatMap(List::iterator);
  }

可以看到,该方法最核心的逻辑就是根据之前从文件中读取的最大和最小的recordKey来过滤无需比较的文件。

使用 HoodieBloomIndexCheckFunction处理分区记录核心逻辑如下

protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
      List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>();
      try {
        // 一个个文件处理
        while (inputItr.hasNext()) {
          Tuple2<String, HoodieKey> currentTuple = inputItr.next();
          String fileId = currentTuple._1;
          String partitionPath = currentTuple._2.getPartitionPath();
          String recordKey = currentTuple._2.getRecordKey();
          Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
          if (keyLookupHandle == null) {
            // 初始化Handle  
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
          }
          // 处理当前文件
          if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
            // 将recordkey添加  
            keyLookupHandle.addKey(recordKey);
          } else {
            // 获取结果
            ret.add(keyLookupHandle.getLookupResult());
            // 重新生成Handle  
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
            keyLookupHandle.addKey(recordKey);
            break;
          }
        }
        if (!inputItr.hasNext()) {
          ret.add(keyLookupHandle.getLookupResult());
        }
      } catch (Throwable e) {
        if (e instanceof HoodieException) {
          throw e;
        }
        throw new HoodieIndexException("Error checking bloom filter index. ", e);
      }
      return ret;
    }

当遍历分区上所有记录时,会按照文件进行处理(通过keyLookupHandle控制处理),通过 HoodieKeyLookupHandle#addKey方法将recordKey添加至keyLookupHandle,其核心代码如下

public void addKey(String recordKey) {
    // 布隆过滤器中是否包含该recordKey,布隆过滤器会从文件中反序列化
    if (bloomFilter.mightContain(recordKey)) {
      // 如果包含则加入候选列表,待进一步确认
      candidateRecordKeys.add(recordKey);
    }
    totalKeysChecked++;
  }

通过 HoodieKeyLookupHandle#getLookupResult获取结果,核心代码如下

public KeyLookupResult getLookupResult() {
    // 获取文件ID对应的最新的数据文件
    HoodieBaseFile dataFile = getLatestDataFile();
    // 对比文件中所有记录和候选列表,找出实际存在的recordKey  
    List<String> matchingKeys =
        checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
    return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
        dataFile.getCommitTime(), matchingKeys);
  }

可以看到获取结果时会执行实际的查找逻辑,即对比文件中所有记录和候选的列表,找出实际存在的recordKey列表。

2.2 tagLocationBacktoRecords分析

在查找完位置信息后,便可将位置信息推回给原始记录,其核心代码如下

protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
      JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
    // 将原始记录进行转化  
    JavaPairRDD<HoodieKey, HoodieRecord<T>> keyRecordPairRDD =
        recordRDD.mapToPair(record -> new Tuple2<>(record.getKey(), record));
    // 以HoodieKey进行一次左外连接,确定位置信息
    return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values()
        .map(v1 -> getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull())));
  }

可以看到通过左外连接便将之前的位置信息推回至原始记录中,这样便完成了对原始记录打位置标签过程。

3. 总结

Hudi默认采用的HoodieBloomIndex索引,其依赖布隆过滤器来判断记录存在与否,当记录存在时,会读取实际文件进行二次判断,以便修正布隆过滤器带来的误差。同时还在每个文件元数据中添加了该文件保存的最大和最小的recordKey,借助该值可过滤出无需对比的文件。

目录
相关文章
|
2月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
108 2
|
20天前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
28天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
78 11
|
2月前
|
存储 运维 数据处理
Apache Paimon:重塑阿里智能引擎数据处理新纪元,解锁高效存储与实时分析潜能!
【8月更文挑战第2天】探索 Apache Paimon 在阿里智能引擎的应用场景
171 2
|
3月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
4月前
|
SQL 运维 druid
深度分析:Apache Doris及其在大数据处理中的应用
Apache Doris是一款开源的高性能实时分析数据库,设计用于低延迟SQL查询和实时数据处理,适合大规模实时分析场景。与Apache Druid、ClickHouse和Greenplum相比,Doris在易用性和实时性上有优势,但其他产品在特定领域如高吞吐、SQL支持或数据处理有特长。选型要考虑查询性能、实时性、SQL需求和运维成本。Doris适用于实时数据分析、BI报表、数据中台和物联网数据处理。使用时注意资源配置、数据模型设计、监控调优和导入策略。
|
4月前
|
easyexcel Java API
Apache POI与easyExcel:Excel文件导入导出的技术深度分析
Apache POI与easyExcel:Excel文件导入导出的技术深度分析
|
4月前
|
消息中间件 存储 大数据
深度分析:Apache Kafka及其在大数据处理中的应用
Apache Kafka是高吞吐、低延迟的分布式流处理平台,常用于实时数据流、日志收集和事件驱动架构。与RabbitMQ(吞吐量有限)、Pulsar(多租户支持但生态系统小)和Amazon Kinesis(托管服务,成本高)对比,Kafka在高吞吐和持久化上有优势。适用场景包括实时处理、数据集成、日志收集和消息传递。选型需考虑吞吐延迟、持久化、协议支持等因素,使用时注意资源配置、数据管理、监控及安全性。
|
4月前
|
消息中间件 分布式计算 Kafka
深度分析:Apache Flink及其在大数据处理中的应用
Apache Flink是低延迟、高吞吐量的流处理框架,以其状态管理和事件时间处理能力脱颖而出。与Apache Spark Streaming相比,Flink在实时性上更强,但Spark生态系统更丰富。Apache Storm在低延迟上有优势,而Kafka Streams适合轻量级流处理。选型考虑延迟、状态管理、生态系统和运维成本。Flink适用于实时数据分析、复杂事件处理等场景,使用时注意资源配置、状态管理和窗口操作的优化。
|
2月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
34 1

推荐镜像

更多
下一篇
无影云桌面