使用 Bucket Index 加速Apache Hudi 写入

简介: 使用 Bucket Index 加速Apache Hudi 写入

Apache Hudi 在写入路径上使用索引[1]来检测更新与插入,并将更新确定性地路由到同一文件组。Hudi 支持开箱即用的不同索引选项,如 bloom、simple、hbase、bucket、global_bloom、global_simple 等。我们将讨论 Apache Hudi 中的 bucket 索引以及它与其他索引的不同之处。

写入流程

这是一批数据摄取到 Hudi 的关键写入步骤。

关键阶段之一是上述所有阶段中的索引阶段。大多数情况下,索引查找将决定写入延迟,因为每个其他阶段都是合理限制或确定的。但是索引延迟取决于很多因素,例如表中的总数据、正在摄取的数据、分区与非分区、常规索引与全局索引、更新传播、记录关键时间特征等。所以,我们经常看到工程师/开发人员花时间尝试减少索引查找时间。

桶索引(Bucket Index)

与 Hudi 支持的所有其他索引相比,桶索引非常特殊。每个其他索引都有某种索引方式,索引查找涉及查找索引元数据和推断记录位置。然而在桶索引的情况下,它的记录键的哈希值或基于 Hudi 确定记录所在位置的某些列。其实我们可以只命名这个 StaticHashIndex 而不是 BucketIndex。无论如何计算哈希是 O(1) 并没有任何 IO操作,因此节省了写入期间索引所需的时间。

桶索引的唯一缺点是每个分区的桶数必须预先为给定的表定义。例如当启动一个新表时可以为每个分区定义 16 个桶,Hudi 将为表中的每个分区分配 16 个文件组。因此传入的记录通过 16 散列到 mod,然后路由到相应的文件组。每个文件组的写句柄将推断出插入/更新,并将基于此合并记录。

性能对比

进行了一个非常小规模的实验测试 Bloom索引和桶索引的区别。数据集特征:

• 总大小为 7GB,约 1300 万条记录,平均分布 10 个分区。

• 更新插入:抽取总批次的 50% 并尝试更新插入。

• upsert(第二次提交)比较 bloom 索引和桶索引的总写入延迟。

如下是两者的 Spark UI。使用桶索引可以明显地看到索引查找不涉及任何阶段,而对于 Bloom 索引可以看到其中有几个阶段/作业用于索引标记。Bloom索引 Spark UI

桶索引 Spark UI

Bloom索引代码

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_bloom_index"
val basePath = $TARGET_LOCATION
val inputPath = $INPUT_LOCATION // with parquet dataset as input. 
val df = spark.read.format("parquet").load(inputPath)
df.cache
df.write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
option("hoodie.datasource.write.operation","insert").
mode(Overwrite).
save(basePath)
// upsert 50% of same batch. 
df.sample(0.5).write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
mode(Append).
save(basePath)
注意:在 EMR 中 默认使用 Bloom 索引。本地测试默认索引是SIMPLE索引。
桶索引代码
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_bucket_index"
val basePath = $TARGET_LOCATION
val inputPath = $INPUT_LOCATION // with parquet dataset as input. 
val df = spark.read.format("parquet").load(inputPath)
df.cache
df.write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
option("hoodie.index.type","BUCKET").
option("hoodie.index.bucket.engine","SIMPLE").
option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").
option("hoodie.bucket.index.num.buckets","12").
option("hoodie.datasource.write.operation","insert").
mode(Overwrite).
save(basePath)
Upsert 50% of records:
df.sample(0.5).write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
option("hoodie.index.type","BUCKET").
option("hoodie.index.bucket.engine","SIMPLE").
option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").
option("hoodie.bucket.index.num.buckets","12").
mode(Append).
save(basePath)
如果更喜欢使用桶索引,这些是要设置的配置
option("hoodie.index.type","BUCKET").
option("hoodie.index.bucket.engine","SIMPLE").
option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").
option("hoodie.bucket.index.num.buckets","12")

注意:如果更喜欢对表使用桶索引,则必须重新开始。不能变更索引,如从 Bloom 切换到桶索引。

结论

如果用例非常适合存储桶索引,则可以大大加快写入延迟,可以选择任何列进行散列,如果没有,主键将用于散列。

目录
相关文章
|
4月前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
92 0
|
4月前
|
存储 机器学习/深度学习 Apache
如何将Apache Hudi应用于机器学习
如何将Apache Hudi应用于机器学习
43 0
|
4月前
|
Apache 索引
精进Hudi系列|Apache Hudi索引实现分析(五)之基于List的IndexFileFilter
精进Hudi系列|Apache Hudi索引实现分析(五)之基于List的IndexFileFilter
53 0
|
4月前
|
存储 SQL Apache
Apache Hudi与Delta Lake对比
Apache Hudi与Delta Lake对比
85 0
|
4月前
|
Apache
Apache Hudi Rollback实现分析
Apache Hudi Rollback实现分析
67 0
|
2月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
3月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之从hudi读数据,报错NoSuchMethodError:org.apache.hudi.format.cow.vector.reader.PaequetColumnarRowSplit.getRecord(),该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
存储 SQL 分布式计算
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
204 0
|
4月前
|
存储 分布式计算 Hadoop
一文了解Apache Hudi架构、工具和最佳实践
一文了解Apache Hudi架构、工具和最佳实践
793 0
|
4月前
|
SQL 分布式计算 NoSQL
使用Apache Hudi和Debezium构建健壮的CDC管道
使用Apache Hudi和Debezium构建健壮的CDC管道
56 0