Apache Hudi索引实现分析(二)之HoodieGlobalBloomIndex

简介: Apache Hudi索引实现分析(二)之HoodieGlobalBloomIndex

1. 介绍

前面分析了Hudi默认的索引实现HoodieBloomIndex,其是基于分区记录所在文件,即分区路径+recordKey唯一即可,Hudi还提供了HoodieGlobalBloomIndex的实现,即全局索引实现,只需要recordKey唯一即可,下面分析其实现。

2. 分析

HoodieGlobalBloomIndex是HoodieBloomIndex的子类,其主要重写了父类的如下几个方法

// 加载分区下所有最新的文件
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc, final HoodieTable hoodieTable)
// 查找记录对应的文件
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD)
// 将位置信息推回至原始记录
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD)

对于加载分区下所有最新文件而言, loadInvolvedFiles核心代码如下

List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
                                                             final HoodieTable hoodieTable) {
    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
    try {
      // 获取所有分区
      List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
          config.shouldAssumeDatePartitioning());
      // 调用父类方法加载所有分区下最新数据文件
      return super.loadInvolvedFiles(allPartitionPaths, jsc, hoodieTable);
    } catch (IOException e) {
      throw new HoodieIOException("Failed to load all partitions", e);
    }
  }

首先会获取所有的分区路径,然后调用父类方法获取分区下最新数据文件。

对于查找记录对应的文件而言, explodeRecordRDDWithFileComparisons核心代码如下

JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
    // 使用索引过滤器,根据之前读取的最大和最小recordKey进行初始化
    IndexFileFilter indexFileFilter =
        config.getBloomIndexPruneByRanges() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
            : new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo);
    return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
      String recordKey = partitionRecordKeyPair._2();
      String partitionPath = partitionRecordKeyPair._1();
      // 获取匹配的文件和partition
      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
          .map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
              new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
          .collect(Collectors.toList());
    }).flatMap(List::iterator);
  }

可以看到和 HoodieBloomIndex#explodeRecordRDDWithFileComparisons处理逻辑类似,在使用索引过滤器获取所有匹配的文件和分区路径时,此时比较的是所有分区下的文件,不再是指定的分区路径。

对于将位置信息推回至原始记录而言, tagLocationBacktoRecords核心代码如下

protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
      JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
    // 对原始记录进行转化
    JavaPairRDD<String, HoodieRecord<T>> incomingRowKeyRecordPairRDD =
        recordRDD.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
    // 对带有位置信息的记录也进行一次转化
    JavaPairRDD<String, Tuple2<HoodieRecordLocation, HoodieKey>> existingRecordKeyToRecordLocationHoodieKeyMap =
        keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1)));
    // 以recordKey进行一次左外连接,确定位置信息
    return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record -> {
      final HoodieRecord<T> hoodieRecord = record._1;
      final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record._2;
      if (recordLocationHoodieKeyPair.isPresent()) {
        // Record key matched to file
        return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
      } else {
        return getTaggedRecord(hoodieRecord, Option.empty());
      }
    });
  }

其处理逻辑与父类处理逻辑相同,也是使用一次左外连接将位置信息推回至原始记录。

3. 总结

对于 HoodieGlobalBloomIndex而言,其是全局的索引,即会在所有分区内查找指定的recordKey,而非像 HoodieBloomIndex只在指定的分区内查找,同时在加载分区下所有最新文件时,其会首先获取所有分区,然后再获取所有分区下的最新文件,而非使用从原始记录中解析出来的分区路径。

目录
相关文章
|
1月前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
34 0
|
1月前
|
存储 SQL 分布式计算
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
116 0
|
1月前
|
存储 分布式计算 Hadoop
一文了解Apache Hudi架构、工具和最佳实践
一文了解Apache Hudi架构、工具和最佳实践
103 0
|
1月前
|
SQL 分布式计算 NoSQL
使用Apache Hudi和Debezium构建健壮的CDC管道
使用Apache Hudi和Debezium构建健壮的CDC管道
17 0
|
1月前
|
存储 SQL 消息中间件
Apache Hudi:统一批和近实时分析的存储和服务
Apache Hudi:统一批和近实时分析的存储和服务
32 0
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
483 5
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1418 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1356 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
143 3
|
1月前
|
Oracle 关系型数据库 流计算
flink cdc 同步问题之报错org.apache.flink.util.SerializedThrowable:如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

热门文章

最新文章

推荐镜像

更多