背景
本文基于SPARK 3.2.1
用来更好的理解spark shuffle中的点点滴滴
分析
- 我们直接从SortShuffleManager着手,因为这是个shuffle的纽带:
override def registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle { ... override def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { ... override def getWriter[K, V]( handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { ...
跟shuffle紧密关联的是这三个方法,
- 其中
registerShuffle
方法是在ShuffleDependency实例构建出来的时候机会被调用:
val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, this)
其中shuffleId 是全局单调递增的,(其实这是为了一次shuffle的标示,是下游task获取上游task数据的label)
getWriter
方法是ShuffleMapTask RunTask中被调用的:
ShuffleMapTask.runTask || \/ dep.shuffleWriterProcessor.write || \/ writer = manager.getWriter[Any, Any]( handle: ShuffleHandle,
这里会根据已经注册好的shuffleHandle来获取对应的writer
getReader
方法下游task读取shuffle数的时候被调用的:
ShuffledRowRDD.compure || \/ SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle,..) || \/ val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
这里会根据已经注册好的shuffleHandle来获取对应的reader,这里会调用getMapSizesByExecutorId最终根据shuffleId来获取(Array[MapStatus], Array[MergeStatus])
这里包括了所有的MapStatus和MergeStatus,这样reader就能根据策略来向不同的blockManager发送shuffle fetch请求,以保证远程Executor负载均衡。
具体的写操作和读操作细节,读者可以去自己细看代码,我们这里只说一些总体的数据流思路。
再来看BlockManager,BlockManager是每个Executor都会有的,在SparkEnv创建的时候就会创建,用来管理数据块的存储的,
其中shuffle 数据的读取和写入都是和他有关联的。
分析一下BlockManager的跟shuffle有关的重要方法:
private[spark] val blockStoreClient = externalBlockStoreClient.getOrElse(blockTransferService) def initialize(appId: String): Unit = { ... val id = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) blockManagerId = if (idFromMaster != null) idFromMaster else id shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId } if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() ... } ... }
blockStoreClient
变量,用来读取其他Executor的Blocks文件的,也就是shuffle数据真正去读数的组件,
这在创建BlockManager的时候,如果开启ESS的话就会创建的
val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) { ...
如果没开启ESS的话,就用自带的BlockTransferService。
shuffleServerId也就是blockManagerId,会在Executor创建的时候初始化,
如果开启ESS,端口就是spark.shuffle.service.port,默认7337,否则就是spark.blockManager.port,默认是随机端口:
Executor中env.blockManager.initialize(conf.getAppId) || \/ registerWithExternalShuffleServer()
registerWithExternalShuffleServer
这个方法是用来注册ESS的(如果开启ESS的情况下):
val shuffleConfig = new ExecutorShuffleInfo( diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, shuffleManagerMeta) // Synchronous and will throw an exception if we cannot connect. blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer( shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
注册的信息包括block本地磁盘的位置,以及shuffleManagerMeta信息,注意如果这里开启push-based shuffle Server的话,就返回的是merge的路径,
否则返回sortShuffleManager的类名。
至于具体向哪个shuffle server实例注册,就是从shuffleServerId中获取的。 之后向对应的ESS发送RegisterExecutor消息
再来看YarnShuffleService对RegisterExecutor消息的回应(实际上是ExternalBlockHandler来处理的):
else if (msgObj instanceof RegisterExecutor) { final Timer.Context responseDelayContext = metrics.registerExecutorRequestLatencyMillis.time(); try { RegisterExecutor msg = (RegisterExecutor) msgObj; checkAuth(client, msg.appId); blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo); mergeManager.registerExecutor(msg.appId, msg.executorInfo); callback.onSuccess(ByteBuffer.wrap(new byte[0])); } finally { responseDelayContext.stop(); }
其中blockManager是把逻辑shuffle block转换为实际物理存储的组件,这里的注册就是blockManager对物理文件(LocalDirs等)做映射关系
mergeManager就是push based shuffle Manager进行文件merge的组件,也就就是把merge的路径以及物理文件(LocalDirs等给注册上去,便于后续获取shuffle文件。
注意:
blockStoreClient是Executor 存在的的时候才会有的组件,因为是去拉取shuffle数据;
而通过registerWithExternalShuffleServer注册的ESS组件是,可以在Executor不存在的时候提供服务的。这两者是有区别的。
至此SPARK SHUFFLE简单的流程就是这样了。