课程资料
课程视频:https://live.juejin.cn/4354/yc_Shuffle
课程PPT:https://bytedance.feishu.cn/file/boxcnQaV9uaxTp4xF0d1vEK5W3c
学员手册:https://juejin.cn/post/7123908203590451207/#heading-46
完整手册:https://bytedance.feishu.cn/docx/doxcnECGEFkCKYqbxaDipK1qrVf
一、shuffle概述
1.1 MapReduce概述
- map阶段:在单机上进行的针对一小块数据的计算过程
- shuffle阶段:在map的基础上,进行数据移动,为后续的reduce阶段做准备
- reduce阶段:对移动后的数据进行处理,依然是在单机上处理一小份数据
1.2 为什么shuffle对性能非常重要
- M * R次网络连接
- 大量的数据移动
- 数据丢失风险
- 可能存在大量的排序操作
- 大量的数据序列化、反序列化操作
- 数据压缩
二、Shuffle算子
2.1 shuffle算子概述
- Spark中会产生的算子大概可以分为四类
- 算子使用例子
2.2 shuffle算子构造
算子内部依赖关系
ShuffleDependency
CoGroupedRDD
Cogroup
- fullOuterJoin、rightOuterJoin、leftOuterJoin
- join
ShuffledRDD
combileByKeyWithClassTag
- combineByKey
- reduceByKey
- Coalesce
sortByKey
- sortBy
Shuffle Dependency 构造
创建会产生shuffle的RDD时,RDD会创建Shuffle Dependency来描述Shuffle相关的信息
- A single key-value pair RDD, i.e. RDD[Product2[K, V]],
- Partitioner (available as partitioner property),
- Serializer,
- Optional key ordering (of Scala’s scala.math.Ordering type),
- Optional Aggregator,
- mapSideCombine flag which is disabled (i.e. false) by default.
构造Partitioner
用来将record映射到具体的partition的方法(partition指的是map映射之后的多个数据存储文件)
Aggregator
- 在map侧合并部分record的函数
接口
- createCombiner:只有一个value的时候初始化的方法
- mergeValue:合并一个value到Aggregator中
- mergeCombiners:合并两个Aggregator
三、Shuffle过程
3.1 spark中的shuffle变迁
HashShuffle
- 优点:不需要排序
- 缺点:打开,创建的文件过多(每个partition会映射到一个独立的文件)
SortShuffle
- 优点:打开的文件少、支持map-side combine(每个task生成一个包含所有partition数据的文件)
- 缺点:需要排序
- 每个reduce task分别获取所有map task生成的属于自己的片段
TungstenSortShuffle
- 优点:更快的排序效率,更高的内存利用效率
- 缺点:不支持map-side combine
3.2 Register Shuffle
- 由action算子触发DAG Scheduler进行shuffle register
- Shuffle Register会根据不同的条件决定注册不同的ShuffleHandle
三种ShuffleHandle对应了三种不同的ShuffleWriter的实现
- BypassMergeSortShuffleWriter:HashShuffle
- UnsafeShuffleWriter:TunstonShuffle
- SortSHuffleWriter:SortShuffle
3.3 ShuffleReader网络请求流程
- 使用netty作为网络框架提供网络服务,并接受reducetask的fetch请求
- 首先发起openBlocks请求获得streamId,然后再处理stream或者chunk请求
Reader的实现—ShuffleBlockFetchIterator
- 区分local和remote节省网络消耗
防止OOM
- maxBytesInFlight
- maxReqsInFlight
- maxBlocksInFlightPerAddress
- maxReqSizeShuffleToMem
- maxAttemptsOnNettyOOM
- External Shuffle Service
为了解决Executor为了服务数据的fetch请求导致无法退出问题,我们在每个节点上部署一个External Shuffle Service,这样产生数据的Executor在不需要继续处理任务时,可以随意退出。
3.4 Shuffle的问题以及优化
常见问题
- 数据存储在本地磁盘,没有备份
- IO并发:大量RPC请求(M*R)
- IO吞吐:随机读、写放大(3M)
- GC频繁,影响NodeManager
优化1. Zero Copy
减少了文件拷配次&程序在拷贝过程中涉及到的用户态和内核态的切换,将文件缓冲区的数据直接输出到目标Channel
Netty 零拷贝
- 可堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。
- CompositeByteBuf 、 Unpooled.wrappedBuffer、 ByteBuf.slice ,可以合并、包装、切分数组,避免发生内存拷贝
- Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝
优化2. map-side预聚合算子
优化3. 倾斜优化
方式一:提高并行度
- 优点:足够简单
- 缺点:只缓解、不根治
- 方式二:Spark AEQ Skew Join
AEQ根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后进行join(会有重复出现)
优化4. 参数优化
四、Push Shuffle
4.1 为何需要
- Avg IO size太小,造成了大量的随机IO,严重影响磁盘的吞吐
- M * R次读请求,造成大量的网络连接,影响稳定性
4.2 Magnet实现原理
- Spark driver组件,协调整体的shuffle操作
- map任务的shuffle writer过程完成后,增加了一个额外的操作push-merge,将数据复制一份到原创的shuffle服务上
- magnet shuffle service是一个强化版的ESS,将隶属于同一个shuffle partition的block,会在远程传输到magnet后被merge到一个文件中
- reduce任务从magnet shuffle service接受合并好的shuffle数据
4.3 Magnet 可靠性
- 如果Map task输出的Block没有成功Push到magnet上,并且反复重试仍然失败,则reduce task直接从ESS上拉取原始的block数据
- 如果magnet上的block因为重复或者冲突等原因,没有正常完成merge的过程,则reduce task直接拉取未完成的merge的block
- 如果reduce拉取已经merge好的block失败,则直接拉取merge前的原始block
- 本质上,magnet中维护了两份shuffle数据的副本(有极小风险,但是选择接受)
- 下方是Cloud Shuffle Service的写入和读取流程
- Cloud Shuffle Service 支持AQE