一、介绍一下 Spark shuffle:
Spark shuffle 就是将分布在不同结点的数据按照一定的规则进行打乱重组。那么,说起 shuffle 就想到 MapReduce 中的 shuffle,MapReduce 中的 shuffle 是来连接 Map 和 Reduce 的桥梁,Map 的输出要用到 Reduce,则必须经过 shuffle 环节。由于 shuffle 阶段涉及磁盘的读写和网络传输,因此 shuffle 的性能直接影响整个程序的性能和吞吐量。
Spark shuffle 分为 HashShuffle 和 SortShuffle 两种,其中 HashShuffle 分为优化前和优化后。而现在默认用的是 SortShuffle。
注:下文中 Map Task 指的就是 Shuffle Write 阶段,Reduce Task 指的就是 Shuffle Read 阶段。
(1)在早期的版本中,用的是未优化的 HashShuffle,这种 shuffle 方式中,MapTask 过程会按照 Hash 的方式重组 Partition 的数据,但不进行排序,每个 MapTask 会为每个 ReduceTask 生成一个文件。但问题来了,如果有 M 个 MapTask 和 R 个 ReduceTask 的话,那将产生 M*R 个中间文件,文件数多了,也将伴随着大量的磁盘 I/O 与大量的内存开销,导致效率低下,同时容易引发 OOM。
(2)针对上面的问题,Spark 对 HashShuffle 进行了改进。一个 Executor 上所有的 MapTask 生成的分区文件只有一份,即将所有的 MapTask 相同的分区文件进行合并,这样每个 Executor 上最多只生成 R 个分区文件。(R 为 ReduceTask 的数量)
但问题又来了,虽然这样减少了文件数量,但如果下游 Stage 的分区数 N 很大,而一个 Executor 上又 K 个 Core,还是会产生 N*K 个文件,同样容易导致 OOM。
(3)为了更好的解决上面的问题,Spark 参考了 MapReduce 中的 shuffle 处理方式,引入基于排序的 shuffle 写操作机制。
每个 Task 不会为后续的每个 Task 创建单独的文件,而是通过缓冲区溢写的方式,在溢写前先根据 key 进行排序,然后默认以 1w 条数据为批次溢写到磁盘文件。每次溢写都会产生一个磁盘文件,也就是说一个 Task 过程会产生多个临时文件。最后将所有的临时文件进行合并,一次写入到最终文件中。这意味着一个 Task 的所有数据都在这一个文件中,同时单独写一份索引文件,标识各个 Task 的数据在文件中的索引。
总体来看,SortShuffle 解决了 HashShuffle 的所有弊端,但因为其 Shuffle 过程需要对记录进行排序,所以在性能上有所损失。
参考:Spark Shuffle 详解 - 知乎 (zhihu.com)
关于 Shuffle Read,主要了解以下问题:
- 在什么时候获取数据,Parent Stage 中的一个 ShuffleMapTask 执行完还是等全部 ShuffleMapTasks 执行完?
当 Parent Stage 的所有 ShuffleMapTasks 结束后再 fetch。- 边获取边处理还是一次性获取完再处理?
因为 Spark 不要求 Shuffle 后的数据全局有序,因此没必要等到全部数据 shuffle 完成后再处理,所以是边 fetch 边处理。
二、Spark 的 shuffle 过程数据一直放在内存中吗?
Spark 的 shuffle 过程一定会有落盘,因为 shuffle 过程中,分区间的数据需要打乱重组,那么下游的 Stage 必须等到上游的 Stage 全部处理完了之后才能进行处理,分区间不能流水线一样的处理了,所以如果数据全都放在内存,那么容易出现 OOM 问题,所以数据是要落盘处理的。
三、shuffle 操作增加了什么开销?底层用的什么算法?
Shuffle 操作会增加 磁盘 I/O、网络传输、序列化和反序列化的开销。
增加磁盘 I/O 开销:Map 端输出的数据在写入磁盘时,会产生磁盘 IO
增加网络传输开销:Reduce 任务需要从 Map 任务的输出中获取数据进行计算,会产生网络传输的开销。
增加序列化和反序列化的开销:Reduce 任务需要对 Shuffle 数据进行聚合和处理,产生序列化和反序列化开销。
shuffle 有两种算法,一个 HashShuffle、一个 SortShuffle,现在底层用的是 SortShuffle 算法。
四、为什么 Spark shuffle 比 MapReduce shuffle 快?
五、哪些算子会导致 Spark 产生 shuffle?
产生 shuffle 的原因是将分区间的数据进行混洗重组了,常用的如:
还有 distinct、sortBy、partitionBy、foldByKey、leftOuterJoin、rightOuterJoin、intersection、substract 等
六、Spark 中,说说 Transform 算子和 Action 算子的区别:
Transform 算子:即转换算子,用于功能的补充和封装,将旧的 RDD 包装成新的 RDD,常用的如 Map、flatMap、glom、filter 等。
Action 算子:即行动算子,行动算子会触发任务的调度和作业的执行,直接返回处理后的结果,而不是新的 RDD。
七、介绍一下 Spark 的常用算子、常见的行动算子、常见的转换算子(窄依赖、宽依赖算子)
转换算子:
窄依赖:map、flatMap、glom、filter
宽依赖:distinct、sortBy、partitionBy、reduceByKey、groupByKey、combineByKey、aggregateByKey、foldByKey、join、leftOuterJoin、rightOuterJoin、cogroup、coalesce、intersection、substract 等
行动算子:collect、foreach、reduce、fold、count、countByKey、countByValue、first、take、takeOrdered、aggregate