什么是BloclkManager?
BlockManager是管理整个Spark运行时数据的读写,包含数据存储本身,在数据存储的基础之上进行数据读写。由于Spark是分布式的,所有BlockManager也是分布式的,BlockManager本身相对而言是一个比较大的模块,Spark中有非常多的模块:调度模块、资源管理模块等等。BlockManager是另外一个非常重要的模块,BlockManager本身源码量非常大。本篇从BlockManager原理流程对BlockManager做深刻的理解。在Shuffle读写数据的时候, 我们需要读写BlockManager的内容。
BlockManager是整个Spark底层负责数据存储与管理的一个组件,Driver和Executor的所有数据都由对应的BlockManager进行管理。
BlockManager相关类之间的关系如下:
spark rpc 整体架构图如下
spark 统一内存模型
MemoryStore 的内存模型
MemoryStore负责将 Block 存储到内存,减少对磁盘的依赖,MemoryStory依赖MemoryManager(与之相对应的是DiskStore,负责将Block存储在磁盘上)。
maxMemory = memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
MemoryStore分为三部分:
blocksMemoryUsed:Block存储占用(存储为MemoryEntry,一种特制,有两种实现:DeserializedMemoryEntry、SerializedMemoryEntry)
currentUnrollMemory:将要被展开的Block数据占用的内存,称为currentUnrollMemory 未被使用的内存
其他补充:在MemoryStore中,存储/执行内存的软边界,堆内/堆外内存的隔阂都是透明的,其原因:MemoryStory依赖MemoryManager 展开(Unroll)操作的内存必须是将整个Block内存提前申请好的,防止向内存真正写入数据的时候发生内存溢出 展开(Unroll)所申请的这部分其实并没有被真正的占用,是先过一遍partition的数据,看一下全部cache到内存需要占用多大,然后向MemoryManager预约这么大的内存,如果完全足够,那么才将数据完全存储到内存中,这时候占用内存是StorageMemory中的。
MemoryConsumer 分布情况
在 Spark 中,使用抽象类 MemoryConsumer 来表示需要使用内存的消费者。在这个类中定义了分配,释放以及 Spill 内存数据到磁盘的一些方法或者接口。
具体的消费者可以继承 MemoryConsumer 从而实现具体的行为,其有各种实现(包括:Shuffle、Join、Sort、Aggregation 等类型),统计有13个。Execution Memory 详细分布情况内存存储相关spark MemoryManager
BlockManagerMasterEndpoint:存在于Driver端,主要对Executor和BlockManager的关系,BlockManager和Block的关系等进行管理。并作为一个RpcEndpoint接受各类消息事件进行处理。 BlockManager:运行在各个节点包括Driver和Executor,提供接口来支持发送和接受blocks处理。 BlockManagerStorageEndpoint:RpcEndpoint来响应Master发来的各种命令,如删除block,删除shuffle,删除Broadcast等。 BlockInfoManager:BlockManager中负责管理blocks元数据和block的锁管理 BlockManagerMaster:BlockManager中负责和BlockManagerMasterEndpoint进行通信处理DiskBlockManager:管理存放在磁盘上的Block,如block和磁盘存储路径的关系。 DiskStore:磁盘存储,负责对block存放到磁盘上进行操作处理 MemoryManager:内存块管理,这里为了方便灵活使用内存,把存储使用的内存和运行使用的内存都通过该类来实现了,这个后面会单独介绍。这里管理的内存包括堆内存和堆外内存 MemoryStore:存储block到内存中,可以为java对象和序列化后的bytebuffer SerializerManager: 序列化处理,负责block的序列化和反序列化处理 BlockTransferService:块传输服务,如提交一个块到一个远端节点 ShuffleManager:shuffle管理,主要支持计算过程中的shuffle处理 MapOutputTracker:map任务输出跟踪管理
缓存RDD内存分配
1: 缓存非序列化 RDD(只支持 ON_HEAP)
MemoryStore#putIteratorAsValues[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]
整个流程还可以细化为以下两个子流程:
unroll block:展开迭代器 store unrolled to storage memory:将展开后的数据存入 storage 内存
1-1: unroll block1-2: store unrolled to storage memory2: 缓存序列化 RDD(支持 ON_HEAP 和 OFF_HEAP)
有了上面分析缓存非序列化 RDD 至内存的经验,再来看下面的缓存序列化 RDD 至内存的图会发现有一些相似,也有一些不同。在下面的流程图中,包含了 unroll 过程和 store block to storage memory 过程。为了方便分析,我将整个流程分为三大块:
红框部分:初始化 allocator、bbos、serializationStream 灰框部分:展开 values 并 append 到 serializationStream 中 篮框部分:store block to storage memory
磁盘存储
DiskBlockManager 创建并维护逻辑block和block落地的物理文件的映射关系。一个逻辑block通过它的BlockId的name属性映射到具体的文件。 CountingWritableChannel 它主要对sink做了包装,在写入sink的同时,还记录向sink写的数据的总量 ManagedBuffer ManagedBuffer以字节数组的方式提供数据,并且数据不能被修改。 它的实现应该制定数据是怎么提供的。具体的缓冲区实现可能不被JVM垃圾收回收器管理。 ManagedBuffer有三种具体的实现 1. FileSegmentManagedBuffer: 文件的一部分支持数据。 2. NioManagedBuffer: NIO ByteBuffer支持数据。 3. NettyManageBuffer: Netty ByteBuf支持数据。 EncryptedManagedBuffer 它是一个适配器,它将几乎所以转换的请求委托给了 blockData,它的父类是lockData BlockData 它是一个接口,它定义了存储方式以及如何提供不同的方式来读去底层的block 数据。 DiskBlockData 该类主要用于将磁盘中的block文件转换为指定的流或对象。 EncryptedBlockData 这个类主要是用于加密的block磁盘文件转换为特定的流或对象。 ByteBufferBlockData 这个类主要是用于内存中的block数据转换为指定的流或对象 ReferenceCounted 这是netty包下的一个接口。它是一个引用计数对象,需要显示调用deallocation。ReferenceCounted对象实例化时,引用计数设为1,调用retain方法增加引用计数,release方法则释放引用计数。 FileRegion 它也是netty下的一个包,FileRegion数据通过支持零拷贝的channel将数据传输到目标channel。 AbstractReferenceCounted 这个类是通过一个变量来记录引用的增加或减少情况。 AbstractFileRegion AbstractFileRegion 继承了AbstractReferenceCounted, 但他还是一个抽象类,只是实现了部分的功能。 DefaultFileRegion 它通过 RandomeAccessFile 获取 可以支持随机访问 FileChannelImpl 的FileChannel,然后根据相对位置计算出绝对位置以及需要传输的字节总大小,最后将数据传输到target。 其引用计数的处理调用其父类 AbstractReferenceCounted的对应方法。 ReadableChannelFileRegion 其内部的buffer 的大小时 64KB,_traferred 变量记录了已经传输的字节数量。ReadableByteChannel 是按顺序读的,所以pos参数没有用。 DiskStore 它就是用来保存block 到磁盘的。
BlockManager相关
SecurityManager 主要负责底层通信的安全认证。BlockManagerMaster 主要负责在executor端和driver的通信,封装了 driver的RpcEndpointRef。NettyBlockTransferService 使用netty来获取一组数据块。MapOutputTracker 是一个跟踪 stage 的map 输出位置的类,driver 和 executor 有对应的实现,分别是 MapOutputTrackerMaster 和 MapOutputTrackerWorker。ShuffleManager在SparkEnv中初始化,它在driver端和executor端都有,负责driver端生成shuffle以及executor的数据读写。BlockManager 是Spark存储体系里面的核心类,它运行在每一个节点上(drievr或executor),提供写或读本地或远程的block到各种各样的存储介质中,包括磁盘、堆内内存、堆外内存。
SecurityManager
这个类主要就是负责Spark的安全的。它是由SparkEnv初始化的。BlockManagerMaster
BlockManagerMaster 这个类是对 driver的 EndpointRef 的包装,可以说是 driver EndpointRef的一个代理类,在请求访问driver的时候,调用driver的EndpointRef的对应方法,并处理其返回。BlockManagerMaster中保存中BlockManager内部管理数据的元数据,进行维护,当BlockManager进行Block增删改等操作时,都会在BlockManagerMaster中进行元数据的变更。message为 BlockManagerMasterEndpoint中的receiveAndReply方法发送过去的。
1. 移除executor,有同步和异步两种方案,这两个方法只会在driver端使用。如下:removeExecutor,removeExecutorAsync 2. 向driver注册blockmanager,方法:registerBlockManager 3. 更新block信息,方法:updateBlockInfo 4. 向driver请求获取block对应的 location信息,方法:getLocations 5. 向driver 请求获得集群中所有的 blockManager的信息,方法:getPeers 6. 向driver 请求executor endpoint ref 对象,方法:getExecutorEndpointRef 7. 移除block、RDD、shuffle、broadcast,方法:removeBlock,removeRDD,removeShuffle,removeBroadcast 8. 向driver 请求获取每一个BlockManager内存状态,方法:getMemoryStatus 9. 向driver请求获取磁盘状态,方法:getStorageStatus 10. 向driver请求获取block状态,方法:getBlockStatus 11. 是否有匹配的block,方法:getMatchingBlockIds 12.检查是否缓存了block,方法:hasCachedBlocks,依赖于teil方法
ShuffleClient
它定义了从executor或者是外部服务读取shuffle数据的接口。核心方法:
1. init方法用于初始化ShuffleClient,需要指定executor 的appId 2. fetchBlocks 用于异步从另一个节点请求获取blocks,参数解释如下: host – the host of the remote node. port – the port of the remote node. execId – the executor id. blockIds – block ids to fetch. listener – the listener to receive block fetching status. downloadFileManager – DownloadFileManager to create and clean temp files. If it's not null, the remote blocks will be streamed into temp shuffle files to reduce the memory usage, otherwise, they will be kept in memory. 3. shuffleMetrics 用于记录shuffle相关的metrics信息
BlockTransferService
它是ShuffleClient的子类。它是ShuffleClient的抽象实现类,定义了读取shuffle的基础框架。核心方法:
init 方法,它额外提供了使用BlockDataManager初始化的方法,方便从本地获取block或者将block存入本地。 close:关闭ShuffleClient port:服务正在监听的端口 hostname:服务正在监听的hostname fetchBlocks 跟继承类一样,没有实现,由于继承关系可以不写。 uploadBlocks:上传block到远程节点,返回一个future对象 fetchBlockSync:同步抓取远程节点的block,直到block数据获取成功才返回 uploadBlockSync 方法:同步上传信息,直到上传成功才结束。
NettyBlockTransferService
它是BlockTransferService,使用netty来一次性获取shuffle的block数据。
MapOutputTracker
MapOutputTracker 是一个定位跟踪 stage 的map 输出位置的类,driver 和 executor 有对应的实现,分别是 MapOutputTrackerMaster 和 MapOutputTrackerWorker。核心方法:
1. 向driver端trackerEndpoint 发送消息,askTracker 2. excutor 获取每一个shuffle中task 需要读取的范围的 block信息,partition范围包头不包尾。getMapSizeByExcutorId 3. 删除指定的shuffle的状态信息,unregisterShuffle 4. 停止服务,stop
ShuffleManager
它是一个可插拔的shuffle系统,ShuffleManager 在driver和每一个executor的SparkEnv中基于spark.shuffle.manager参数创建,driver使用这个类来注册shuffle,executor或driver本地任务可以请求ShuffleManager 来读写任务。
BlockManager
BlockManager,运行在每个节点(driver和executors)上,提供接口用于读写本地和远程各种存储设备(内存、磁盘和off-heap)。BlockManager是Spark存储体系中的核心组件。
BlockManager主要功能有 向Driver注册当前的BlockManger 向Driver上报所管理的数据块信息 从本地获取序列化/非序列化数据块的方法 保存数据块到本地 从Driver获取集群中持有某个数据块的节点信息 从其他节点获取数据块的方法 注册任务,获取/释放数据块上的锁 将所持有的数据块复制到其他节点
BlockManager主要由以下部分组成:
1.shuffle客户端ShuffleClient; 2.BlockManagerMaster(对存在于所有Executor上的BlockManager统一管理) 3.磁盘块管理器DiskBlockManager,主要是用来映射blockId和磁盘文件的关系,维护blockID的锁,在读或写数据的时候,都是根据blockId去读取或创建一个文件; 4.内存存储MemoryStore; 5.磁盘存储DiskStore; 6.非广播Block清理器metadataCleanner和广播Block清理器broadcastCleaner; 7.压缩算法实现CompressionCodee。
executor上启动BlockManager,负责具体的读写实现;
MemoryStore:负责内存数据的读写
DiskStore:负责磁盘数据的读写
BlockTransferService:负责远程数据的读写
1)什么时候启动的BlockManagerMaster和BlockManager?
1:driver在sparkContext中初始化BlockManager2:executor中初始化3:接下来来围绕driver的初始化梳理BlockManager,在SparkEnv中的创建:BlockManagerMaster、NettyBlockTransferService、BlockManagerMasterEndpoint、BlockManager。核心:new BlockManagerMaster,new BlockManager4: 调用BlockManagerMaster的registerBlockManager方法向Driver上的BlockManagerMaster注册
def initialize(appId: String): Unit = { //初始化BlockTransferService,其实是它的子类NettyBlockTransferService是下了init方法, //该方法的作用就是初始化传输服务,通过传输服务可以从不同的节点上拉取Block数据 blockTransferService.init(this) shuffleClient.init(appId) //设置block的复制分片策略,由spark.storage.replication.policy指定 blockReplicationPolicy = { val priorityClass = conf.get( "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName) val clazz = Utils.classForName(priorityClass) val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy] logInfo(s"Using $priorityClass for block replication policy") ret } //根据给定参数为对对应的Executor封装一个BlockManagerId对象(块存储的唯一标识) //executorID:executor的Id,blockTransferService.hostName:传输Block数据的服务的主机名 //blockTransferService.port:传输Block数据的服务的主机名 val id = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None) //调用BlockManagerMaster的registerBlockManager方法向Driver上的BlockManagerMaster注册 val idFromMaster = master.registerBlockManager( id, maxMemory, slaveEndpoint) //更新BlockManagerId blockManagerId = if (idFromMaster != null) idFromMaster else id //判断是否开了外部shuffle服务 shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId } // 如果开启了外部shuffle服务,并且该节点是Driver的话就调用registerWithExternalShuffleServer方法 //将BlockManager注册在本地 if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() } logInfo(s"Initialized BlockManager: $blockManagerId") }
2)BlockManager如何将数据写入内存 —— MemoryStore
1:通过内存管理器memoryManager申请指定大小的内存,如果申请到再进行存储操作,申请不到则直接返回false
2:将数据封装成entry对象存储。entry有两个实现类,分别是SerializedMemoryEntry和DeserializedMemoryEntry,表示序列化和反序列化后的entry信息,从这可以看出,spark内存存储默认都是要序列化的,序列化后会放在LinkedHashMap集合中。3:读取
def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { //通过blockId从LinkedHashMap内存中获取entry对象 val entry = entries.synchronized { entries.get(blockId) } entry match { case null => None case e: DeserializedMemoryEntry[_] => throw new IllegalArgumentException("should only call getBytes on serialized blocks") //通过模式匹配,验证entry类型,并提取序列化类中的数据信息 case SerializedMemoryEntry(bytes, _, _) => Some(bytes) } }
3)BlockManager如何将数据写入本地磁盘——DiskStore可以看到磁盘写逻辑也很好理解,就是通过FileOutputStream的channel将数据写入到磁盘的文件中,唯一需要留意的就是获取File文件时,我们实际上是在对应目录下创建一个逻辑file,并没有存储数据,只有通过channel将数据写入后,这个file才能算是一个实实在在的文件。
4)BlockManager如何读写远程数据——BlockTransferService
远程数据下载:
override def fetchBlocks( //需要传入ip和port host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener, tempFileManager: DownloadFileManager): Unit = { if (logger.isTraceEnabled) { logger.trace(s"Fetch blocks from $host:$port (executor id $execId)") } try { val maxRetries = transportConf.maxIORetries() //创建远程块数据下载的启动模块,并实现启动方法 val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener): Unit = { try { //创建传输客户端,用于连接远程节点 val client = clientFactory.createClient(host, port, maxRetries > 0) //启动一对一的数据块获取 new OneForOneBlockFetcher(client, appId, execId, blockIds, listener, transportConf, tempFileManager).start() } catch { case e: IOException => Try { driverEndPointRef.askSync[Boolean](IsExecutorAlive(execId)) } match { case Success(v) if v == false => throw new ExecutorDeadException(s"The relative remote executor(Id: $execId)," + " which maintains the block data to fetch is dead.") case _ => throw e } } } } //只有最大重试次数大于0,才会走封装的重试类,否则直接是开启一对一下载 if (maxRetries > 0) { // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's // a bug in this code. We should remove the if statement once we're sure of the stability. new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start() } else { blockFetchStarter.createAndStart(blockIds, listener) } } catch { case e: Exception => logger.error("Exception while beginning fetchBlocks", e) blockIds.foreach(listener.onBlockFetchFailure(_, e)) } }
一对一接收方法中就是向远程节点发送rpc请求获取数据,然后在回调函数中等待接收数据
public void start() { //想远程节点发送rpc请求,并在回调函数中监听远程节点的响应 client.sendRpc(message.toByteBuffer(), new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { try { //创建流处理器处理远程节点返回的数据 streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response); logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle); // 遍历获取远程节点提供的block数据 for (int i = 0; i < streamHandle.numChunks; i++) { if (downloadFileManager != null) { client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i), new DownloadCallback(i)); } else { client.fetchChunk(streamHandle.streamId, i, chunkCallback); } } } catch (Exception e) { logger.error("Failed while starting block fetches after success", e); failRemainingBlocks(blockIds, e); } } @Override public void onFailure(Throwable e) { logger.error("Failed while starting block fetches", e); failRemainingBlocks(blockIds, e); } }); }
远程数据上传
override def uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit] = { val result = Promise[Unit]() val client = clientFactory.createClient(hostname, port) // 序列化元数据 val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) // 如果上传的数据量超过一定量则通过流式处理器上传 val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) //上传成功或者失败的回调函数 val callback = new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}") result.success((): Unit) } override def onFailure(e: Throwable): Unit = { logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e) result.failure(e) } } //根据是否需要流处理进而走不通的逻辑 if (asStream) { //如果是流式处理,则封装流处理器,然后分批上传 val streamHeader = new UploadBlockStream(blockId.name, metadata).toByteBuffer client.uploadStream(new NioManagedBuffer(streamHeader), blockData, callback) } else { // 如果数据量比较小,则一次性传输完,而不需要分批处理 val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer, callback) } result.future }
总结:
1、MemoryStore管理内存存储,默认是将block数据封装成序列化entry存储在LinkedHashMap中
2、写磁盘时,一开始获取的File文件是一种逻辑上的存在,此时并不包含具体数据。3、远程block数据的上传和下载是两个节点协调配合的结果,他们之间通过rpc方式通信。