Spark源码分析 – BlockManager

简介:

参考, Spark源码分析之-Storage模块

对于storage, 为何Spark需要storage模块?为了cache RDD 
Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block 
所以storage模块,就是要实现RDD在memory和disk上的persistent功能

首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave 
master负责track所有的slave BlockManager的BlockManagerInfo, 而BlockManagerInfo中又track了该BlockManager管理的所有的block的BlockStatus 
当slave上的block有任何变化的时候, 需要发送updateBlockInfo事件来更新master上block信息 
典型的中心化设计, master和slave之间的通信通过actor来进行, 当然对于block数据的传输, 由于数据量比较大, 所以使用connectionManager(NIO或Netty) 
所以自然需要BlockManagerMasterActor和BlockManagerSlaveActor, 参考Spark 源码分析 – BlockManagerMaster&Slave

其中还有个BlockManagerMaster,负责wrap BlockManagerMasterActor, 比较confusing的是每个节点都会创建这个BlockManagerMaster, 只是在slave中不会真正创建BlockManagerMasterActor, 而是Ref, 不好的设计 
而且由于BlockManager被master和slave公用, 所以提供了两者大部分接口, 而对于master部分都是直接wrap BlockManagerMaster, 而对于slave中的数据读写等就直接实现了, 设计不统一

总之, storage这个模块, 设计比较随意, 不是很合理, 也体现在一些细小的命名上, 给分析和理解带来了一些困难.

 

在SparkEnv的初始化中, 创建BlockManagerMaster和blockManager

    val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
      "BlockManagerMaster",
      new BlockManagerMasterActor(isLocal)))
    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
    // 创建actor和actor ref
// 对于BlockManagerMaster, 在master上创建BlockManagerMasterActor, 而在slave上创建BlockManagerMasterActor的ref
    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        actorSystem.actorOf(Props(newActor), name = name)
      } else {
        val driverHost: String = System.getProperty("spark.driver.host", "localhost")
        val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
        Utils.checkHost(driverHost, "Expected hostname")
        val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
        logInfo("Connecting to " + name + ": " + url)
        actorSystem.actorFor(url)
      }
    }

 

1 BlockManagerId

BlockManagerId作为BlockManager唯一标识, 所以希望一个BlockManager只创建一个BlockManagerId 对象 
典型Singleton的场景 
在Scala里面实现Singleton比较晦涩, 这里是个典型的例子 
将所有的构造函数设为private, 然后利用伴生对象的来创建对象实例

/**
 * This class represent an unique identifier for a BlockManager.
 * The first 2 constructors of this class is made private to ensure that
 * BlockManagerId objects can be created only using the apply method in
 * the companion object. This allows de-duplication of ID objects.
 * Also, constructor parameters are private to ensure that parameters cannot
 * be modified from outside this class.
 */
private[spark] class BlockManagerId private (
    private var executorId_ : String,
    private var host_ : String,
    private var port_ : Int,
    private var nettyPort_ : Int
  ) extends Externalizable {
  private def this() = this(null, null, 0, 0)  // For deserialization only
}
 
private[spark] object BlockManagerId {

  /**
   * Returns a [[org.apache.spark.storage.BlockManagerId]] for the given configuraiton.
   *
   * @param execId ID of the executor.
   * @param host Host name of the block manager.
   * @param port Port of the block manager.
   * @param nettyPort Optional port for the Netty-based shuffle sender.
   * @return A new [[org.apache.spark.storage.BlockManagerId]].
   */
  def apply(execId: String, host: String, port: Int, nettyPort: Int) =
    getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))

  def apply(in: ObjectInput) = {
    val obj = new BlockManagerId()
    obj.readExternal(in)
    getCachedBlockManagerId(obj)
  }

  val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()

  def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
    blockManagerIdCache.putIfAbsent(id, id)
    blockManagerIdCache.get(id)
  }
}

 

2 BlockManager

BlockManager是被master和slave公用的, 但对于master的逻辑都已经wrap在BlockManagerMaster中了 
所以这里主要分析一些slave相关的接口逻辑, reportBlockStatus, get, put 
其中put, get使用到memoryStore和diskStore, 参考Spark 源码分析 -- BlockStore

private[spark] class BlockManager(
    executorId: String,
    actorSystem: ActorSystem,
    val master: BlockManagerMaster,
    val defaultSerializer: Serializer,
    maxMemory: Long)
  extends Logging {
  private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {} // BlockInfo的定义, 详细见下

  val shuffleBlockManager = new ShuffleBlockManager(this)
  private val blockInfo = new TimeStampedHashMap[String, BlockInfo] // 记录manage的所有block的BlockInfo [blockid,blockinfo]

  private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
  private[storage] val diskStore: DiskStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
  
  val blockManagerId = BlockManagerId(executorId, connectionManager.id.host, connectionManager.id.port, nettyPort) // BlockManagerId
  val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),   // 创建slaveActor, 貌似每个BlockManager都会创建slaveActor
    name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
 
  /**
   * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
   * BlockManagerWorker actor.
   */
  private def initialize() {
    master.registerBlockManager(blockManagerId, maxMemory, slaveActor) // 向master注册BlockManager, 如果本身就是driver, 啥都不做
    BlockManagerWorker.startBlockManagerWorker(this) // 创建BlockManagerWorker用于和remote传输block,block比较大所以无法用akka
    if (!BlockManager.getDisableHeartBeatsForTesting) {
      heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { // 设定scheduler定期发送hb
        heartBeat()
      }
    }
  }

2.1 BlockInfo

BlockInfo关键是对block做了访问互斥, 访问block前需要, 先waitForReady 
所以每个block, 都需要生成一个BlockInfo来经行互斥管理 
这个为啥叫BlockInfo? 
BlockManagerMasterActor中updateBlockInfo事件更新的不是这个BlockInfo, 而是BlockManagerInfo.BlockStatus, 不太合理!

  private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
    @volatile var pending: Boolean = true
    @volatile var size: Long = -1L
    @volatile var initThread: Thread = null
    @volatile var failed = false

    setInitThread()

    private def setInitThread() {
      // Set current thread as init thread - waitForReady will not block this thread
      // (in case there is non trivial initialization which ends up calling waitForReady as part of
      // initialization itself)
      this.initThread = Thread.currentThread()
    }

    /**
     * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
     * Return true if the block is available, false otherwise.
     */
    def waitForReady(): Boolean = {
      if (initThread != Thread.currentThread() && pending) {
        synchronized {
          while (pending) this.wait()
        }
      }
      !failed
    }

    /** Mark this BlockInfo as ready (i.e. block is finished writing) */
    def markReady(sizeInBytes: Long) {
      assert (pending)
      size = sizeInBytes
      initThread = null
      failed = false
      initThread = null
      pending = false
      synchronized {
        this.notifyAll()
      }
    }

    /** Mark this BlockInfo as ready but failed */
    def markFailure() {
      assert (pending)
      size = 0
      initThread = null
      failed = true
      initThread = null
      pending = false
      synchronized {
        this.notifyAll()
      }
    }
  }

2.2 reportBlockStatus

  /**
   * Tell the master about the current storage status of a block. This will send a block update
   * message reflecting the current status, *not* the desired storage level in its block info.
   * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
   *
   * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid).
   * This ensures that update in master will compensate for the increase in memory on slave.
   */
  def reportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L) {
    val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize) // 如果返回false, 说明你发的blockid在master没有, 需要重新注册
    if (needReregister) {
      logInfo("Got told to reregister updating block " + blockId)
      // Reregistering will report our new block for free.
      asyncReregister()
    }
    logDebug("Told master about block " + blockId)
  }
  /**
   * Actually send a UpdateBlockInfo message. Returns the mater's response,
   * which will be true if the block was successfully recorded and false if
   * the slave needs to re-register.
   */
  private def tryToReportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {
    val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
      info.level match {
        case null =>
          (StorageLevel.NONE, 0L, 0L, false)
        case level =>
          val inMem = level.useMemory && memoryStore.contains(blockId)
          val onDisk = level.useDisk && diskStore.contains(blockId)
          val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication)
          val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize
          val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
          (storageLevel, memSize, diskSize, info.tellMaster)
      }
    }
    if (tellMaster) {  // 把当前block的情况, disk和memory的使用情况报告给master
      master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
    } else {
      true
    }
  }

2.3 Get

  /**
   * Get block from local block manager, 在本地读取block
   */
  def getLocal(blockId: String): Option[Iterator[Any]] = {
    val info = blockInfo.get(blockId).orNull
    if (info != null) {
      info.synchronized { // 对block的互斥访问
        // In the another thread is writing the block, wait for it to become ready.
        if (!info.waitForReady()) { // 等待block ready, block只能被线性的写入
          // If we get here, the block write failed.
          logWarning("Block " + blockId + " was marked as failure.")
          return None
        }

        val level = info.level
        // Look for the block in memory
        if (level.useMemory) { // 如果storage level是用到memory的, 就先在memoryStore中试图取这个block 
           memoryStore.getValues(blockId) match { 
            case Some(iterator) =>
              return Some(iterator) // 直接返回iterator
            case None =>
              logDebug("Block " + blockId + " not found in memory")
          }
        }

        //前面在memory中没有找到, 所以继续在disk里面找 
        //Look for block on disk, potentially loading it back into memory if required
        if (level.useDisk) {
          if (level.useMemory && level.deserialized) { // MEMORY_AND_DISK, 没有序列化, 部分数据在disk
            diskStore.getValues(blockId) match {
              case Some(iterator) =>  // 从disk中取出这个block, 并重新放到memory中
                // Put the block back in memory before returning it
                // TODO: Consider creating a putValues that also takes in a iterator ?
                val elements = new ArrayBuffer[Any]
                elements ++= iterator
                memoryStore.putValues(blockId, elements, level, true).data match {
                  case Left(iterator2) => // 期望从putValues中得到存入block的iterator
                    return Some(iterator2)
                  case _ =>
                    throw new Exception("Memory store did not return back an iterator")
                }
              case None =>
                throw new Exception("Block " + blockId + " not found on disk, though it should be")
            }
          } else if (level.useMemory && !level.deserialized) { // MEMORY_AND_DISK_SER, 序列化
            // Read it as a byte buffer into memory first, then return it
            diskStore.getBytes(blockId) match { // 由于读取的是序列化数据, 使用getBytes
              case Some(bytes) =>
                // Put a copy of the block back in memory before returning it. Note that we can't
                // put the ByteBuffer returned by the disk store as that's a memory-mapped file.
                // The use of rewind assumes this.
                assert (0 == bytes.position())
                val copyForMemory = ByteBuffer.allocate(bytes.limit)
                copyForMemory.put(bytes)
                memoryStore.putBytes(blockId, copyForMemory, level) // 在memoryStore中缓存的仍然是序列化数据
                bytes.rewind() // 反序列化需要重新读数据, 所以rewind
                return Some(dataDeserialize(blockId, bytes)) // 但返回的需要反序列化后的数据
              case None =>
                throw new Exception("Block " + blockId + " not found on disk, though it should be")
            }
          } else { // DISK_ONLY, 没啥说的, 直接取disk读
            diskStore.getValues(blockId) match {
              case Some(iterator) =>
                return Some(iterator)
              case None =>
                throw new Exception("Block " + blockId + " not found on disk, though it should be")
            }
          }
        }
      }
    } else {
      logDebug("Block " + blockId + " not registered locally")
    }
    return None
  }
  /**
   * Get block from the local block manager as serialized bytes.
   */
  def getLocalBytes(blockId: String): Option[ByteBuffer] = {
  //逻辑更简单......
}
 
  /**
   * Get block from remote block managers.
   */
  def getRemote(blockId: String): Option[Iterator[Any]] = {
    // Get locations of block
    val locations = master.getLocations(blockId)
    // Get block from remote locations
    for (loc <- locations) {
      val data = BlockManagerWorker.syncGetBlock( //使用BlockManagerWorker从remote获取block
          GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
      if (data != null) {
        return Some(dataDeserialize(blockId, data))
      }
      logDebug("The value of block " + blockId + " is null")
    }
    logDebug("Block " + blockId + " not found")
    return None
  }

2.3 Put

  /**
   * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
   */
  def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
    tellMaster: Boolean = true) : Long = {
    // Remember the block's storage level so that we can correctly drop it to disk if it needs
    // to be dropped right after it got put into memory. Note, however, that other threads will
    // not be able to get() this block until we call markReady on its BlockInfo.
    val myInfo = { 
      val tinfo = new BlockInfo(level, tellMaster) // 创建新的BlockInfo
      // Do atomically !
      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) // check blockid的blockinfo是否已经存在

      if (oldBlockOpt.isDefined) { // 如果存在就需要互斥
        if (oldBlockOpt.get.waitForReady()) {
          logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
          return oldBlockOpt.get.size
        }
        // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
        oldBlockOpt.get
      } else {
        tinfo
      }
    }

    // If we need to replicate the data, we'll want access to the values, but because our
    // put will read the whole iterator, there will be no values left. For the case where
    // the put serializes data, we'll remember the bytes, above; but for the case where it
    // doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
    var valuesAfterPut: Iterator[Any] = null

    // Ditto for the bytes after the put
    var bytesAfterPut: ByteBuffer = null

    // Size of the block in bytes (to return to caller)
    var size = 0L

    myInfo.synchronized { // 加锁, 开始真正的put
      var marked = false
      try {
        if (level.useMemory) { // 如果可以用memory, 优先放memory里面
          // Save it just to memory first, even if it also has useDisk set to true; we will later
          // drop it to disk if the memory store can't hold it.
          val res = memoryStore.putValues(blockId, values, level, true)
          size = res.size
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case Left(newIterator) => valuesAfterPut = newIterator
          }
        } else { // 否则存到disk上
          // Save directly to disk.
          // Don't get back the bytes unless we replicate them.
          val askForBytes = level.replication > 1
          val res = diskStore.putValues(blockId, values, level, askForBytes)
          size = res.size
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case _ =>
          }
        }

        // Now that the block is in either the memory or disk store, let other threads read it,
        // and tell the master about it.
        marked = true  // 释放blockinfo上的互斥条件, 让其他线程可以访问改block
        myInfo.markReady(size)
        if (tellMaster) {
          reportBlockStatus(blockId, myInfo) // 通知master, block状态变化
        }
      } finally {
        // If we failed at putting the block to memory/disk, notify other possible readers
        // that it has failed, and then remove it from the block info map.
        if (! marked) { // 如果put失败, 需要做些clear工作
          // Note that the remove must happen before markFailure otherwise another thread
          // could've inserted a new BlockInfo before we remove it.
          blockInfo.remove(blockId)
          myInfo.markFailure()
          logWarning("Putting block " + blockId + " failed")
        }
      }
    }
    // Replicate block if required
    if (level.replication > 1) {
      val remoteStartTime = System.currentTimeMillis
      // Serialize the block if not already done
      if (bytesAfterPut == null) {
        if (valuesAfterPut == null) {
          throw new SparkException(
            "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
        }
        bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
      }
      replicate(blockId, bytesAfterPut, level) // 做replicate
      logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime))
    }
    BlockManager.dispose(bytesAfterPut)

    return size
  }
 
  /**
   * Put a new block of serialized bytes to the block manager.
   */
  def putBytes(
    blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
    //逻辑比较简单......
  }

 

  /**
   * Replicate block to another node.
   */
  var cachedPeers: Seq[BlockManagerId] = null
  private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
    val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
    if (cachedPeers == null) {
      cachedPeers = master.getPeers(blockManagerId, level.replication - 1) //找到可用于replica的peers
    }
    for (peer: BlockManagerId <- cachedPeers) {  //把需要replica的block放到这些peer上去
      val start = System.nanoTime
      data.rewind()
      if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), //通过BlockManagerWorker传输block数据
        new ConnectionManagerId(peer.host, peer.port))) {
        logError("Failed to call syncPutBlock to " + peer)
      }
      logDebug("Replicated BlockId " + blockId + " once used " +
        (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
        data.limit() + " bytes.")
    }
  }

2.3 dropFromMemory

  /**
   * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
   * store reaches its limit and needs to free up space.
   */
  def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
    logInfo("Dropping block " + blockId + " from memory")
    val info = blockInfo.get(blockId).orNull
    if (info != null)  {
      info.synchronized {  //获取blockInfo的互斥
        // required ? As of now, this will be invoked only for blocks which are ready
        // But in case this changes in future, adding for consistency sake.
        if (! info.waitForReady() ) {
          // If we get here, the block write failed.
          logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
          return
        }

        val level = info.level
        if (level.useDisk && !diskStore.contains(blockId)) { // 如果使用disk, 就把memory中要删除的写入disk
          logInfo("Writing block " + blockId + " to disk")
          data match {
            case Left(elements) =>
              diskStore.putValues(blockId, elements, level, false)
            case Right(bytes) =>
              diskStore.putBytes(blockId, bytes, level)
          }
        }
        val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L // 计算出从memory中drop掉的size
        val blockWasRemoved = memoryStore.remove(blockId)  // 从memoryStore drop掉block
        if (info.tellMaster) {
          reportBlockStatus(blockId, info, droppedMemorySize) // 通知master, block信息变化
        }
        if (!level.useDisk) {
          // The block is completely gone from this node; forget it so we can put() it again later.
          blockInfo.remove(blockId) // 如果没有使用disk, 那么从memory中删除, 意味着完全删除这个block
        }
      }
    } else {
      // The block has already been dropped
    }
  }

本文章摘自博客园,原文发布日期:2014-01-10
目录
相关文章
|
8月前
|
存储 缓存 分布式计算
spark BlockManager粗讲
spark BlockManager粗讲
|
分布式计算 Scala Spark
Spark源码分析之ResultTask处理
ResultTask 执行当前分区的计算,首先从ShuffleMapTask拿到当前分区的数据,会从所有的ShuffleMapTask都拿一遍当前的分区数据,然后调用reduceByKey自定义的函数进行计算,最后合并所有的ResultTask输出结果,进行输出
2310 0
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
2044 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
942 0
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
1292 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1628 0
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
900 0
|
分布式计算 Spark Hadoop
Spark2.4.0源码分析之WorldCount Stage提交(DAGScheduler)(六)
- 理解ShuffuleMapStage是如何转化为ShuffleMapTask并作为TaskSet提交 - 理解ResultStage是如何转化为ResultTask并作为TaskSet提交
1185 0
|
分布式计算 Apache Spark
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)
理解FinalStage是如何按stage从前到后依次提交顺序
2236 0
|
缓存 分布式计算 Scala
Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)
理解FinalStage的转化(即Stage的划分)
900 0