Apache Hudi索引实现分析(一)之HoodieBloomIndex

简介: Apache Hudi索引实现分析(一)之HoodieBloomIndex

1. 介绍

为了加快数据的upsert,Hudi提供了索引机制,现在Hudi内置支持四种索引:HoodieBloomIndex、HoodieGlobalBloomIndex、InMemoryHashIndex和HBaseIndex,下面对Hudi基于BloomFilter索引机制进行分析。

2. 分析

对于所有索引类型的基类HoodieIndex,其包含了如下核心的抽象方法

// 给输入记录RDD打位置标签  
public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
      HoodieTable<T> hoodieTable) throws HoodieIndexException;

对于Hudi默认实现HoodieBloomIndex,在给输入记录打位置标签时,会有如下步骤

1.根据配置缓存输入记录JavaRDD,避免重复加载开销。

2.将输入记录JavaRDD转化为JavaPairRDD。

3.根据索引查看位置信息,获取JavaPairRDD。

4.缓存第三步结果。

5.将位置信息推回给输入记录后返回。

2.1 LookupIndex分析

其中第三步的主要逻辑在 HoodieBloomIndex#lookupIndex方法中,其核心代码如下

private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
      JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext jsc,
      final HoodieTable hoodieTable) {
    // 1. 按照分区路径进行一次count
    Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
    List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
    // 2. 加载分区下所有最新的数据文件
    List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
        loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable);
    // 按照分区路径先进行分组
    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
        fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
    // 3. 先计算合适的并行度,然后继续查找包含记录的文件  
    // 会根据之前的最大和最小recordKey过滤不需要进行比较的文件  
    Map<String, Long> comparisonsPerFileGroup =
        computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
    int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
    int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
    return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
        comparisonsPerFileGroup);
  }

查看索引方法的逻辑非常简单,主要分为三步(1. 根据分区路径进行count、2. 加载分区下所有最新的文件、3. 查找包含记录的文件)。

第二步中加载分区下所有最新的文件的逻辑在 HoodieBloomIndex#loadInvolvedFiles方法中,其核心代码如下

List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
      final HoodieTable hoodieTable) {
    // 获取所有分区下最新的文件
    List<Pair<String, String>> partitionPathFileIDList =
        jsc.parallelize(partitions, Math.max(partitions.size(), 1)).flatMap(partitionPath -> {
          // 最新完成的commit
          Option<HoodieInstant> latestCommitTime =
              hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
          // Pair<分区路径, 文件ID>
          List<Pair<String, String>> filteredFiles = new ArrayList<>();
          if (latestCommitTime.isPresent()) { // 存在上一次commit
            // 获取指定分区下小于指定时间的所有数据文件  
            filteredFiles = hoodieTable.getBaseFileOnlyView()
                .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
                .map(f -> Pair.of(partitionPath, f.getFileId())).collect(toList());
          }
          return filteredFiles.iterator();
        }).collect();
    // hoodie.bloom.index.prune.by.ranges配置项为true
    if (config.getBloomIndexPruneByRanges()) {
      return jsc.parallelize(partitionPathFileIDList, Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> {
        try {
          HoodieRangeInfoHandle<T> rangeInfoHandle = new HoodieRangeInfoHandle<T>(config, hoodieTable, pf);
          // 从指定文件获取对应的最大和最小recordKey  
          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
          return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
        } catch (MetadataNotFoundException me) {
          // 出错时默认最大和最小recordKey为null
          return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
        }
      }).collect();
    } else {
      // 配置项未开启则默认最大和最小recordKey为null  
      return partitionPathFileIDList.stream()
          .map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
    }
  }

该方法最核心的逻辑便是获取分区下最新的数据文件。若文件ID相同,但是commitTime不同,那么会返回小于指定commitTime,最新提交的文件;若文件ID不同,那么返回小于指定commitTime的最新提交文件即可,总结而言就是如果同一文件ID对应多个文件,则选取最新的文件。然后根据配置决定是否从文件读取最大最小的recordKey,最大最小recordKey可用于后续过滤不相关的文件,否则会比较分区下所有的文件。

第三步中查找包含记录的文件在 HoodieBloomIndex#findMatchingFilesForRecordKeys中,其核心代码如下

JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
      Map<String, Long> fileGroupToComparisons) {
    // 1. 查找需要比对的文件Tuple2  
    JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
        explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
    // hoodie.bloom.index.bucketized.checking = true,默认为true
    if (config.useBloomIndexBucketizedChecking()) {
      // 2. 使用Partitioner重新分区
      Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
          config.getBloomIndexKeysPerBucket());
      fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
          .repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2);
    } else {
      fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
    }
    // 3. 使用分区CheckFunction进行处理,然后将处理的结果转化为对应的HoodieKey
    return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true)
        .flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)
        .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
            .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
                new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())))
            .collect(Collectors.toList()).iterator());
  }

该方法可分为三步,首先查找对应的文件(通过最大和最小recordKey过滤),然后进行重新分区或者排序,最后处理分区。其中查找记录对应的文件 explodeRecordRDDWithFileComparisons方法核心逻辑如下

JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
    // 使用索引过滤器,根据之前读取的最大和最小recordKey进行初始化
    IndexFileFilter indexFileFilter =
        config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
            : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
    return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
      String recordKey = partitionRecordKeyPair._2();
      String partitionPath = partitionRecordKeyPair._1();
      // 获取匹配的文件,recordKey是否大于最小recordKey和小于最大recordKey
      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
          .map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
              new HoodieKey(recordKey, partitionPath)))
          .collect(Collectors.toList());
    }).flatMap(List::iterator);
  }

可以看到,该方法最核心的逻辑就是根据之前从文件中读取的最大和最小的recordKey来过滤无需比较的文件。

使用 HoodieBloomIndexCheckFunction处理分区记录核心逻辑如下

protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
      List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>();
      try {
        // 一个个文件处理
        while (inputItr.hasNext()) {
          Tuple2<String, HoodieKey> currentTuple = inputItr.next();
          String fileId = currentTuple._1;
          String partitionPath = currentTuple._2.getPartitionPath();
          String recordKey = currentTuple._2.getRecordKey();
          Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
          if (keyLookupHandle == null) {
            // 初始化Handle  
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
          }
          // 处理当前文件
          if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
            // 将recordkey添加  
            keyLookupHandle.addKey(recordKey);
          } else {
            // 获取结果
            ret.add(keyLookupHandle.getLookupResult());
            // 重新生成Handle  
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
            keyLookupHandle.addKey(recordKey);
            break;
          }
        }
        if (!inputItr.hasNext()) {
          ret.add(keyLookupHandle.getLookupResult());
        }
      } catch (Throwable e) {
        if (e instanceof HoodieException) {
          throw e;
        }
        throw new HoodieIndexException("Error checking bloom filter index. ", e);
      }
      return ret;
    }

当遍历分区上所有记录时,会按照文件进行处理(通过keyLookupHandle控制处理),通过 HoodieKeyLookupHandle#addKey方法将recordKey添加至keyLookupHandle,其核心代码如下

public void addKey(String recordKey) {
    // 布隆过滤器中是否包含该recordKey,布隆过滤器会从文件中反序列化
    if (bloomFilter.mightContain(recordKey)) {
      // 如果包含则加入候选列表,待进一步确认
      candidateRecordKeys.add(recordKey);
    }
    totalKeysChecked++;
  }

通过 HoodieKeyLookupHandle#getLookupResult获取结果,核心代码如下

public KeyLookupResult getLookupResult() {
    // 获取文件ID对应的最新的数据文件
    HoodieBaseFile dataFile = getLatestDataFile();
    // 对比文件中所有记录和候选列表,找出实际存在的recordKey  
    List<String> matchingKeys =
        checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
    return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
        dataFile.getCommitTime(), matchingKeys);
  }

可以看到获取结果时会执行实际的查找逻辑,即对比文件中所有记录和候选的列表,找出实际存在的recordKey列表。

2.2 tagLocationBacktoRecords分析

在查找完位置信息后,便可将位置信息推回给原始记录,其核心代码如下

protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
      JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
    // 将原始记录进行转化  
    JavaPairRDD<HoodieKey, HoodieRecord<T>> keyRecordPairRDD =
        recordRDD.mapToPair(record -> new Tuple2<>(record.getKey(), record));
    // 以HoodieKey进行一次左外连接,确定位置信息
    return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values()
        .map(v1 -> getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull())));
  }

可以看到通过左外连接便将之前的位置信息推回至原始记录中,这样便完成了对原始记录打位置标签过程。

3. 总结

Hudi默认采用的HoodieBloomIndex索引,其依赖布隆过滤器来判断记录存在与否,当记录存在时,会读取实际文件进行二次判断,以便修正布隆过滤器带来的误差。同时还在每个文件元数据中添加了该文件保存的最大和最小的recordKey,借助该值可过滤出无需对比的文件。

目录
相关文章
|
1月前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
35 0
|
1月前
|
存储 SQL 分布式计算
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
116 0
|
1月前
|
存储 分布式计算 Hadoop
一文了解Apache Hudi架构、工具和最佳实践
一文了解Apache Hudi架构、工具和最佳实践
110 0
|
1月前
|
SQL 分布式计算 NoSQL
使用Apache Hudi和Debezium构建健壮的CDC管道
使用Apache Hudi和Debezium构建健壮的CDC管道
17 0
|
1月前
|
存储 SQL 消息中间件
Apache Hudi:统一批和近实时分析的存储和服务
Apache Hudi:统一批和近实时分析的存储和服务
32 0
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
484 5
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1421 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1358 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
145 3
|
1月前
|
Oracle 关系型数据库 流计算
flink cdc 同步问题之报错org.apache.flink.util.SerializedThrowable:如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

热门文章

最新文章

推荐镜像

更多