Spark源码分析 – Shuffle


参考详细探究Spark的shuffle实现, 写的很清楚, 当前设计的来龙去脉



Hadoop的思路是, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件 
所以Hadoop后面直到reduce之前做的所有的事情其实就是不断的merge, 基于文件的多路并归排序, 在map端的将相同partition的merge到一起, 在reduce端, 把从mapper端copy来的数据文件进行merge, 以用于最终的reduce 
多路并归排序, 达到两个目的 
merge, 把相同key的value都放到一个arraylist里面 
sort, 最终的结果是按key排序的 
这个方案扩展性很好, 面对大数据也没有问题, 当然问题在效率, 毕竟需要多次进行基于文件的多路并归排序, 多轮的和磁盘进行数据读写……



Spark的优势在于效率, 所以没有做merge sort, 这样省去多次磁盘读写
当然这样会有扩展性问题, 很难两全,
因为不能后面再merge, 所以在写的时候, 需要同时打开corenum * bucketnum个文件, 写完才能关闭
并且在reduce的时候, 由于之前没有做merge, 所以必须在内存里面维护所有key的hashmap, 实时的merge和reduce, 详细参考下面


如何将shuffle数据写入block, 关键看ShuffleMapTask中的逻辑
可用看到使用shuffleBlockManager, Spark从0.8开始将shuffleBlockManager从普通的BlockManager中分离出来, 便于优化



      // Obtain all the block writers for shuffle blocks.
      val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
      shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) // 创建ShuffleBlocks, 参数是shuffleId和目标partitions数目
      buckets = shuffle.acquireWriters(partition) // 生成ShuffleWriterGroup, shuffle目标buckets(对应于partition)

      // Write the map output to its associated buckets.
      for (elem <- rdd.iterator(split, taskContext)) { // 从RDD中取出每个elem数据
        val pair = elem.asInstanceOf[Product2[Any, Any]]
        val bucketId = dep.partitioner.getPartition(pair._1) // 根据pair的key进行shuffle, 得到目标bucketid
        buckets.writers(bucketId).write(pair) // 将pair数据写入bucket.writer (BlockObjectWriter)

      // Commit这些buckets到block, 其他的RDD会从通过shuffleid找到这些block, 并读取数据
      // Commit the writes. Get the size of each bucket block (total block size).
      var totalBytes = 0L
      val compressedSizes: Array[Byte] = { writer: BlockObjectWriter =>
        val size = writer.size()
        totalBytes += size



ShuffleBlockManager的核心函数就是forShuffle, 这个函数返回ShuffleBlocks对象
ShuffleBlocks对象的函数acquireWriters, 返回ShuffleWriterGroup, 其中封装所有partition所对应的BlockObjectWriter

由于Spark的调度是基于task的, task其实对应于partition
如果有m个partitions, 而需要shuffle到n个partition上, 其实就是m个mapper task和n个reducer task
当然在spark中不可能所有的mapper task一起运行, task的并行度取决于core number

1. 如果每个mapper task都要产生n个files, 那么最终产生的文件数就是n*m, 文件数过多...
在Spark 0.8.1中已经优化成使用shuffle consolidation, 即多个mapper task公用一个bucket文件, 怎么公用?
取决于并行度, 因为并行的task是无法公用一个bucket文件的, 所以至少会产生corenum * bucketnum个文件, 而后面被执行的task就可以重用前面创建的bucketfile, 而不用重新创建

2. 在打开文件写的时候, 每个文件的write handler默认需要100KB内存缓存, 所以同时需要corenum * bucketnum * 100kb大小的内存消耗, 这个问题还没有得到解决

其实就是说spark在shuffle的时候碰到了扩展性问题, 这个问题为什么Hadoop没有碰到?
因为hadoop可用容忍多次的磁盘读写, 多次的文件merge, 所以它可以在每次从buffer spill的时候, 把内容写到一个新的文件中, 然后后面再去做文件merge  

class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter])
trait ShuffleBlocks {
  def acquireWriters(mapId: Int): ShuffleWriterGroup
  def releaseWriters(group: ShuffleWriterGroup)

class ShuffleBlockManager(blockManager: BlockManager) {

  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {
    new ShuffleBlocks {
      // Get a group of writers for a map task.
      override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
        val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
        val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => // 根据需要shuffle的partition数目创建writers
          val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId)  // blockid =  "shuffle_" + shuffleId + "_" + mapId + "_" + bucketId
          blockManager.getDiskBlockWriter(blockId, serializer, bufferSize) // 从blockManager得到DiskBlockWriter
        new ShuffleWriterGroup(mapId, writers)

      override def releaseWriters(group: ShuffleWriterGroup) = {
        // Nothing really to release here.



关于这部分参考, Spark源码分析 – PairRDD
关键的一点是, 在reduce端的处理中 (可以看没有mapSideCombine的部分, 更清晰一些)
mapPartitions其实是使用的MapPartitionsRDD, 即对于每个item调用aggregator.combineValuesByKey
可以看到这里和Hadoop最大的不同是, Hadoop在reduce时得到的是一个key已经merge好的集合, 所以一次性reduce处理完后, 就可以直接存掉了
而Spark没有merge这块, 所以数据是一个个来的, 所以你必须在内存里面维持所有的key的hashmap, 这里就可能有扩展性问题, Spark在PR303中实现外部排序的方案来应对这样的问题

    //RDD本身的partitioner和传入的partitioner相等时, 即不需要重新shuffle, 直接map即可
    if (self.partitioner == Some(partitioner)) {  
      self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //2. mapPartitions, map端直接调用combineValuesByKey
    } else if (mapSideCombine) { //如果需要mapSideCombine
      val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //先在partition内部做mapSideCombine
      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializerClass) //3. ShuffledRDD, 进行shuffle
      partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true) //Shuffle完后, 在reduce端再做一次combine, 使用combineCombinersByKey
    } else {
      // Don't apply map-side combiner.和上面的区别就是不做mapSideCombine
      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
      values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)


  override def compute(split: Partition, context: TaskContext): Iterator[P] = {
    val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
    SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context.taskMetrics, //使用shuffleFetcher.fetch得到shuffle过数据的iterator



从mapOutputTracker查询到(根据shuffleId, reduceId)需要读取的shuffle partition的地址


private[spark] abstract class ShuffleFetcher {
   * Fetch the shuffle outputs for a given ShuffleDependency.
   * @return An iterator over the elements of the fetched shuffle outputs.
  def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,  // reduceId, 就是reduce端的partitionid
      serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[T]
  /** Stop the fetcher */
  def stop() {}


private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {

  override def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer)
    : Iterator[T] =
    val blockManager = SparkEnv.get.blockManager
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) // 从mapOutputTracker获取shuffleid的Array[MapStatus] 
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]  // 由于有多个map在同一个node上, 有相同的BlockManagerId, 需要合并
    for (((address, size), index) <- statuses.zipWithIndex) {  // 这里index指,在map端的partitionid
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size)) // {BlockManagerId,((mappartitionid, size),…)} 
    val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] = { // (BlockManagerId, (blockfile地址, size)) 
      case (address, splits) =>
        (address, => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2))) // 可以看到blockfile地址,由shuffleId, mappartitionid, reduceId决定
    val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) // Iterator of (block ID, value) 
    val itr = blockFetcherItr.flatMap(unpackBlock) // unpackBlock会拆开(block ID, value)取出value, 以生成最终获取到数据的iterater

    CompletionIterator[T, Iterator[T]](itr, { // 和普通Iterator的区别是,迭代完时, 会调用后面的completion逻辑
      val shuffleMetrics = new ShuffleReadMetrics
      shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
      shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
      shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
      shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
      shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
      shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
      shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
      metrics.shuffleReadMetrics = Some(shuffleMetrics)
  private def convertMapStatuses(
        shuffleId: Int,
        reduceId: Int,
        statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
    assert (statuses != null) {
      status => 
        if (status == null) {
          throw new FetchFailedException(null, shuffleId, -1, reduceId,
            new Exception("Missing an output location for shuffle " + shuffleId))
        } else {
          (status.location, decompressSize(status.compressedSizes(reduceId))) // 关键转化就是, 将decompressSize只取该reduce partition的部分


Shuffle信息注册 - MapOutputTracker

前面有个问题没有说清楚, 当shuffle完成后, reducer端的task怎么知道应该从哪里获取当前partition所需要的所有shuffled blocks
在Hadoop中是通过JobTracker, Mapper会通过Hb告诉JobTracker执行的状况, Reducer不断的去询问JobTracker, 并知道需要copy哪些HDFS文件 


首先每个节点都可能需要查询shuffle信息, 所以需要MapOutputTrackerActor用于通信
参考SparkContext中的逻辑, 只有在master上才创建Actor对象, 其他slaver上只是创建Actor Ref

private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
  def receive = {
    case GetMapOutputStatuses(shuffleId: Int, requester: String) => // 提高用于查询shuffle信息的接口
      logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester)
      sender ! tracker.getSerializedLocations(shuffleId)

    case StopMapOutputTracker =>
      logInfo("MapOutputTrackerActor stopped!")
      sender ! true

注意, 只有master上的MapOutputTracker会有所有的最新shuffle信息
但是对于slave, 出于效率考虑, 也会buffer从master得到的shuffle信息, 所以getServerStatuses中会先在local的mapStatuses取数据, 如果没有, 再取remote的master上获取

private[spark] class MapOutputTracker extends Logging {
  var trackerActor: ActorRef = _   // MapOutputTrackerActor
  private var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]  // 用于buffer所有的shuffle信息
  def registerShuffle(shuffleId: Int, numMaps: Int) {  // 注册shuffle id, 初始化Array[MapStatus]
    if (mapStatuses.putIfAbsent(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
      throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")

  def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) { // 当task完成时, 注册MapOutput信息
    var array = mapStatuses(shuffleId)
    array.synchronized {
      array(mapId) = status
  // Remembers which map output locations are currently being fetched on a worker
  private val fetching = new HashSet[Int]
  // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
  def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
    val statuses = mapStatuses.get(shuffleId).orNull
    if (statuses == null) {  // local的mapStatuses中没有
      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
      var fetchedStatuses: Array[MapStatus] = null
      fetching.synchronized {
        if (fetching.contains(shuffleId)) { // 已经在fetching中, 所以只需要wait
          // Someone else is fetching it; wait for them to be done
          while (fetching.contains(shuffleId)) {
            try {
            } catch {
              case e: InterruptedException =>

        // Either while we waited the fetch happened successfully, or
        // someone fetched it in between the get and the fetching.synchronized.
        fetchedStatuses = mapStatuses.get(shuffleId).orNull
        if (fetchedStatuses == null) {
          // We have to do the fetch, get others to wait for us.
          fetching += shuffleId  // 如果还没有就加到fetching, 继续fetching
      if (fetchedStatuses == null) {
        // We won the race to fetch the output locs; do so
        val hostPort = Utils.localHostPort()
        // This try-finally prevents hangs due to timeouts:
        try {
          val fetchedBytes =
            askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]] // 从remote master上fetching
          fetchedStatuses = deserializeStatuses(fetchedBytes)
          logInfo("Got the output locations")
          mapStatuses.put(shuffleId, fetchedStatuses) // 把结果buffer到local
        } finally {
          fetching.synchronized {
            fetching -= shuffleId
      if (fetchedStatuses != null) {
        fetchedStatuses.synchronized {
          return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
        throw new FetchFailedException(null, shuffleId, -1, reduceId,
          new Exception("Missing all output locations for shuffle " + shuffleId))
    } else {  // 在local找到, 直接返回
      statuses.synchronized {
        return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)



Spark中是以shuffleid来标识每个shuffle, 不同于Hadoop, 一个job中可能有多个shuffle过程, 所以无法通过jobid
1. 在new stage的时候, 需要注册shuffleid, 由于new stage一定是由于遇到shuffleDep

  private def newStage(
      rdd: RDD[_],
      shuffleDep: Option[ShuffleDependency[_,_]],
      jobId: Int,
      callSite: Option[String] = None)
    : Stage =
    if (shuffleDep != None) {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size) // 注册shuffleid和map端RDD的partitions数目

2. 在handle TaskCompletion事件的时候, 当一个ShuffleMapTask完成的时候, 即mapOutput产生的时候, 就可以注册MapStatus(BlockManagerId, compressedSizes)
通过BlockManagerId+partitionid+reduceid就可以知道blockid, 从而读到数据

 private def handleTaskCompletion(event: CompletionEvent) {
    event.reason match {
      case Success =>
        task match {
          case rt: ResultTask[_, _] =>
          case smt: ShuffleMapTask =>
            val status = event.result.asInstanceOf[MapStatus] // 在ShuffleTask的run的返回值本身就是MapStatus, 所以这里做下类型转换
            val execId = status.location.executorId  // class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
            } else {
              stage.addOutputLoc(smt.partition, status) // 把MapStatus buffer到stage中outputLocs上去
              if (stage.shuffleDep != None) {
                // We supply true to increment the epoch number here in case this is a
                // recomputation of the map outputs. In that case, some nodes may have cached
                // locations with holes (from when we detected the error) and will need the
                // epoch incremented to refetch them.
                // TODO: Only increment the epoch number if this is not the first time
                //       we registered these map outputs.
                mapOutputTracker.registerMapOutputs(  // 注册到mapOutputTracker中的mapStatuses上
         => if (list.isEmpty) null else list.head).toArray,
                  changeEpoch = true)

