精进Hudi系列|Apache Hudi索引实现分析(四)之基于Tree的IndexFileFilter

简介: 精进Hudi系列|Apache Hudi索引实现分析(四)之基于Tree的IndexFileFilter

1. 介绍

前面分析了基于BloomFilter实现的HoodieBloomIndex和HoodieGlobalBloomIndex,以及基于外部存储系统HBase的索引实现,基于BloomFilter的索引会借助IndexFileFilter来粗略过滤出需要比较的文件,Hudi默认使用HoodieBloomIndex和HoodieGlobalBloomIndex,下面分析其实现。

2. 分析

IndexFileFilter接口用于辅助定位recordKey所在的位置,其定义如下

public interface IndexFileFilter extends Serializable {
  // 获取所有匹配的文件和分区
  Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey);
}

其有基于Tree和List数据结构的实现,分别为IntervalTreeBasedIndexFileFilterIntervalTreeBasedGlobalIndexFileFilter,以及ListBasedIndexFileFilter`ListBasedGlobalIndexFileFilter

2.1 IntervalTreeBasedIndexFileFilter实现

基于树数据结构的索引过滤器的实现,IntervalTreeBasedIndexFileFilter主要用于HoodieBloomIndex。其构造函数如下

IntervalTreeBasedIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
    partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> {
      // 进行一次shuffle,尽量让树的各节点等高
      Collections.shuffle(bloomIndexFiles);
      // 进行recordKey查找的核心数据结构
      KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree();
      bloomIndexFiles.forEach(indexFileInfo -> {
        if (indexFileInfo.hasKeyRanges()) { // 有最大和最小的recordKey
          // 插入树中
          lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), indexFileInfo.getMaxRecordKey(),
              indexFileInfo.getFileId()));
        } else {
          if (!partitionToFilesWithNoRanges.containsKey(partition)) {
            partitionToFilesWithNoRanges.put(partition, new HashSet<>());
          }
          // 放入无最大和最小recordKey的集合
          partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId());
        }
      });
      // 保存该分区的Tree
      partitionToFileIndexLookUpTree.put(partition, lookUpTree);
    });
  }

可以看到,在构造函数内便会构造KeyRangeLookupTree,然后以分区路径为粒度将分区下所有存在最大和最小recordKey的节点插入树中。

KeyRangeLookupTree是一棵近似有序树,当插入节点(KeyRangeNode)时,首先当前节点比较(根从节点开始)待插入节点比较(根据最大和最小recordKey比较),若相等时,则将待插入节点的文件列表加入当前节点的文件列表中;若小于待插入节点,先判断当前节点的右节点是否存在,若存在则插入右子树中,否则直接设置待插入节点为当前节点的右子节点;若大于待插入节点,先判断当前节点的左节点是否存在,若存在则插入左子树中,否则直接设置待插入节点为当前节点的左子节点。每个节点还会保存其左右子节点中最大和最小的recordKey,在插入的时候会动态调整。

树构造完后,当查找所有匹配的文件和分区时,getMatchingFilesAndPartition核心代码如下

public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
    Set<Pair<String, String>> toReturn = new HashSet<>();
    // 当所有的文件都无最大和最小recordKey时,需要先进行如下的判断
    if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) {
 // 在该分区对应的Tree查找     partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file ->
          toReturn.add(Pair.of(partitionPath, file)));
    }
    // 无最大最小值分区中包含此分区  
    if (partitionToFilesWithNoRanges.containsKey(partitionPath)) {
      // 所有文件都会返回  
      partitionToFilesWithNoRanges.get(partitionPath).forEach(file ->
          toReturn.add(Pair.of(partitionPath, file)));
    }
    return toReturn;
  }

若分区和文件索引查找树(partitionToFileIndexLookUpTree)包含该分区,那么获取分区对应的树后查找匹配的文件;若分区和文件索引(partitionToFilesWithNoRanges)包含该分区,那么所有文件都会被返回。

其中getMatchingIndexFiles核心代码如下

Set<String> getMatchingIndexFiles(String lookupKey) {
    // 存放查找结果  
    Set<String> matchingFileNameSet = new HashSet<>();
    // 实际查找  
    getMatchingIndexFiles(getRoot(), lookupKey, matchingFileNameSet);
    return matchingFileNameSet;
  }

实际查找过程如下:比较当前节点(从根节点开始)的最小recordKey与lookupKey以及最大recordKey与lookupKey,若lookupKey大于最小recordKey并且小于最大recordKey,那么返回当前节点对应的文件列表;若当前节点左子节点存在并且lookupKey大于当前节点的左子节点最小recordKey并且小于当前节点的左子节点最大recordKey,则在左子树中继续查找;若当前节点右子节点存在并且lookupKey大于当前节点的右子节点最小recordKey并且小于当前节点的右子节点最大recordKey,则在右子树中继续查找。

2.2 IntervalTreeBasedGlobalIndexFileFilter实现

IntervalTreeBasedGlobalIndexFileFilter主要用于HoodieGlobalBloomIndex,其也是基于树数据结构的索引过滤器的实现,其构造函数如下

IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
    List<BloomIndexFileInfo> allIndexFiles = new ArrayList<>();
    partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> bloomIndexFileInfoList.forEach(file -> {
      // 保存所有文件和分区路径  
      fileIdToPartitionPathMap.put(file.getFileId(), parition);
      allIndexFiles.add(file);
    }));
    // 进行一次shuffle,尽量让树的各节点等高
    Collections.shuffle(allIndexFiles);
    allIndexFiles.forEach(indexFile -> {
      if (indexFile.hasKeyRanges()) { // 包含最大和最小recordKey
        // 插入树中  
        indexLookUpTree
            .insert(new KeyRangeNode(indexFile.getMinRecordKey(), indexFile.getMaxRecordKey(), indexFile.getFileId()));
      } else {
        // 保存无最大和最小recordKey的文件  
        filesWithNoRanges.add(indexFile.getFileId());
      }
    });
  }

可以看到其与IntervalTreeBasedIndexFileFilter构造函数的区别是其并没有为每个分区构造一个KeyRangeLookupTree,所有分区共享一个KeyRangeLookupTree,而对于无最大和最小recordKey的文件处理也类似,IntervalTreeBasedIndexFileFilter按照分区保存,IntervalTreeBasedGlobalIndexFileFilter则所有分区统一保存。

获取匹配文件和分区方法getMatchingFilesAndPartition核心代码如下

public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
    Set<String> matchingFiles = new HashSet<>();
    // 在Tree中查找有最大最小record的文件
    matchingFiles.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey));
    // 无最大最小recordKey的文件全部需要返回  
    matchingFiles.addAll(filesWithNoRanges);
    Set<Pair<String, String>> toReturn = new HashSet<>();
    matchingFiles.forEach(file -> toReturn.add(Pair.of(fileIdToPartitionPathMap.get(file), file)));
    return toReturn;
  }

会先在树中查找,将符合条件的文件返回,另外所有无最大最小值的文件也需要返回,两次查找重复的文件会被自动过滤掉。

3. 总结

IntervalTreeBasedIndexFileFilterIntervalTreeBasedGlobalIndexFileFilter的实现都基于树实现,前者主要用于HoodieBloomIndex,而后者主要用于HoodieGlobalBloomIndex;同时前者的实现是以分区为粒度划分(查找),而后者不以分区为粒度,是全局的。过滤器的主要作用是辅助过滤出待比较的文件和分区(是否对这些文件有更新),这也是Hudi的默认实现方式。

目录
相关文章
|
3天前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
40 0
|
3天前
|
Apache
Apache Hudi Rollback实现分析
Apache Hudi Rollback实现分析
27 0
|
3天前
|
存储 SQL 分布式计算
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
150 0
|
3天前
|
存储 分布式计算 Hadoop
一文了解Apache Hudi架构、工具和最佳实践
一文了解Apache Hudi架构、工具和最佳实践
155 0
|
3天前
|
SQL 分布式计算 NoSQL
使用Apache Hudi和Debezium构建健壮的CDC管道
使用Apache Hudi和Debezium构建健壮的CDC管道
22 0
|
3天前
|
存储 SQL 消息中间件
Apache Hudi:统一批和近实时分析的存储和服务
Apache Hudi:统一批和近实时分析的存储和服务
43 0
|
3天前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
722 5
|
3天前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1809 2
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
3天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1794 2
官宣|Apache Flink 1.19 发布公告
|
3天前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
348 3

热门文章

最新文章

推荐镜像

更多