《循序渐进学Spark》一3.6 Shuffle机制

简介:

本节书摘来自华章出版社《循序渐进学Spark》一书中的第3章,第3.6节,作者 小象学院 杨 磊,更多章节内容可以访问云栖社区“华章计算机”公众号查看。


3.6 Shuffle机制

在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节,Shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark作为MapReduce框架的一种实现,自然也实现了Shuffle的逻辑。对于大数据计算框架而言,Shuffle阶段的效率是决定性能好坏的关键因素之一。

3.6.1 什么是Shuffle

Shuffle是MapReduce框架中的一个特定的阶段,介于Map阶段和Reduce阶段之间,当Map的输出结果要被Reduce使用时,输出结果需要按关键字值(key)哈希,并且分发到每一个Reducer上,这个过程就是Shuffle。直观来讲,Spark Shuffle机制是将一组无规则的数据转换为一组具有一定规则数据的过程。由于Shuffle涉及了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响整个程序的运行效率。

在MapReduce计算框架中,Shuffle连接了Map阶段和Reduce阶段,即每个Reduce Task从每个Map Task产生的数据中读取一片数据,极限情况下可能触发M*R个数据拷贝通道(M是Map Task数目,R是Reduce Task数目)。通常Shuffle分为两部分:Map阶段的数据准备和Reduce阶段的数据拷贝。首先,Map阶段需根据Reduce阶段的Task数量决定每个Map Task输出的数据分片数目,有多种方式存放这些数据分片:

1) 保存在内存中或者磁盘上(Spark和MapReduce都存放在磁盘上)。

2) 每个分片对应一个文件(现在Spark采用的方式,以及以前MapReduce采用的方式),或者所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量(现在MapReduce采用的方式)。

因此可以认为Spark Shuffle与Mapreduce Shuffle的设计思想相同,但在实现细节和优化方式上不同。

在Spark中,任务通常分为两种,Shuffle mapTask和reduceTask,具体逻辑如图3-11所示:

278d5baeff935b9f4339f94fc85ccab67241927b

图3-11 Spark Shuffle

图3-11中的主要逻辑如下:

1)首先每一个MapTask会根据ReduceTask的数量创建出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。

2)其次MapTask产生的结果会根据设置的partition算法填充到每个bucket中。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中。

当ReduceTask启动时,它会根据自己task的id和所依赖的Mapper的id从远端或本地的block manager中取得相应的bucket作为Reducer的输入进行处理。

这里的bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。Spark shuffle可以分为两部分:

1) 将数据分成bucket,并将其写入磁盘的过程称为Shuffle Write。

2) 在存储Shuffle数据的节点Fetch数据,并执行用户定义的聚集操作,这个过程称为Shuffle Fetch。

3.6.2 Shuffle历史及细节

下面介绍Shuffle Write与Fetch。

1. Shuffle Write

在Spark的早期版本实现中,Spark在每一个MapTask中为每个ReduceTask创建一个bucket,并将RDD计算结果放进bucket中。

但早期的Shuffle Write有两个比较大的问题。

1)Map的输出必须先全部存储到内存中,然后写入磁盘。这对内存是非常大的开销,当内存不足以存储所有的Map输出时就会出现OOM(Out of Memory)。

2)每个MapTask会产生与ReduceTask数量一致的Shuffle文件,如果MapTask个数是1k,ReduceTask个数也是1k,就会产生1M个Shuffle文件。这对于文件系统是比较大的压力,同时在Shuffle数据量不大而Shuffle文件又非常多的情况下,随机写也会严重降低IO的性能。

后来到了Spark 0.8版实现时,显著减少了Shuffle的内存压力,现在Map输出不需要先全部存储在内存中,再flush到硬盘,而是record-by-record写入磁盘中。对于Shuffle文件的管理也独立出新的ShuffleBlockManager进行管理,而不是与RDD cache文件在一起了。

但是Spark 0.8版的Shuffle Write仍然有两个大的问题没有解决。

1)Shuffle文件过多的问题。这会导致文件系统的压力过大并降低IO的吞吐量。

2)虽然Map输出数据不再需要预先存储在内存中然后写入磁盘,从而显著减少了内存压力。但是新引入的DiskObjectWriter所带来的buffer开销也是不容小视的内存开销。假定有1k个MapTask和1k个ReduceTask,就会有1M个bucket,相应地就会有1M个write handler,而每一个write handler默认需要100KB内存,那么总共需要100GB内存。这样仅仅是buffer就需要这么多的内存。因此当ReduceTask数量很多时,内存开销会很大。

为了解决shuffle文件过多的情况,Spark后来引入了新的Shuffle consolidation,以期显著减少Shuffle文件的数量。

Shuffle consolidation的原理如图3-12所示:

在图3-12中,假定该job有4个Mapper和4个Reducer,有2个core能并行运行两个task。可以算出Spark的Shuffle Write共需要16个bucket,也就有了16个write handler。在之前的Spark版本中,每个bucket对应一个文件,因此在这里会产生16个shuffle文件。


be9c6297f36c52af0446b73eb595a97a7c4c1d94

图3-12 Shuffle consolidation

而在Shuffle consolidation中,每个bucket并非对应一个文件,而是对应文件中的一个segment。同时Shuffle consolidation产生的Shuffle文件数量与Spark core的个数也有关系。在图3-12中,job中的4个Mapper分为两批运行,在第一批2个Mapper运行时会申请8个bucket,产生8个Shuffle文件;而在第二批Mapper运行时,申请的8个bucket并不会再产生8个新的文件,而是追加写到之前的8个文件后面,这样一共就只有8个Shuffle文件,而在文件内部共有16个不同的segment。因此从理论上讲Shuffle consolidation产生的Shuffle文件数量为C×R,其中C是Spark集群的core number,R是Reducer的个数。

很显然,当M=C时,Shuffle consolidation产生的文件数和之前的实现相同。

Shuffle consolidation显著减少了Shuffle文件的数量,解决了Spark之前实现中一个比较严重的问题。但是Writer handler的buffer开销过大依然没有减少,若要减少Writer handler的buffer开销,只能减少Reducer的数量,但是这又会引入新的问题。

2. Shuffle Fetch与Aggregator

Shuffle Write写出去的数据要被Reducer使用,就需要Shuffle Fetch将所需的数据Fetch过来。这里的Fetch操作包括本地和远端,因为Shuffle数据有可能一部分是存储在本地的。在早期版本中,Spark对Shuffle Fetcher实现了两套不同的框架:NIO通过socket连接Fetch数据;OIO通过netty server去fetch数据。分别对应的类是Basic-BlockFetcherIterator和NettyBlockFetcherIterator。

目前在Spark1.5.0中做了优化。新版本定义了类ShuffleBlockFetcherIterator来完成数据的fetch。对于local的数据,ShuffleBlockFetcherIterator会通过local的BlockMan-ager来fetch。对于远端的数据块,它通过BlockTransferService类来完成。具体实现参见如下代码:


在MapReduce的Shuffle过程中,Shuffle fetch过来的数据会进行归并排序(merge sort),使得相同key下的不同value按序归并到一起供Reducer使用,这个过程如图3-13所示:

这些归并排序都是在磁盘上进行的,这样做虽然有效地控制了内存使用,但磁盘IO却大幅增加了。虽然Spark属于MapReduce体系,但是对传统的MapReduce算法进行了一定的改变。Spark假定在大多数应用场景下,Shuffle数据的排序不是必须的,如word count。强制进行排序只会使性能变差,因此Spark并不在Reducer端做归并排序。既然没有归并排序,那Spark是如何进行reduce的呢?这就涉及下面要讲的Shuffle Aggregator了。

d16bc68966fb6a4fd8daa9cfcbfa7db1ff6514ff

图3-13 Fetch merge

Aggregator本质上是一个hashmap,它是以map output的key为key,以任意所要combine的类型为value的hashmap。

在做word count reduce计算count值时,它会将Shuffle fetch到的每一个key-value对更新或是插入hashmap中(若在hashmap中没有查找到,则插入其中;若查找到,则更新value值)。这样就不需要预先把所有的key-value进行merge sort,而是来一个处理一个,省去了外部排序这一步骤。但同时需要注意的是,reducer的内存必须足以存放这个partition的所有key和count值,因此对内存有一定的要求。

在上面word count的例子中,因为value会不断地更新,而不需要将其全部记录在内存中,因此内存的使用还是比较少的。考虑一下如果是groupByKey这样的操作,Reducer需要得到key对应的所有value。在Hadoop MapReduce中,由于有了归并排序,因此给予Reducer的数据已经是group by key了,而Spark没有这一步,因此需要将key和对应的value全部存放在hashmap中,并将value合并成一个array。可以想象为了能够存放所有数据,用户必须确保每一个partition小到内存能够容纳,这对于内存是非常严峻的考验。因此在Spark文档中,建议用户涉及这类操作时尽量增加partition,也就是增加Mapper和Reducer的数量。

增加Mapper和Reducer的数量固然可以减小partition的大小,使内存可以容纳这个partition。但是在Shuffle write中提到,bucket和对应于bucket的write handler是由Mapper和Reducer的数量决定的,task越多,bucket就会增加得更多,由此带来write handler所需的buffer也会更多。在一方面我们为了减少内存的使用采取了增加task数量的策略,另一方面task数量增多又会带来buffer开销更大的问题,因此陷入了内存使用的两难境地。

为了减少内存的使用,只能将Aggregator的操作从内存移到磁盘上进行,因此Spark新版本中提供了外部排序的实现,以解决这个问题。

Spark将需要聚集的数据分为两类:不需要归并排序和需要归并排序的数据。对于前者,在内存中的AppendOnlyMap中对数据聚集。对于需要归并排序的数据,现在内存中进行聚集,当内存数据达到阈值时,将数据排序后写入磁盘。事实上,磁盘上的数据只是全部数据的一部分,最后将磁盘数据全部进行归并排序和聚集。具体Aggregator的逻辑可以参见Aggregator类的实现。



本节就Shuffle的概念与原理先介绍到这里。在下一章讲解Spark源码时,会对Shuffle的核心机制——Shuffle存储做代码层面的讲解。相信学习完本章和第4章的Shuffle存储机制后,读者会对Shuffle机制掌握得更加深入。

相关文章
|
2月前
|
分布式计算 Spark 索引
Spark学习---day07、Spark内核(Shuffle、任务执行)
Spark学习---day07、Spark内核(源码提交流程、任务执行)
|
4月前
|
分布式计算 Java 调度
Spark中的Shuffle过程是什么?为什么它在性能上很关键?
Spark中的Shuffle过程是什么?为什么它在性能上很关键?
30 0
|
9月前
|
分布式计算 监控 Java
Spark学习---7、Spark内核(源码提交流程、任务执行、Shuffle、内存管理)(一)
Spark学习---7、Spark内核(源码提交流程、任务执行、Shuffle、内存管理)(一)
|
10月前
|
分布式计算 算法 Java
Spark shuffle、RDD 算子【重要】
Spark shuffle、RDD 算子【重要】
212 0
|
11月前
|
SQL 分布式计算 开发工具
Spark 3.1.1 shuffle fetch 导致shuffle错位的问题
Spark 3.1.1 shuffle fetch 导致shuffle错位的问题
270 0
|
11月前
|
存储 缓存 分布式计算
Spark 缓存和检查点机制
Spark 缓存和检查点机制
83 0
|
存储 分布式计算 负载均衡
OPPO 开源高可用、高性能的 Spark Remote Shuffle Service
大数据计算的兴起,源于 Google 的 MapReduce 论文,MapReduce 的原理很简单,其流程核心则是 Map 和 Reduce 两阶段数据交换,也即 Shuffle。
538 0
OPPO 开源高可用、高性能的 Spark Remote Shuffle Service
|
缓存 分布式计算 Spark
Spark之Shuffle机制及其文件寻址详解
Spark之Shuffle机制及其文件寻址详解
163 0
Spark之Shuffle机制及其文件寻址详解
|
存储 分布式计算 Java
SPARK 是怎么清除Shuffle中间结果数据的
SPARK 是怎么清除Shuffle中间结果数据的
381 0
|
分布式计算 Spark
SPARK push-based shuffle mapTask是怎么获取ESS列表信息
SPARK push-based shuffle mapTask是怎么获取ESS列表信息
200 0