Apache Spark 将支持 Stage 级别的资源控制和调度

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 我们需要对不同 Stage 设置不同的资源。但是目前的 Spark 不支持这种细粒度的资源配置,导致我们不得不在作业启动的时候设置大量的资源,从而导致资源可能浪费,特别是在机器学习的场景下。

背景

熟悉 Spark 的同学都知道,Spark 作业启动的时候我们需要指定 Exectuor 的个数以及内存、CPU 等信息。但是在 Spark 作业运行的时候,里面可能包含很多个 Stages,这些不同的 Stage 需要的资源可能不一样,由于目前 Spark 的设计,我们无法对每个 Stage 进行细粒度的资源设置。而且即使是一个资深的工程师也很难准确的预估一个比较合适的配置,使得作业启动时设置的参数适合 Spark 每个 Stage。

我们来考虑这个这样的场景:我们有个 Spark 作业,它总共有两个 Stages。第一个 Stage 主要对我们输入的数据进行基本的 ETL 操作。这个阶段一般会启动大量的 Task,但是每个 Task 仅仅需要少量的内存以及少数的核(比如1个core)。第一个 Stage 处理完之后,我们将 ETL 处理的结果作为 ML 算法的输入,这个 Stage 只需要少数的 Task,但是每个 Task 需要大量的内存、GPUs 以及 CPU。

像上面这种业务场景大家应该经常遇到过,我们需要对不同 Stage 设置不同的资源。但是目前的 Spark 不支持这种细粒度的资源配置,导致我们不得不在作业启动的时候设置大量的资源,从而导致资源可能浪费,特别是在机器学习的场景下。

不过值得高兴的是,来自英伟达的首席系统软件工程师 Thomas Graves 给社区提了个 ISSUE,也就是 SPIP: Support Stage level resource configuration and scheduling,旨在让 Spark 支持 Stage 级别的资源配置和调度。大家从名字还可以看出,这是个 SPIP(Spark Project Improvement Proposals 的简称),SPIP 主要是标记重大的面向用户或跨领域的更改,而不是小的增量改进。所以可以看出,这个功能对 Spark 的修改很大,会对用户产生比较大的影响。

作者提完这个 SPIP 之后给社区发了一份邮件,说明这个 SPIP 的目的,解决的问题等,然后让大家进行投票决定这个 SPIP 要不要开发下去。值得高兴的是,已经社区的一轮投票,得到6票赞成1票反对的结果。那就说明这个 SPIP 通过了,将进入开发状态。

image.png

设计

前面扯了一堆,下面让我们来看看这个方案是如何设计的。为了实现这个功能,需要在现有的 RDD 类里面加上一些新的 API,用于指定这个 RDD 计算需要用到的资源,比如添加以下两个方法:

def withResources(resources: ResourceProfile): this.type
def getResourceProfile(): Option[ResourceProfile]

上面的 withResources 方法主要用于设置当前 RDD 的 resourceProfile,并返回当前 RDD 实例。ResourceProfile 里面指定的资源包括 cpu、内存和额外的资源(GPU/FPGA/等)。我们还可以利用它实现其他功能,比如限制每个 stage 的 task 数量,为 shuffle 指定一些参数。不过为了设计实现的简单,目前只考虑支持 Spark 目前支持的资源,针对 Task 可以设置 cpu 和额外的资源(GPU/FPGA/等);针对 Executor 可以设置 cpu、内存和额外的资源(GPU/FPGA/等) 。执行器资源包括cpu、内存和额外资源(GPU、FPGA等)。通过给现有 RDD 类添加上面的方法,这使得所有继承自 RDD 的演变 RDD 都支持设置资源,当然包括了输入文件生成的 RDD。

用户在编程的时候,可以通过 withResources 方法来设置 ResourceProfile 的,当然肯定不可以设置无限的资源。可以通过 ResourceProfile.require 同时设置 Executor 和 task 需要的资源。具体的接口如下所示:

def require(request: TaskResourceRequest): this.type
def require(request: ExecutorResourceRequest): this.type

class ExecutorResourceRequest(
 val resourceName: String,
 val amount: Int, // potentially make this handle fractional resources
 val units: String, // units required for memory resources
 val discoveryScript: Option[String] = None,
 val vendor: Option[String] = None)

class TaskResourceRequest(
 val resourceName: String,
 val amount: Int) // potentially make this handle fractional resources

之所以用 ResourceProfile 包装 ExecutorResourceRequest 或 TaskResourceRequest 是因为后面如果我们需要添加新功能可以很容易的实现。比如我们可以在 ResourceProfile 里面添加 ResourceProfile.prefer 方法,来实现程序申请到足够就运行这个作业,如果没有申请到足够资源就使作业失败。

当然,这个功能的实现需要依赖 Spark 的 Dynamic Allocation 机制。如果用户没有启用 Dynamic Allocation (spark.dynamicAllocation.enabled=false)或者用户没有为 RDD 设置 ResourceProfile,那么就按照现有的资源申请那套机制运行,否则就使用这个新机制。

因为每个 RDD 都可以指定 ResourceProfile,而 DAGScheduler 是可以把多个 RDD 的转换放到一个 stage 中计算的,所以 Spark 需要解决同一个 stage 中多个 RDD 的资源申请冲突。当然,一些 RDD 也会出现跨 Stages 的情况,比如 reduceByKey,所以针对这种情况 Spark 需要将 ResourceProfile 的设置应用到这两个 Stage 中。

如何使用

那么如果 RDD 添加了上面的方法,我们就可以想下面一样设置每个 Task 的资源使用情况:

val rp = new ResourceProfile()
rp.require(new ExecutorResourceRequest("memory", 2048))
rp.require(new ExecutorResourceRequest("cores", 2))
rp.require(new ExecutorResourceRequest("gpu", 1, Some("/opt/gpuScripts/getGpus")))
rp.require(new TaskResourceRequest("gpu", 1))
​
val rdd = sc.makeRDD(1 to 10, 5).mapPartitions { it =>
  val context = TaskContext.get()
  context.resources().get("gpu").get.addresses.iterator
}.withResources(rp)
​
val gpus = rdd.collect()

上面 ResourceProfile 指定 Executor 需要 2GB 内存、2个 cores 以及一个 GPU;Task 需要一个 GPU。

总结

本文只是介绍了这个功能的简单实现,在真实的设计开发中会有很多需要考虑的问题,具体可以参见 SPARK-27495,对应的设计文件参见 Stage Level Scheduling SPIP Appendices API/Design。因为这是个比较大的功能,所以可能需要花费数个月的时间去实现。相信有了这个功能之后,我们会更合理的使用集群的资源。


原文链接

本文转载自公众号:过往记忆大数据


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
5月前
|
分布式计算 大数据 数据处理
Apache Spark:提升大规模数据处理效率的秘籍
【4月更文挑战第7天】本文介绍了Apache Spark的大数据处理优势和核心特性,包括内存计算、RDD、一站式解决方案。分享了Spark实战技巧,如选择部署模式、优化作业执行流程、管理内存与磁盘、Spark SQL优化及监控调优工具的使用。通过这些秘籍,可以提升大规模数据处理效率,发挥Spark在实际项目中的潜力。
264 0
|
2月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
40 0
|
3月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
103 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
2月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
157 0
|
3月前
|
分布式计算 Apache Spark
|
4月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
96 6
|
4月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
5月前
|
存储 监控 Apache
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
网易的灵犀办公和云信利用 Apache Doris 改进了大规模日志和时序数据处理,取代了 Elasticsearch 和 InfluxDB。Doris 实现了更低的服务器资源消耗和更高的查询性能,相比 Elasticsearch,查询速度提升至少 11 倍,存储资源节省达 70%。Doris 的列式存储、高压缩比和倒排索引等功能,优化了日志和时序数据的存储与分析,降低了存储成本并提高了查询效率。在灵犀办公和云信的实际应用中,Doris 显示出显著的性能优势,成功应对了数据增长带来的挑战。
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
|
4月前
|
分布式计算 Shell 调度
看看airflow怎样调度python写的spark任务吧
看看airflow怎样调度python写的spark任务吧
46 0
|
4月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

推荐镜像

更多
下一篇
无影云桌面