1. 介绍
前面分析了基于BloomFilter实现的HoodieBloomIndex和HoodieGlobalBloomIndex,以及基于外部存储系统HBase的索引实现,基于BloomFilter的索引会借助IndexFileFilter来粗略过滤出需要比较的文件,Hudi默认使用HoodieBloomIndex和HoodieGlobalBloomIndex,下面分析其实现。
2. 分析
IndexFileFilter接口用于辅助定位recordKey所在的位置,其定义如下
public interface IndexFileFilter extends Serializable { // 获取所有匹配的文件和分区 Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey); }
其有基于Tree和List数据结构的实现,分别为IntervalTreeBasedIndexFileFilter
和IntervalTreeBasedGlobalIndexFileFilter
,以及ListBasedIndexFileFilter
和`ListBasedGlobalIndexFileFilter
。
2.1 IntervalTreeBasedIndexFileFilter实现
基于树数据结构的索引过滤器的实现,IntervalTreeBasedIndexFileFilter
主要用于HoodieBloomIndex
。其构造函数如下
IntervalTreeBasedIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) { partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> { // 进行一次shuffle,尽量让树的各节点等高 Collections.shuffle(bloomIndexFiles); // 进行recordKey查找的核心数据结构 KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree(); bloomIndexFiles.forEach(indexFileInfo -> { if (indexFileInfo.hasKeyRanges()) { // 有最大和最小的recordKey // 插入树中 lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileId())); } else { if (!partitionToFilesWithNoRanges.containsKey(partition)) { partitionToFilesWithNoRanges.put(partition, new HashSet<>()); } // 放入无最大和最小recordKey的集合 partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId()); } }); // 保存该分区的Tree partitionToFileIndexLookUpTree.put(partition, lookUpTree); }); }
可以看到,在构造函数内便会构造KeyRangeLookupTree
,然后以分区路径为粒度将分区下所有存在最大和最小recordKey的节点插入树中。
KeyRangeLookupTree
是一棵近似有序树,当插入节点(KeyRangeNode)时,首先当前节点比较(根从节点开始)待插入节点比较(根据最大和最小recordKey比较),若相等时,则将待插入节点的文件列表加入当前节点的文件列表中;若小于待插入节点,先判断当前节点的右节点是否存在,若存在则插入右子树中,否则直接设置待插入节点为当前节点的右子节点;若大于待插入节点,先判断当前节点的左节点是否存在,若存在则插入左子树中,否则直接设置待插入节点为当前节点的左子节点。每个节点还会保存其左右子节点中最大和最小的recordKey,在插入的时候会动态调整。
树构造完后,当查找所有匹配的文件和分区时,getMatchingFilesAndPartition
核心代码如下
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) { Set<Pair<String, String>> toReturn = new HashSet<>(); // 当所有的文件都无最大和最小recordKey时,需要先进行如下的判断 if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) { // 在该分区对应的Tree查找 partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file -> toReturn.add(Pair.of(partitionPath, file))); } // 无最大最小值分区中包含此分区 if (partitionToFilesWithNoRanges.containsKey(partitionPath)) { // 所有文件都会返回 partitionToFilesWithNoRanges.get(partitionPath).forEach(file -> toReturn.add(Pair.of(partitionPath, file))); } return toReturn; }
若分区和文件索引查找树(partitionToFileIndexLookUpTree)包含该分区,那么获取分区对应的树后查找匹配的文件;若分区和文件索引(partitionToFilesWithNoRanges)包含该分区,那么所有文件都会被返回。
其中getMatchingIndexFiles
核心代码如下
Set<String> getMatchingIndexFiles(String lookupKey) { // 存放查找结果 Set<String> matchingFileNameSet = new HashSet<>(); // 实际查找 getMatchingIndexFiles(getRoot(), lookupKey, matchingFileNameSet); return matchingFileNameSet; }
实际查找过程如下:比较当前节点(从根节点开始)的最小recordKey与lookupKey以及最大recordKey与lookupKey,若lookupKey大于最小recordKey并且小于最大recordKey,那么返回当前节点对应的文件列表;若当前节点左子节点存在并且lookupKey大于当前节点的左子节点最小recordKey并且小于当前节点的左子节点最大recordKey,则在左子树中继续查找;若当前节点右子节点存在并且lookupKey大于当前节点的右子节点最小recordKey并且小于当前节点的右子节点最大recordKey,则在右子树中继续查找。
2.2 IntervalTreeBasedGlobalIndexFileFilter实现
IntervalTreeBasedGlobalIndexFileFilter
主要用于HoodieGlobalBloomIndex
,其也是基于树数据结构的索引过滤器的实现,其构造函数如下
IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) { List<BloomIndexFileInfo> allIndexFiles = new ArrayList<>(); partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> bloomIndexFileInfoList.forEach(file -> { // 保存所有文件和分区路径 fileIdToPartitionPathMap.put(file.getFileId(), parition); allIndexFiles.add(file); })); // 进行一次shuffle,尽量让树的各节点等高 Collections.shuffle(allIndexFiles); allIndexFiles.forEach(indexFile -> { if (indexFile.hasKeyRanges()) { // 包含最大和最小recordKey // 插入树中 indexLookUpTree .insert(new KeyRangeNode(indexFile.getMinRecordKey(), indexFile.getMaxRecordKey(), indexFile.getFileId())); } else { // 保存无最大和最小recordKey的文件 filesWithNoRanges.add(indexFile.getFileId()); } }); }
可以看到其与IntervalTreeBasedIndexFileFilter
构造函数的区别是其并没有为每个分区构造一个KeyRangeLookupTree
,所有分区共享一个KeyRangeLookupTree
,而对于无最大和最小recordKey的文件处理也类似,IntervalTreeBasedIndexFileFilter
按照分区保存,IntervalTreeBasedGlobalIndexFileFilter
则所有分区统一保存。
获取匹配文件和分区方法getMatchingFilesAndPartition
核心代码如下
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) { Set<String> matchingFiles = new HashSet<>(); // 在Tree中查找有最大最小record的文件 matchingFiles.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey)); // 无最大最小recordKey的文件全部需要返回 matchingFiles.addAll(filesWithNoRanges); Set<Pair<String, String>> toReturn = new HashSet<>(); matchingFiles.forEach(file -> toReturn.add(Pair.of(fileIdToPartitionPathMap.get(file), file))); return toReturn; }
会先在树中查找,将符合条件的文件返回,另外所有无最大最小值的文件也需要返回,两次查找重复的文件会被自动过滤掉。
3. 总结
IntervalTreeBasedIndexFileFilter
和IntervalTreeBasedGlobalIndexFileFilter
的实现都基于树实现,前者主要用于HoodieBloomIndex
,而后者主要用于HoodieGlobalBloomIndex
;同时前者的实现是以分区为粒度划分(查找),而后者不以分区为粒度,是全局的。过滤器的主要作用是辅助过滤出待比较的文件和分区(是否对这些文件有更新),这也是Hudi的默认实现方式。