【架构】
Spark采用了分布式计算中的Master-Slave模型。
【1】Master作为整个集群的控制器,负责整个集群的正常运行;【2】Worker是计算节点,接受主节点命令以及进行状态汇报;
【3】Executor负责任务(Tast)的调度和执行;
【4】Client作为用户的客户端负责提交应用;
【5】Driver负责控制一个应用的执行。
Spark集群启动时,需要从主节点和从节点分别启动Master进程和Worker进程,对整个集群进行控制。在一个Spark应用的执行过程中,Driver是应用的逻辑执行起点,运行Application的main函数并创建SparkContext,DAGScheduler把对Job中的RDD有向无环图根据依赖关系划分为多个Stage,每一个Stage是一个TaskSet, TaskScheduler把Task分发给Worker中的Executor;Worker启动Executor,Executor启动线程池用于执行Task。
【RDD】
RDD:弹性分布式数据集,是一种内存抽象,可以理解为一个大数组,数组的元素是RDD的分区Partition,分布在集群上;在物理数据存储上,RDD的每一个Partition对应的就是一个数据块Block,Block可以存储在内存中,当内存不够时可以存储在磁盘上。
数据集分区存储在节点的内存中,减少迭代过程(如机器学习算法)反复的I/O操作从而提高性能。而Hadoop将Mapreduce计算的结果写入磁盘。
【工作机制】
【应用执行的机制】
Spark应用(Application)是用户提交的应用程序,执行模式有Local、Standalone、YARN、Mesos。
根据Application的Driver Program(或者YARN的AppMaster)是否在集群中运行
Spark应用的运行方式又可以分为Cluster模式和Client模式。
【yarn-Cluster模式和yarn-Client模式的区别】
client的driver运行在本地 AppMaster运行在yarn的一个节点上
AM只负责资源申请和释放,远程通信,等待driver完成;
cluster的driver运行在AM所在的container里,driver和AM是同一个进程的不同线程,会通信,AM同样等待driver的完成,从而释放资源。
【spark-yarn】
【shuffle】
定义:对无规则的数据进行重组排序等过程
必要性:分布式计算中数据时分布在各节点上计算的,而汇总统计等操作需要在所有数据上执行
在运行job时,spark是一个stage一个stage执行的
每个Stage由多个Task组成,同一Stage的各Task并行执行互不影响,但是后一个(Stage 1)需要等待前一个(Stage 0)执行结束才能开始执行,更为详细的执行过程如下图。
在Stage 0 和Stage 1之间存在数据交换,Stage 0 的Task无法确定其所产生的结果最终需要传递给Stage 1的哪个Task,因此数据需要按照一定的规则(Partitioner)重新打乱,这个过程称为Shuffle
【宽窄依赖】RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作.窄依赖跟宽依赖的区别是是否发生 shuffle(洗牌) 操作.
宽依赖会发生 shuffle 操作.
窄依赖是子 RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果
宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片
【Hash based shuffle 】Hash based shuffle的每个mapper都需要为每个reducer写一个文件,供reducer读取,即需要产生M*R个数量的文件,如果mapper和reducer的数量比较大,产生的文件数会非常多。
Hadoop Map Reduce被人诟病的地方,很多不需要sort的地方的sort导致了不必要的开销,于是Spark的Hash based shuffle设计的目标之一就是避免不需要的排序,
但是它在处理超大规模数据集的时候,产生了大量的磁盘IO和内存的消耗,很影响性能。
【Sort based shuffle】为了解决hash based shuffle性能差的问题,Spark 1.1 引入了Sort based shuffle,完全借鉴map reduce实现,每个Shuffle Map Task只产生一个文件,不再为每个Reducer生成一个单独的文件,将所有的结果只写到一个Data文件里,同时生成一个index文件,index文件存储了Data中的数据是如何进行分类的。Reducer可以通过这个index文件取得它需要处理的数据。 下一个Stage中的Task就是根据这个Index文件来获取自己所要抓取的上一个Stage中的Shuffle Map Task的输出数据。
Shuffle Map Task产生的结果只写到一个Data文件里, 避免产生大量的文件,从而节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。
而减少文件的数量可以避免同时写多个文件对系统带来的压力。
Sort based shuffle在速度和内存使用方面也优于Hash based shuffle。
【Tungsten-sort Based Shuffle】Tungsten-sort是对普通sort的一种优化,排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗降低,相应的也会减少gc的开销。
Tungsten-sort优化点主要在三个方面:
1)直接在serialized binary data上进行sort而不是java objects,减少了memory的开销和GC的overhead。
2)提供cache-efficient sorter,使用一个8 bytes的指针,把排序转化成了一个指针数组的排序。
3)spill的merge过程也无需反序列化即可完成。
这些优化的实现导致引入了一个新的内存管理模型,类似OS的Page,Page是由MemoryBlock组成的, 支持off-heap(用NIO或者Tachyon管理) 以及 on-heap 两种模式。为了能够对Record 在这些MemoryBlock进行定位,又引入了Pointer的概念。