Serverless Spark的弹性利器 - EMR Shuffle Service

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
函数计算FC,每月免费额度15元,12个月
简介: 在传统计算存储混合的架构中,为了兼顾计算和存储,CPU和存储设备都不能太差,因此牺牲了灵活性,提高了成本。在计算存储分离架构中,可以独立配置计算机型和存储机型,具有极大的灵活性,从而降低成本。

背景与动机

计算存储分离下的刚需

计算存储分离是云原生的重要特征。通常来讲,计算是CPU密集型,存储是IO密集型,他们对于硬件配置的需求是不同的。在传统计算存储混合的架构中,为了兼顾计算和存储,CPU和存储设备都不能太差,因此牺牲了灵活性,提高了成本。在计算存储分离架构中,可以独立配置计算机型和存储机型,具有极大的灵活性,从而降低成本。

存储计算分离是新型的硬件架构,但以往的系统是基于混合架构设计的,必须进行改造才能充分利用分离架构的优势,甚至不改造的话会报错,例如很多系统假设本地盘足够大,而计算节点本地盘很小;再例如有些系统在Locality上的优化在分离架构下不再适用。Spark Shuffle就是一个典型例子。众所周知,Shuffle的过程如下图所示。
1.png

每个mapper把全量shuffle数据按照partitionId排序后写本地文件,同时保存索引文件记录每个partition的offset和length。reduce task去所有的map节点拉取属于自己的shuffle数据。大数据场景T级别的shuffle数据量很常见,这就要求本地磁盘足够大,导致了跟计算存储分离架构的冲突。因此,需要重构传统的shuffle过程,把shuffle数据卸载到存储节点。

稳定性和性能

除了计算存储分离架构下的刚需,在传统的混合架构下,目前的shuffle实现也存在重要缺陷: 大量的随机读写和小数据量的网络传输。考虑1000 mapper * 2000 reducer的stage,每个mapper写128M shuffle数据,则属于每个reduce的数据量约为64k。从mapper的磁盘角度,每次磁盘IO请求随机读64K的数据; 从网络的角度,每次网络请求传输64k的数据:都是非常糟糕的pattern,导致大量不稳定和性能问题。因此,即使在混合架构下,重构shuffle也是很必要的工作。

EMR Shuffle Service设计

基于以上的动机,阿里云EMR团队设计开发了EMR Shuffle Service服务(以下称ESS),同时解决了计算存储分离和混合架构下的shuffle稳定性和性能问题。

整体设计

ESS包含三个主要角色: Master, Worker, Client。其中Master和Worker构成服务端,Client以不侵入方式集成到Spark里。Master的主要职责是资源分配和状态管理;Worker的主要职责是处理和存储shuffle数据;Client的主要职责是缓存和推送shuffle数据。整体流程如下所示(其中ResourceManager和MetaService是Master的组件):
2.png

ESS采用Push Style的shuffle模式,每个Mapper持有一个按Partition分界的缓存区,Shuffle数据首先写入缓存区,每当某个Partition的缓存满了即触发PushData。

在PushData触发之前Client会检查本地是否有PartitionLocation信息,该Location规定了每个Partition的数据应该推送的Worker地址。若不存在,则向Master发起getOrAllocateBuffers请求。Master收到后检查是否已分配,若未分配则根据当前资源情况选择互为主从的两个Worker并向他们发起AllocateBuffer指令。Worker收到后记录Meta并分配内存缓存。Master收到Worker的ack之后把主副本的Location信息返回给Client。

Client开始往主副本推送数据。主副本Worker收到请求后,把数据缓存到本地内存,同时把该请求以Pipeline的方式转发给从副本。从副本收到完整数据后立即向主副本发ack,主副本收到ack后立即向Client回复ack。

为了不block PushData的请求,Worker收到PushData请求后会先塞到一个queue里,由专有的线程池异步处理。根据该Data所属的Partition拷贝到事先分配的buffer里,若buffer满了则触发flush。ESS支持多种存储后端,包括DFS和local。若后端是DFS,则主从副本只有一方会flush,依靠DFS的双副本保证容错;若后端是Local,则主从双方都会flush。

在所有的Mapper都结束后,Master会触发StageEnd事件,向所有Worker发送CommitFiles请求,Worker收到后把属于该Stage的buffer里的数据flush到存储层,close文件,并释放buffer。Master收到所有ack后记录每个partition对应的文件列表。若CommitFiles请求失败,则Master标记此Stage为DataLost。

在Reduce阶段,reduce task首先向Master请求该Partition对应的文件列表,若返回码是DataLost,则触发Stage重算或直接abort作业。若返回正常,则直接读取文件数据。

ESS的设计要点,一是采用PushStyle的方式做shuffle,避免了本地存储,从而适应了计算存储分离架构;二是按照reduce做了聚合,避免了小文件随机读写和小数据量网络请求;三是做了两副本,提高了系统稳定性。

容错

除了双副本和DataLost检测,ESS在容错上做了很多事情保证正确性。

PushData失败

当PushData失败次数(Worker挂了,网络繁忙,CPU繁忙等)超过MaxRetry后,Client会给Master发消息请求新的Partition Location,此后本Client都会使用新的Location地址。

若Revive是因为Client端而非Worker的问题导致,则会产生同一个Partition数据分布在不同Worker上的情况,Master的Meta组件会正确处理这种情形。

若发生WorkerLost,则会导致大量PushData同时失败,此时会有大量同一Partition的Revive请求打到Master。为了避免给同一个Partition分配过多的Location,Master保证仅有一个Revive请求真正得到处理,其余的请求塞到pending queue里,待Revive处理结束后返回同一个Location。

WorkerLost

当发生WorkerLost时,对于该Worker上的副本数据,Master向其peer发送CommitFile的请求,然后清理peer上的buffer。若Commit Files失败,则记录该Stage为DataLost;若成功,则后续的PushData通过Revive机制重新申请Location。

数据冗余

Speculation task和task重算会导致数据重复。解决办法是每个PushData的数据片里encode了所属的mapId,attemptId和batchId,并且Master为每个map task记录成功commit的attemtpId。read端通过attemptId过滤不同的attempt数据,并通过batchId过滤同一个attempt的重复数据。

ReadPartition失败

在DFS模式下,ReadPartition失败会直接导致Stage重算或abort job。在Local模式,ReadPartition失败会触发从peer location读,若主从都失败则触发Stage重算或abort job。

多backend支持

ESS目前支持DFS和Local两种存储后端。

跟Spark集成

ESS以不侵入Spark代码的方式跟Spark集成,用户只需把我们提供的Shuffle Client jar包配置到driver和client的classpath里,并加入以下配置即可切换到ESS方式:

spark.shuffle.manager=org.apache.spark.shuffle.ess.EssShuffleManager

监控报警
我们对ESS服务端进行了较为详尽的监控报警并对接了Prometheus和Grafana,如下所示:
3.png

性能数字

TeraSort的性能数字如下(2T, 4T, 10T规模):

5.png

10T规模TPC-DS的性能数字如下:
6.png

后续

我们后续会持续投入,集中在产品化、极致性能、池化等方向,欢迎大家使用!


更多数据湖技术相关的文章请点击:阿里云重磅发布云原生数据湖体系


更多数据湖相关信息交流请加入阿里巴巴数据湖技术钉钉群
数据湖钉群.JPG

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
相关文章
|
21天前
|
分布式计算 Serverless Spark
【开发者评测】E-MapReduce Serverless Spark获奖名单
E-MapReduce Serverless Spark获奖名单正式公布!
122 1
|
1月前
|
分布式计算 监控 Serverless
E-MapReduce Serverless Spark 版测评
E-MapReduce Serverless Spark 版测评
11574 10
|
4天前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
31 0
|
1月前
|
弹性计算 分布式计算 运维
迟来的EMR Serverless Spark评测报告
本文是一篇关于阿里云EMR Serverless Spark产品评测的文章,作者分享了使用体验和理解。EMR Serverless Spark是阿里云提供的全托管、一站式的Spark数据计算平台,简化了大数据处理流程,让用户专注于数据分析。文章提到了产品的主要优势,如快速启动、弹性伸缩、高资源利用率和低成本。
182 8
|
23天前
|
运维 数据挖掘 Serverless
深度解析阿里云EMR Serverless StarRocks在OLAP数据分析中的应用场景
阿里云EMR Serverless StarRocks作为一款高性能、全场景覆盖、全托管免运维的OLAP分析引擎,在企业数据分析领域展现出了强大的竞争力和广泛的应用前景。通过其卓越的技术特点、丰富的应用场景以及完善的生态体系支持,EMR Serverless StarRocks正逐步成为企业数字化转型和智能化升级的重要推手。未来随着技术的不断进步和应用场景的不断拓展我们有理由相信EMR Serverless StarRocks将在更多领域发挥重要作用为企业创造更大的价值。
|
1月前
|
分布式计算 运维 Serverless
E-MapReduce Serverless Spark 评测
EMR Serverless Spark服务对比传统引擎和自建集群展现高稳定性和性能,自动化运维降低成本。其敏捷性、自动扩缩容和阿里云生态集成提升了开发效率。不过,监控预警、资源调度和工具集扩展是潜在改进点。该服务可与MaxCompute、DataWorks、Quick BI联动,实现数据处理、管理、可视化一站式解决方案。
56 0
|
1月前
|
存储 分布式计算 Serverless
|
5天前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
5天前
|
安全 对象存储
阿里云EMR数据湖文件系统问题之JindoFSOSS的单一prefix热点的问题如何解决
阿里云EMR数据湖文件系统问题之JindoFSOSS的单一prefix热点的问题如何解决
|
5天前
|
存储 安全 API
阿里云EMR数据湖文件系统问题之JindoFS元数据查询和修改请求的问题如何解决
阿里云EMR数据湖文件系统问题之JindoFS元数据查询和修改请求的问题如何解决

热门文章

最新文章

相关产品

  • 函数计算