👉引言💎
学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。 热爱写作,愿意让自己成为更好的人............
铭记于心 | ||
🎉✨🎉我唯一知道的,便是我一无所知🎉✨🎉 |
概述
学习目标:
- 了解spark中shuffle的发展历史以及主要实现机制。包括如何划分stage、partition分区、spill、combine等。
- 了解spark shuffle的底层实现原理以及Push-based shuffle总体架构方案。
- 了解shuffle优化,包括实际业务场景下如何避免产生shuffle、减少shuffle数据量、shuffle参数优化等
一、shuffle概述
1 MapReduce
- 《MapReduce:Simplified Data Processing on Large Clusters》
- 开源实现中主要分为Map,Shuffle,Reduce三个阶段
- 1.1 Map阶段
单机上针对一小块数据的计算过程
- 1.2 Shuffle阶段
在map阶段的基础上,进行数据移动,为后续的reduce阶段做准备
- 1.3 Reduce阶段
- 对移动后的数据进行处理,依然是在单机上处理一小份数据
2 Shuffle对性能极为重要
- 经典shuffle过程
各个引擎中,针对数据shuffle的优化都是重中之重,数据shuffle涉及到以下几个方面:
- M*R次网络连接
- 大量的数据移动
- 数据丢失风险
- 可能存在大量排序操作
- 大量数据序列化、反序列化操作
- 数据压缩
- 数据shuffle表示了不同分区数据交换的过程,不同的shuffle策略性能差异较大。目前在各个引擎中shuffle都是优化的重点,在spark框架中,shuffle是支撑spark进行大规模复杂数据处理的基石。
二、shuffle算子
1 常见的触发shuffle的算子
- repartition
- coalesce、repartition
- ByKey
- groupByKey、reduceByKey、aggregateByKey、combineByKey、sortByKeysortBy
- Join
- cogroup、join
- Distinct
- distinct
2 算子使用例子
val text = sc.textFile("mytextfile.txt") val counts = text .flatMap(line => line.split(" ")) .map(word => (word,1)) .reduceByKey(*+*) counts.collect
3 Spark中对shuffle的抽象
- 宽依赖
父RDD的每个分片至多被子RDD中的一个分片所依赖 - 窄依赖
父RDD中的分片可能被子RDD 中的多个分片所依赖
4 Shuffle Dependency算子依赖关系
- 创建会产生shuffle的RDD时,RDD会创建Shuffle Dependency来描述Shuffle相关的信息
- 1 构造函数
- A single key-value pair RDD, i.e. RDD[Product2[K, V]],
- Partitioner (available as partitioner property),
- Serializer,
- Optional key ordering (of Scala’s scala.math.Ordering type),
- Optional Aggregator,
- mapSideCombine flag which is disabled (i.e. false) by default.
- 2 Partitioner
- 用来将record映射到具体的partition的方法
- 接口
- numberPartitions
- getPartition
- 3 Aggregator
- 在map侧合并部分record的函数
- 接口
- createCombiner:只有一个value的时候初始化的方法
- mergeValue:合并一个value到Aggregator中
- mergeCombiners:合并两个Aggregator
🌹写在最后💖: 路漫漫其修远兮,吾将上下而求索!伙伴们,再见!🌹🌹🌹