Halodoc使用Apache Hudi构建Lakehouse的关键经验

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: Halodoc使用Apache Hudi构建Lakehouse的关键经验

Halodoc 数据工程已经从传统的数据平台 1.0 发展到使用 LakeHouse 架构的现代数据平台 2.0 的改造。在我们之前的博客中,我们提到了我们如何在 Halodoc 实施 Lakehouse 架构来服务于大规模的分析工作负载。我们提到了平台 2.0 构建过程中的设计注意事项、最佳实践和学习。本博客中我们将详细介绍 Apache Hudi 以及它如何帮助我们构建事务数据湖。我们还将重点介绍在构建Lakehouse时面临的一些挑战,以及我们如何使用 Apache Hudi 克服这些挑战。

Apache Hudi

让我们从对 Apache Hudi 的基本了解开始。Hudi 是一个丰富的平台,用于在自我管理的数据库层上构建具有增量数据管道的流式数据湖,同时针对湖引擎和常规批处理进行了优化。Apache Hudi 将核心仓库和数据库功能直接引入数据湖。Hudi 提供表、事务、高效的 upserts/deletes、高级索引、流式摄取服务、数据Clustering/压缩优化和并发性,同时将数据保持为开源文件格式。Apache Hudi 可以轻松地在任何云存储平台上使用。Apache Hudi 的高级性能优化,使得使用任何流行的查询引擎(包括 Apache Spark、Flink、Presto、Trino、Hive 等)的分析工作负载更快。让我们看看在构建Lakehouse时遇到的一些关键挑战,以及我们如何使用 Hudi 和 AWS 云服务解决这些挑战。

在 LakeHouse 中执行增量 Upsert

每个人在构建事务数据湖时面临的主要挑战之一是确定正确的主键来更新数据湖中的记录。在大多数情况下都使用主键作为唯一标识符和时间戳字段来过滤传入批次中的重复记录。在 Halodoc,大多数微服务使用 RDS MySQL 作为数据存储。我们有 50 多个 MySQL 数据库需要迁移到数据湖,交易经历各种状态,并且在大多数情况下经常发生更新。

问题: MySQL RDS 以秒格式存储时间戳字段,这使得跟踪发生在毫秒甚至微秒内的事务变得困难,使用业务修改的时间戳字段识别传入批次中的最新交易对我们来说是一项挑战。我们尝试了多种方法来解决这个问题,通过使用 rank 函数或组合多个字段并选择正确的复合键。选择复合键在表中并不统一,并且可能需要不同的逻辑来识别最新的交易记录。

解决方案: AWS Data Migration Service 可以配置为具有可以添加具有自定义或预定义属性的附加标头的转换规则。

ar_h_change_seq:来自源数据库的唯一递增数字,由时间戳和自动递增数字组成。该值取决于源数据库系统。

标头帮助我们轻松过滤掉重复记录,并且我们能够更新数据湖中的最新记录。标头将仅应用于正在进行的更改。对于全量加载,我们默认为记录分配了 0,在增量记录中,我们为每条记录附加了一个唯一标识符。我们在 precombine 字段中配置 ar_h_change_seq 以从传入批次中删除重复记录。

Hudi配置:

precombine = ar_h_change_seq
hoodie.datasource.write.precombine.field: precombine
hoodie.datasource.write.payload.class: 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
hoodie.payload.ordering.field: precombine

数据湖中的小文件问题

在构建数据湖时,会发生频繁的更新/插入,从而导致每个分区中都有很多小文件。

问题: 让我们看看小文件在查询时是如何导致问题的。当触发查询以提取或转换数据集时,Driver节点必须收集每个文件的元数据,从而导致转换过程中的性能开销。

解决方案: 定期压缩小文件有助于保持正确的文件大小,从而提高查询性能。而Apache Hudi 支持同步和异步压缩。

• 同步压缩:这可以在写入过程本身期间启用,这将增加 ETL 执行时间以更新 Hudi 中的记录。

• 异步压缩:压缩可以通过不同的进程来实现,并且需要单独的内存来实现。这不会影响写入过程,也是一个可扩展的解决方案。

在 Halodoc,我们首先采用了同步压缩。慢慢地,我们计划采用基于表大小、增长和用例的混合压缩。

Hudi配置:

hoodie.datasource.clustering.inline.enable
hoodie.datasource.compaction.async.enable

保持存储大小以降低成本

数据湖很便宜,并不意味着我们应该存储业务分析不需要的数据。否则我们很快就会看到存储成本越来越高。Apache Hudi 会在每个 upsert 操作中维护文件的版本,以便为记录提供时间旅行查询。每次提交都会创建一个新版本的文件,从而创建大量版本化文件。

问题: 如果我们不启用清理策略,那么存储大小将呈指数增长,直接影响存储成本。如果没有业务价值,则必须清除较旧的提交。

解决方案: Hudi 有两种清理策略,基于文件版本和基于计数(要保留的提交数量)。在 Halodoc,我们计算了写入发生的频率以及 ETL 过程完成所需的时间,基于此我们提出了一些要保留在 Hudi 数据集中的提交。示例:如果每 5 分钟安排一次将数据摄取到 Hudi 的作业,并且运行时间最长的查询可能需要 1 小时才能完成,则平台应至少保留 60/5 = 12 次提交。

Hudi配置:

hoodie.cleaner.policy: KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained: 12

或者

hoodie.cleaner.policy: KEEP_LATEST_FILE_VERSIONS
hoodie.cleaner.fileversions.retained: 1

根据延迟和业务用例选择正确的存储类型

Apache Hudi 有两种存储类型,用于存储不同用例的数据集。一旦选择了一种存储类型,更改/更新到另外一种类型可能是一个繁琐的过程(CoW变更为MoR相对轻松,MoR变更为CoW较为麻烦)。因此在将数据迁移到 Hudi 数据集之前选择正确的存储类型非常重要。

问题: 选择不正确的存储类型可能会影响 ETL 执行时间和数据消费者的预期数据延迟。

解决方案: 在 Halodoc我们将这两种存储类型都用于我们的工作负载。MoR:MoR 代表读取时合并。我们为写入完成后需要即时读取访问的表选择了 MoR。它还减少了 upsert 时间,因为 Hudi 为增量更改日志维护 AVRO 文件,并且不必重写现有的 parquet 文件。MoR 提供数据集 _ro 和 _rt 的 2 个视图。

• _ro 用于读取优化表。

• _rt 用于实时表。

CoW:CoW 代表写时复制。存储类型 CoW 被选择用于数据延迟、更新成本和写入放大优先级较低但读取性能优先级较高的数据集。

type = COPY_ON_WRITE / MERGE_ON_READ
hoodie.datasource.write.table.type: type

文件列表很繁重,Hudi如何解决

一般来说分布式对象存储或文件系统上的 upsert 和更新是昂贵的,因为这些系统本质上是不可变的,它涉及跟踪和识别需要更新的文件子集,并用包含最新记录的新版本覆盖文件。Apache Hudi 存储每个文件切片和文件组的元数据,以跟踪更新插入操作的记录。

问题: 如前所述,在不同分区中有大量文件是Driver节点收集信息的开销,因此会导致内存/计算问题。

解决方案: 为了解决这个问题,Hudi 引入了元数据概念,这意味着所有文件信息都存储在一个单独的表中,并在源发生变化时进行同步。这将有助于 Spark 从一个位置读取或执行文件列表,从而实现最佳资源利用。这些可以通过以下配置轻松实现。Hudi配置

hoodie.metadata.enabled: true

为 Hudi 数据集选择正确的索引

在传统数据库中使用索引来有效地从表中检索数据。Apache Hudi 也有索引概念,但它的工作方式略有不同。Hudi 中的索引主要用于强制跨表的所有分区的键的唯一性。

问题: 想要构建事务数据湖时,维护/限制每个分区或全局分区中的重复记录始终至关重要

解决方案: Hudi 通过使用 Hudi 数据集中的索引解决了这个问题,它提供全局和非全局索引。默认情况下使用Bloom Index。目前Hudi支持:

• Bloom Index:使用由记录键构建的Bloom过滤器,还可以选择使用记录键范围修剪候选文件。

• Simple Index:对存储表中的记录和传入更新/删除记录进行连接操作。

• Hbase Index:管理外部 Apache HBase 表中的索引映射。

在 Halodoc,我们利用全局 Bloom 索引,以便记录在分区中是唯一的,使用索引时必须根据源行为或是否有人想要维护副本做出决定。

总结

在 Halodoc过去 6 个月我们一直在使用 Apache Hudi,它一直很好地服务于大规模数据工作负载。一开始为 Apache Hudi 选择正确的配置涉及一些学习曲线。在这篇博客中,我们分享了我们在构建 LakeHouse 时遇到的一些问题,以及在生产环境中使用 Apache Hudi 时正确配置参数/配置的最佳实践。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
23天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
67 5
|
23天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
57 4
|
1月前
|
消息中间件 分布式计算 大数据
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
63 5
|
1月前
|
存储 SQL 分布式计算
大数据-162 Apache Kylin 全量增量Cube的构建 Segment 超详细记录 多图
大数据-162 Apache Kylin 全量增量Cube的构建 Segment 超详细记录 多图
58 3
|
22天前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
101 61
|
1月前
|
Java 大数据 数据库连接
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
29 2
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
|
22天前
|
分布式计算 大数据 Apache
Apache Spark & Paimon Meetup · 北京站,助力 LakeHouse 架构生产落地
2024年11月15日13:30北京市朝阳区阿里中心-望京A座-05F,阿里云 EMR 技术团队联合 Apache Paimon 社区举办 Apache Spark & Paimon meetup,助力企业 LakeHouse 架构生产落地”线下 meetup,欢迎报名参加!
86 3
|
1月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
609 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
69 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

推荐镜像

更多
下一篇
无影云桌面