一文聊透Apache Hudi的索引设计与应用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 一文聊透Apache Hudi的索引设计与应用

Apache Hudi索引在数据读和写的过程中都有应用。读的过程主要是查询引擎利用MetaDataTable使用索引进行Data Skipping以提高查找速度;写的过程主要应用在upsert写上,即利用索引查找该纪录是新增(I)还是更新(U),以提高写入过程中纪录的打标(tag)速度。

MetaDataTable

目前使能了"hoodie.metadata.enable"后,会在.hoodie目录下生成一张名为metadata的mor表,利用该表可以显著提升源表的读写性能。

该表目前包含三个分区:files, column_stats, bloom_filters,分区下文件的格式为hfile,采用该格式的目的主要是为了提高metadata表的点查能力。

其中files分区纪录了源表各个分区内的所有文件列表,这样hudi在生成源表的文件系统视图时就不必再依赖文件系统的list files操作(在云存储场景list files操作更有可能是性能瓶颈);TimeLine Server和上述设计类似,也是通过时间线服务器来避免对提交元数据进行list以生成hudi active timeline。

其中column_stats分区纪录了源表中各个分区内所有文件的统计信息,主要是每个文件中各个列的最大值,最小值,纪录数据,空值数量等。只有在开启了"hoodie.metadata.index.column.stats.enable"参数后才会使能column_stats分区,默认源表中所有列的统计信息都会纪录,也可以通过"hoodie.metadata.index.column.stats.column.list"参数单独设置。Hudi表每次提交时都会更新column_stats分区内各文件统计信息(这部分统计信息在提交前的文件写入阶段便已经统计好)。

其中bloom_filters分区纪录了源表中各个分区内所有文件的bloom_filter信息,只有在开启了"hoodie.metadata.index.bloom.filter.enable"参数后才会使能bloom_filters分区,默认纪录源表中record key的bloomfilter, 也可以通过"hoodie.metadata.index.bloom.filter.column.list"参数单独设置。

需要注意bloom_filter信息不仅仅存储在metadata表中(存在该表中是为了读取加速,减少从各个base文件中提取bloomfilter的IO开销)。Hudi表在开启了"hoodie.populate.meta.fields"参数后(默认开启),在完成一个parquet文件写入时,会在parquet文件的footerMetadata中填充bloomfilter相关参数, 其中"hoodie_bloom_filter_type_code"参数为过滤器类型,设置为默认的DYNAMIC_V0(可根据record key数量动态扩容);"org.apache.hudi.bloomfilter"参数为过滤器bitmap序列化结果;"hoodie_min_record_key"参数为当前文件record_key最小值;"hoodie_max_record_key"参数为当前文件record_key最大值。Hudi表提交时其Metadata表bloom_filters分区内的bloom_filter信息便提取自parquet文件footerMetadata的"org.apache.hudi.bloomfilter".

写入

Flink

对flink写入而言就是通过bucket_idx进行打标(仅支持分区内去重打标)或者bucket_assigner算子使用flink state进行打标(支持分区内以及全局去重打标,可通过参数控制,如果要进行全局去重需要使能index.global.enabled且不使能changelog.enabled),目前flink仅支持这两种方式,具体可参考hoodieStreamWrite方法。

对于upsert写入场景,flink state会随着写入数据量的增大而线性增大,导致越写越慢(打标过程变慢)的现象;而bucket_idx由于没有数据查找过程(通过纪录的record key直接哈希得到对应的filegroup进行打标),因此写入速度不会随数据量增大而线性增大。

如果应用场景需要对分区表进行全局去重,则只能使用flink state。如果上层业务允许,我们也可以通过变更表结构,将分区键加入到主键中作为主键的一部分来实现分区间的天然去重。

图2. 1 flink写入打标过程

对于metadata表而言,flink可以通过使能参数"hoodie.metadata.index.column.stats.enable"生成column_stats,flink可以在读优化查询时使用到列统计信息进行data skipping。

对于metadata表而言,flink可以通过使能参数"hoodie.metadata.index.bloom.filter.enable"生成bloom_filters,但是flink目前不支持在读时使用bloomfilter进行data skipping,也不支持在写时通过bloomfilter进行打标。

Spark

对spark写入而言就是对每条纪录调用index.tagLocation进行打标的过程。Spark目前支持SimpleIndex, GlobalSimpleIndex, BloomIndex, BucketIndex, HbaseIndex进行写入打标。

SimpleIndex通过在每个分区内进行InputRecordRdd left outer join ExistingRecordRdd的方式判断输入纪录是否已经存储在当前分区内;GlobalSimpleIndex和SimpleIndex类似,只不过left outer join该表内所有已存在数据而不是当前分区已存在数据。

BloomIndex通过column_stat_idx和bloom_filter_idx进行数据打标过滤:首先通过column_stat_idx(可以从metadata表中获取,也可从parquet footer metadata中获取,通过"hoodie.bloom.index.use.metadata"参数控制)的min,max值过滤掉纪录肯定不存在的文件(在record key递增且数据经过clustering的情况下可以过滤出大量文件)以获得纪录可能存在的文件。然后在纪录可能存在的文件中依次使用每个文件对应的bloomfilter(可以从metadata表中获取,也可从parquet footer metadata中获取,通过"hoodie.bloom.index.use.metadata"参数控制)判断该纪录是否一定不存在。最后得到每个文件可能包含的纪录列表,由于bloomfilter的误判特性,需要将这些纪录在文件中进行精准匹配查找以得到实际需要更新的纪录及其对应的location.

图2. 2 spark写入使用BloomIndex打标过程

BucketIndex和flink的bucket打标类似,通过hash(record_key) mod bucket_num的方式得到纪录实际应该插入的文件位置,如果该文件不存在则为插入,存在则为更新。

HbaseIndex通过外部hbase服务存储record key,因此打标过程需要和hbase服务进行交互,由于使用hbase存储,因此该索引天然是全局的。

读取

Flink

flink读取目前支持使用column_stats进行data skipping.建表时需要使能"metadata.enabled","hoodie.metadata.index.column.stats.enable","read.data.skipping.enabled",这三个参数。

Spark

spark读取目前支持使用column_stats进行data skipping.建表时需要使能" hoodie.metadata.enabled","hoodie.metadata.index.column.stats.enable","read.data.skipping.enabled",这三个参数。

总结

写入打标:


column_stat_idx bloom_filter_idx bucket_idx flink_state Simple Hbase_idx
Spark Y Y Y N flink only Y Y
Flink N N Y Y N spark only N

MetaDataTable表索引分区构建:


file_idx column_stat_idx bloom_filter_idx
Spark Y Y Y
Flink Y Y Y

读取data skipping:


column_stat_idx bloom_filter_idx bucket_idx
Spark Y N N
Flink Y N N

社区进展/规划

Column Stats Index

RFC-27 Data skipping(column_stats) index to improve query performance

状态:COMPLETED

简述:列统计索引的rfc设计

原理:列统计索引存储在metadata table中,使用hfile存储索引数据

图5. 1 hfile layout

HFile最大的优势是数据按照key进行了排序,因此点查速度很快。

图5. 2 column stats index storage format

由于HFile的前缀搜索速度很快,因此上述布局(一个列的统计信息在相邻的data block中)可以快速拿到一个列在各个文件中的统计信息。

https://cwiki.apache.org/confluence/display/HUDI/RFC-27+Data+skipping+index+to+improve+query+performance

https://github.com/apache/hudi/blob/master/rfc/rfc-27/rfc-27.md

RFC-58 Integrate column stats index with all query engines

状态:UNDER REVIEW

简述:集成列统计索引到presto/trino/hive

原理:基于RFC-27 metadata table中的column_stats index来实现上述引擎的data skipping,当前有两种可能的实现:基于列域(column domain, 域是一个列可能包含值的一个集合)的实现和基于hudiExpression的实现。

图5. 3 HudiExpression sketch

https://github.com/apache/hudi/pull/6345/files?short_path=e681037#diff-e6810379013465743bdbdab398ba78e45381edfd64399c125c16f21752f36728

https://github.com/apache/hudi/pull/6345

Bucket Index

RFC-29 Hash(bucket) Index

状态:COMPLETED

简述:bucket index的rfc设计

原理:对主键做hash后取桶个数的模(hash(pk) mod bucket_num), 即数据在写入时就按照主键进行了clustering,后续upsert可以直接通过hash找到对应的桶。

图5. 4 hash process

图5. 5 bucket-filegroup mapping

目前一个桶和一个filegroup一一对应,数据文件的前缀会加上bucketId。

https://cwiki.apache.org/confluence/display/HUDI/RFC+-+29%3A+Hash+Index

RFC-42 Consistent hashing index for dynamic bucket numbers

状态:ONGOING

简述:bucket index一致性哈希实现的rfc设计

原理:RFC-29实现的bucket index不支持动态修改桶个数,由此导致数据倾斜和一个file group size过大,采用一致性哈希可以在不改变大多数桶的情况下完成桶的分裂/合并,以尽可能小的减小动态调整桶数量时对读写的影响。

图5. 6 一致性哈希算法

通过Hash(v) % 0xFFFF得到一个范围hash值,然后通过一个range mapping layer将哈希值和桶关联起来,可以看到如果bucket#2过大,可以将其对应的范围0x5000-0xA000进行split分成两个桶,仅需要在这个范围内进行重新分桶/文件重写即可。

图5. 7 算法复杂度对比

https://github.com/apache/hudi/blob/master/rfc/rfc-42/rfc-42.md

https://github.com/apache/hudi/pull/4958

https://github.com/apache/hudi/pull/6737

Bloom Index

RFC-37 Metadata based Bloom Index

状态:COMPLETED

简述:bloom index的rfc设计

原理:将base文件内的bloom filter提取到metadata table中以减少IO,提升查找速度

图5. 8 bloom filter location in metadata table

图5. 9 bloom index storage format

Key的生成方式为:

key = base64_encode(concat(hash64(partition name), hash128(file name)))

因此一个分区内的文件天然的在HFile的相邻data block中,采用base64编码可以减少key的磁盘存储空间。

https://github.com/apache/hudi/blob/master/rfc/rfc-37/rfc-37.md

Record-level Index

RFC-08 Record-level index to speed up UUID-based upserts and deletes

状态:ONGOING

简述:记录级(主键)索引的rfc设计

原理:为每条记录生成recordKey <-> partition, fileId的映射索引,以加速upsert的打标过程。

图5. 10 行级索引实现

每条记录被哈希到对应的bucket中,每一个bucket中包含多个HFile文件,每个HFile文件的data block中包含recordKey <-> partition, fileId的映射。

https://cwiki.apache.org/confluence/display/HUDI/RFC-08++Record+level+indexing+mechanisms+for+Hudi+datasets

https://issues.apache.org/jira/browse/HUDI-53

https://github.com/apache/hudi/pull/5581

Secondary Index

RFC-52 Secondary index to improve query performance

状态:UNDER REVIEW

简述:二级(非主键/辅助)索引的rfc设计

原理:二级索引可以精确匹配数据行(记录级别索引只能定位到fileGroup),即提供一个column value -> row 的映射,如果查询谓词包含二级索引列就可以根据上述映射关系快速定位目标行。

图5. 11 二级索引架构

图5. 12 使用Lucene index进行谓词过滤

如上图所示:先通过row group统计信息进行首次过滤以加载指定page页,然后通过lucene索引文件(倒排索引,key为列值,value为row id集合)过滤出指定的行(以row id标识),合并各谓词的row id,加载各个列的page页并进行row id对齐,取出目标行。Lucene index只是二级索引框架下的一种可能实现。

https://github.com/apache/hudi/pull/5370

Function Index

RFC-63 Index on Function

状态:UNDER REVIEW

简述:函数索引的rfc设计

原理:通过sql或者hudi配置定义一个在某列上的函数作为函数索引,将其记录到表属性中,在数据写入时索引函数可以作为排序域,由此每个数据文件对应于索引函数值都有一个较小的min-max以进行有效的文件过滤,同时metadata table中也会维护文件级别的索引函数值对应的列统计信息。数据文件中不会新增索引函数值对应的列。

图5. 13 带timestamp的hudi表

如上图所示,一个场景需要过滤出每天1点到2点的数据,由于把timestamp直接转成小时将不会保序,就没法直接使用timestamp的min,max进行文件过滤,如果我们对timestamp列做一个HOUR(timestamp)的函数索引,然后将每个文件对应的函数索引min,max值记录到metadata table中,就可以快速的使用上述索引值进行文件过滤。

https://github.com/apache/hudi/pull/5370

Support data skipping for MOR

Hudi当前还不支持针对MOR表中log文件的索引,社区目前正在讨论中:

https://issues.apache.org/jira/browse/HUDI-3866

https://cwiki.apache.org/confluence/display/HUDI/RFC+-+06+%3A+Add+indexing+support+to+the+log+file

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
105 5
|
4月前
|
安全 网络协议 应用服务中间件
AJP Connector:深入解析及在Apache HTTP Server中的应用
【9月更文挑战第6天】在Java Web应用开发中,Tomcat作为广泛使用的Servlet容器,经常与Apache HTTP Server结合使用,以提供高效、稳定的Web服务。而AJP Connector(Apache JServ Protocol Connector)作为连接Tomcat和Apache HTTP Server的重要桥梁,扮演着至关重要的角色
111 2
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
59 1
|
3月前
|
存储 分布式计算 druid
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
82 1
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
|
3月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
80 3
|
4月前
|
Apache
多应用模式下,忽略项目的入口文件,重写Apache规则
本文介绍了在多应用模式下,如何通过编辑Apache的.htaccess文件来重写URL规则,从而实现忽略项目入口文件index.php进行访问的方法。
|
5月前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
6月前
|
存储 缓存 Apache
Apache Paimon 在蚂蚁的应用
本文整理自 Apache Paimon Committer 闵文俊老师在5月16日 Streaming Lakehouse Meetup · Online 上的分享。Apache Paimon 是一种实时数据湖格式,设计用于流批一体处理,支持实时更新和OLAP查询。它采用LSM Tree结构,提供多种Changelog Producer和Merge Engine,支持高效的数据合并。Paimon适用于流读、批读及时间旅行查询,与多种查询引擎兼容。在蚂蚁集团的应用中,Paimon降低了资源开销,提升了查询性能,简化了研发流程,特别是在去重、核对场景和离线查询加速方面表现突出。
716 7
Apache Paimon 在蚂蚁的应用
|
5月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
89 0
|
5月前
|
存储 Ubuntu 应用服务中间件
如何在 Ubuntu 14.04 上使用 Passenger 和 Apache 部署 Rails 应用
如何在 Ubuntu 14.04 上使用 Passenger 和 Apache 部署 Rails 应用
32 0

推荐镜像

更多