Spark之Shuffle机制及其文件寻址详解
一、SparkShuffle概念
Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.
Spark 官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operationsShuffle operations部分对Shuffle做了简要介绍。
Spark中有两种Shuffle类型,HashShuffle和SortShuffle,Spark1.2之前是HashShuffleManager, Spark1.2引入SortShuffleManager,在Spark 2.0+版本中已经将HashShuffleManager丢弃。
二、SparkShuffle 寻址
Spark中Shuffle文件的寻址是一个文件底层的管理机制,所以还是有必要了解一下。
2.1 基本概念
1) MapOutputTracker
MapOutputTracker是Spark架构中的一个模块,是一个主从架构。管理磁盘小文件的地址。
MapOutputTrackerMaster是主对象,存在于Driver中。MapOutputTrackerWorker是从对象,存在于Excutor中。
2) BlockManager
BlockManager块管理者,是Spark架构中的一个模块,也是一个主从架构。
BlockManagerMaster,主对象,存在于Driver中。(BlockManagerMaster会在集群中有用到广播变量和缓存数据或者删除缓存数据的时候,通知BlockManagerSlave传输或者删除数据。) BlockManagerWorker,从对象,存在于Excutor中。BlockManagerWorker会与BlockManagerWorker之间通信。
无论在Driver端的BlockManager还是在Excutor端的BlockManager都含有四个对象:
① DiskStore:负责磁盘的管理。② MemoryStore:负责内存的管理。③ ConnectionManager:负责连接其他的 BlockManagerWorker。④ BlockTransferService:负责数据的传输。
2.2 架构图
Shuffle文件寻址流程:
a) 当map task执行完成后,会将task的执行情况和磁盘小文件的地址封装到MpStatus对象中,通过MapOutputTrackerWorker对象向Driver中的MapOutputTrackerMaster汇报。b) 在所有的map task执行完毕后,Driver中就掌握了所有的磁盘小文件的地址。c) 在reduce task执行之前,会通过Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster获取磁盘小文件的地址。d) 获取到磁盘小文件的地址后,会通过BlockManager中的ConnectionManager连接数据所在节点上的ConnectionManager,然后通过BlockTransferService进行数据的传输。e) BlockTransferService默认启动5个task去节点拉取数据。默认情况下,5个task拉取数据量不能超过48M。拉取过来的数据放在Executor端的shuffle聚合内存中(spark.shuffle.memeoryFraction=0.2), 如果5个task一次拉取的数据放不到shuffle内存中会有OOM,如果放下一次,不会有OOM,以后放不下的会放磁盘。
扩展补充如何避免OOM:
- 拉去数据 少一些。
- 提高ExecutorShuffle聚合内存。
- 提高executor内存。