💨Shuffle概述
🎈在开源实现的MapReduce中,存在Map、 Shuffle、 Reduce三个阶段。
Shuffle过程是MapReduce的核心。
Map阶段:是在单机上进行的针对-一小块数据的计算过程。Shuffle阶段:在map阶段的基础,上,进行数据移动,为后续的reduce阶段做准备。reduce阶段:对移动后的数据进行处理,依然是在单机上处理一小份数据。
🎈为什么需要Shuffle?
在分布式计算框架中,数据本地化是一个很重要的考虑,即计算需要被分发到数据所在的位置,从而减少数据的移动,提高运行效率。 hadoop中,map负责数据的初级拆分获取解析, reduce负责最终数据的集总,除了业务逻辑的功能外,其他的核心数据处理都是由shuffle来支持。
🎈Shuffle有什么?
简单来说,shuffle中有三次的数据排序
🚩第一次是 快速排序,这是因为第一次的数据全部在内存中开辟了一个缓冲区,数据从map出来后,分批进入缓冲区,对它们的索引进行排序,并且按照map的逻辑进行分区,在出缓冲区落盘的时候,完成排序。🚩第二次是归并排序,将第一次分批出来的文件进行区内归并排序。🚩第三次也是归并排序,将所有的map Task第二次产生的文件进行区内归并排序
这三次可以看做是一个整体的过程,从这里应该可以看出,shuffle是一个比较耗费资源并且时间开销比较大的环节。
🎈为什么Shuffle对性能非常重要?
🚩M * R次网络连接🚩大量的数据移动🚩数据丢失风险🚩可能存在大量的排序操作🚩大量的数据序列化、反序列化操作🚩数据压缩
🍳在大数据场景下,数据shuffle表示了不同分区数据交换的过程,不同的shufle策略性能差异较大。 目前在各个引擎中shuffle都是优化的重点,在spark框架中,shuffle 是支撑spark进行大规模复杂 数据处理的基石。
shuffle的数据来源于map,所以可以对map端出来的数据进行处理,我们可以采用压缩的方式尽量减少数据的规模。
💨Shuffle算子
💨分类
spark中会导致shuffle操作的有以下几种算子:
🚩repartition类的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等
🚩byKey类的操作:比如reduceByKey、groupByKey、sortByKey等
🚩join类的操作:比如join、cogroup等
🚩Distinct类的操作:distinct
💨Spark中对shuffle的抽象
窄依赖:父RDD的每个分片至多被子RDD中的一个分片所依赖
宽依赖:父RDD中的分片可能被子RDD中的多个分片所依赖
💨算子内部的依赖关系
ShuffleDependency:
🍳CoGroupedRDD含Cogroup
🚩fullOuterJoin、rightOuterJoin、 leftOuterJoin🚩join
🍳ShuffledRDD
🚩combineByKeyWithClassTag combineByKey reduceByKey🚩Coalesce🚩sortByKey sortBy
💨Shuffle过程
💨Write
Spark中需要Shuffle 输出的Map任务会为每个Reduce创建对应的bucket,Map产生的结果会根据设置的partitioner得到对应的bucketId,然后填 充到相应的bucket中去。每个Map的输出结果可能包含所有的Reduce所需要的数据,所以每个Map会创建R个bucket(R是reduce的 个数),M个Map总共会创建M*R个bucket。
💨Fether
Reduce去拖Map的输出数据,Spark提供了两套不同的拉取数据框架:通过socket连接去取数据;使用netty框架去取数据。Spark Map输出的数据没有经过排序,Spark Shuffle过来的数据也不会进行排序,Spark认为Shuffle过程中的排序不是必须的,并不是所有类型的Reduce需要的数据都需要排序,强 制地进行排序只会增加Shuffle的负担。educe拖过来的数据会放在一个HashMap中,HashMap中存储的也是对,key是Map输出的key,Map输出对应这个key的所有value组成HashMap的value。Spark将 Shuffle取过来的每一个对插入或者更新到HashMap中,来一个处理一个。HashMap全部放在内存中。
💨Shuffle Handle创建
🎈Register Shuffle时做的最重要的事情是根据不同条件创建不同的shuffle Handle
🎈Shuffle Handle与Shuffle Writer的对应关系
BypassMergeSortShuffleHandle——>BypassMergeSortShuffleWriter
SerializedShuffleHandle——>UnsafeShuffleWriter
BaseShuffleHandle——>SortShuffleWriter
💨Reader实现-网络时序图
🎈使用基于netty的网络通信框架
🎈位置信息记录在MapOutputTracker中
🎈主要会发送两种类型的请求
🚩OpenBlocks请求
🚩Chunk请求或Stream请求
💨Shuffle优化使用的技术: Netty Zero Copy
🚩可堆外内存,避免JVM堆内存到堆外内存的数据拷贝。🚩CompositeByteBuf、Unpooled.wrappedBuffer. ByteBuf.slice,可以合并、包装、切分数组,避免发生内存拷贝🚩Netty使用FileRegion实现文件传输,FileRegion底层封装了FileChannel#transferTo()方法,可以将文件缓冲区的数据直接传输到目标Channel, 避免内核缓冲区和用户态缓冲区之间的数据拷贝
在第一次排序之后,此时由于原数据中各个字段可能会有数据分布不均,这样会导致reduce端处理数据时的数据倾斜——各个Task的处理量相差悬殊,可以在此处进行初步的数据合并处理。
应用场景:
去重操作;聚合,byKey类操作;排序操作等
💨常见问题
🚩数据存储在本地磁盘,没有备份
🚩I0并发:大量RPC请求(M*R)
🚩I0吞吐:随机读、写放大(3X)
🚩GC频繁,影响NodeManager
💨Shuffle优化
🚩避免shuffle,使用broadcast替代join
🚩使用可以map-side预聚合的算子