Apache Spark源码走读(十二)Sort-based Shuffle的设计与实现

简介: Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析。

概要

Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析。

Sort-based Shuffle之初体验

通过一个小的实验来直观的感受一下sort-based shuffle算法会产生哪些中间文件,具体实验步骤如下所述。

步骤1: 修改conf/spark-default.conf, 加入如下内容

spark.shuffle.manager SORT

步骤2: 运行spark-shell

SPARK_LOCAL_IP=127.0.0.1 $SPARK_HOME/bin/spark-shell

 步骤3: 执行wordcount

sc.textFile("README.md").flatMap(l => l.split(" ")).map(w=>(w,1)).reduceByKey(_ + _).collect

 步骤4: 查看生成的中间文件

find /tmp/spark-local* -type f

文件查找结果如下所示

/tmp/spark-local-20140919091822-aa66/0f/shuffle_0_1_0.index
/tmp/spark-local-20140919091822-aa66/30/shuffle_0_0_0.index
/tmp/spark-local-20140919091822-aa66/0c/shuffle_0_0_0.data
/tmp/spark-local-20140919091822-aa66/15/shuffle_0_1_0.data

可以看到生成了两人种后缀的文件,分别为data和index类型,这两者的用途在后续分析中会详细讲述。

如果我们做一下对比实验,将shuffle模式改为Hash,再来观看生成的文件,就会找到区别。将原先配置文件中的SORT改为HASH,重新启动spark-shell,执行相同的wordcount之后,在tmp目录下找到的文件列表如下。

/tmp/spark-local-20140919092949-14cc/10/shuffle_0_1_3
/tmp/spark-local-20140919092949-14cc/0f/shuffle_0_1_2
/tmp/spark-local-20140919092949-14cc/0f/shuffle_0_0_3
/tmp/spark-local-20140919092949-14cc/0c/shuffle_0_0_0
/tmp/spark-local-20140919092949-14cc/0d/shuffle_0_1_0
/tmp/spark-local-20140919092949-14cc/0d/shuffle_0_0_1
/tmp/spark-local-20140919092949-14cc/0e/shuffle_0_1_1
/tmp/spark-local-20140919092949-14cc/0e/shuffle_0_0_2

两者生成的文件数量差异非常大,具体数值计算如下

  1. 在HASH模式下,每一次shuffle会生成M*R的数量的文件,如上述wordcount例子中,整个job有一次shuffle过程,由于输入文件默认分片为2,故M个数为2,而spark.default.parallelism配置的值为4,故R为4,所以总共生成1*2*4=8个文件。shuffle_0_1_2解读为shuffle+shuffle_id+map_id+reduce_id,故0_1_2表示由第0次shuffle中的第1个maptask生成的文件,该文件内容会被第2个reduce task消费
  2. 在SORT模式下,一个Map Task只生成一个文件,而不管生成的文件要被多少的Reduce消费,故文件个数是M的数量,由于wordcount中的默认分片为2,故只生成两个data文件

多次shuffle

刚才的示例中只有一次shuffle过程,我们可以通过小小的改动来达到两次shuffle,代码如下

sc.textFile("README.md").flatMap(l => l.split(" ")).map(w => (w,1)).reduceByKey(_ + _).map(p=>(p._2,p._1)).groupByKey.collect

上述代码将reduceByKey的结果通过map进行反转,即将原来的(w, count)转换为(count,w),然后根据出现次数进行归类。 groupByKey会再次导致数据shuffle过程。

在HASH模式下产生的文件如下所示

/tmp/spark-local-20140919094531-1cb6/12/shuffle_0_3_3
/tmp/spark-local-20140919094531-1cb6/0c/shuffle_0_0_0
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_2_3
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_3_2
/tmp/spark-local-20140919094531-1cb6/11/shuffle_1_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_2_2
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_3_1
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_0_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_0_3
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_3_0
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_2_1
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_0_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_1_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_0_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_1_0
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_1_0_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_2_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_1_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_0_2
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_0_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_1_0

引入一次新的shuffle,产生了大量的中间文件

如果是使用SORT,效果如何呢?只会增加M个文件,由于在新的shuffle过程中,map task数目为4,所以总共的文件是2+4=6。

/tmp/spark-local-20140919094731-034a/29/shuffle_0_3_0.data
/tmp/spark-local-20140919094731-034a/30/shuffle_0_0_0.index
/tmp/spark-local-20140919094731-034a/15/shuffle_0_1_0.data
/tmp/spark-local-20140919094731-034a/36/shuffle_0_2_0.data
/tmp/spark-local-20140919094731-034a/0c/shuffle_0_0_0.data
/tmp/spark-local-20140919094731-034a/32/shuffle_0_2_0.index
/tmp/spark-local-20140919094731-034a/32/shuffle_1_1_0.index
/tmp/spark-local-20140919094731-034a/0f/shuffle_0_1_0.index
/tmp/spark-local-20140919094731-034a/0f/shuffle_1_0_0.index
/tmp/spark-local-20140919094731-034a/0a/shuffle_1_1_0.data
/tmp/spark-local-20140919094731-034a/2b/shuffle_1_0_0.data
/tmp/spark-local-20140919094731-034a/0d/shuffle_0_3_0.index

值得指出的是shuffle_0和shuffle_1的执行次序问题,数字越大越先执行,由于spark job提交的时候是从后往前倒推的,故0是最后将执行,而前面的先执行。

Sort-based Shuffle的设计思想

sort-based shuffle的总体指导思想是一个map task最终只生成一个shuffle文件,那么后续的reduce task是如何从这一个shuffle文件中得到自己的partition呢,这个时候就需要引入一个新的文件类型即index文件。

其具体实现步骤如下:

  1. Map Task在读取自己输入的partition之后,将计算结果写入到ExternalSorter
  2. ExternalSorter会使用一个map来存储新的计算结果,新的计算结果根据partiton分类,如果是有combine操作,则需要将新的值与原有的值进行合并
  3. 如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件
  4. 当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里
  5. 最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件

相应的源文件

  1. SortShuffleManager.scala
  2. SortShuffleWriter.scala
  3. ExternalSorter.scala
  4. IndexShuffleBlockManager.scala

几个重要的函数

SortShuffleWriter.write

  override def write(records: Iterator[_ >: Product2[K, V]]): Unit = {
    if (dep.mapSideCombine) {
      if (!dep.aggregator.isDefined) {
        throw new IllegalStateException("Aggregator is empty for map-side combine")
      }
      sorter = new ExternalSorter[K, V, C](
        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      sorter.insertAll(records)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      sorter = new ExternalSorter[K, V, V](
        None, Some(dep.partitioner), None, dep.serializer)
      sorter.insertAll(records)
    }

    val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
    val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
    shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

    mapStatus = new MapStatus(blockManager.blockManagerId,
      partitionLengths.map(MapOutputTracker.compressSize))
  }

ExternalSorter.insertAll

def insertAll(records: Iterator[_  {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        elementsRead += 1
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpill(usingMap = true)
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        elementsRead += 1
        val kv = records.next()
        buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
        maybeSpill(usingMap = false)
      }
    }
  }

writePartitionedFile将内存中的数据和spill文件中内容一起合并到一个文件当中

def writePartitionedFile(
      blockId: BlockId,
      context: TaskContext,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)

    if (bypassMergeSort && partitionWriters != null) {
      // We decided to write separate files for each partition, so just concatenate them. To keep
      // this simple we spill out the current in-memory collection so that everything is in files.
      spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
      partitionWriters.foreach(_.commitAndClose())
      var out: FileOutputStream = null
      var in: FileInputStream = null
      try {
        out = new FileOutputStream(outputFile)
        for (i <- 0 until numPartitions) {
          in = new FileInputStream(partitionWriters(i).fileSegment().file)
          val size = org.apache.spark.util.Utils.copyStream(in, out, false)
          in.close()
          in = null
          lengths(i) = size
        }
      } finally {
        if (out != null) {
          out.close()
        }
        if (in != null) {
          in.close()
        }
      }
    } else {
      // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by
      // partition and just write everything directly.
      for ((id, elements) <- this.partitionedIterator) {
        if (elements.hasNext) {
          val writer = blockManager.getDiskWriter(
            blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get)
          for (elem 

而数据读取过程中则需要使用IndexShuffleBlockManager来获取Partiton的具体位置

  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    // The block is actually going to be a range of a single map output file for this map, so
    // find out the consolidated file, then the offset within that from our index
    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)

    val in = new DataInputStream(new FileInputStream(indexFile))
    try {
      in.skip(blockId.reduceId * 8)
      val offset = in.readLong()
      val nextOffset = in.readLong()
      new FileSegmentManagedBuffer(
        getDataFile(blockId.shuffleId, blockId.mapId),
        offset,
        nextOffset - offset)
    } finally {
      in.close()
    }
  }

参数资料

  1. 详细探究spark的shuffle 实现
  2. spark-2045 sort-based shuffle implementation
目录
相关文章
|
2月前
|
分布式计算 监控 大数据
如何优化Spark中的shuffle操作?
【10月更文挑战第18天】
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
70 1
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
82 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
103 0
|
6月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
170 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
5月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
86 0
|
5月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
242 0
|
6月前
|
分布式计算 Apache Spark
|
7月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
196 6
|
7月前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。

推荐镜像

更多