Spark技术内幕: Shuffle详解(一)

简介:

通过上面一系列文章,我们知道在集群启动时,在Standalone模式下,Worker会向Master注册,使得Master可以感知进而管理整个集群;Master通过借助ZK,可以简单的实现HA;而应用方通过SparkContext这个与集群的交互接口,在创建SparkContext时就完成了Application的注册,Master为其分配Executor;在应用方创建了RDD并且在这个RDD上进行了很多的Transformation后,触发action,通过DAGScheduler将DAG划分为不同的Stage后,将Stage转换为TaskSet交给TaskSchedulerImpl;TaskSchedulerImpl通过SparkDeploySchedulerBackend的reviveOffers,最终向ExecutorBackend发送LaunchTask的消息;ExecutorBackend接收到消息后,启动Task,开始在集群中启动计算。

接下来,会介绍一些更详细的细节实现。

Shuffle,无疑是性能调优的一个重点,本文将从源码实现的角度,深入解析Spark Shuffle的实现细节。

每个Stage的上边界,要不是需要从外部存储读取数据,要么需要读取上一个Stage的输出;而下边界,要么是需要写入本地文件系统,以供child Stage读取,要么是ResultTask,需要输出结果了。

首先从org.apache.spark.rdd.ShuffledRDD开始, 因为ShuffledRDD是一个Stage的开始,它需要获取上一个Stage的输出结果,然后进行接下来的运算。那么这个数据获取是如何实现的?顺着ShuffledRDD的实现,我们可以理清这条线。首先可以看一下compute是如何实现的。

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

它需要从ShuffleManager获取shuffleReader,然后读取数据进行计算。看一下shuffleManager:

 // Let the user specify short names for shuffle managers
    val shortShuffleMgrNames = Map(
      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
    val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
    val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

ShuffleManager分为hash和sort,hash是默认的,即Shuffle时不排序。熟悉MapReduce的同学都知道,MapReduce是无论如何都要排序的,即到Reduce端的都是已经排序好的,当然这么做也是为了可以处理海量的数据。在Spark1.1之前,只支持hash based的Shuffle,sort based Shuffle是1.1新加入的实验功能。

hash顾名思义,在Reduce时的数据需要求有序,因此可以在Reduce获得了数据后,立即进行处理;而不需要等待所有的数据都得到后再处理。这个接下来会通过源码进行解释。而sort,意味着排序,实际上对于sortByKey这种转换可能sort是更有意义的。

ShuffledRDD是通过org.apache.spark.shuffle.hash.HashShuffleReader获取上一个Stage的结果。而HashShuffleReader通过org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$#fetch来获取结果。而fetch通过调用org.apache.spark.storage.BlockManager#getMultiple来转发请求:

  def getMultiple(
      blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
      serializer: Serializer,
      readMetrics: ShuffleReadMetrics): BlockFetcherIterator = {
    val iter = new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,
      readMetrics)
    iter.initialize()
    iter
  }


而最终的实现在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#initialize中,

  override def initialize() {
      // Split local and remote blocks.
      // 获得需要远程请求的数据列表,并且将已经在本地的数据的blockid放在localBlocksToFetch中,
      // 并且在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator.getLocalBlocks进行本地读取
      val remoteRequests = splitLocalRemoteBlocks()
      // Add the remote requests into our queue in a random order
      fetchRequests ++= Utils.randomize(remoteRequests)

      // Send out initial requests for blocks, up to our maxBytesInFlight
      while (!fetchRequests.isEmpty && //保证占用内存不超过设定的值spark.reducer.maxMbInFlight,默认值是48M
        (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
        sendRequest(fetchRequests.dequeue())
      }

      val numFetches = remoteRequests.size - fetchRequests.size
      logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

      // Get Local Blocks
      startTime = System.currentTimeMillis
      getLocalBlocks() // 从本地获取
      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
    }

具体获取如何获取的策略都在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks中。这个会在下一篇博文中详解。
目录
相关文章
|
8月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
771 1
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
152 2
|
2月前
|
分布式计算 监控 大数据
如何优化Spark中的shuffle操作?
【10月更文挑战第18天】
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
137 1
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
106 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
117 0
|
4月前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
88 0
|
6月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
179 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
7月前
|
分布式计算 Hadoop 大数据
大数据技术:Hadoop与Spark的对比
【6月更文挑战第15天】**Hadoop与Spark对比摘要** Hadoop是分布式系统基础架构,擅长处理大规模批处理任务,依赖HDFS和MapReduce,具有高可靠性和生态多样性。Spark是快速数据处理引擎,侧重内存计算,提供多语言接口,支持机器学习和流处理,处理速度远超Hadoop,适合实时分析和交互式查询。两者在资源占用和生态系统上有差异,适用于不同应用场景。选择时需依据具体需求。
|
8月前
|
分布式计算 Hadoop 大数据
探索大数据技术:Hadoop与Spark的奥秘之旅
【5月更文挑战第28天】本文探讨了大数据技术中的Hadoop和Spark,Hadoop作为分布式系统基础架构,通过HDFS和MapReduce处理大规模数据,适用于搜索引擎等场景。Spark是快速数据处理引擎,采用内存计算和DAG模型,适用于实时推荐和机器学习。两者各有优势,未来将继续发展和完善,助力大数据时代的发展。