AnalyticDB基于Apache Hudi构建低成本Lakehouse实践

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 基于低成本对象存储OSS + Apache Hudi 构建Lakehouse的挑战与实践

1. AnalyticDB MySQL产品架构

首先介绍下 AnalyticDB MySQL(下简称ADB)产品架构 ADB湖仓版产品架构包含自研和开源两部分。ADB湖仓版在数据全链路的「采存算管用」5 大方面都进行了全面升级和建设。

在「采集」方面,我们推出了数据管道 APS 功能,可以一键低成本接入数据库、日志、大数据中的数据,解决数据入湖仓的问题。

在「存储」方面,我们除了内置Hudi /Delta格式的外表数据湖格式能力,也对内部存储进行了升级改造。通过只存一份数据,同时满足离线、在线 2 类场景。

在「计算」方面,我们对自研的 XIHE BSP SQL 引擎进行容错性、运维能力等方面的提升,同时引入开源 Spark 引擎满足更复杂的离线处理场景和机器学习场景。

在「管理」方面,我们推出了统一的元数据管理服务,统一湖仓元数据及权限,让湖仓数据的流通更顺畅。

在「应用」方面,除了通过SQL方式的BI分析应用外,还支持基于 Spark 的 AI 应用。

我们希望通过在做深自研的同时,也充分拥抱开源技术,来满足不同客户的不同业务场景,帮助客户实现降本增效。

 

2. 基于 Hudi 支持日增千亿数据入湖

ADB积极拥抱开源生态来满足已经在开源生态上构建数据平台的客户可以更平滑地使用湖仓版,如内置支持包括Spark、Hudi、Delta等开源计算引擎与存储引擎。在Parquet/ORC/JSON/CSV 等 Append 类型数据格式的基础上,为支持在对象存储OSS上构建低成本 LakeHouse,集成支持Apache Hudi,并对Hudi进行功能/性能/易用性增强。

通过ADB APS服务将源端数据近实时以Hudi格式写入OSS构建Lakehouse或通过Spark以离线方式以Hudi格式写入OSS构建Lakehouse,基于Apache Hudi提供近实时的更新、删除能力以及APS/Spark近实时/离线写入能力,满足客户对低成本构建Lakehouse的诉求。

接着介绍 ADB 基于 Apache Hudi 构建日增千亿数据入湖所面临的挑战及实践。

2.1 日志型数据入湖场景介绍

数据入湖典型的业务场景如下:业务源端数据通过数据采集进入阿里云 SLS,然后通过 ADB APS 数据管道服务进入ADB 湖仓版,基于 ADB 湖仓版之上构建日志查询、日志导出、日志分析等功能。

该业务场景有如下典型的特点:

  • 高吞吐,单链路最高 4GB/s 吞吐,日增数据量 350TB,总存储量超 20PB
  • 数据倾斜/热点严重:源端数据倾斜非常严重,从百万到几十条数据不等
  • 扫描量大:日志查询的扫描量 50GB ~ 500GB 不等,查询并发在 100+

如果使用数仓的话首先面临成本高的问题,另外数仓缺少热点打散能力,只能不断加资源,瓶颈明显;最后数仓系统资源固化,没有动态弹性能力或者弹性能力较弱,承载不同客户查询需求时,容易互相干扰,尤其是大客户的数据扫描。

2.2 日志型数据入湖架构

基于SLS的日志数据入湖的技术架构非常简单,整体来看就是 Flink 从 SLS 读取数据然后写入 Hudi,架构非常简单,其中对于 SLS 和 Flink 的状态存储说明如下:

  • SLS 多 Shard 存储,由 Flink 的多个 Source 算子并行消费
  • 消费后通过 Sink 算子调用 Hudi Worker/Writer 写出到 Hudi(实际链路还存在 Repartition,热点打散等逻辑)
  • Flink Checkpoint 后端存储以及 Hudi 数据存储在OSS

2.3 Flink Hudi写入流程

接着来看下 Flink 写入 Hudi 的主流程,首先明确 Flink 写 Hudi 时会存在两种角色,Coordinator 负责处理元数据,如对 Hudi Instant 的相关操作,Worker/Writer 负责处理数据,如写入数据为 parquet 格式,写入步骤如下

  1. Coordinator 开启一个 Hudi Instant
  2. Filnk Sink 发送数据给 Hudi Worker 写出
  3. 触发 Flink Checkpoint 时,则通过 Sink 算子通知 Worker Flush 数据,同时持久化operator-state
  4. 当 Flink 完成 Checkpoint 持久化时,通知  Coordinator 提交该 Instant,Coordinator 完成最终提交,写 commit 文件,数据对外可见
  5. 提交后会立即开启一个新的 Instant,继续上述循环

2.4 数据一致性挑战

把 Flink 写 Hudi 保证端到端一致性分成两部分,一部分是 Flink 框架内部的,另外一个部分是与 Hudi 交互的部分。

  • 其中步骤 1 到 3 之间是 Flink Checkpoint 逻辑,如果异常在这些步骤上发生,则认为 Checkpoint失败,触发 Job 重启,从上一次 Checkpoint 恢复,相当于两阶段提交的 Precommit 阶段失败,事务回滚,如果有 Hudi 的 inflight commit,等待 Hudi Rollback 即可,无数据不一致问题
  • 当 3 到 4 之间发生异常,则出现 Flink 和 Hudi 状态不一致。此时 Flink 认为 Checkpoint 已结束,而 Hudi 实际尚未提交。如果对此情况不做处理,则发生了数据丢失,因为Flink Checkpoint 完毕后,SLS 位点已经前移,而这部分数据在 Hudi 上并未完成提交,因此容错的重点是如何处理此阶段引起的数据一致性问题
  • 我们拿一个例子举例分析在步骤 3 和 4 之前发生异常时,如果保证数据一致性
  • 否则认为上一次 Instant 执行失败,等待 Rollback 即可,脏数据对用户不可见

我们举例分析下在步骤 3 和 4 之间发生异常时,是如何保证数据一致性的,可以看到对于1615 的 commit 在 Flink 完成 Checkpoint 时会将其 instant 信息持久化至 Flink 后端存储。

从 checkpoint 恢复时有如下步骤:

  1. Checkpoint 时 Sink 算子 Flush 数据及持久化 Instant 的 state;
  2. Worker 请求处于 pending 的 Instant,与从 state 恢复的 Instant 做对比并汇报给 Coordinator;
  3. Coordinator 从 Instant Timeline 中获取最新的 Instant 信息,并接收所有 Worker 的汇报;
  4. 如果 Worker 汇报 Instant 相同,并且不在 Timeline 中已完成的 Instant 中,则表示该 Instant 尚未提交,触发 Recommit。

经过上述步骤可以保证在 Flink 完成 Checkpoint 时,但对于 Hudi Commit 失败时的场景会进行 recommit,从而保证数据的一致性。

2.5 热点处理挑战

在处理 4GB/s 的高吞吐流量时面临一个非常大的挑战就是热点数据处理,我们统计了 5 分钟内各 Task 处理数据的大小,发现各 Task 处理数据从 200W 条到几十条不等,热点问题明显。

而在写入链路中会根据分区字段做 shuffle,同一个分区由一个 Task 写入,对于上述存在的热点问题会导致部分TM上的分区写入非常慢,导致数据延迟/作业挂掉。

面对写入热点问题,我们开发了热点打散功能,通过配置指定规则打散热点数据,同时可以根据数据流量自动更新热点打散规则,确保系统的健壮性,经过热点打散后个 TM 处理的数据量/CPU占用/内存占用基本相同并且比较平稳,作业稳定性也得到了提升。

2.6 作业OOM挑战

另外一个挑战是 OOM,其实和热点打散也有很大关系,我们发现作业运行时会出现OOM,导致作业挂掉,数据延迟上涨,因此我们对堆外/堆内内存的使用做了比较细致的梳理,使用内存的部分主要集中在:

  • 写 Parquet 文件占用堆外内存
  • OSS Flush 占用堆外内存
  • 单 TM 的 Slot 数、写并发都影响内存占用,如每个写并发处理 30-50 Handle,TM 16 并发,8M row group size 最多占用 6400 M 内存
  • 堆内内存负载过高导致频繁Full GC

我们针对上述内存使用做了优化,如:

  • row group size 配置为 4M,减少堆外内存占用,同时将堆外内存调大
  • close 时及时释放 compressor 占用的内存,这部分对 parquet 源码做了改造
  • 透出堆外内存指标,增加堆外内存监控,这部分也是对 parquet 源码做了改造
  • 源端 source 算子与 Shard 分配更均衡,以此保证各 TM 消费的 shard 数基本均等

2.7 OSS限流挑战

最后一个比较大的挑战就是 OSS 限流,云对象存储(如OSS)对 List 操作不友好,list objects 对 OSS 服务器压力较大,如在我们场景下,1500 写并发,会产生 1W QPS list object,OSS 侧目前限流 1K QPS,限流明显,限流会导致作业处理变慢,延迟变高。为解决该问题,我们详细梳理了写入链路对 OSS 的请求,在元数据链路对 OSS 的请求如下:

  • Timeline 构建需要 list .hoodie 目录
  • Flink CkpMetaData 基于 OSS 传递给 Worker
  • Hadoop-OSS SDK create/exists/mkdir 函数依赖 getStatus 接口,getStatus 接口现有实现导致大量 list 请求,其中 getStatus 接口对于不存在的文件,会额外进行一次 list objects 请求确认 Path 是不是目录,对 Marker File、partitionMetadata、数据文件都会产生大量的 list objects 请求

在数据链路对 OSS 请求如下:

  • 先临时写到本地磁盘,再上传至 OSS,导致本地磁盘写满。

针对上述对 OSS 的请求,我们做了如下优化,在元数据侧:

  1. Timeline Based CkpMetaData,将TM请求打到 JM,避免大量 TM 扫描 OSS 文件
  2. Hadoop-OSS SDK,透出直接创建文件的接口,不进行目录检查
  3. PartitionMetaData 缓存处理,在内存中对每个分区的元数据文件做了缓存处理,尽量减少与 OSS 的交互
  4. Create Marker File 异步处理,异步化处理不阻塞对 Handle 的创建,减少创建 Handle 的成本
  5. 开启 Timeline Based Marker File,这个是社区已经有的能力,直接开启即可

这里额外补充下可能有小伙伴比较好奇为什么开启 Hudi Metadata Table 来解决云对象存储的限流问题,我们内部做过测试,发现开启 Metadata Table 时,写入会越来越慢,无法满足高吞吐的场景。

以上就是我们在处理千亿日志数据入湖时面临的典型挑战以及我们如何克服这些挑战,接着讲讲我们在处理数据入湖时为满足业务要求做的关键特性开发。

2.8 高级特性-并发写支持

首先是支持并发写,业务侧要求链路有补数据能力,补数据场景涉及多 Flink Client 写不同分区,实时写链路,补数据链路,Table Service 链路并发操作表数据/元数据,这要求:

  • 表数据不错乱
  • 补数据/TableService 链路不影响实时写链路

因此我们对 Hudi 内核侧做了部分修改来支持并发写场景:

  • CkpMetadata 唯一标识,保证不同作业使用不同 ckp meta
  • ViewStorage 唯一标识,保证不同作业 Timeline Server 隔离
  • Lazy Table Service,保证并行作业不互相 rollback commit,避免数据错乱
  • Instant 生成重试策略,保证 Timeline Instant 的唯一性,避免数据错乱
  • 独立 Table Service 处理,使用单独的作业运行 Table Service,与实时写链路完全隔离

2.9 高级特性-分区TTL管理

另外一个关键特性是出于成本考虑,业务侧要求 Hudi 中数据不能无限地保存,需要按照用户设定的策略保留指定时间的数据,这要求:

  • Hudi 提供分区级别按照数据量,分区数和过期时间等不同维度进行生命周期管理的能力
  • Hudi 支持并发设置生命周期管理策略,因为面向多租户会涉及并发更新管理策略

针对业务对生命周期管理的需求,我们开发 Hudi 的生命周期管理功能,具体实现如下:

  • 对于生命周期管理使用,首先通过 call command 添加生命周期管理策略,并进行持久化,为支持并发更新,我们参考 Hudi MOR 表中 Base 文件和 Log 文件的设计,并发更新会写入不同的 Log 文件;
  • 对于生命周期管理的执行,在每一次 commit 结束后进行统计信息增量采集并更新至统计信息文件,然后按照分区策略进行过期分区的处理,对于过期分区会生成一个 replace commit,等待后续被 clean 即可,同时会合并前面的策略 Base 文件和 Log 文件,生成新的 Base 文件以及更新统计信息;

我们也提供了按照分区数、数据量、过期时间三种不同策略来管理 Hudi 表中的分区的生命周期,很好的满足业务侧的需求。

2.10 高级特性-独立TableService

最后一个比较大的关键特性是独立 TableService,业务侧要求保证实时写链路稳定,同时希望提高入湖数据的查询性能,这要求:

  • 在不影响主链路同步性能情况下,清理 Commits/文件版本,保证表状态大小可控
  • 为提高查询性能,提供异步 Clustering 能力,合并小文件,减少扫描量,提高查询性能

基于上述诉求我们开发了基于 ADB 湖仓版的独立 Table Service 服务,在入湖链路写入完成后会进行一次调度,然后将请求写入调度组件,供调度组件后续拉起弹性的 Flink/Spark TableService 作业,可以做到对实时写入链路无影响。

对于表状态管理以及数据布局优化均是采用的独立 TableService 作业执行,保证表的状态大小可控,同时通过异步 Clustering 操作,端到端查询性能提升 40% 以上。

2.11 指标监控与告警

在对日志入湖链路进行了深入打磨后,我们可以保证最高 4GB/s 的数据写入,延迟在 5min内,满足业务诉求。

同时也建设了指标监控大盘与异常链路告警功能,便于快速定位问题以及出现问题后快速响应。

3. 基于 AnalyticDB 构建 Lakehouse 实践

最后介绍下基于 ADB湖仓版 构建 Lakehouse 实践。

前面也提到 ADB 湖仓版拥抱开源技术,ADB 集成了流式处理引擎 Flink,并在此基础上推出了 APS 数据管道服务,APS 具备如下优势:

  • 低成本,低延迟:作业级别弹性资源,按量付费;按流量自由设定作业资源;充分享受 Flink 流式处理性能红利
  • 多数据源快速集成:得益于 Flink 成熟的 Connectors 机制,可以方便对接如 SLS、Kafka 等数据源,同时可以保证数据入湖的精确一致性
  • 低使用门槛:支持白屏化操作快速构建 Lakehouse,基于统一元数据服务,Lakehouse 数据可通过 Spark/ADB 引擎无缝访问

而为了满足客户对于批处理以及机器学习能力的诉求,ADB 集成了 Spark 引擎,并在此技术上推出了 Servlersss Spark,其具备如下优势:

  • 一份数据存储,在离线共享:无缝对接 ADB 已有元数据和数据;支持大吞吐读写 ADB 数据;Spark 批量写入的数据,在线分析查询可直接访问
  • 数据库体系&体验:使用 ADB 统一的账号、权限和鉴权体系;支持通过 ADB Workflow、DMS 以及 DataWorks 调度编排 SparkSQL 作业
  • 完全兼容 Spark 生态:基于最新的 Apache Spark 3.X 版本,充分享受开源社区红利;支持 SparkSQL、DataFrame API 主流编程接口以及 ThriftServer;支持 Spark UDF,支持 Hive UDF/UDTF/UDAF
  • 按量计费,秒级弹性:开箱即用,按量计费无任何持有成本;基于神龙/ECS/ECI 的管控底座以及资源池化,缓存加速等技术,支持 Spark Pod 秒级拉起

对于实时性有诉求的场景,可以基于 ADB APS 服务非常方便的构建准实时 Lakehouse,白屏化操作快速配置入湖通道,多种数据源支持,满足不同数据源接入诉求,更多数据源也在持续集成中。

而对于实时性没有诉求的场景,可以基于 Spark + Hudi + ADB 工作编排构建离线 Lakehouse,如对 RDS 数据构建离线Lakehouse进行分析,可使用ADB工作编排,利用 Spark 将 RDS 数据离线增量导入 Lakehouse,并做数据的清洗和加工,有需要最后可通过一条简单的 Spark SQL将数据从 Hudi 导入 ADB 做查询分析加速。

另外 ADB Spark 与 Hudi 和 ADB 表都做了深度集成,便于客户使用,如对于 Hudi 表的使用,免去了很多 Hudi 额外的配置,开箱即用;对于 ADB 表,可通过 Spark 创建、删除 ADB 表元数据,也支持读写 ADB 表数据。

4. 总结展望

借助 AnalyticDB MySQL湖仓版,客户可以很方便地基于OSS存储底座构建准实时/离线低成本Lakehouse,同时为进一步降低客户构建Lakehouse的门槛与成本,ADB 团队也在积极产品化 Hudi TableService,包括异步数据聚簇、索引构建等更高级特性,满足快速入湖的同时,加速查询分析。

阿里云AnalyticDB MySQL升级为湖仓一体架构,支持高吞吐离线处理和高性能在线分析,可无缝替换CDH/TDH/Databricks/Presto/Spark/Hive等。试用活动(5000ACU时+100GB存储)正在火热申请中,申请链接:https://free.aliyun.com/?searchKey=AnalyticDB%20MySQL

钉钉交流群号:33600023146

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
12天前
|
消息中间件 分布式计算 大数据
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
36 5
|
12天前
|
存储 SQL 分布式计算
大数据-162 Apache Kylin 全量增量Cube的构建 Segment 超详细记录 多图
大数据-162 Apache Kylin 全量增量Cube的构建 Segment 超详细记录 多图
30 3
|
12天前
|
Java 大数据 数据库连接
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
22 2
大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala
|
12天前
|
SQL 分布式计算 大数据
大数据-160 Apache Kylin 构建Cube 按照日期构建Cube 详细记录
大数据-160 Apache Kylin 构建Cube 按照日期构建Cube 详细记录
25 2
|
12天前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
31 1
|
6天前
|
存储 小程序 Apache
10月26日@杭州,飞轮科技 x 阿里云举办 Apache Doris Meetup,探索保险、游戏、制造及电信领域数据仓库建设实践
10月26日,由飞轮科技与阿里云联手发起的 Apache Doris 杭州站 Meetup 即将开启!
25 0
|
8天前
|
SQL 分布式计算 数据挖掘
加速数据分析:阿里云Hologres在实时数仓中的应用实践
【10月更文挑战第9天】随着大数据技术的发展,企业对于数据处理和分析的需求日益增长。特别是在面对海量数据时,如何快速、准确地进行数据查询和分析成为了关键问题。阿里云Hologres作为一个高性能的实时交互式分析服务,为解决这些问题提供了强大的支持。本文将深入探讨Hologres的特点及其在实时数仓中的应用,并通过具体的代码示例来展示其实际应用。
46 0
|
12天前
|
SQL 存储 监控
大数据-161 Apache Kylin 构建Cube 按照日期、区域、产品、渠道 与 Cube 优化
大数据-161 Apache Kylin 构建Cube 按照日期、区域、产品、渠道 与 Cube 优化
25 0
|
3天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
230 4
Apache Flink 2.0-preview released
|
8天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
26 3

热门文章

最新文章

相关产品

  • 云原生数据仓库AnalyticDB MySQL版