SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互

简介: SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互

背景

本文基于SPARK 3.2.1

用来更好的理解spark shuffle中的点点滴滴

分析

  1. 我们直接从SortShuffleManager着手,因为这是个shuffle的纽带:
override def registerShuffle[K, V, C](
     shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle {
...
 override def getReader[K, C](
      handle: ShuffleHandle,
      startMapIndex: Int,
      endMapIndex: Int,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext,
      metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
    ...
 override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Long,
      context: TaskContext,
      metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
...

跟shuffle紧密关联的是这三个方法,

  • 其中registerShuffle方法是在ShuffleDependency实例构建出来的时候机会被调用:
 val shuffleId: Int = _rdd.context.newShuffleId()
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)

其中shuffleId 是全局单调递增的,(其实这是为了一次shuffle的标示,是下游task获取上游task数据的label)

  • getWriter方法是ShuffleMapTask RunTask中被调用的:
ShuffleMapTask.runTask
     ||
     \/
dep.shuffleWriterProcessor.write
     ||
     \/
writer = manager.getWriter[Any, Any](
      handle: ShuffleHandle,

这里会根据已经注册好的shuffleHandle来获取对应的writer

  • getReader方法下游task读取shuffle数的时候被调用的:
ShuffledRowRDD.compure
      ||
      \/
SparkEnv.get.shuffleManager.getReader(
    dependency.shuffleHandle,..)
     ||
     \/
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)

这里会根据已经注册好的shuffleHandle来获取对应的reader,这里会调用getMapSizesByExecutorId最终根据shuffleId来获取(Array[MapStatus], Array[MergeStatus])

这里包括了所有的MapStatus和MergeStatus,这样reader就能根据策略来向不同的blockManager发送shuffle fetch请求,以保证远程Executor负载均衡。

具体的写操作和读操作细节,读者可以去自己细看代码,我们这里只说一些总体的数据流思路。


再来看BlockManager,BlockManager是每个Executor都会有的,在SparkEnv创建的时候就会创建,用来管理数据块的存储的,

其中shuffle 数据的读取和写入都是和他有关联的。

分析一下BlockManager的跟shuffle有关的重要方法:

  private[spark] val blockStoreClient = externalBlockStoreClient.getOrElse(blockTransferService)
   def initialize(appId: String): Unit = {
        ...
     val id =
      BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
         blockManagerId = if (idFromMaster != null) idFromMaster else id
      shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }
     if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
      ...
    }
    ...
   }

blockStoreClient 变量,用来读取其他Executor的Blocks文件的,也就是shuffle数据真正去读数的组件,

这在创建BlockManager的时候,如果开启ESS的话就会创建的

val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
    ...

如果没开启ESS的话,就用自带的BlockTransferService。


shuffleServerId也就是blockManagerId,会在Executor创建的时候初始化,

如果开启ESS,端口就是spark.shuffle.service.port,默认7337,否则就是spark.blockManager.port,默认是随机端口:

Executor中env.blockManager.initialize(conf.getAppId)
      ||
      \/
registerWithExternalShuffleServer()

registerWithExternalShuffleServer这个方法是用来注册ESS的(如果开启ESS的情况下):

val shuffleConfig = new ExecutorShuffleInfo(
      diskBlockManager.localDirsString,
      diskBlockManager.subDirsPerLocalDir,
      shuffleManagerMeta)
// Synchronous and will throw an exception if we cannot connect.
 blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(
    shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)

注册的信息包括block本地磁盘的位置,以及shuffleManagerMeta信息,注意如果这里开启push-based shuffle Server的话,就返回的是merge的路径,

否则返回sortShuffleManager的类名。

至于具体向哪个shuffle server实例注册,就是从shuffleServerId中获取的。 之后向对应的ESS发送RegisterExecutor消息

再来看YarnShuffleService对RegisterExecutor消息的回应(实际上是ExternalBlockHandler来处理的):

 else if (msgObj instanceof RegisterExecutor) {
      final Timer.Context responseDelayContext =
        metrics.registerExecutorRequestLatencyMillis.time();
      try {
        RegisterExecutor msg = (RegisterExecutor) msgObj;
        checkAuth(client, msg.appId);
        blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
        mergeManager.registerExecutor(msg.appId, msg.executorInfo);
        callback.onSuccess(ByteBuffer.wrap(new byte[0]));
      } finally {
        responseDelayContext.stop();
      }

其中blockManager是把逻辑shuffle block转换为实际物理存储的组件,这里的注册就是blockManager对物理文件(LocalDirs等)做映射关系

mergeManager就是push based shuffle Manager进行文件merge的组件,也就就是把merge的路径以及物理文件(LocalDirs等给注册上去,便于后续获取shuffle文件。


注意:

blockStoreClient是Executor 存在的的时候才会有的组件,因为是去拉取shuffle数据;

而通过registerWithExternalShuffleServer注册的ESS组件是,可以在Executor不存在的时候提供服务的。这两者是有区别的。


至此SPARK SHUFFLE简单的流程就是这样了。


相关文章
|
7月前
|
SQL 分布式计算 数据安全/隐私保护
如何杜绝 spark history server ui 的未授权访问? 1
如何杜绝 spark history server ui 的未授权访问?
|
2月前
|
分布式计算 Spark 索引
Spark学习---day07、Spark内核(Shuffle、任务执行)
Spark学习---day07、Spark内核(源码提交流程、任务执行)
|
4月前
|
分布式计算 Java 调度
Spark中的Shuffle过程是什么?为什么它在性能上很关键?
Spark中的Shuffle过程是什么?为什么它在性能上很关键?
30 0
|
7月前
|
分布式计算 Hadoop 大数据
如何杜绝 spark history server ui 的未授权访问? 2
如何杜绝 spark history server ui 的未授权访问?
|
9月前
|
分布式计算 监控 Java
Spark学习---7、Spark内核(源码提交流程、任务执行、Shuffle、内存管理)(一)
Spark学习---7、Spark内核(源码提交流程、任务执行、Shuffle、内存管理)(一)
|
10月前
|
分布式计算 算法 Java
Spark shuffle、RDD 算子【重要】
Spark shuffle、RDD 算子【重要】
213 0
|
11月前
|
SQL 分布式计算 开发工具
Spark 3.1.1 shuffle fetch 导致shuffle错位的问题
Spark 3.1.1 shuffle fetch 导致shuffle错位的问题
272 0
|
缓存 分布式计算 Spark
Spark之Shuffle机制及其文件寻址详解
Spark之Shuffle机制及其文件寻址详解
163 0
Spark之Shuffle机制及其文件寻址详解
|
存储 分布式计算 Java
SPARK 是怎么清除Shuffle中间结果数据的
SPARK 是怎么清除Shuffle中间结果数据的
381 0
|
分布式计算 Spark
SPARK push-based shuffle mapTask是怎么获取ESS列表信息
SPARK push-based shuffle mapTask是怎么获取ESS列表信息
200 0