Apache Hudi索引实现分析(三)之HBaseIndex

简介: Apache Hudi索引实现分析(三)之HBaseIndex

1. 介绍

前面分析了基于过滤器的索引,接着分析基于外部存储系统的索引实现:HBaseIndex。对于想自定义实现Index具有一定的借鉴作用。

2. 分析

HBaseIndex也是HoodieIndex的子类实现,其实现了父类的两个核心方法。

// 给输入记录RDD打位置标签 
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable);
// 更新位置信息    
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable);

在写入数据过程中,会调用tagLocation给输入记录打位置标签,其核心代码如下

public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
      HoodieTable<T> hoodieTable) {  
    return recordRDD.mapPartitionsWithIndex(locationTagFunction(hoodieTable.getMetaClient()), true);
  }

可以看到该方法主要使用了locationTagFunctionFunction来处理原始记录,其核心代码如下

private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>> locationTagFunction(
      HoodieTableMetaClient metaClient) {
    return (Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>) (partitionNum,
        hoodieRecordIterator) -> {
      // 每次取的批次大小   
      int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
      // 获取HBase连接
      synchronized (HBaseIndex.class) {
        if (hbaseConnection == null || hbaseConnection.isClosed()) {
          hbaseConnection = getHBaseConnection();
        }
      }
      List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
      HTable hTable = null;
      try {
        // 获取配置的表  
        hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
        List<Get> statements = new ArrayList<>();
        List<HoodieRecord> currentBatchOfRecords = new LinkedList<>();
        // 遍历该分区上的记录
        while (hoodieRecordIterator.hasNext()) {
          HoodieRecord rec = hoodieRecordIterator.next();
          // 根据recordKey生成Get  
          statements.add(generateStatement(rec.getRecordKey()));
          currentBatchOfRecords.add(rec);
          // 达到批量大小或者遍历完记录
          if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
            // 获取结果
            Result[] results = doGet(hTable, statements);
            // 清空便于GC回收
            statements.clear();
            for (Result result : results) {
              // 移除结果对应的的HoodieRecord
              HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
              if (result.getRow() != null) {
                // 取出key, commit时间,文件ID和分区路径  
                String keyFromResult = Bytes.toString(result.getRow());
                String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
                String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
                String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
                // 检查是否为合法的提交(包含在timeline或者小于最新的一次commit)
                if (checkIfValidCommit(metaClient, commitTs)) {
                  // 重新生成HoodieRecord  
                  currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
                      currentRecord.getData());
                  currentRecord.unseal();
                  // 设置位置信息  
                  currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
                  currentRecord.seal();
                  taggedRecords.add(currentRecord);
                  // the key from Result and the key being processed should be same
                  assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
                } else { // 非法提交,也标记为已打完标签
                  taggedRecords.add(currentRecord);
                }
              } else { // 标记为已打完标签
                taggedRecords.add(currentRecord);
              }
            }
          }
        }
      }
      return taggedRecords.iterator();
    };
  }

可以看到从HBase中取位置信息流程非常简单,即遍历指定分区上所有记录,然后批量生成recordKey从HBase索引表(表名自定义配置)取对应的信息,然后生成位置信息。

当写完数据后,需要调用updateLocation更新记录的位置信息,其核心代码如下

public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
      HoodieTable<T> hoodieTable) {
    // 根据配置(hoodie.index.hbase.qps.allocator.class)生成Allocator  
    final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
    // 根据Allocator进行初始化  
    setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
    // 使用Function处理  
    JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
    // 缓存状态RDD
    writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel());
    return writeStatusJavaRDD;
  }

其中updateLocationFunction核心代码如下

private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateLocationFunction() {
    return (Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>>) (partition, statusIterator) -> {
      List<WriteStatus> writeStatusList = new ArrayList<>();
      // 获取HBase连接
      synchronized (HBaseIndex.class) {
        if (hbaseConnection == null || hbaseConnection.isClosed()) {
          hbaseConnection = getHBaseConnection();
        }
      }
      try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
        // 遍历状态信息  
        while (statusIterator.hasNext()) {
          WriteStatus writeStatus = statusIterator.next();
          List<Mutation> mutations = new ArrayList<>();
          try {
            for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
              if (!writeStatus.isErrored(rec.getKey())) {
                // 获取新的位置信息  
                Option<HoodieRecordLocation> loc = rec.getNewLocation();
                if (loc.isPresent()) { // 新的位置信息存在
                  if (rec.getCurrentLocation() != null) { // 当前位置信息存在
                    // 表示更新,无需更新
                    continue;
                  }
                  // 根据HoodieRecord信息初始化Put  
                  Put put = new Put(Bytes.toBytes(rec.getRecordKey()));
                  put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime()));
                  put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId()));
                  put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(rec.getPartitionPath()));
                  mutations.add(put);
                } else { // 新的位置不存在
                  // 表示删除了该记录
                  Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey()));
                  mutations.add(delete);
                }
              }
              if (mutations.size() < multiPutBatchSize) {
                continue;
              }
              // 更新  
              doMutations(mutator, mutations);
            }
            // 处理剩余的更新
            doMutations(mutator, mutations);
          } 
          writeStatusList.add(writeStatus);
        }
      } 
      return writeStatusList.iterator();
    };
  }

可以看到当写完数据后,会更新位置信息,通过WriteStatus中的HoodieRecord的位置信息判断是否需要更新位置信息,对于更新无需要更新,对于新插入需要更新,对于删除需要删除HBase中存储的信息。

3. 总结

Hudi内置了HBase外置存储系统索引的实现,用户可直接配置HBase索引,将记录索引信息存入HBase,当然用户也可自定义实现其他类型索引。

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
4月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
286 2
|
1月前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
1月前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
3月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
2月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
69 3
|
2月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
41 2
|
2月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
60 1
|
2月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
92 0
|
3月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
162 11
|
4月前
|
存储 运维 数据处理
Apache Paimon:重塑阿里智能引擎数据处理新纪元,解锁高效存储与实时分析潜能!
【8月更文挑战第2天】探索 Apache Paimon 在阿里智能引擎的应用场景
221 2

推荐镜像

更多
下一篇
DataWorks