Spark源码阅读笔记一——part of core

简介: spark core的部分阅读笔记,持续更新中

内部accumulator通过心跳报告给driver
task运行时可以累加accumulator,但是不能读取value,value只能在driver获取
spark内部用一个weakhashmap保存accumulator,便于gc的清理

CacheManager
spark的类用于负责传递RDD的分区内容给BlockManager,并保证一个节点不会载入一个rdd的两份拷贝,这个通过一个hashset实现,已载入的rdd会将id保存到set中
获取和计算rdd时,先判断是否已经计算,如果没有再从blockmanager获取block然后计算结果。
除非是本地模式,不然rdd的计算结果都会缓存
如果rdd不需要在内存中缓存,则直接将计算结果通过iterator直接传给blockmanager
在rdd需要缓存时,我们必须小心不能在内存中一次性展开全部的partition,否则如果jvm没有足够的空间给这个单个的partition可能会引发OOM异常。
取而代之的是,我们展开这些value,小心的、可能的放弃并丢掉这个partition到磁盘如果合适。
如果空间足够就全部缓存到内存中,否则如果使用磁盘就放到磁盘,不然直接就返回value

Dependency
NarrowDependency:一个子partition依赖于多个父partition
ShuffleDependency:shuffle stage的输出依赖,在shuffle中,rdd是短暂的因为我们在executor端不需要它

ExecutorAllocationClient
与cluster manager请求或杀掉executor的客户端
根据我们的调度需要更新集群,依赖于三个信息
1 executor的数量,我们需要的全部的executor数,cluster manager不能杀掉任何运行中的executor来达到这个数量,这是我们想要分配的executor数量
2 所有要运行的stage中有本地偏好的task数量,包括运行等待和完成的task
3 task到运行host的map

ExecutorAllocationManager(EAM)
一个代理,根据工作负载动态的分配和移除executor
EAM维护一个移动的目标executor数量,定期的同步到cluster manager。target的数量从配置的一个初始值开始,并根据等待和运行task数变化
在当前的target数量多于需要控制的当前的负载时,会减少target数量。target总是会一次性减到可以运行所以当前运行和等待task的数量
当需要响应积压的等待需要调度的task时,会增加target的数量。如果一个队列在N秒内没有排空,则新的executor被加入。如果这个队列仍然在另外的M秒内存在,则更多的executor会被加入。增加的数量在每轮以上一轮的指数级增加,直到达到上限。上限是基于一个配置的属性和当前运行和等待任务的数量。
指数增长有双重理由。
1 executor应该在开始缓慢的增加,以防万一额外需要的executor数量很小。否则我们增加了多于我们需要的executor数量则我们需要在后面移除他们。
2 executor的数量需要快速增加,以防万一executor的数量最大值非常高,否则在繁重的工作负载下性能提升需要很长时间。
executor移除的策略很简单,如果一个executor已经空闲了K秒,意味着它没有被调度用于执行任何task,因而移除它。
这里没有重试的逻辑,因为我们假定cluster manager最终会异步的执行所有它收到的请求。
相关的spark属性如下

成员变量initializing,是否需要一直等待初始化的executor集合被分配,当这个变量为true的时候,我们不会取消未执行的executor请求。这个在下面两种情况会被设置成false
1 一个stage被提交
2 一个executor的空闲时间超时
用于增加减少executor的调度任务是一个定时任务,每100毫秒执行一次
调度方法上,首先基于添加时间和我们当前的需要调整我们请求的executor,然后如果一个已存在的executor已经过期了,则杀掉。

updateAndSyncNumExecutorsTarget:更新target数量并同步结果到cluster manager。检查我们已存在的分配和之前的请求超过我们现在的需要。如果满足,truncate target数量并让cluster manager知道以便于它可以取消不需要的等待的请求。如果不满足,并且添加的时间超时,看看我们是否能请求新的executor,并刷新添加时间。

当一个executor(程它为executor X)因为到达了下限而没有被删除,则它不会再被标记位空闲。当有新的executor加入,我们不再在最低下限,则我们必须再次标记executor X为空闲,以使我们不会忘记它是一个被移除的候选。
当scheduler的队列是空的时候,就会将addtime设为未设置
所有cache的block会被报告给driver,但不包括广播的block
当executor执行任务了(busy),就会清除它的idle time
private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
stageid到tuple的map,tuple是节点和将会在这个节点上运行的task的数量
taskstart和blockmanageradded这些事件是在不同的线程执行的,因而顺序不一定,taskstart事件中将对应的executor置为busy
taskend,如果executor不再运行任何调度的任务,则标记为idle
如果task失败,则会将scheduler置为积压任务的状态,将这个task从这个stage对应的task列表中移除

目录
相关文章
|
8月前
|
设计模式 SQL 分布式计算
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
81 0
|
7月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
50 0
|
8月前
|
分布式计算 Java Hadoop
Spark3.3.0源码编译补充篇-抓狂的证书问题
Spark3.3.0源码编译补充篇-抓狂的证书问题
53 0
|
8月前
|
分布式计算 Java 测试技术
肝Spark源码的若干骚操作
肝Spark源码的若干骚操作
57 0
|
8月前
|
分布式计算 Java 程序员
Spark3.0源码编译打包
Spark3.0源码编译打包
49 0
|
8月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
119 2
|
8月前
|
存储 Java 关系型数据库
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
232 1
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
|
8月前
|
存储 缓存 分布式计算
【Spark】Spark Core Day04
【Spark】Spark Core Day04
57 1
|
8月前
|
分布式计算 监控 分布式数据库
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
184 0
|
8月前
|
机器学习/深度学习 分布式计算 搜索推荐
【大数据技术】Spark MLlib机器学习协同过滤电影推荐实战(附源码和数据集)
【大数据技术】Spark MLlib机器学习协同过滤电影推荐实战(附源码和数据集)
287 0