一、应用执行机制
一个应用的生命周期即,用户提交自定义的作业之后,Spark框架进行处理的一系列过程。
在这个过程中,不同的时间段里,应用会被拆分为不同的形态来执行。
1、应用执行过程中的基本组件和形态
Driver:
运行在客户端或者集群中,执行Application的main方法并创建SparkContext,调控整个应用的执行。
Application:
用户自定义并提交的Spark程序。
Job:
一个Application可以包含多个Job,每个Job由Action操作触发。
Stage:
比Job更小的单位,一个Job会根据RDD之间的依赖关系被划分为多个Stage,每个Stage中只存有RDD之间的窄依赖,即Transformation算子。
TaskSet:
每个Stage中包含的一组相同的Task。
Task:
最后被分发到Executor中执行的具体任务,执行Stage中包含的算子。
明确了一个应用的生命周期中会有哪些组件参与之后,再来看看用户是怎么提交Spark程序的。
2、应用的两种提交方式
Driver进程运行在客户端(Client模式):
即用户在客户端直接运行程序。
程序的提交过程大致会经过以下阶段:
- 用户运行程序。
- 启动Driver进行(包括DriverRunner和SchedulerBackend),并向集群的Master注册。
- Driver在客户端初始化DAGScheduler等组件。
- Woker节点向Master节点注册并启动Executor(包括ExecutorRunner和ExecutorBackend)。
- ExecutorBackend启动后,向Driver内部的SchedulerBackend注册,使得Driver可以找到计算节点。
- Driver中的DAGScheduler解析RDD生成Stage等操作。
- Driver将Task分配到各个Executor中并行执行。
Driver进程运行在集群中(某个Worker节点,Cluster模式):
即用户将Spark程序提交给Master分配执行。
大致会经过一下流程:
- 用户启动客户端,提交Spark程序给Master。
- Master针对每个应用分发给指定的Worker启动Driver进行。
- Worker收到命令之后启动Driver进程(即DriverRunner和其中的SchedulerBackend),并向Master注册。
- Master指定其他Worker启动Executor(即ExecutorRunner和其内部的ExecutorBackend)。
- ExecutorBackend向Driver中的SchedulerBackend注册。
- Driver中的DAGScheduler解析RDD生产Stage等。
- Executor内部启动线程池并行化执行Task。
可以看到,两种程序的提交方式在处理过程中,仅仅是在哪个地方启动Driver进程的区别而已。
为Client模式中时(使用Spark Shell直接执行的程序),Driver就在客户端上。
为Cluster模式时(提交Spark程序到Master),Driver运行与集群中的某个Worker节点。
二、调度与任务分配模块
Spark框架就像一个操作系统一样,有着自己的作业调度策略,当集群运行在不同的模式下,调度不同级别的单位,使用的策略也是有所不同的。
1、Application之间的调度
当有多个用户提交多个Spark程序时,Spark是如何调度这些应用并合理地分配资源呢?
Standalone模式下,默认使用FIFO,每个app会独占所有资源
可以通过以下几个参数调整集群相关的资源:
- spark.cores.max:调整app可以在整个集群中申请的CPU core数量
- spark.deploy.defaultCores:默认的CPU core数量
- spark.executor.memory:限制每个Executor可用的内存
在Mesos模式下,可以使用
- spark.mesos.coarse=true设置静态配置资源的策略
- 使用mesos://URL且不配置spark.mesos.coarse=true(每个app会有独立固定的内存分配,空闲时其他机器可以使用其资源)
在Yarn模式下,提交作业时可以使用
- 通过–num-executors控制分配多少个Executor给app
- –executor-memory和–executor-cores分别控制Executor的内存和CPU core
2、Application内部的Job调度机制
一个Application中,由各个Action触发的多个Job之间也是存在调度关系的。
Action操作实现上是调用了SparkContext的runJob方法提交Job。
Spark中调度Job有两种策略
FIFO:
- 第一个Job分配其所需的所有资源
- 第二个Job如果还有剩余资源的话就分配,否则等待
FAIR:
- 使用轮询的方式调度Job
可以通过配置spark.scheduler.mode调整Job的调度方式
另外也可以配置调度池,具体参考官方文档
或者参考conf/fairscheduler.xml.template文件。
3、Job中的Stage调度
Stage是由DAGScheduler组件生产的,在源码中,有三个比较特殊的变量:
- waitingStages:存储等待执行的Stages
- runningStages:存储正在执行的Stages
- failedStages:存储执行失败的Stage
Spark会通过广度优先遍历找到最开始的Stage执行,若有父Stage没有执行完则等待。
4、Stage中的Task调度
暂未了解。。。
三、I/O制度
Spark虽然是基于内存计算的框架,但是不可避免的也会接触到一些存储层,那么在和存储层交互的时候,Spark做了哪些工作?
1、序列化
序列化的本质就是将对象转换为字节流,可以理解为将链表中存储的非连续空间的数据存储转化为连续空间存储的数组中
Spark为什么要做序列化操作?
内存或磁盘中RDD会含有对象的存储,而在节点间数据的传输时,序列化之后的数据可以节约空间和提高效率。
2、压缩
压缩是日常生活中的一个常见操作,好处显而易见,节约空间,从而就可以获得时间上的效率。
Spark中序列化之后的数据可以进行压缩以减少空间开销。
Spark支持两种压缩算法
- Snappy算法:高压缩速度
- LZF算法:高压缩比
在不同的场景中选择不同的压缩算法可以有效的提高程序运行的效率。
压缩配置方式:
- 启动前在spark-env.sh中设置:export SPARK_JAVA_OPTS=”-Dspark.broadcast.compress”
- 在应用程序中配置
conf.getBoolean(“spark.broadcast.compress,true”)
conf.set(“spark.broadcast.compress”,true)
3、块管理
RDD从物理上看是一个元数据结构,记录着Block和Node之间的映射关系。
存储RDD是以Block块为单位的,每个分区对应一个块,PartitionID通过元数据信息可以映射到Block。
BlockManager管理和接口、块读写流程、数据块读写管理等细节待继续深入了解。
四、通信模块
Spark中使用Akka作为通信框架
- Actors是一组包含状态和行为的对象
- 一个Actor接收到其他Actor的信息之后可以根据需求做出各种反应
- Client、Master、Worker等都是一个Actor
Spark各个组件的之间协调工作都是基于Akka机制来的,待深入了解的有:
- Client Actor通信代码逻辑
- Master Actor通信代码逻辑
- Worker Actor消息处理逻辑
五、容错机制
之前讲过,RDD之间的算子操作会形成DAG图,RDD之间的依赖关系会形成Lineage。
要理解Lineage机制首先要明确两种依赖的概念:
Shuffle Dependencies(宽依赖)
父分区可以被多个子分区所用
即多对多的关系Narrow Dependencies(窄依赖)
父分区最多被一个子分区所用
即一对一或者多对一的关系
当出现某个节点计算错误的时候,会顺着RDD的操作顺序往回走
一旦是Narrow Dependencies错误,重新计算父RDD分区即可,因为其不依赖其他节点
而如果Shuffle Dependencies错误,重算代价较高,因为一旦重新计算其依赖的父RDD分区,会造成冗余计算
这时候就需要人为的添加检查点来提高容错机制的执行效率
什么情况下需要加CheckPoint
- DAG中的Lineage过长,如果重算开销太大,故在特定几个Shuffle Dependencies上做CheckPoint是有价值的。
- Checkpoint会产生磁盘开销,因为其就是将数据持久化到磁盘中,所以做检查点的RDD最好是已经在内存中缓存了。
六、Shuffle机制
Shuffle的定义:对无规则的数据进行重组排序等过程
为什么要Shuffle:分布式计算中数据是分布在各个节点上计算的,而汇总统计等操作需要在所有数据上执行
Spark中Shuffle经历的阶段:
Shuffle Write
将各个节点数据写入到指定分区
1、根据下一个Stage分区数分成相应的Bucket
2、将Bucket写入磁盘
Shuffle Fetch
获取各个分区发送的数据
1、在存储有Shuffle数据节点的磁盘Fetch需要的数据
2、Fetch到本地之后进行自定义的聚集函数操作
最后记录一下提交Spark作业的方法
在spark的bin目录下
执行spark-submit脚本
./spark-submit \
–class 入口函数所在的类名全称 \
–master spark master节点的地址(默认端口7077)\
–executor-memory 指定worker中Executor的内存 \
–total-executor-cores 100 \
jar文件所在的目录 \