Spark 3.0 中的屏障执行模式_Spark的MPI时代来了

简介: Spark 3.0 中的屏障执行模式_Spark的MPI时代来了

RDD屏障概念引入

Spark 3.0 引入了一种名为RDDBarrier[T]的新型 RDD ,它表示 RDD 需要使用屏障执行模式来处理。此 RDD 公开了普通 RDD 中不可用的新功能。

RDDBarrier的源码定义如下:

/**
 * :: Experimental ::
 * Wraps an RDD in a barrier stage, which forces Spark to launch tasks of this stage together
@Experimental
@Since("2.4.0")
class RDDBarrier[T: ClassTag] private[spark] (rdd: RDD[T]) {
...
}

注释中便是第一手的定义,首先RDDBarrier是实验性的,把一个Rdd封装在了一个屏障阶段,这样回强制Spark在运行task的时候在这个阶段一起执行。

关于Barrier概念其实和并发环境中的屏障是一样的,直白点说就是多个线程会在同一个时间点开始执行。

实操

来点例子,找找感觉。我们首先还是创建RDDBarrier Rdd

val df = sparkSession.range(0,100).repartition(4)
    val barrierRdd = df.rdd.barrier()
    val count = barrierRdd.mapPartitions(v => v).count()
    println("count is " + count)

需要看出效果,我们把线程数量调整成1

val sparkSession = SparkSession.builder.
          master("local[1]")
          .appName("barrierRdd")
          .getOrCreate()

我们运行,看效果,我找出关键的日志:

21/12/15 22:42:20 INFO SharedState: Warehouse path is 'file:/Users/zhuxuemin/IdeaProjects/spark-3.0-examples/spark-warehouse'.
21/12/15 22:42:22 INFO CodeGenerator: Code generated in 265.914341 ms
21/12/15 22:42:22 INFO SparkContext: Starting job: count at BarrierRddExample.scala:15
21/12/15 22:42:22 WARN DAGScheduler: Barrier stage in job 0 requires 4 slots, but only 1 are available. Will retry up to 40 more times
21/12/15 22:42:37 WARN DAGScheduler: Barrier stage in job 0 requires 4 slots, but only 1 are available. Will retry up to 39 more times

观察最后的提示,以上就是Barrier stage需要4个槽位,但是其实只有一个,因为我们repartition(4)是开了4个并发,线程数量不够4个,就会等着不执行了。

我们调整到线程数量为4,再看看效果:

21/12/15 22:48:36 INFO DAGScheduler: ResultStage 1 (count at BarrierRddExample.scala:15) finished in 0.325 s
21/12/15 22:48:36 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/12/15 22:48:36 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
21/12/15 22:48:36 INFO DAGScheduler: Job 0 finished: count at BarrierRddExample.scala:15, took 1.269630 s
count is 100

结果就是执行完成了。barrierRdd会等待资源足够的时候同时执行。

引入背景

可能最大的疑惑就是不知道有啥子用,这项特性是jira中提到的,原始链接:https://issues.apache.org/jira/browse/SPARK-24374,标题叫做 SPIP: Support Barrier Execution Mode in Apache Spark,页面里面是同时上传了pdf文档的

我简单翻译一下,前面是介绍背景吧啦吧啦,略去,捡中间部分

…考虑一个简单的场景:作为一名数据科学家,我想构建一个管道,从生产数据仓库中获取训练事件并拟合具有数据并行性的 DL 模型。这里,“生产数据仓库”因公司而异,例如 Apache Hive,来自 AWS 的 Redshift、来自 Azure 的 Data Lake、来自 Databricks 的 Delta 等。经过社区多年的发展,Spark 是从这些生产数据仓库读取数据的事实上的选择,而 TensorFlow 和 Horovod 是分布式 DL 训练的流行选择。作为末端用户,如何进行衔接(数仓->模型训练)?我们可以简化吗?这里的建议是在 Apache Spark 中添加一个新的调度模型,以便用户可以将分布式 DL 训练正确嵌入作为 Spark 阶段,以简化分布式训练工作流程。例如,Horovod 使用 MPI 实现 all-reduce 以加速分布式 TensorFlowtraining。计算模型与 Spark 使用的 MapReduce 不同。在 Spark 中,阶段中的任务不依赖于同一阶段中的任何其他任务,因此可以独立调度。在 MPI 中,所有Worker同时启动并传递消息。为了在 Spark 中嵌入这个工作负载,我们需要引入一个新的调度模型,暂命名为“barrierscheduling”,它在启动任务的同时,为用户提供足够的信息和工具来嵌入分布式 DL 训练。 Spark 还可以提供额外的容错层,以防某些任务在中间失败,Spark 会中止所有任务并重新启动阶段。

为什么需要新的执行模式

传统的Spark执行模式我们仍旧算作为Map/Reduce模式,那么这种执行模式多年来一直很好地适用于不同的工作负载。为什么我们现在需要不同的执行模式?

原因之一是在spark上支持深度学习框架。深度学习框架不适合 Map/Reduce 模型。它们与称为 MPI(消息传递接口)的其他类型的执行模型配合得很好。例如,Uber 的大规模深度学习开源框架 Horovod 使用 MPI 为各种 DL 框架实现分布式深度学习。

为了原生支持深度学习,spark 需要支持不同于 Map/Reduce 的执行模型。新的执行模型以 MPI 为模型。

屏障执行模式

  1. 工作是阶段的集合。在这些阶段之间通常会进行洗牌。这与 Map/Reduce 模型相同
  2. 每个阶段都是任务的集合。这些任务都是一起开始的,而且是相互依赖的。与 Map/Reduce 相比,这是主要的区别之一。在 MPI 模型中,任务可以相互通信并相互依赖。所以他们需要一起开始。
  3. 由于任务相互依赖,当其中一个任务失败时,所有任务都会重试。同样,这与 Map/Reduce 模型不同。
  4. 任务数量始终由开发人员决定。这是因为即使数据可能很小,计算也可能复杂得多,并且可能需要比典型处理更多的资源。还应该有足够的资源来一起运行所有任务。

后记

我们其实可以说,Spark的MPI时代来了~

目录
相关文章
|
1月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
148 6
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
64 2
|
3月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
249 3
|
4月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
存储 分布式计算 监控
Spark Standalone模式是一种集群部署方式
【6月更文挑战第17天】Spark Standalone模式是一种集群部署方式
70 7
|
6月前
|
分布式计算 监控 Java
Note_Spark_Day02:Standalone集群模式和使用IDEA开发应用程序
Note_Spark_Day02:Standalone集群模式和使用IDEA开发应用程序
90 0
|
6月前
|
分布式计算 资源调度 调度
利用SparkLauncher实现Spark Cluster模式下的远端交互
利用SparkLauncher实现Spark Cluster模式下的远端交互
119 0
|
6月前
|
分布式计算 资源调度 监控
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
199 1
|
分布式计算 资源调度 监控
Spark Yarn模式部署集群
Spark Yarn模式部署集群
84 1