1. 介绍
为了加快数据的upsert,Hudi提供了索引机制,现在Hudi内置支持四种索引:HoodieBloomIndex、HoodieGlobalBloomIndex、InMemoryHashIndex和HBaseIndex,下面对Hudi基于BloomFilter索引机制进行分析。
2. 分析
对于所有索引类型的基类HoodieIndex,其包含了如下核心的抽象方法
// 给输入记录RDD打位置标签 public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) throws HoodieIndexException;
对于Hudi默认实现HoodieBloomIndex,在给输入记录打位置标签时,会有如下步骤
1.根据配置缓存输入记录JavaRDD,避免重复加载开销。
2.将输入记录JavaRDD转化为JavaPairRDD。
3.根据索引查看位置信息,获取JavaPairRDD。
4.缓存第三步结果。
5.将位置信息推回给输入记录后返回。
2.1 LookupIndex分析
其中第三步的主要逻辑在 HoodieBloomIndex#lookupIndex
方法中,其核心代码如下
private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex( JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext jsc, final HoodieTable hoodieTable) { // 1. 按照分区路径进行一次count Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); // 2. 加载分区下所有最新的数据文件 List<Tuple2<String, BloomIndexFileInfo>> fileInfoList = loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable); // 按照分区路径先进行分组 final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo = fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList()))); // 3. 先计算合适的并行度,然后继续查找包含记录的文件 // 会根据之前的最大和最小recordKey过滤不需要进行比较的文件 Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD); int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup); int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism); return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable, comparisonsPerFileGroup); }
查看索引方法的逻辑非常简单,主要分为三步(1. 根据分区路径进行count、2. 加载分区下所有最新的文件、3. 查找包含记录的文件)。
第二步中加载分区下所有最新的文件的逻辑在 HoodieBloomIndex#loadInvolvedFiles
方法中,其核心代码如下
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc, final HoodieTable hoodieTable) { // 获取所有分区下最新的文件 List<Pair<String, String>> partitionPathFileIDList = jsc.parallelize(partitions, Math.max(partitions.size(), 1)).flatMap(partitionPath -> { // 最新完成的commit Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant(); // Pair<分区路径, 文件ID> List<Pair<String, String>> filteredFiles = new ArrayList<>(); if (latestCommitTime.isPresent()) { // 存在上一次commit // 获取指定分区下小于指定时间的所有数据文件 filteredFiles = hoodieTable.getBaseFileOnlyView() .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()) .map(f -> Pair.of(partitionPath, f.getFileId())).collect(toList()); } return filteredFiles.iterator(); }).collect(); // hoodie.bloom.index.prune.by.ranges配置项为true if (config.getBloomIndexPruneByRanges()) { return jsc.parallelize(partitionPathFileIDList, Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> { try { HoodieRangeInfoHandle<T> rangeInfoHandle = new HoodieRangeInfoHandle<T>(config, hoodieTable, pf); // 从指定文件获取对应的最大和最小recordKey String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys(); return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1])); } catch (MetadataNotFoundException me) { // 出错时默认最大和最小recordKey为null return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue())); } }).collect(); } else { // 配置项未开启则默认最大和最小recordKey为null return partitionPathFileIDList.stream() .map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList()); } }
该方法最核心的逻辑便是获取分区下最新的数据文件。若文件ID相同,但是commitTime不同,那么会返回小于指定commitTime,最新提交的文件;若文件ID不同,那么返回小于指定commitTime的最新提交文件即可,总结而言就是如果同一文件ID对应多个文件,则选取最新的文件。然后根据配置决定是否从文件读取最大最小的recordKey,最大最小recordKey可用于后续过滤不相关的文件,否则会比较分区下所有的文件。
第三步中查找包含记录的文件在 HoodieBloomIndex#findMatchingFilesForRecordKeys
中,其核心代码如下
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys( final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable, Map<String, Long> fileGroupToComparisons) { // 1. 查找需要比对的文件Tuple2 JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD = explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD); // hoodie.bloom.index.bucketized.checking = true,默认为true if (config.useBloomIndexBucketizedChecking()) { // 2. 使用Partitioner重新分区 Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons, config.getBloomIndexKeysPerBucket()); fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t)) .repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2); } else { fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism); } // 3. 使用分区CheckFunction进行处理,然后将处理的结果转化为对应的HoodieKey return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true) .flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0) .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream() .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()))) .collect(Collectors.toList()).iterator()); }
该方法可分为三步,首先查找对应的文件(通过最大和最小recordKey过滤),然后进行重新分区或者排序,最后处理分区。其中查找记录对应的文件 explodeRecordRDDWithFileComparisons
方法核心逻辑如下
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons( final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD) { // 使用索引过滤器,根据之前读取的最大和最小recordKey进行初始化 IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) : new ListBasedIndexFileFilter(partitionToFileIndexInfo); return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> { String recordKey = partitionRecordKeyPair._2(); String partitionPath = partitionRecordKeyPair._1(); // 获取匹配的文件,recordKey是否大于最小recordKey和小于最大recordKey return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream() .map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(), new HoodieKey(recordKey, partitionPath))) .collect(Collectors.toList()); }).flatMap(List::iterator); }
可以看到,该方法最核心的逻辑就是根据之前从文件中读取的最大和最小的recordKey来过滤无需比较的文件。
使用 HoodieBloomIndexCheckFunction
处理分区记录核心逻辑如下
protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() { List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>(); try { // 一个个文件处理 while (inputItr.hasNext()) { Tuple2<String, HoodieKey> currentTuple = inputItr.next(); String fileId = currentTuple._1; String partitionPath = currentTuple._2.getPartitionPath(); String recordKey = currentTuple._2.getRecordKey(); Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId); if (keyLookupHandle == null) { // 初始化Handle keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); } // 处理当前文件 if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) { // 将recordkey添加 keyLookupHandle.addKey(recordKey); } else { // 获取结果 ret.add(keyLookupHandle.getLookupResult()); // 重新生成Handle keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); keyLookupHandle.addKey(recordKey); break; } } if (!inputItr.hasNext()) { ret.add(keyLookupHandle.getLookupResult()); } } catch (Throwable e) { if (e instanceof HoodieException) { throw e; } throw new HoodieIndexException("Error checking bloom filter index. ", e); } return ret; }
当遍历分区上所有记录时,会按照文件进行处理(通过keyLookupHandle控制处理),通过 HoodieKeyLookupHandle#addKey
方法将recordKey添加至keyLookupHandle,其核心代码如下
public void addKey(String recordKey) { // 布隆过滤器中是否包含该recordKey,布隆过滤器会从文件中反序列化 if (bloomFilter.mightContain(recordKey)) { // 如果包含则加入候选列表,待进一步确认 candidateRecordKeys.add(recordKey); } totalKeysChecked++; }
通过 HoodieKeyLookupHandle#getLookupResult
获取结果,核心代码如下
public KeyLookupResult getLookupResult() { // 获取文件ID对应的最新的数据文件 HoodieBaseFile dataFile = getLatestDataFile(); // 对比文件中所有记录和候选列表,找出实际存在的recordKey List<String> matchingKeys = checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath())); return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(), dataFile.getCommitTime(), matchingKeys); }
可以看到获取结果时会执行实际的查找逻辑,即对比文件中所有记录和候选的列表,找出实际存在的recordKey列表。
2.2 tagLocationBacktoRecords分析
在查找完位置信息后,便可将位置信息推回给原始记录,其核心代码如下
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords( JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) { // 将原始记录进行转化 JavaPairRDD<HoodieKey, HoodieRecord<T>> keyRecordPairRDD = recordRDD.mapToPair(record -> new Tuple2<>(record.getKey(), record)); // 以HoodieKey进行一次左外连接,确定位置信息 return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values() .map(v1 -> getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull()))); }
可以看到通过左外连接便将之前的位置信息推回至原始记录中,这样便完成了对原始记录打位置标签过程。
3. 总结
Hudi默认采用的HoodieBloomIndex索引,其依赖布隆过滤器来判断记录存在与否,当记录存在时,会读取实际文件进行二次判断,以便修正布隆过滤器带来的误差。同时还在每个文件元数据中添加了该文件保存的最大和最小的recordKey,借助该值可过滤出无需对比的文件。