Apache Hudi索引实现分析(二)之HoodieGlobalBloomIndex

简介: Apache Hudi索引实现分析(二)之HoodieGlobalBloomIndex

1. 介绍

前面分析了Hudi默认的索引实现HoodieBloomIndex,其是基于分区记录所在文件,即分区路径+recordKey唯一即可,Hudi还提供了HoodieGlobalBloomIndex的实现,即全局索引实现,只需要recordKey唯一即可,下面分析其实现。

2. 分析

HoodieGlobalBloomIndex是HoodieBloomIndex的子类,其主要重写了父类的如下几个方法

// 加载分区下所有最新的文件
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc, final HoodieTable hoodieTable)
// 查找记录对应的文件
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo, JavaPairRDD<String, String> partitionRecordKeyPairRDD)
// 将位置信息推回至原始记录
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD)

对于加载分区下所有最新文件而言, loadInvolvedFiles核心代码如下

List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
                                                             final HoodieTable hoodieTable) {
    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
    try {
      // 获取所有分区
      List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
          config.shouldAssumeDatePartitioning());
      // 调用父类方法加载所有分区下最新数据文件
      return super.loadInvolvedFiles(allPartitionPaths, jsc, hoodieTable);
    } catch (IOException e) {
      throw new HoodieIOException("Failed to load all partitions", e);
    }
  }

首先会获取所有的分区路径,然后调用父类方法获取分区下最新数据文件。

对于查找记录对应的文件而言, explodeRecordRDDWithFileComparisons核心代码如下

JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
    // 使用索引过滤器,根据之前读取的最大和最小recordKey进行初始化
    IndexFileFilter indexFileFilter =
        config.getBloomIndexPruneByRanges() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
            : new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo);
    return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
      String recordKey = partitionRecordKeyPair._2();
      String partitionPath = partitionRecordKeyPair._1();
      // 获取匹配的文件和partition
      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
          .map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
              new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
          .collect(Collectors.toList());
    }).flatMap(List::iterator);
  }

可以看到和 HoodieBloomIndex#explodeRecordRDDWithFileComparisons处理逻辑类似,在使用索引过滤器获取所有匹配的文件和分区路径时,此时比较的是所有分区下的文件,不再是指定的分区路径。

对于将位置信息推回至原始记录而言, tagLocationBacktoRecords核心代码如下

protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
      JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
    // 对原始记录进行转化
    JavaPairRDD<String, HoodieRecord<T>> incomingRowKeyRecordPairRDD =
        recordRDD.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
    // 对带有位置信息的记录也进行一次转化
    JavaPairRDD<String, Tuple2<HoodieRecordLocation, HoodieKey>> existingRecordKeyToRecordLocationHoodieKeyMap =
        keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1)));
    // 以recordKey进行一次左外连接,确定位置信息
    return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record -> {
      final HoodieRecord<T> hoodieRecord = record._1;
      final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record._2;
      if (recordLocationHoodieKeyPair.isPresent()) {
        // Record key matched to file
        return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
      } else {
        return getTaggedRecord(hoodieRecord, Option.empty());
      }
    });
  }

其处理逻辑与父类处理逻辑相同,也是使用一次左外连接将位置信息推回至原始记录。

3. 总结

对于 HoodieGlobalBloomIndex而言,其是全局的索引,即会在所有分区内查找指定的recordKey,而非像 HoodieBloomIndex只在指定的分区内查找,同时在加载分区下所有最新文件时,其会首先获取所有分区,然后再获取所有分区下的最新文件,而非使用从原始记录中解析出来的分区路径。

目录
相关文章
|
2月前
|
存储 SQL Apache
为什么 Apache Doris 是比 Elasticsearch 更好的实时分析替代方案?
本文将从技术选型的视角,从开放性、系统架构、实时写入、实时存储、实时查询等多方面,深入分析 Apache Doris 与 Elasticsearch 的能力差异及性能表现
为什么 Apache Doris 是比 Elasticsearch 更好的实时分析替代方案?
|
6月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
267 5
|
6月前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
6月前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
8月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
7月前
|
存储 数据挖掘 数据处理
Apache Paimon 是一款高性能的数据湖框架,支持流式和批处理,适用于实时数据分析
【10月更文挑战第8天】随着数据湖技术的发展,越来越多企业开始利用这一技术优化数据处理。Apache Paimon 是一款高性能的数据湖框架,支持流式和批处理,适用于实时数据分析。本文分享了巴别时代在构建基于 Paimon 的 Streaming Lakehouse 的探索和实践经验,包括示例代码和实际应用中的优势与挑战。
247 1
|
7月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
152 3
|
7月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
107 2
|
7月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
119 1
|
8月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
297 11

推荐镜像

更多