客户说|科脉 x AnalyticDB,Serverless Spark替换CDH助力运维降本80%

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 使用AnalyticDB Spark替换自建CDHSpark,助力科脉降本增效

原创 汤金源

1.业务背景

1.1 公司概述


深圳市科脉技术股份有限公司(以下简称“科脉”),成立于1999年,专注于泛零售产业的数字科技服务商,是国家级专精特新“小巨人”企业、国家高新技术企业、中国软件行业信用等级AAA企业。秉承“用数字科技陪伴企业持续成长”的理念,提供金融场景的收银系统、SaaS ERP、聚合支付、资金分帐、私域营销增长、线上线下一体化SaaS方案等全链路数字化产品与解决方案。


科脉大数据部定位为企业挖掘数据金矿,为零售商家的成长服务。除了支撑科脉核心营销云业务与管理数字化外,还支撑早期的ERP,线上线下一体化的有数商城,聚合支付享钱平台等,数据资产的实现需要数据治理的工作繁多,为各个业务赋能需要综合考虑业务线的需求差异,科脉统一数据中台盖亚应用而生,实现了血缘关系自动化依赖,接口由配置化SQL生成,助力科脉降本增效。


1.2 业务挑战


随着业务的快速发展,数据量不断增加,对系统的性能和稳定性提出了更高的要求,每日跑批的1300个离线数据作业从9点延长至10点才能跑完,业务报表产出时间晚于客户需求时间,如果扩容机器,白天又会出现很长时间的资源未被使用,导致资源浪费。


通过Spark离线作业为每个商户的消费者打上业务标签,形成用户画像帮助十万级商家进行智能营销,需要运算每天千万级的历史数据,在运算1年到现在的数据统计,即在数据加工初始化跑批时,会导致晚上的日常任务不能跑,另外由于StarRocks计算节点和Yarn混合部署,所有跑初始化作业也不能完全占满服务器资源,否则影响在线服务。如果临时加节点处理,资源审批和运维部署的时间又过长,因此只能让业务长时间等待后才有数据可用。


挑战总结如下:


由于资源限制,每日跑批作业数据不能按时完成,影响客户报表产出。


数据加工初始化,资源不能快速扩容,导致业务长时间等待,制约业务快速发展。


经过充分的调研,发现云原生数据库能够匹配以上场景和解决痛点,即:按需付费,用多少弹多少,并且能够快速弹升。


2. 解决方案

2.1 老架构:CDH


大数据平台目前是存算分离架构,数据存储使用阿里云OSS,不用考虑添加机器等运维工作,而且整个CDH集群运维也非常轻量,不会存在随着数量增长而不停的加磁盘需求。

大数据计算以 Spark Jar 和 SQL 为主:



架构中调度系统和元数据中心都采用自建方案,存储使用OSS,计算采用自建 CDH Spark 集群,但面临自建CDH节点扩缩容周期长,天维度扩/缩容,无法处理突发的业务高峰问题。因此需要将计算上云切换为全托管方式,在和阿里云架构师沟通后,发现AnalyticDB Spark(以下简称ADB Spark)可以满足技术要求。


2.2 新架构:CDH + ADB Spark


优化后新架构:




新架构相比老架构核心点是使用ADB Spark替换自建CDH Spark,其他模块如调度系统和元数据服务HMS均无变化。


新旧流程对比:



新作业流程描述如下:


1. 任务调度仍然由科脉调度发起,通过ADB Spark OpenAPI接口启动Spark作业运行。


2. ADB Spark跨VPC读取 CDH HiveMetastore 元数据,进行语法解析和运算,结果仍然和之前一样写到阿里云OSS,ADB Serverless Spark 全托管,按量收费,如果没有提交任务到服务中,不计算费用。


2.3 ADB Spark 优化


2.3.1 写OSS优化


在Hadoop生态中使用OutputCommitter接口保证写入过程的数据一致性,原理类似于二阶段提交协议。


开源Hadoop提供了Hadoop FileSystem来实现读写OSS文件,它默认使用OutputCommitter的实现是FileOutputCommitter。为保证数据一致性,不让用户看到中间结果,在执行Task时先把结果输出到一个临时工作目录,所有Task都确认输出完成时,再由Driver统一将临时工作目录Rename到生产数据路径,整体流程如下图所示:



由于OSS相比 HDFS Rename 操作十分昂贵,Rename操作是Copy&Delete操作,而HDFS则是NameNode上的一个元数据操作。在ADB Spark中继续使用开源Hadoop的FileOutputCommitter性能很差,为解决这个问题,ADB Spark引入了OSS Multipart Upload特性来优化写入性能。


阿里云OSS支持Multipart Upload功能,原理是把一个文件分割成多个数据片并发上传,上传完成后,让用户自己选择一个时机调用Multipart Upload的完成接口,将这些数据片合并成原来的文件,以此来提高文件写入OSS的吞吐。由于Multipart Upload可以控制文件对用户可见的时机,所以我们可以利用它代替 Rename 操作来优化 ADB Spark OutputCommitter 场景写 OSS 时的性能。


基于Multipart Upload实现的OutputCommitter,整个算法流程如下图:


利用OSS Multipart Upload,有以下几个好处:


写入文件不需要多次拷贝。可以看到本来昂贵的 Rename 操作已经不需要了,写入文件不需要copy&delete。另外相比于Rename,OSS的completeMultipartUpload接口是一个非常轻量的操作。


出现数据不一致的几率更小。虽然如果一次要写入多个文件,此时进行completeMultipartUpload仍然不是原子性操作,但是相比于原先的rename会copy数据,他的时间窗口会缩短很多,出现数据不一致的几率会小很多,可以满足绝大部分场景。


Rename中的文件元信息相关操作不再需要。经过我们的统计,算法1中一个文件的元数据操作可以从13次下降到6次,算法2则可以从8次下降到4次。


OSS Multipart Upload中控制用户可见性的接口是和,这种接口的语义类似于。Hadoop FileSystem标准接口没有提供commit/abort这样的语义。


前面有提到过 OutputCommitter 类似于一个二阶段提交协议,因此我们可以把这个过程抽象为一个分布式事务,可以理解为Driver开启一个全局事务,各个Executor开启各自的本地事务,当Driver收到所有本地事务完成的信息之后,会提交这个全局事务。


基于这个抽象,我们引入了一个Semi-Transaction层(没有实现所有的事务语义),其中定义了Transaction等接口。在这个抽象之下我们把适配OSS Multipart Upload特性的一致性保证机制封装进来。另外我们还实现了OSSTransactionalOutputCommitter,它实现了OutputCommitter接口,上层的计算引擎比如Spark通过它和 Semi-Transaction 层交互,结构如下图:



下面以ADB Serverless Spark的使用来说明 OSSTransactionalOutputCommitter 的大体流程:


1. setupJob


Driver开启一个GlobalTransaction,GlobalTransaction在初始化的时候会在OSS上新建一个隐藏的属于这个GlobalTransaction的工作目录,用来存放本job的文件元数据。


2. setupTask


Executor使用Driver序列化过来的GlobalTransaction生成LocalTransaction。并监听文件的写入完成状态。Executor写文件。文件的元数据信息会被LocalTransaction监听到,并储存到本地的RocksDB里面,OSS远程调用比较耗时,我们把元数据存储到本地RocksDB上等到后续一次提交能够减少远程调用耗时。


3. commitTask


当Executor调用LocalTransaction commit操作时,LocalTransaction会上传这个Task它所相关的元数据到OSS对应的工作目录中去,不再监听文件完成状态。


4. commitJob


Driver会调用GlobalTransaction的commit操作,全局事务会读取工作目录中的所有元数据中的待提交文件列表,调用OSS completeMultipartUpload接口,让所有文件对用户可见。


引入 Semi-Transaction 有两个好处:


它不对任何计算引擎的接口有依赖,因此后面可以比较方便的移植到另外的一个计算引擎,通过适配可以将它所提供的实现给Presto或者其它计算引擎使用。


可以在Transaction的语义下添加更多的实现。例如对于分区合并的这种场景,可以加入MVCC的特性,在合并数据的同时不影响线上对数据的使用。


针对 Spark 典型 ETL Benchmark Terasort,在1TB输入数据量的情况下,ADB Spark FileOutputFormat 执行时间缩短62%,性能提升163%。而针对动态分区场景,社区算法1运行失败,算法2可以执行成功,ADB Spark FileOutputFormat 相比算法2性能还要进一步提升124%。


2.3.2 免AK/SK的OSS访问


开源使用AK/SK的方式访问OSS数据,而明文AK/SK配置泄露可能导致发生信息安全风险,为为满足企业安全以及精细化访问控制要求,ADB Spark基于RAM系统实现对OSS的访问控制,用户只需快速一键授权https://help.aliyun.com/document_detail/455073.html 即可授权子账号/其他账号访问权限,而后便可在ADB Spark上免AK/SK访问OSS数据,避免AK/SK泄露风险。


可通过如下示例访问OSS上的表数据,而不需要填写AK/SK信息





2.3.3 多租UI服务


UI服务对于开发者来说至关重要,开发人员依赖UI服务进行作业调试以及生产作业问题排查。好的UI服务可以很好地加速研发效率。


Spark社区提供HistoryServer提供对Spark历史作业的UI和日志服务,在实际应用中遇到诸多痛点,典型如下:


Eventlog空间开销大:HistoryServer依赖Spark引擎将运行中的Event信息全部记录FileSystem中,然后后台回放并绘出UI页面。对于复杂作业和长作业Eventlog量较大,可以达到百GB甚至TB级别。


复杂作业和长作业不支持:复杂作业或者长作业的Eventlog很大,HistoryServer会解析失败,甚至OOM。再加上空间开销大的原因,用户一般都只能关闭Eventlog。


 Replay效率差,延迟高:HistoryServer采用后台Replay Eventlog的方式还原Spark UI,相当于把Spark引擎的事件全部重放一遍,开销大,会有延迟。特别是作业较多或者较复杂的情况下,延迟可达分钟甚至十分钟级别。


针对上述痛点,ADB Spark重新自研了一套多租户UI方案,具体方案如下



UI 方案针对社区方案做了深度优化:


高效渲染:ADB Spark减少Eventlog依赖,作业运行时,Spark Driver 实时流式采集 Spark Event Meta 到OSS,保存作业结束时的页面元信息。为了加速复杂作业 UI 渲染速度,ADB Spark 优化反序列算法并采用 Rotation 算法自动过滤非必要事件,GB 级 Event 渲染提升 47%。


UIServer水平扩展:UIServer主要负责解析历史UI Meta和提供 stderr 和 stdout 日志服务,是轻量化无状态的,可以实现水平扩展,进而支持万级别客户同时在线服务。UIServer URL采用加密token作为参数,token代表了用户身份、应用id,UIServer据此实现多租户服务化。


本地日志自动滚动:对于长作业而言,stderr 或者 stdout 信息会随着时间增加累积,最终甚至可能打爆磁盘。ADB Spark安全容器内置后台进程,实现日志滚动,保存最有价值的最近一段时间的日志。


2.3.4 OpenAPI应用管理


为方便其他系统对接ADB Spark服务,充分享受云上Serverless Spark优势,ADB Spark提供多种方式提交作业,包括:OpenAPI 、Spark-Submit 和 Airflow调度


OpenAPI包括:提交作业、获取作业状态、获取作业日志、停止作业等典型API,方便集成到业务系统中。

更详细的API参考文档:https://help.aliyun.com/document_detail/612446.html


Spark-submit方式与社区提交Spark作业方式类似,通过spark-submit脚本即可提交作业至ADB Spark。

详细参考文档:https://help.aliyun.com/document_detail/471602.html典型作业提交脚本如下:


./bin/spark-submit  \
--class com.aliyun.adb.spark.oss.SparkReadOss \--verbose \--name <your_job_name> \--jars oss://<bucket-name>/jars/xxx.jar,oss://<bucket-name>/jars/xxx.jar--conf spark.driver.resourceSpec=medium \--conf spark.executor.instances=1 \--conf spark.executor.resourceSpec=medium \oss://<bucket-name>/jars/xxx.jar args0 args1


Airflow调度ADB Spark作业可以方便与客户内部调度系统集成,通过客户熟知的Airflow调度系统管理ADB Spark作业的生命周期。

详细参考文档:https://help.aliyun.com/document_detail/470947.html


2.4 迁移问题梳理


2.4.1 ADB Spark和CDH其他组件的兼容性问题


科脉将CDH Spark分析的业务迁移到 ADB Spark,使用的其他组件如Hive,StarRocks等都仍然在CDH集群中。而Spark等业务是分析、写入这些组件,就会引入一些兼容性问题,主要概括如下:


1. ADB Spark到各组件的网络打通。之前CDH Spark和这些组件在同一个VPC中,无需进行网络打通即可访问。ADB Spark访问外部组件进行网络打通的方法是pod挂载弹性网卡,同时也需要对一些具有白名单的服务,如rds等添加弹性网卡网段的白名单,详细参考https://help.aliyun.com/document_detail/471203.html跨VPC访问和连接数据源;


2. CDH是Spark on Yarn的形态,而 ADB Spark 是Spark on K8S,需要对jar作业内容和一些配置进行修改,如external shuffle service配置、SparkSession参数等做改造;


3. CDH中使用 Spark 2.2 + Hive 2.1.1,而ADB目前内置的版本是Spark 3.2/3.3 + Hive 2.3.9。Spark读写低版本的Hive需要依赖HiveShim模块做适配,相关配置如下:


set spark.sql.hive.metastore.version={hive_version};set spark.sql.hive.metastore.jars= path;set spark.sql.hive.metastore.jars.path=/tmp/*/*.jar;add jar "oss://path/to/hive_jars/*";


和传统自建的spark集群不同,这里首先要add jar将需要的hive版本的jar包上传到spark pod,然后再做hive shim相关的配置。但是HIVE_HOME/lib下jar包较多,加载耗时较长,可以筛选出十几个hive client端依赖的jar,无需将所有的hive jar都做上传。


4. CDH Hive中将部分方法的签名做了改造,导致和开源Spark中的Hive shim 2.1不一致。如hive-exec.jar中的方法签名就不一致,需要将这个jar替换为开源的Hive 2.1的jar


5. ……


2.4.2 CDH Spark2和开源Spark3的兼容性问题


因为Spark3的一些默认行为较Spark2有所改变, CDH Spark 2中正常运行作业迁移到Spark 3也会有一些兼容性问题,如:


1. Spark 3和低版本 Hive/Spark 不兼容


a)Spark3写的parquet,用户使用原CDH中的Hue进行分析的时候会报错格式错误,无法读取。添加配置加配置 set spark.sql.parquet.writeLegacyFormat=true; 适配解决;


b)强制类型转换报错,如string转int报错"Cannot safely cast 'km_mch_status': string to int",配置spark.sql.storeAssignmentPolicy LEGACY解决。spark2中默认的配置是这个,允许强制类型转换,spark3中默认配置是ANSI,会有更严格的类型校验;


c)Datatime格式转换错误,配置 set spark.sql.legacy.timeParserPolicy=LEGACY 解决。


2. insert overwrite静态分区报错。如下的操作在Spark 3中是不支持的


insert overwrite table tmp.spark3_snap partition(dt='2020-09-10')select id from tmp.spark3_snapwhere dt='2020-09-09';


需要将静态分区改造成动态分区,如上述SQL可以改造为


insert overwrite table tmp.spark3_snap partition(dt)select id,'2020-09-10'as dt from tmp.spark3_snapwhere dt='2020-09-09';


Spark社区中相关的讨论:https://issues.apache.org/jira/browse/SPARK-32838


3. Spark3不支持select结果作为原表来引用,如执行如下


select xxx
FROM(SELECT*FROM tmp_ys_item WHERE pt_d BETWEEN DATE_ADD('2023-07-11',-1)AND'2023-07-11')INNER JOIN(SELECT*FROM tmp_ys_order WHERE pt_d BETWEEN DATE_ADD('2023-07-11',-1)AND'2023-07-11')ON tmp_ys_order.sheet_no= tmp_ys_item.sheet_noAND tmp_ys_order.pt_d= tmp_ys_item.pt_d


会报错不存在tmp_ys_order.sheet_no这个字段,这是因为spark3中对select结果的字段由统一的前缀__auto_generated_subquery_name,实际上字段是__auto_generated_subquery_name.sheet_no。解决方案是给select结果加上别名,如上SQL可以改造为:


select xxx
FROM (SELECT * FROM tmp_ys_item WHERE pt_d BETWEEN DATE_ADD('2023-07-11',-1) AND '2023-07-11') as t1
INNER JOIN (SELECT * FROM tmp_ys_order WHERE pt_d BETWEEN DATE_ADD('2023-07-11',-1) AND '2023-07-11') as t2
ON t2.sheet_no = t1.sheet_no
AND t2.pt_d = t1.pt_d


4. ……


3. 方案价值

 降低成本:云服务只需在界面修改最大资源量参数,就可获得足够的资源满足业务计算需求,非常灵活,且按量付费成本更低(公式、比例),按照最新的方案可以每年节省21%


自主可控:使用标准的开源技术,元数据、调度都自主研发可控,不存在和云厂商的绑定;


灵活取用:该方案业务增长不需要添加机器,也没有机器需要运维,至少减少80%的运维成本。



4. 未来展望

使用ThriftServer Server提前启动运行服务,加速作业启动,减少资源占用时间。


安全增强,在CDH中大数据组件之间通讯都是通过Kerberos认证,ADB Spark暂时不支持该能力。


数据湖,业务涉及退款等业务,需要长尾更新能力,需要数据湖能力,需要现在的方案满足以后数据湖要求。


作业的资源配置优化诊断建议,解决配置资源不足和资源过配的情况。




云原生数据仓库AnalyticDB MySQL免费试用


阿里云AnalyticDB MySQL升级为湖仓一体架构

支持高吞吐离线处理和高性能在线分析


无缝替换CDH/TDH/Databricks/Presto/Spark/Hive等。

免费试用活动(5000ACU时+100GB存储)正在火热申请中


点击文末「前往体验」即可开启免费试


   前往体验


相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
4月前
|
运维 监控 Kubernetes
实时数仓Hologres运维问题之时效性如何解决
Hologres达成分钟级问题发现与解决,确保监控高效准确。
43 0
|
29天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
54 1
|
2月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
128 2
|
3月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
158 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
4月前
|
分布式计算 安全 OLAP
7倍性能提升|阿里云AnalyticDB Spark向量化能力解析
AnalyticDB Spark如何通过向量化引擎提升性能?
|
4月前
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
281 3
|
5月前
|
分布式计算 监控 Serverless
E-MapReduce Serverless Spark 版测评
E-MapReduce Serverless Spark 版测评
11603 10
|
5月前
|
分布式计算 Serverless Spark
【开发者评测】E-MapReduce Serverless Spark获奖名单
E-MapReduce Serverless Spark获奖名单正式公布!
181 1
|
5月前
|
弹性计算 分布式计算 运维
迟来的EMR Serverless Spark评测报告
本文是一篇关于阿里云EMR Serverless Spark产品评测的文章,作者分享了使用体验和理解。EMR Serverless Spark是阿里云提供的全托管、一站式的Spark数据计算平台,简化了大数据处理流程,让用户专注于数据分析。文章提到了产品的主要优势,如快速启动、弹性伸缩、高资源利用率和低成本。
245 8
|
4月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
216 0

热门文章

最新文章

相关产品

  • 云原生数据仓库AnalyticDB MySQL版