精进Hudi系列|Apache Hudi索引实现分析(四)之基于Tree的IndexFileFilter

简介: 精进Hudi系列|Apache Hudi索引实现分析(四)之基于Tree的IndexFileFilter

1. 介绍

前面分析了基于BloomFilter实现的HoodieBloomIndex和HoodieGlobalBloomIndex,以及基于外部存储系统HBase的索引实现,基于BloomFilter的索引会借助IndexFileFilter来粗略过滤出需要比较的文件,Hudi默认使用HoodieBloomIndex和HoodieGlobalBloomIndex,下面分析其实现。

2. 分析

IndexFileFilter接口用于辅助定位recordKey所在的位置,其定义如下

public interface IndexFileFilter extends Serializable {
  // 获取所有匹配的文件和分区
  Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey);
}

其有基于Tree和List数据结构的实现,分别为IntervalTreeBasedIndexFileFilterIntervalTreeBasedGlobalIndexFileFilter,以及ListBasedIndexFileFilter`ListBasedGlobalIndexFileFilter

2.1 IntervalTreeBasedIndexFileFilter实现

基于树数据结构的索引过滤器的实现,IntervalTreeBasedIndexFileFilter主要用于HoodieBloomIndex。其构造函数如下

IntervalTreeBasedIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
    partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> {
      // 进行一次shuffle,尽量让树的各节点等高
      Collections.shuffle(bloomIndexFiles);
      // 进行recordKey查找的核心数据结构
      KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree();
      bloomIndexFiles.forEach(indexFileInfo -> {
        if (indexFileInfo.hasKeyRanges()) { // 有最大和最小的recordKey
          // 插入树中
          lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), indexFileInfo.getMaxRecordKey(),
              indexFileInfo.getFileId()));
        } else {
          if (!partitionToFilesWithNoRanges.containsKey(partition)) {
            partitionToFilesWithNoRanges.put(partition, new HashSet<>());
          }
          // 放入无最大和最小recordKey的集合
          partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId());
        }
      });
      // 保存该分区的Tree
      partitionToFileIndexLookUpTree.put(partition, lookUpTree);
    });
  }

可以看到,在构造函数内便会构造KeyRangeLookupTree,然后以分区路径为粒度将分区下所有存在最大和最小recordKey的节点插入树中。

KeyRangeLookupTree是一棵近似有序树,当插入节点(KeyRangeNode)时,首先当前节点比较(根从节点开始)待插入节点比较(根据最大和最小recordKey比较),若相等时,则将待插入节点的文件列表加入当前节点的文件列表中;若小于待插入节点,先判断当前节点的右节点是否存在,若存在则插入右子树中,否则直接设置待插入节点为当前节点的右子节点;若大于待插入节点,先判断当前节点的左节点是否存在,若存在则插入左子树中,否则直接设置待插入节点为当前节点的左子节点。每个节点还会保存其左右子节点中最大和最小的recordKey,在插入的时候会动态调整。

树构造完后,当查找所有匹配的文件和分区时,getMatchingFilesAndPartition核心代码如下

public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
    Set<Pair<String, String>> toReturn = new HashSet<>();
    // 当所有的文件都无最大和最小recordKey时,需要先进行如下的判断
    if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) {
 // 在该分区对应的Tree查找     partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file ->
          toReturn.add(Pair.of(partitionPath, file)));
    }
    // 无最大最小值分区中包含此分区  
    if (partitionToFilesWithNoRanges.containsKey(partitionPath)) {
      // 所有文件都会返回  
      partitionToFilesWithNoRanges.get(partitionPath).forEach(file ->
          toReturn.add(Pair.of(partitionPath, file)));
    }
    return toReturn;
  }

若分区和文件索引查找树(partitionToFileIndexLookUpTree)包含该分区,那么获取分区对应的树后查找匹配的文件;若分区和文件索引(partitionToFilesWithNoRanges)包含该分区,那么所有文件都会被返回。

其中getMatchingIndexFiles核心代码如下

Set<String> getMatchingIndexFiles(String lookupKey) {
    // 存放查找结果  
    Set<String> matchingFileNameSet = new HashSet<>();
    // 实际查找  
    getMatchingIndexFiles(getRoot(), lookupKey, matchingFileNameSet);
    return matchingFileNameSet;
  }

实际查找过程如下:比较当前节点(从根节点开始)的最小recordKey与lookupKey以及最大recordKey与lookupKey,若lookupKey大于最小recordKey并且小于最大recordKey,那么返回当前节点对应的文件列表;若当前节点左子节点存在并且lookupKey大于当前节点的左子节点最小recordKey并且小于当前节点的左子节点最大recordKey,则在左子树中继续查找;若当前节点右子节点存在并且lookupKey大于当前节点的右子节点最小recordKey并且小于当前节点的右子节点最大recordKey,则在右子树中继续查找。

2.2 IntervalTreeBasedGlobalIndexFileFilter实现

IntervalTreeBasedGlobalIndexFileFilter主要用于HoodieGlobalBloomIndex,其也是基于树数据结构的索引过滤器的实现,其构造函数如下

IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
    List<BloomIndexFileInfo> allIndexFiles = new ArrayList<>();
    partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> bloomIndexFileInfoList.forEach(file -> {
      // 保存所有文件和分区路径  
      fileIdToPartitionPathMap.put(file.getFileId(), parition);
      allIndexFiles.add(file);
    }));
    // 进行一次shuffle,尽量让树的各节点等高
    Collections.shuffle(allIndexFiles);
    allIndexFiles.forEach(indexFile -> {
      if (indexFile.hasKeyRanges()) { // 包含最大和最小recordKey
        // 插入树中  
        indexLookUpTree
            .insert(new KeyRangeNode(indexFile.getMinRecordKey(), indexFile.getMaxRecordKey(), indexFile.getFileId()));
      } else {
        // 保存无最大和最小recordKey的文件  
        filesWithNoRanges.add(indexFile.getFileId());
      }
    });
  }

可以看到其与IntervalTreeBasedIndexFileFilter构造函数的区别是其并没有为每个分区构造一个KeyRangeLookupTree,所有分区共享一个KeyRangeLookupTree,而对于无最大和最小recordKey的文件处理也类似,IntervalTreeBasedIndexFileFilter按照分区保存,IntervalTreeBasedGlobalIndexFileFilter则所有分区统一保存。

获取匹配文件和分区方法getMatchingFilesAndPartition核心代码如下

public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
    Set<String> matchingFiles = new HashSet<>();
    // 在Tree中查找有最大最小record的文件
    matchingFiles.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey));
    // 无最大最小recordKey的文件全部需要返回  
    matchingFiles.addAll(filesWithNoRanges);
    Set<Pair<String, String>> toReturn = new HashSet<>();
    matchingFiles.forEach(file -> toReturn.add(Pair.of(fileIdToPartitionPathMap.get(file), file)));
    return toReturn;
  }

会先在树中查找,将符合条件的文件返回,另外所有无最大最小值的文件也需要返回,两次查找重复的文件会被自动过滤掉。

3. 总结

IntervalTreeBasedIndexFileFilterIntervalTreeBasedGlobalIndexFileFilter的实现都基于树实现,前者主要用于HoodieBloomIndex,而后者主要用于HoodieGlobalBloomIndex;同时前者的实现是以分区为粒度划分(查找),而后者不以分区为粒度,是全局的。过滤器的主要作用是辅助过滤出待比较的文件和分区(是否对这些文件有更新),这也是Hudi的默认实现方式。

目录
相关文章
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
233 2
|
19天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
20天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
2月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
1月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
54 3
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
33 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
55 1
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
76 0
|
2月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
145 11
|
3月前
|
存储 运维 数据处理
Apache Paimon:重塑阿里智能引擎数据处理新纪元,解锁高效存储与实时分析潜能!
【8月更文挑战第2天】探索 Apache Paimon 在阿里智能引擎的应用场景
211 2

推荐镜像

更多