文/李少锋
阿里云瑶池旗下的云原生数据仓库AnalyticDB MySQL版是基于湖仓一体架构打造的实时湖仓。本文将分享AnalyticDB MySQL Spark助力构建低成本数据湖分析的最佳实践。
全文目录:
- AnalyticDB MySQL介绍
- AnalyticDB MySQL Serverless Spark核心优化
- 基于AnalyticDB MySQL湖仓版的最佳实践
*文章转载自DataFunTalk
AnalyticDB MySQL介绍
首先介绍下AnalyticDB产品架构, AnalyticDB湖仓版产品架构包含自研和开源两部分。AnalyticDB湖仓版在数据全链路的「采存算管用」5 大方面都进行了全面升级和建设。
在「采集」方面,我们推出了数据管道 APS 功能,可以一键低成本接入数据库、日志、大数据中的数据,解决数据入湖仓的问题。
在「存储」方面,我们除了内置Hudi/Delta格式的外表数据湖格式能力,也对内部存储进行了升级改造。通过只存一份数据,同时满足离线、在线 2 类场景。
在「计算」方面,我们对自研的 XIHE BSP SQL 引擎进行容错性、运维能力等方面的提升,同时引入开源 Spark 引擎满足更复杂的离线处理场景和机器学习场景。
在「管理」方面,我们推出了统一的元数据管理服务,统一湖仓元数据及权限,让湖仓数据的流通更顺畅。
在「应用」方面,除了通过SQL方式的BI分析应用外,还支持基于Spark的 AI 应用。
我们希望通过在做深自研的同时,也充分拥抱开源技术,来满足不同客户的不同业务场景,帮助客户实现降本增效。
拥抱开源不仅仅只是简单集成Spark/Hudi/Delta等开源引擎,还包括湖仓库表元数据管理,以便多引擎共享,为此AnalyticDB提供了统一元数据服务管理湖仓库表元数据,湖仓中的元数据/权限可互通,不同引擎可自由访问湖仓数据而无需重复创建元数据。对于湖仓数据,为屏蔽底层数据存储格式的差异,便于第三方引擎集成,AnalyticDB提供了面向内存列存格式Arrow的Lakehouse API服务,提供统一的读写能力,满足业务对仓存储有大吞吐的诉求,对于仓存储已经通过Arrow格式完成Spark引擎对接。
可以看到在AnalyticDB拥抱开源技术方面,Spark扮演着非常重要的角色,而在引入Spark引擎后AnalyticDB团队基于Spark引擎也做了非常多的优化,让其更符合云原生的场景。
AnalyticDB MySQL Serverless Spark核心优化
接下来分享AnalyticDB Spark的核心优化,下图是AnalyticDB Spark整体架构。
- 最上层是面向用户的使用入口,包括AnalyticDB SQL/Jar控制台、AnalyticDB作业调度控制台,以及阿里云DMS、Dataworks调度系统,以及贴合开源Spark用户使用习惯的SparkSubmit脚本方式。
- 支持控制台和调度系统提交Spark作业的是OpenAPI模块,该模块提供了规范的API能力,对下对接管控服务,对上支持各类系统集成AnalyticDB Spark服务。
- Spark管控服务负责管理Spark作业,该服务以多租形态部署,负责Spark作业资源校验,元数据管理,状态管理,安全校验等方面。
- 下一层由Driver和多个Executor组成的Spark集群,该集群归属于AnalyticDB实例的资源组,不同资源组之间的资源相互隔离,互不影响,Driver、Executor都会通过统一元数据服务请求库表元信息,通过统一管控底座申请弹性资源。
- 最底层是AnalyticDB Spark支持的各类数据源,大体分为两类,一类是OSS/MaxCompute等通过AnyTunnel/STS Token进行授权访问的数据源,另一类是用户VPC下如AnalyticDB/RDS/HBase等需要通过ENI弹性网卡技术打通不同VPC网络才能访问的数据源。
为让客户更便捷使用AnalyticDB Spark,享受云上Serverless Spark带来的弹性、性价比优势,低门槛被集成能力是关键的一环,因此AnalyticDB Spark基于阿里云OpenAPI规范开发了30个API来管理Spark作业,覆盖全生命周期管理,包括提交作业、停止作业、获取作业状态、获取作业日志等。
基于OpenAPI能力支持了阿里云DMS、Dataworks调度系统,同时为了满足客户自建调度系统如自建Airflow场景,AnalyticDB Spark也支持Airflow调度并且将此特性贡献到了Airflow官方开源仓库,便于社区用户更方便使用AnalyticDB Spark。
Spark典型ETL作业会先访问OSS、RDS、ES、Kafka等数据源,分析计算后写出。数据源访问是第一步,不同于传统线下机房的部署形态,AnalyticDB Spark提供Serverless形态部署在AnalyticDB平台VPC内,和用户数据源所在的VPC不在同一个VPC内,网络是相互隔离的,因此需要打通平台VPC和用户VPC的网络。AnalyticDB Spark使用ENI弹性网卡技术打通不同VPC之间的网络,用户只需要配置交换机和安全组,AnalyticDB Spark会自动创建托管的弹性网卡来打通Driver/Executor到数据源的网络,弹性网卡的生命周期与作业生命周期对应,弹性网卡在作业提交后被创建,停止后被释放。
Spark UI是分析Spark作业的一种非常重要的工具,该服务对于开发者至关重要,开发人员依赖UI服务进行作业调试、调优,以及生产作业的问题排查。好的UI服务可以很好地加速研发效率。而开源Spark社区的HistoryServer提供对Spark历史作业的UI和日志服务,但在实际应用中遇到诸多痛点,典型如下:
- Eventlog空间开销大:HistoryServer依赖Spark引擎将运行中的Event信息全部记录到存储系统中,然后后台回放并绘出UI页面。对于复杂作业和长作业Eventlog量较大,可以达到百GB甚至TB级别。
- 复杂作业和长作业不支持:复杂作业或者长作业的Eventlog很大,HistoryServer会解析失败,甚至OOM。再加上空间开销大的原因,用户一般都只能关闭Eventlog。
- Replay效率差,延迟高:HistoryServer采用后台Replay Eventlog的方式还原Spark UI,相当于把Spark引擎的事件全部重放一遍,开销大并且延迟高。特别是作业较多或者较复杂的情况下,延迟可达分钟甚至十分钟级别。
由于开源HistoryServer方案存在诸多痛点问题,AnalyticDB Spark自研了一套多租户UI服务来满足云上场景,该UI服务有如下特点:
- 高效渲染:作业运行时,Spark Driver 实时流式采集 Spark Event Meta 到OSS,保存作业结束时的页面元信息。为了加速复杂作业 UI 渲染速度,AnalyticDB Spark 优化反序列算法并采用 Rotation 算法自动过滤非必要事件,GB 级 Event 渲染提升47%;
- UIServer水平扩展:UIServer主要负责解析历史UI Meta和提供 stderr 和 stdout 日志服务,轻量化无状态,可以实现水平扩展,满足万级客户在线;
- 多租户:Spark UI URL采用加密token作为参数,token代表用户身份、应用ID、UI过期时间等,并据此实现多租户服务化;
- 本地日志自动滚动:对于长作业而言,stderr 或者 stdout 日志随时间增加累积,最终可能打爆磁盘,引起稳定性问题。AnalyticDB Spark安全容器内置后台进程,实现日志滚动,保存最有价值的最近一段时间的日志。
当作业运行出现异常情况时,对作业进行快速诊断调优的能力非常重要,开源Spark用户通过Spark UI查看各Task执行情况以及配合日志分析一些诸如长尾Task,Join计划不合理等问题,然后调整代码逻辑重新提交作业进行调试,整个分析步骤非常耗费时间和精力,调优效率低下并且很多时候效果不佳。为解决此类问题, AnalyticDB Spark针对作业提供了诊断和调优服务,运行作业时便可在控制台查看作业是否有异常诊断结果,为更方便客户处理异常作业,AnalyticDB Spark除了输出异常作业诊断根因外,还会给出调优建议,包括多表Join后数据膨胀、资源利用率过载、过低等调优建议,让客户以更合理的配置完成作业,解决盲目调优的难题。
除了提供易于数据工程师开发调试作业的调度系统/控制台外,AnalyticDB Spark还提供了方便数据分析师使用的Notebook,借助Notebook和底层Spark强大的分布式能力,方便数据分析师进行数据分析,洞察数据价值。Notebook服务完全免费,用户通过AnalyticDB控制台便可开箱使用Notebook,Notebook当前支持SQL/Python/Scala语言来满足不同工程师需求。
在生态建设方面,为降低用户使用湖格式门槛,AnalyticDB Spark内置了Hudi/Delta湖格式,开箱即用且完全兼容开源语法,同时AnalyticDB Spark也支持客户自定义开发的私有格式。除了内置支持湖存储外,AnalyticDB Spark还支持直接访问内部仓存储格式,通过Lakehouse API(SDK方式)对接内部仓存储,显著提高访问仓的吞吐。
AnalyticDB Spark基于Spark CatalogPlugin机制构建了统一的Catalog管理不同的表格式Catalog,如支持Hudi表的HoodieCatalog、Delta表的DeltaCatalog以及AnalyticDB内表的ADBCatalog,基于统一Catalog屏蔽Hudi/Delta开源社区中繁琐的catalog和extension等参数配置,开箱即用,在保持易用性的同时,也兼容Hudi/Delta开源社区标准用法以及支持客户自定义Catalog管理其他表格式。
对于访问仓存储,通过传统JDBC协议访问效率低,无法支撑机器学习、数据挖掘等大吞吐访问场景。为让一份数据同时支持离线和在线场景,AnalyticDB Spark支持通过Lakehouse API(SDK协议)对接仓存储,以Arrow格式进行数据交换,相比传统JDBC方式,访问效率提升6x,客户可以借助Spark和仓存储构建Zero-ETL解决方案。
如客户有对内表进行挖掘分析的诉求,但受限于JDBC吞吐能力,需要先将数据以Parquet格式导出至OSS,然后使用Spark进行分析挖掘,引入ETL操作,数据一致性差、时效性低。
而通过Lakehouse API(SDK协议)方式吞吐高,满足分析挖掘需求,无需ETL,数据一致性好、时效性高。
为满足客户对于安全方面的诉求,AnalyticDB Spark团队联合达摩院团队面向隐私计算领域携手打造的全密态密云原生大数据计算引擎,让可信安全的一站式数据交换迈上了平台化的新阶梯,满足对安全有诉求的场景,全自研的TEE引擎也通过了信通院最高级别的安全认证。客户通过简单配置即可开启全密态计算,使用成本极低。
访问OSS数据源是Spark数据湖分析中非常典型的一类场景,开源hadoop-oss也支持直接访问OSS,但开源方案使用AK/SK明文方式访问,而明文AK/SK配置泄露可能导致发生信息安全风险。为规避此类安全风险,AnalyticDB Spark基于阿里云RAM系统自研了RAM&STS方案,该方案可以满足企业安全以及精细化访问控制要求,ADB Spark基于RAM系统实现对OSS的访问控制,用户只需快速一键授权 https://help.aliyun.com/document_detail/455073.html,即可授权子账号/跨账号访问权限,免AK/SK访问OSS数据,规避账密泄露风险。
Spark Driver/Executor会周期性的请求元数据服务中心刷新STS Token避免Token过期,元数据服务中心会将请求代理给RAM系统生成新的Token返回,后续Driver/Executor使用新的新的凭证访问OSS文件,避免访问凭证过期影响作业稳定性。
除了对OSS易用性进行改造外,AnalyticDB Spark结合OSS对象存储的特点对OSS写性能也进行了深度优化。
阿里云OSS支持Multipart Upload功能,原理是把一个文件切分成多个数据片并发上传,上传完成后调用Multipart Upload完成接口将这些数据片合并成原来的文件,以此来提高文件写入OSS的吞吐。由于Multipart Upload可以控制文件对用户可见的时机,所以可以利用它代替rename操作来优化写OSS时的性能,利用Multipart Upload方式有如下优势:
- 写入文件不需要多次拷贝。不需要昂贵的 Rename 操作,写入文件不需要copy&delete。另外相比于Rename,OSS 的 completeMultipartUpload 接口非常轻量;
- 出现数据不一致几率更小。虽然如果一次要写入多个文件,此时进行completeMultipartUpload仍然不是原子性操作,但是相比于原先的rename会copy数据,时间窗口缩短很多,出现数据不一致的几率会小很多,可以满足绝大部分场景;
- Rename中的文件元信息相关操作不再需要。经过统计,算法1中一个文件的元数据操作可以从13次下降到6次,算法2则可以从8次下降到4次。
OSS Multipart Upload中控制用户可见性的接口是CompleteMultipartUpload和abortMultipartUpload,这种接口的语义类似于commit/abort。Hadoop FileSystem标准接口没有提供commit/abort这样的语义,因此我们引入了一套独立的Semi-Transaction层提供类似语义,大致流程如下:
- setupJob。Driver开启一个GlobalTransaction,GlobalTransaction在初始化的时候会在OSS上新建一个隐藏的属于这个GlobalTransaction的工作目录,用来存放本job的文件元数据。
- setupTask。Executor使用Driver序列化过来的GlobalTransaction生成LocalTransaction。并监听文件的写入完成状态。
Executor写文件。文件的元数据信息会被LocalTransaction监听到,并储存到本地的RocksDB里面,OSS远程调用比较耗时,我们把元数据存储到本地RocksDB上等到后续一次提交能够减少远程调用耗时。 - commitTask。当Executor调用LocalTransaction commit操作时,LocalTransaction会上传这个Task它所相关的元数据到OSS对应的工作目录中去,不再监听文件完成状态。
- commitJob。Driver会调用GlobalTransaction的commit操作,全局事务会读取工作目录中的所有元数据中的待提交文件列表,调用OSS completeMultipartUpload接口,让所有文件对用户可见。
该事务层有如下特点:
- 通用性强:不依赖任何计算引擎的接口,可以比较方便移植到另外其他计算引擎,通过适配可以将它所提供的实现给Presto或者其它计算引擎使用;
- 扩展性好:可以在Transaction的语义下添加更多的实现。例如对于分区合并的这种场景,可以加入MVCC的特性,在合并数据的同时不影响线上对数据的使用。
除了对OSS访问进行深度优化外,AnalyticDB Spark还引入了Native Engine来加速CPU计算。Spark 1.6 版本以来引入了诸如钨丝计划、向量化 Parquet Reader 等一系列优化后,整体的计算性能有两倍左右的提升。
但在 3.0 版本以后,整体计算性能的提升有所减缓,很难达到倍数提升。随着磁盘/网络带宽技术的不断发展,CPU计算能力成为新的瓶颈,而Spark 基于 JVM 体系只能利用到一些比较基础的 CPU 指令集,虽然有 JIT 的加持,但相比目前市面上很多的 Native 向量化计算引擎而言,性能差距较大。
因此考虑如何将具有高性能计算能力的 Native 向量引擎引用到 Spark 里来,从而提升 Spark 的计算性能,突破 CPU 瓶颈,向量引擎包括闭源的Databricks Photon引擎,开源社区的Gluten + Velox/Clickhouse引擎等,都可以更好的实现CPU加速,开源Gluten + Velox方案性能平均提升2x,单查询性能最高提升8x。
向量引擎整体思路是在不破坏Spark兼容性的同时,将执行计划算子尽可能地卸载到Native Engine执行来加速spark作业性能,阿里云AnalyticDB团队与英特尔Gluten团队展开了深度合作,在AnalyticDB Spark中支持了Native Engine的落地上线,客户无需修改任何代码便可开启Native优化,在内部测试中相较于开源的Spark,Native Engine有1.3x到2.8x倍的性能提升。
除针对CPU瓶颈作业进行优化外,AnalyticDB Spark也针对重IO作业进行了优化,构建了分布式缓存服务LakeCache。LakeCache利用高性能Local SSD实现可线性扩展的高效Cache服务,为计算引擎提供10倍以上IO加速。通过多租户实现低成本Cache,每计算单元(ACU)成本增加3%,TPCH E2E性能提升2.7倍。通过分布式、大容量Cache共享服务,实现Cache命中率接近100%,基于Native Engine的LakeCache Client也在进一步支持中。
以上分享了AnalyticDB Spark相较于开源Spark在各方面的增强,各个维度总结表格如下,整体而言,AnalyticDB Spark相较开源版本性能提升2.7x,成本比自建下降29%。
基于AnalyticDB MySQL湖仓版的最佳实践
接下来分享几个基于AnalyticDB MySQL的最佳客户实践。
1. 第一个案例
这个案例是利用AnalyticDB湖仓一体能力,借助AnalyticDB Spark弹性能力加速湖上数据写入湖仓中。
整体数据流如下:
- 使用AnalyticDB APS入湖服务消费SLS数据并以每秒4G/s速率写入OSS Hudi表;
- 根据不同的业务场景,通过Spark弹性作业将湖中数据写入仓中加速查询或者进行ETL写入新CSV表供客户下载分析。
基于AnalyticDB湖仓一体架构收益如下:
- 通过新发布的高速数据通道,湖仓一体存储,统一元数据服务,Spark/Xihe离在线混合引擎核心技术,支撑双11数据库自治服务(DAS),总写入吞吐达到8GB/s,存储总量达到20PB+。
- AnalyticDB内建数据通道支持SLS/Kafka等数据源高吞吐低延迟入湖入仓,通过横向扩容,负载均衡,热点打散,保障面对大促,提供平时2倍以上的吞吐能力。
- 通过统一的元数据服务打通湖仓边界,并提供统一的存储API接口,离线和在线计算引擎均可直接访问湖和仓的数据,一套体系同时支撑了SQL模板趋势分析离线场景和数据导出,日志查询在线场景,为DAS业务持续发展提供丰富想象力。
2. 第二个案例
这个案例是基于AnalyticDB Spark + AnalyticDB Hudi + OSS构建低成本LakeHouse。整体数据流如下:
- 客户通过平台自研数据采集模块从门户网站采集信息至RDS,日增百万条记录;
- RDS数据通过数据增量抽取以parquet格式写入OSS;
- 通过 Spark 对 parquet表进行清洗并写入Hudi表,清洗逻辑涉及分词、分句、实体关键词的抽取(车型)、统计等;
- 通过 Spark 对Hudi表进行清洗聚合后再写入Hudi表;
- 根据业务诉求生成Parquet离线文件供数据分析师下载使用或将数据导入仓进行在线分析。
基于AnalyticDB平台构建Lakehouse方案收益如下:
- 计算耗时:下降3倍。使用传统自建Hadoop集群的方式,对于小公司,由于成本原因,集群的固定资源一般是不够大的,这会导致计算任务耗时很长,尤其是任务多了之后只能串行处理不能并行化,导致时间会更长。使用可灵活弹性升级的AnalyticDB数据湖分析平台后,我们可以并行化启动多个任务流,每个任务流根据我们预计的完成时间分配合理的计算资源ACU数量,可以做到不增加总成本的基础上,让计算时间显著缩短。目前我们每天的计算任务可以控制在30分钟内完成,一周的计算任务可以控制在3小时内完成。最快的一次,我们需要重算历史一年的数据,通过指定使用更多的ACU数量,在1天之内就全部计算完成。同时引入Hudi后作业耗时从10min下降到3min。
- 计算费用:下降30%~50%。AnalyticDB数据湖分析的整体费用由两部分构成:OSS存储和接口费+AnalyticDB Spark按量计算费用。OSS存储和接口费,按照数据量10TB左右估算,每个月费用应该在2000元以内;AnalyticDB Spark按量计算费用是按ACU数量*计算时长收费,100核400G的集群算1个小时大概35元,性价比非常高。AnalyticDB Spark + OSS组合方案中 Spark 计算 + OSS存储成本每个月5000左右,一年约6万,搭建传统集群50个CU 估计1年成本9万多,整体成本下降30%,如果业务数据量大,计算复杂,计算频率不是很高,整体成本下降更高,使用AnalyticDB Spark数据湖分析性价比非常高。
3.第三个案例
这个案例是从CDH迁移到AnalyticDB Spark实现降本增效的案例,客户调度系统和元数据中心都采用自建方案,存储使用OSS,计算采用自建CDH Spark集群,但面临自建CDH节点扩缩容周期长,天维度扩/缩容,无法处理突发的业务高峰等问题。因此需要将计算上云切换为全托管弹性方式。
使用AnalyticDB Spark方案收益如下:
- 云资源调整量只要在界面修改最大资源就可以获得足够的资源满足业务运算需求,非常灵活,且按量付费成本更低,降本20%;
- 业务增长不需要添加机器,也没有机器需要运维,至少减少80%降低了运维成本;
- 使用标准的开源技术,在业务发展后,若自建机房更低成本,该放方案轻松进行下云操作,不会Vendor lock-in,因为元数据、调度都自主研发可控。
作者简介
李少锋,阿里云数据库OLAP AnalyticDB MySQL Hudi & Spark团队技术负责人,Apache Hudi Committer & PMC 成员。目前主要负责阿里云数据库AnalyticDB MySQL数据湖存储与计算Hudi & Spark的研发工作。
阿里云AnalyticDB MySQL升级为湖仓一体架构,支持高吞吐离线处理和高性能在线分析,可无缝替换CDH/TDH/Databricks/Presto/Spark/Hive等。免费试用活动(5000ACU时+100GB存储)正在火热申请中,点击链接即可开启免费试用。
钉钉交流群号:33600023146