客户说|科脉 x AnalyticDB,Serverless Spark替换CDH助力运维降本80%

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 使用AnalyticDB Spark替换自建CDHSpark,助力科脉降本增效

原创 汤金源

1.业务背景

1.1 公司概述


深圳市科脉技术股份有限公司(以下简称“科脉”),成立于1999年,专注于泛零售产业的数字科技服务商,是国家级专精特新“小巨人”企业、国家高新技术企业、中国软件行业信用等级AAA企业。秉承“用数字科技陪伴企业持续成长”的理念,提供金融场景的收银系统、SaaS ERP、聚合支付、资金分帐、私域营销增长、线上线下一体化SaaS方案等全链路数字化产品与解决方案。


科脉大数据部定位为企业挖掘数据金矿,为零售商家的成长服务。除了支撑科脉核心营销云业务与管理数字化外,还支撑早期的ERP,线上线下一体化的有数商城,聚合支付享钱平台等,数据资产的实现需要数据治理的工作繁多,为各个业务赋能需要综合考虑业务线的需求差异,科脉统一数据中台盖亚应用而生,实现了血缘关系自动化依赖,接口由配置化SQL生成,助力科脉降本增效。


1.2 业务挑战


随着业务的快速发展,数据量不断增加,对系统的性能和稳定性提出了更高的要求,每日跑批的1300个离线数据作业从9点延长至10点才能跑完,业务报表产出时间晚于客户需求时间,如果扩容机器,白天又会出现很长时间的资源未被使用,导致资源浪费。


通过Spark离线作业为每个商户的消费者打上业务标签,形成用户画像帮助十万级商家进行智能营销,需要运算每天千万级的历史数据,在运算1年到现在的数据统计,即在数据加工初始化跑批时,会导致晚上的日常任务不能跑,另外由于StarRocks计算节点和Yarn混合部署,所有跑初始化作业也不能完全占满服务器资源,否则影响在线服务。如果临时加节点处理,资源审批和运维部署的时间又过长,因此只能让业务长时间等待后才有数据可用。


挑战总结如下:


由于资源限制,每日跑批作业数据不能按时完成,影响客户报表产出。


数据加工初始化,资源不能快速扩容,导致业务长时间等待,制约业务快速发展。


经过充分的调研,发现云原生数据库能够匹配以上场景和解决痛点,即:按需付费,用多少弹多少,并且能够快速弹升。


2. 解决方案

2.1 老架构:CDH


大数据平台目前是存算分离架构,数据存储使用阿里云OSS,不用考虑添加机器等运维工作,而且整个CDH集群运维也非常轻量,不会存在随着数量增长而不停的加磁盘需求。

大数据计算以 Spark Jar 和 SQL 为主:



架构中调度系统和元数据中心都采用自建方案,存储使用OSS,计算采用自建 CDH Spark 集群,但面临自建CDH节点扩缩容周期长,天维度扩/缩容,无法处理突发的业务高峰问题。因此需要将计算上云切换为全托管方式,在和阿里云架构师沟通后,发现AnalyticDB Spark(以下简称ADB Spark)可以满足技术要求。


2.2 新架构:CDH + ADB Spark


优化后新架构:




新架构相比老架构核心点是使用ADB Spark替换自建CDH Spark,其他模块如调度系统和元数据服务HMS均无变化。


新旧流程对比:



新作业流程描述如下:


1. 任务调度仍然由科脉调度发起,通过ADB Spark OpenAPI接口启动Spark作业运行。


2. ADB Spark跨VPC读取 CDH HiveMetastore 元数据,进行语法解析和运算,结果仍然和之前一样写到阿里云OSS,ADB Serverless Spark 全托管,按量收费,如果没有提交任务到服务中,不计算费用。


2.3 ADB Spark 优化


2.3.1 写OSS优化


在Hadoop生态中使用OutputCommitter接口保证写入过程的数据一致性,原理类似于二阶段提交协议。


开源Hadoop提供了Hadoop FileSystem来实现读写OSS文件,它默认使用OutputCommitter的实现是FileOutputCommitter。为保证数据一致性,不让用户看到中间结果,在执行Task时先把结果输出到一个临时工作目录,所有Task都确认输出完成时,再由Driver统一将临时工作目录Rename到生产数据路径,整体流程如下图所示:



由于OSS相比 HDFS Rename 操作十分昂贵,Rename操作是Copy&Delete操作,而HDFS则是NameNode上的一个元数据操作。在ADB Spark中继续使用开源Hadoop的FileOutputCommitter性能很差,为解决这个问题,ADB Spark引入了OSS Multipart Upload特性来优化写入性能。


阿里云OSS支持Multipart Upload功能,原理是把一个文件分割成多个数据片并发上传,上传完成后,让用户自己选择一个时机调用Multipart Upload的完成接口,将这些数据片合并成原来的文件,以此来提高文件写入OSS的吞吐。由于Multipart Upload可以控制文件对用户可见的时机,所以我们可以利用它代替 Rename 操作来优化 ADB Spark OutputCommitter 场景写 OSS 时的性能。


基于Multipart Upload实现的OutputCommitter,整个算法流程如下图:


利用OSS Multipart Upload,有以下几个好处:


写入文件不需要多次拷贝。可以看到本来昂贵的 Rename 操作已经不需要了,写入文件不需要copy&delete。另外相比于Rename,OSS的completeMultipartUpload接口是一个非常轻量的操作。


出现数据不一致的几率更小。虽然如果一次要写入多个文件,此时进行completeMultipartUpload仍然不是原子性操作,但是相比于原先的rename会copy数据,他的时间窗口会缩短很多,出现数据不一致的几率会小很多,可以满足绝大部分场景。


Rename中的文件元信息相关操作不再需要。经过我们的统计,算法1中一个文件的元数据操作可以从13次下降到6次,算法2则可以从8次下降到4次。


OSS Multipart Upload中控制用户可见性的接口是和,这种接口的语义类似于。Hadoop FileSystem标准接口没有提供commit/abort这样的语义。


前面有提到过 OutputCommitter 类似于一个二阶段提交协议,因此我们可以把这个过程抽象为一个分布式事务,可以理解为Driver开启一个全局事务,各个Executor开启各自的本地事务,当Driver收到所有本地事务完成的信息之后,会提交这个全局事务。


基于这个抽象,我们引入了一个Semi-Transaction层(没有实现所有的事务语义),其中定义了Transaction等接口。在这个抽象之下我们把适配OSS Multipart Upload特性的一致性保证机制封装进来。另外我们还实现了OSSTransactionalOutputCommitter,它实现了OutputCommitter接口,上层的计算引擎比如Spark通过它和 Semi-Transaction 层交互,结构如下图:



下面以ADB Serverless Spark的使用来说明 OSSTransactionalOutputCommitter 的大体流程:


1. setupJob


Driver开启一个GlobalTransaction,GlobalTransaction在初始化的时候会在OSS上新建一个隐藏的属于这个GlobalTransaction的工作目录,用来存放本job的文件元数据。


2. setupTask


Executor使用Driver序列化过来的GlobalTransaction生成LocalTransaction。并监听文件的写入完成状态。Executor写文件。文件的元数据信息会被LocalTransaction监听到,并储存到本地的RocksDB里面,OSS远程调用比较耗时,我们把元数据存储到本地RocksDB上等到后续一次提交能够减少远程调用耗时。


3. commitTask


当Executor调用LocalTransaction commit操作时,LocalTransaction会上传这个Task它所相关的元数据到OSS对应的工作目录中去,不再监听文件完成状态。


4. commitJob


Driver会调用GlobalTransaction的commit操作,全局事务会读取工作目录中的所有元数据中的待提交文件列表,调用OSS completeMultipartUpload接口,让所有文件对用户可见。


引入 Semi-Transaction 有两个好处:


它不对任何计算引擎的接口有依赖,因此后面可以比较方便的移植到另外的一个计算引擎,通过适配可以将它所提供的实现给Presto或者其它计算引擎使用。


可以在Transaction的语义下添加更多的实现。例如对于分区合并的这种场景,可以加入MVCC的特性,在合并数据的同时不影响线上对数据的使用。


针对 Spark 典型 ETL Benchmark Terasort,在1TB输入数据量的情况下,ADB Spark FileOutputFormat 执行时间缩短62%,性能提升163%。而针对动态分区场景,社区算法1运行失败,算法2可以执行成功,ADB Spark FileOutputFormat 相比算法2性能还要进一步提升124%。


2.3.2 免AK/SK的OSS访问


开源使用AK/SK的方式访问OSS数据,而明文AK/SK配置泄露可能导致发生信息安全风险,为为满足企业安全以及精细化访问控制要求,ADB Spark基于RAM系统实现对OSS的访问控制,用户只需快速一键授权https://help.aliyun.com/document_detail/455073.html 即可授权子账号/其他账号访问权限,而后便可在ADB Spark上免AK/SK访问OSS数据,避免AK/SK泄露风险。


可通过如下示例访问OSS上的表数据,而不需要填写AK/SK信息





2.3.3 多租UI服务


UI服务对于开发者来说至关重要,开发人员依赖UI服务进行作业调试以及生产作业问题排查。好的UI服务可以很好地加速研发效率。


Spark社区提供HistoryServer提供对Spark历史作业的UI和日志服务,在实际应用中遇到诸多痛点,典型如下:


Eventlog空间开销大:HistoryServer依赖Spark引擎将运行中的Event信息全部记录FileSystem中,然后后台回放并绘出UI页面。对于复杂作业和长作业Eventlog量较大,可以达到百GB甚至TB级别。


复杂作业和长作业不支持:复杂作业或者长作业的Eventlog很大,HistoryServer会解析失败,甚至OOM。再加上空间开销大的原因,用户一般都只能关闭Eventlog。


 Replay效率差,延迟高:HistoryServer采用后台Replay Eventlog的方式还原Spark UI,相当于把Spark引擎的事件全部重放一遍,开销大,会有延迟。特别是作业较多或者较复杂的情况下,延迟可达分钟甚至十分钟级别。


针对上述痛点,ADB Spark重新自研了一套多租户UI方案,具体方案如下



UI 方案针对社区方案做了深度优化:


高效渲染:ADB Spark减少Eventlog依赖,作业运行时,Spark Driver 实时流式采集 Spark Event Meta 到OSS,保存作业结束时的页面元信息。为了加速复杂作业 UI 渲染速度,ADB Spark 优化反序列算法并采用 Rotation 算法自动过滤非必要事件,GB 级 Event 渲染提升 47%。


UIServer水平扩展:UIServer主要负责解析历史UI Meta和提供 stderr 和 stdout 日志服务,是轻量化无状态的,可以实现水平扩展,进而支持万级别客户同时在线服务。UIServer URL采用加密token作为参数,token代表了用户身份、应用id,UIServer据此实现多租户服务化。


本地日志自动滚动:对于长作业而言,stderr 或者 stdout 信息会随着时间增加累积,最终甚至可能打爆磁盘。ADB Spark安全容器内置后台进程,实现日志滚动,保存最有价值的最近一段时间的日志。


2.3.4 OpenAPI应用管理


为方便其他系统对接ADB Spark服务,充分享受云上Serverless Spark优势,ADB Spark提供多种方式提交作业,包括:OpenAPI 、Spark-Submit 和 Airflow调度


OpenAPI包括:提交作业、获取作业状态、获取作业日志、停止作业等典型API,方便集成到业务系统中。

更详细的API参考文档:https://help.aliyun.com/document_detail/612446.html


Spark-submit方式与社区提交Spark作业方式类似,通过spark-submit脚本即可提交作业至ADB Spark。

详细参考文档:https://help.aliyun.com/document_detail/471602.html典型作业提交脚本如下:


./bin/spark-submit  \
--class com.aliyun.adb.spark.oss.SparkReadOss \--verbose \--name <your_job_name> \--jars oss://<bucket-name>/jars/xxx.jar,oss://<bucket-name>/jars/xxx.jar--conf spark.driver.resourceSpec=medium \--conf spark.executor.instances=1 \--conf spark.executor.resourceSpec=medium \oss://<bucket-name>/jars/xxx.jar args0 args1


Airflow调度ADB Spark作业可以方便与客户内部调度系统集成,通过客户熟知的Airflow调度系统管理ADB Spark作业的生命周期。

详细参考文档:https://help.aliyun.com/document_detail/470947.html


2.4 迁移问题梳理


2.4.1 ADB Spark和CDH其他组件的兼容性问题


科脉将CDH Spark分析的业务迁移到 ADB Spark,使用的其他组件如Hive,StarRocks等都仍然在CDH集群中。而Spark等业务是分析、写入这些组件,就会引入一些兼容性问题,主要概括如下:


1. ADB Spark到各组件的网络打通。之前CDH Spark和这些组件在同一个VPC中,无需进行网络打通即可访问。ADB Spark访问外部组件进行网络打通的方法是pod挂载弹性网卡,同时也需要对一些具有白名单的服务,如rds等添加弹性网卡网段的白名单,详细参考https://help.aliyun.com/document_detail/471203.html跨VPC访问和连接数据源;


2. CDH是Spark on Yarn的形态,而 ADB Spark 是Spark on K8S,需要对jar作业内容和一些配置进行修改,如external shuffle service配置、SparkSession参数等做改造;


3. CDH中使用 Spark 2.2 + Hive 2.1.1,而ADB目前内置的版本是Spark 3.2/3.3 + Hive 2.3.9。Spark读写低版本的Hive需要依赖HiveShim模块做适配,相关配置如下:


set spark.sql.hive.metastore.version={hive_version};set spark.sql.hive.metastore.jars= path;set spark.sql.hive.metastore.jars.path=/tmp/*/*.jar;add jar "oss://path/to/hive_jars/*";


和传统自建的spark集群不同,这里首先要add jar将需要的hive版本的jar包上传到spark pod,然后再做hive shim相关的配置。但是HIVE_HOME/lib下jar包较多,加载耗时较长,可以筛选出十几个hive client端依赖的jar,无需将所有的hive jar都做上传。


4. CDH Hive中将部分方法的签名做了改造,导致和开源Spark中的Hive shim 2.1不一致。如hive-exec.jar中的方法签名就不一致,需要将这个jar替换为开源的Hive 2.1的jar


5. ……


2.4.2 CDH Spark2和开源Spark3的兼容性问题


因为Spark3的一些默认行为较Spark2有所改变, CDH Spark 2中正常运行作业迁移到Spark 3也会有一些兼容性问题,如:


1. Spark 3和低版本 Hive/Spark 不兼容


a)Spark3写的parquet,用户使用原CDH中的Hue进行分析的时候会报错格式错误,无法读取。添加配置加配置 set spark.sql.parquet.writeLegacyFormat=true; 适配解决;


b)强制类型转换报错,如string转int报错"Cannot safely cast 'km_mch_status': string to int",配置spark.sql.storeAssignmentPolicy LEGACY解决。spark2中默认的配置是这个,允许强制类型转换,spark3中默认配置是ANSI,会有更严格的类型校验;


c)Datatime格式转换错误,配置 set spark.sql.legacy.timeParserPolicy=LEGACY 解决。


2. insert overwrite静态分区报错。如下的操作在Spark 3中是不支持的


insert overwrite table tmp.spark3_snap partition(dt='2020-09-10')select id from tmp.spark3_snapwhere dt='2020-09-09';


需要将静态分区改造成动态分区,如上述SQL可以改造为


insert overwrite table tmp.spark3_snap partition(dt)select id,'2020-09-10'as dt from tmp.spark3_snapwhere dt='2020-09-09';


Spark社区中相关的讨论:https://issues.apache.org/jira/browse/SPARK-32838


3. Spark3不支持select结果作为原表来引用,如执行如下


select xxx
FROM(SELECT*FROM tmp_ys_item WHERE pt_d BETWEEN DATE_ADD('2023-07-11',-1)AND'2023-07-11')INNER JOIN(SELECT*FROM tmp_ys_order WHERE pt_d BETWEEN DATE_ADD('2023-07-11',-1)AND'2023-07-11')ON tmp_ys_order.sheet_no= tmp_ys_item.sheet_noAND tmp_ys_order.pt_d= tmp_ys_item.pt_d


会报错不存在tmp_ys_order.sheet_no这个字段,这是因为spark3中对select结果的字段由统一的前缀__auto_generated_subquery_name,实际上字段是__auto_generated_subquery_name.sheet_no。解决方案是给select结果加上别名,如上SQL可以改造为:


select xxx
FROM (SELECT * FROM tmp_ys_item WHERE pt_d BETWEEN DATE_ADD('2023-07-11',-1) AND '2023-07-11') as t1
INNER JOIN (SELECT * FROM tmp_ys_order WHERE pt_d BETWEEN DATE_ADD('2023-07-11',-1) AND '2023-07-11') as t2
ON t2.sheet_no = t1.sheet_no
AND t2.pt_d = t1.pt_d


4. ……


3. 方案价值

 降低成本:云服务只需在界面修改最大资源量参数,就可获得足够的资源满足业务计算需求,非常灵活,且按量付费成本更低(公式、比例),按照最新的方案可以每年节省21%


自主可控:使用标准的开源技术,元数据、调度都自主研发可控,不存在和云厂商的绑定;


灵活取用:该方案业务增长不需要添加机器,也没有机器需要运维,至少减少80%的运维成本。



4. 未来展望

使用ThriftServer Server提前启动运行服务,加速作业启动,减少资源占用时间。


安全增强,在CDH中大数据组件之间通讯都是通过Kerberos认证,ADB Spark暂时不支持该能力。


数据湖,业务涉及退款等业务,需要长尾更新能力,需要数据湖能力,需要现在的方案满足以后数据湖要求。


作业的资源配置优化诊断建议,解决配置资源不足和资源过配的情况。




云原生数据仓库AnalyticDB MySQL免费试用


阿里云AnalyticDB MySQL升级为湖仓一体架构

支持高吞吐离线处理和高性能在线分析


无缝替换CDH/TDH/Databricks/Presto/Spark/Hive等。

免费试用活动(5000ACU时+100GB存储)正在火热申请中


点击文末「前往体验」即可开启免费试


   前往体验


相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
5月前
|
运维 监控 Kubernetes
实时数仓Hologres运维问题之时效性如何解决
Hologres达成分钟级问题发现与解决,确保监控高效准确。
51 0
|
2月前
|
人工智能 自然语言处理 关系型数据库
客户说|宝宝树选用AnalyticDB RAG引擎,共创智能母婴生活新范式
宝宝树与阿里云深度合作,利用大数据和AI技术,推出了一系列智能化产品,如AI解读B超单、AI起名等,覆盖备孕、孕期、产后等场景,提升了用户体验,推动了商业化进程。通过技术架构的优化,宝宝树在内容生产和搜索精度上取得了显著成效,未来将继续深化“AI+母婴”战略,为用户提供更全面、个性化的服务。
|
6月前
|
数据采集 运维 Cloud Native
Flink+Paimon在阿里云大数据云原生运维数仓的实践
构建实时云原生运维数仓以提升大数据集群的运维能力,采用 Flink+Paimon 方案,解决资源审计、拓扑及趋势分析需求。
18540 54
Flink+Paimon在阿里云大数据云原生运维数仓的实践
|
5月前
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
296 3
|
8月前
|
SQL 运维 安全
客户说|享道出行 x DMS,构建一站式高效、安全的数据运维管理平台
享道出行通过引入一站式运维管理系统DMS,有效地降低数据运维的复杂性,提升运维团队的工作效率,同时保障IT系统的稳定性和安全性,最终实现业务的持续性和稳定发展。
|
SQL 分布式计算 数据可视化
基于Spark技术的银行客户数据分析
基于Spark技术的银行客户数据分析
|
SQL 运维 监控
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB PostgreSQL版功能演示(上)——七、常用运维SQL
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB PostgreSQL版功能演示(上)——七、常用运维SQL
|
弹性计算 监控 数据挖掘
《阿里云认证的解析与实战-数据仓库ACP认证》——云上数据仓库的架构方案——一、AnalyticDB助力客户行为日志实时分析
《阿里云认证的解析与实战-数据仓库ACP认证》——云上数据仓库的架构方案——一、AnalyticDB助力客户行为日志实时分析
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
168 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
80 0

相关产品

  • 云原生数据仓库AnalyticDB MySQL版