AnalyticDB基于Apache Hudi构建低成本Lakehouse实践

简介: 基于低成本对象存储OSS + Apache Hudi 构建Lakehouse的挑战与实践

1. AnalyticDB MySQL产品架构

首先介绍下 AnalyticDB MySQL(下简称ADB)产品架构 ADB湖仓版产品架构包含自研和开源两部分。ADB湖仓版在数据全链路的「采存算管用」5 大方面都进行了全面升级和建设。

在「采集」方面,我们推出了数据管道 APS 功能,可以一键低成本接入数据库、日志、大数据中的数据,解决数据入湖仓的问题。

在「存储」方面,我们除了内置Hudi /Delta格式的外表数据湖格式能力,也对内部存储进行了升级改造。通过只存一份数据,同时满足离线、在线 2 类场景。

在「计算」方面,我们对自研的 XIHE BSP SQL 引擎进行容错性、运维能力等方面的提升,同时引入开源 Spark 引擎满足更复杂的离线处理场景和机器学习场景。

在「管理」方面,我们推出了统一的元数据管理服务,统一湖仓元数据及权限,让湖仓数据的流通更顺畅。

在「应用」方面,除了通过SQL方式的BI分析应用外,还支持基于 Spark 的 AI 应用。

我们希望通过在做深自研的同时,也充分拥抱开源技术,来满足不同客户的不同业务场景,帮助客户实现降本增效。

 

2. 基于 Hudi 支持日增千亿数据入湖

ADB积极拥抱开源生态来满足已经在开源生态上构建数据平台的客户可以更平滑地使用湖仓版,如内置支持包括Spark、Hudi、Delta等开源计算引擎与存储引擎。在Parquet/ORC/JSON/CSV 等 Append 类型数据格式的基础上,为支持在对象存储OSS上构建低成本 LakeHouse,集成支持Apache Hudi,并对Hudi进行功能/性能/易用性增强。

通过ADB APS服务将源端数据近实时以Hudi格式写入OSS构建Lakehouse或通过Spark以离线方式以Hudi格式写入OSS构建Lakehouse,基于Apache Hudi提供近实时的更新、删除能力以及APS/Spark近实时/离线写入能力,满足客户对低成本构建Lakehouse的诉求。

接着介绍 ADB 基于 Apache Hudi 构建日增千亿数据入湖所面临的挑战及实践。

2.1 日志型数据入湖场景介绍

数据入湖典型的业务场景如下:业务源端数据通过数据采集进入阿里云 SLS,然后通过 ADB APS 数据管道服务进入ADB 湖仓版,基于 ADB 湖仓版之上构建日志查询、日志导出、日志分析等功能。

该业务场景有如下典型的特点:

  • 高吞吐,单链路最高 4GB/s 吞吐,日增数据量 350TB,总存储量超 20PB
  • 数据倾斜/热点严重:源端数据倾斜非常严重,从百万到几十条数据不等
  • 扫描量大:日志查询的扫描量 50GB ~ 500GB 不等,查询并发在 100+

如果使用数仓的话首先面临成本高的问题,另外数仓缺少热点打散能力,只能不断加资源,瓶颈明显;最后数仓系统资源固化,没有动态弹性能力或者弹性能力较弱,承载不同客户查询需求时,容易互相干扰,尤其是大客户的数据扫描。

2.2 日志型数据入湖架构

基于SLS的日志数据入湖的技术架构非常简单,整体来看就是 Flink 从 SLS 读取数据然后写入 Hudi,架构非常简单,其中对于 SLS 和 Flink 的状态存储说明如下:

  • SLS 多 Shard 存储,由 Flink 的多个 Source 算子并行消费
  • 消费后通过 Sink 算子调用 Hudi Worker/Writer 写出到 Hudi(实际链路还存在 Repartition,热点打散等逻辑)
  • Flink Checkpoint 后端存储以及 Hudi 数据存储在OSS

2.3 Flink Hudi写入流程

接着来看下 Flink 写入 Hudi 的主流程,首先明确 Flink 写 Hudi 时会存在两种角色,Coordinator 负责处理元数据,如对 Hudi Instant 的相关操作,Worker/Writer 负责处理数据,如写入数据为 parquet 格式,写入步骤如下

  1. Coordinator 开启一个 Hudi Instant
  2. Filnk Sink 发送数据给 Hudi Worker 写出
  3. 触发 Flink Checkpoint 时,则通过 Sink 算子通知 Worker Flush 数据,同时持久化operator-state
  4. 当 Flink 完成 Checkpoint 持久化时,通知  Coordinator 提交该 Instant,Coordinator 完成最终提交,写 commit 文件,数据对外可见
  5. 提交后会立即开启一个新的 Instant,继续上述循环

2.4 数据一致性挑战

把 Flink 写 Hudi 保证端到端一致性分成两部分,一部分是 Flink 框架内部的,另外一个部分是与 Hudi 交互的部分。

  • 其中步骤 1 到 3 之间是 Flink Checkpoint 逻辑,如果异常在这些步骤上发生,则认为 Checkpoint失败,触发 Job 重启,从上一次 Checkpoint 恢复,相当于两阶段提交的 Precommit 阶段失败,事务回滚,如果有 Hudi 的 inflight commit,等待 Hudi Rollback 即可,无数据不一致问题
  • 当 3 到 4 之间发生异常,则出现 Flink 和 Hudi 状态不一致。此时 Flink 认为 Checkpoint 已结束,而 Hudi 实际尚未提交。如果对此情况不做处理,则发生了数据丢失,因为Flink Checkpoint 完毕后,SLS 位点已经前移,而这部分数据在 Hudi 上并未完成提交,因此容错的重点是如何处理此阶段引起的数据一致性问题
  • 我们拿一个例子举例分析在步骤 3 和 4 之前发生异常时,如果保证数据一致性
  • 否则认为上一次 Instant 执行失败,等待 Rollback 即可,脏数据对用户不可见

我们举例分析下在步骤 3 和 4 之间发生异常时,是如何保证数据一致性的,可以看到对于1615 的 commit 在 Flink 完成 Checkpoint 时会将其 instant 信息持久化至 Flink 后端存储。

从 checkpoint 恢复时有如下步骤:

  1. Checkpoint 时 Sink 算子 Flush 数据及持久化 Instant 的 state;
  2. Worker 请求处于 pending 的 Instant,与从 state 恢复的 Instant 做对比并汇报给 Coordinator;
  3. Coordinator 从 Instant Timeline 中获取最新的 Instant 信息,并接收所有 Worker 的汇报;
  4. 如果 Worker 汇报 Instant 相同,并且不在 Timeline 中已完成的 Instant 中,则表示该 Instant 尚未提交,触发 Recommit。

经过上述步骤可以保证在 Flink 完成 Checkpoint 时,但对于 Hudi Commit 失败时的场景会进行 recommit,从而保证数据的一致性。

2.5 热点处理挑战

在处理 4GB/s 的高吞吐流量时面临一个非常大的挑战就是热点数据处理,我们统计了 5 分钟内各 Task 处理数据的大小,发现各 Task 处理数据从 200W 条到几十条不等,热点问题明显。

而在写入链路中会根据分区字段做 shuffle,同一个分区由一个 Task 写入,对于上述存在的热点问题会导致部分TM上的分区写入非常慢,导致数据延迟/作业挂掉。

面对写入热点问题,我们开发了热点打散功能,通过配置指定规则打散热点数据,同时可以根据数据流量自动更新热点打散规则,确保系统的健壮性,经过热点打散后个 TM 处理的数据量/CPU占用/内存占用基本相同并且比较平稳,作业稳定性也得到了提升。

2.6 作业OOM挑战

另外一个挑战是 OOM,其实和热点打散也有很大关系,我们发现作业运行时会出现OOM,导致作业挂掉,数据延迟上涨,因此我们对堆外/堆内内存的使用做了比较细致的梳理,使用内存的部分主要集中在:

  • 写 Parquet 文件占用堆外内存
  • OSS Flush 占用堆外内存
  • 单 TM 的 Slot 数、写并发都影响内存占用,如每个写并发处理 30-50 Handle,TM 16 并发,8M row group size 最多占用 6400 M 内存
  • 堆内内存负载过高导致频繁Full GC

我们针对上述内存使用做了优化,如:

  • row group size 配置为 4M,减少堆外内存占用,同时将堆外内存调大
  • close 时及时释放 compressor 占用的内存,这部分对 parquet 源码做了改造
  • 透出堆外内存指标,增加堆外内存监控,这部分也是对 parquet 源码做了改造
  • 源端 source 算子与 Shard 分配更均衡,以此保证各 TM 消费的 shard 数基本均等

2.7 OSS限流挑战

最后一个比较大的挑战就是 OSS 限流,云对象存储(如OSS)对 List 操作不友好,list objects 对 OSS 服务器压力较大,如在我们场景下,1500 写并发,会产生 1W QPS list object,OSS 侧目前限流 1K QPS,限流明显,限流会导致作业处理变慢,延迟变高。为解决该问题,我们详细梳理了写入链路对 OSS 的请求,在元数据链路对 OSS 的请求如下:

  • Timeline 构建需要 list .hoodie 目录
  • Flink CkpMetaData 基于 OSS 传递给 Worker
  • Hadoop-OSS SDK create/exists/mkdir 函数依赖 getStatus 接口,getStatus 接口现有实现导致大量 list 请求,其中 getStatus 接口对于不存在的文件,会额外进行一次 list objects 请求确认 Path 是不是目录,对 Marker File、partitionMetadata、数据文件都会产生大量的 list objects 请求

在数据链路对 OSS 请求如下:

  • 先临时写到本地磁盘,再上传至 OSS,导致本地磁盘写满。

针对上述对 OSS 的请求,我们做了如下优化,在元数据侧:

  1. Timeline Based CkpMetaData,将TM请求打到 JM,避免大量 TM 扫描 OSS 文件
  2. Hadoop-OSS SDK,透出直接创建文件的接口,不进行目录检查
  3. PartitionMetaData 缓存处理,在内存中对每个分区的元数据文件做了缓存处理,尽量减少与 OSS 的交互
  4. Create Marker File 异步处理,异步化处理不阻塞对 Handle 的创建,减少创建 Handle 的成本
  5. 开启 Timeline Based Marker File,这个是社区已经有的能力,直接开启即可

这里额外补充下可能有小伙伴比较好奇为什么开启 Hudi Metadata Table 来解决云对象存储的限流问题,我们内部做过测试,发现开启 Metadata Table 时,写入会越来越慢,无法满足高吞吐的场景。

以上就是我们在处理千亿日志数据入湖时面临的典型挑战以及我们如何克服这些挑战,接着讲讲我们在处理数据入湖时为满足业务要求做的关键特性开发。

2.8 高级特性-并发写支持

首先是支持并发写,业务侧要求链路有补数据能力,补数据场景涉及多 Flink Client 写不同分区,实时写链路,补数据链路,Table Service 链路并发操作表数据/元数据,这要求:

  • 表数据不错乱
  • 补数据/TableService 链路不影响实时写链路

因此我们对 Hudi 内核侧做了部分修改来支持并发写场景:

  • CkpMetadata 唯一标识,保证不同作业使用不同 ckp meta
  • ViewStorage 唯一标识,保证不同作业 Timeline Server 隔离
  • Lazy Table Service,保证并行作业不互相 rollback commit,避免数据错乱
  • Instant 生成重试策略,保证 Timeline Instant 的唯一性,避免数据错乱
  • 独立 Table Service 处理,使用单独的作业运行 Table Service,与实时写链路完全隔离

2.9 高级特性-分区TTL管理

另外一个关键特性是出于成本考虑,业务侧要求 Hudi 中数据不能无限地保存,需要按照用户设定的策略保留指定时间的数据,这要求:

  • Hudi 提供分区级别按照数据量,分区数和过期时间等不同维度进行生命周期管理的能力
  • Hudi 支持并发设置生命周期管理策略,因为面向多租户会涉及并发更新管理策略

针对业务对生命周期管理的需求,我们开发 Hudi 的生命周期管理功能,具体实现如下:

  • 对于生命周期管理使用,首先通过 call command 添加生命周期管理策略,并进行持久化,为支持并发更新,我们参考 Hudi MOR 表中 Base 文件和 Log 文件的设计,并发更新会写入不同的 Log 文件;
  • 对于生命周期管理的执行,在每一次 commit 结束后进行统计信息增量采集并更新至统计信息文件,然后按照分区策略进行过期分区的处理,对于过期分区会生成一个 replace commit,等待后续被 clean 即可,同时会合并前面的策略 Base 文件和 Log 文件,生成新的 Base 文件以及更新统计信息;

我们也提供了按照分区数、数据量、过期时间三种不同策略来管理 Hudi 表中的分区的生命周期,很好的满足业务侧的需求。

2.10 高级特性-独立TableService

最后一个比较大的关键特性是独立 TableService,业务侧要求保证实时写链路稳定,同时希望提高入湖数据的查询性能,这要求:

  • 在不影响主链路同步性能情况下,清理 Commits/文件版本,保证表状态大小可控
  • 为提高查询性能,提供异步 Clustering 能力,合并小文件,减少扫描量,提高查询性能

基于上述诉求我们开发了基于 ADB 湖仓版的独立 Table Service 服务,在入湖链路写入完成后会进行一次调度,然后将请求写入调度组件,供调度组件后续拉起弹性的 Flink/Spark TableService 作业,可以做到对实时写入链路无影响。

对于表状态管理以及数据布局优化均是采用的独立 TableService 作业执行,保证表的状态大小可控,同时通过异步 Clustering 操作,端到端查询性能提升 40% 以上。

2.11 指标监控与告警

在对日志入湖链路进行了深入打磨后,我们可以保证最高 4GB/s 的数据写入,延迟在 5min内,满足业务诉求。

同时也建设了指标监控大盘与异常链路告警功能,便于快速定位问题以及出现问题后快速响应。

3. 基于 AnalyticDB 构建 Lakehouse 实践

最后介绍下基于 ADB湖仓版 构建 Lakehouse 实践。

前面也提到 ADB 湖仓版拥抱开源技术,ADB 集成了流式处理引擎 Flink,并在此基础上推出了 APS 数据管道服务,APS 具备如下优势:

  • 低成本,低延迟:作业级别弹性资源,按量付费;按流量自由设定作业资源;充分享受 Flink 流式处理性能红利
  • 多数据源快速集成:得益于 Flink 成熟的 Connectors 机制,可以方便对接如 SLS、Kafka 等数据源,同时可以保证数据入湖的精确一致性
  • 低使用门槛:支持白屏化操作快速构建 Lakehouse,基于统一元数据服务,Lakehouse 数据可通过 Spark/ADB 引擎无缝访问

而为了满足客户对于批处理以及机器学习能力的诉求,ADB 集成了 Spark 引擎,并在此技术上推出了 Servlersss Spark,其具备如下优势:

  • 一份数据存储,在离线共享:无缝对接 ADB 已有元数据和数据;支持大吞吐读写 ADB 数据;Spark 批量写入的数据,在线分析查询可直接访问
  • 数据库体系&体验:使用 ADB 统一的账号、权限和鉴权体系;支持通过 ADB Workflow、DMS 以及 DataWorks 调度编排 SparkSQL 作业
  • 完全兼容 Spark 生态:基于最新的 Apache Spark 3.X 版本,充分享受开源社区红利;支持 SparkSQL、DataFrame API 主流编程接口以及 ThriftServer;支持 Spark UDF,支持 Hive UDF/UDTF/UDAF
  • 按量计费,秒级弹性:开箱即用,按量计费无任何持有成本;基于神龙/ECS/ECI 的管控底座以及资源池化,缓存加速等技术,支持 Spark Pod 秒级拉起

对于实时性有诉求的场景,可以基于 ADB APS 服务非常方便的构建准实时 Lakehouse,白屏化操作快速配置入湖通道,多种数据源支持,满足不同数据源接入诉求,更多数据源也在持续集成中。

而对于实时性没有诉求的场景,可以基于 Spark + Hudi + ADB 工作编排构建离线 Lakehouse,如对 RDS 数据构建离线Lakehouse进行分析,可使用ADB工作编排,利用 Spark 将 RDS 数据离线增量导入 Lakehouse,并做数据的清洗和加工,有需要最后可通过一条简单的 Spark SQL将数据从 Hudi 导入 ADB 做查询分析加速。

另外 ADB Spark 与 Hudi 和 ADB 表都做了深度集成,便于客户使用,如对于 Hudi 表的使用,免去了很多 Hudi 额外的配置,开箱即用;对于 ADB 表,可通过 Spark 创建、删除 ADB 表元数据,也支持读写 ADB 表数据。

4. 总结展望

借助 AnalyticDB MySQL湖仓版,客户可以很方便地基于OSS存储底座构建准实时/离线低成本Lakehouse,同时为进一步降低客户构建Lakehouse的门槛与成本,ADB 团队也在积极产品化 Hudi TableService,包括异步数据聚簇、索引构建等更高级特性,满足快速入湖的同时,加速查询分析。

阿里云AnalyticDB MySQL升级为湖仓一体架构,支持高吞吐离线处理和高性能在线分析,可无缝替换CDH/TDH/Databricks/Presto/Spark/Hive等。试用活动(5000ACU时+100GB存储)正在火热申请中,申请链接:https://free.aliyun.com/?searchKey=AnalyticDB%20MySQL

钉钉交流群号:33600023146

相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
存储 数据管理 物联网
328 0
存储 SQL 分布式计算
184 0
|
5月前
|
SQL 存储 运维
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
本文介绍了 Apache Doris 在菜鸟的大规模落地的实践经验,菜鸟为什么选择 Doris,以及 Doris 如何在菜鸟从 0 开始,一步步的验证、落地,到如今上万核的规模,服务于各个业务线,Doris 已然成为菜鸟 OLAP 数据分析的最优选型。
366 2
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
|
5月前
|
机器学习/深度学习 算法 大数据
构建数据中台,为什么“湖仓一体”成了大厂标配?
在大数据时代,数据湖与数据仓库各具优势,但单一架构难以应对复杂业务需求。湖仓一体通过融合数据湖的灵活性与数据仓的规范性,实现数据分层治理、统一调度,既能承载海量多源数据,又能支撑高效分析决策,成为企业构建数据中台、推动智能化转型的关键路径。
|
6月前
|
存储 SQL 分布式计算
MaxCompute x 聚水潭:基于近实时数仓解决方案构建统一增全量一体化数据链路
聚水潭作为中国领先的电商SaaS ERP服务商,致力于为88,400+客户提供全链路数字化解决方案。其核心ERP产品助力企业实现数据驱动的智能决策。为应对业务扩展带来的数据处理挑战,聚水潭采用MaxCompute近实时数仓Delta Table方案,有效提升数据新鲜度和计算效率,提效比例超200%,资源消耗显著降低。未来,聚水潭将进一步优化数据链路,结合MaxQA实现实时分析,赋能商家快速响应市场变化。
283 0
|
6月前
|
分布式计算 Serverless OLAP
实时数仓Hologres V3.1版本发布,Serverless型实例从零开始构建OLAP系统
Hologres推出Serverless型实例,支持按需计费、无需独享资源,适合新业务探索分析。高性能查询内表及MaxCompute/OSS外表,弹性扩展至512CU,性能媲美主流开源产品。新增Dynamic Table升级、直读架构优化及ChatBI解决方案,助力高效数据分析。
实时数仓Hologres V3.1版本发布,Serverless型实例从零开始构建OLAP系统
|
6月前
|
SQL 分布式计算 DataWorks
破界·融合·进化:解码DataWorks与Hologres的湖仓一体实践
基于阿里云DataWorks与实时数仓Hologres,提供统一的大数据开发治理平台与全链路实时分析能力。DataWorks支持多行业数据集成与管理,Hologres实现海量数据的实时写入与高性能查询分析,二者深度融合,助力企业构建高效、实时的数据驱动决策体系,加速数字化升级。
|
9月前
|
存储 SQL 运维
中国联通网络资源湖仓一体应用实践
本文分享了中国联通技术专家李晓昱在Flink Forward Asia 2024上的演讲,介绍如何借助Flink+Paimon湖仓一体架构解决传统数仓处理百亿级数据的瓶颈。内容涵盖网络资源中心概况、现有挑战、新架构设计及实施效果。新方案实现了数据一致性100%,同步延迟从3小时降至3分钟,存储成本降低50%,为通信行业提供了高效的数据管理范例。未来将深化流式数仓与智能运维融合,推动数字化升级。
425 0
中国联通网络资源湖仓一体应用实践
|
9月前
|
存储 消息中间件 Java
抖音集团电商流量实时数仓建设实践
本文基于抖音集团电商数据工程师姚遥在Flink Forward Asia 2024的分享,围绕电商流量数据处理展开。内容涵盖业务挑战、电商流量建模架构、流批一体实践、大流量任务调优及总结展望五个部分。通过数据建模与优化,实现效率、质量、成本和稳定性全面提升,数据质量达99%以上,任务性能提升70%。未来将聚焦自动化、低代码化与成本优化,探索更高效的流批一体化方案。
594 12
抖音集团电商流量实时数仓建设实践

相关产品

  • 云原生数据仓库AnalyticDB MySQL版
  • 推荐镜像

    更多