RDD 的分区和 Shuflee_Shuffle 过程扫盲 | 学习笔记

简介: 快速学习 RDD 的分区和 Shuflee_Shuffle 过程扫盲

开发者学堂课程【大数据Spark2020最新课程(知识精讲与实战演练)第二阶段RDD 的分区和 Shuflee_Shuffle 过程扫盲学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/689/detail/11995


RDD 的分区和 Shuflee_Shuffle 过程扫盲


内容介绍

一、Shuffle 说明

二、Hash base shuffle

三、Sort base shuffle

 

一、Shuffle 说明

关于 RDD 分区,在最后一部分简要介绍一个 shuffle 过程,接下来在后面的内容当中一部分是 spark 的原理,而 spark 的原理中一部分是讲 shuffle 的原理。

在讲 shuffle 原理之前,在 sparkRDD 特性的介绍部分,要进行一个简单的 shuffle 说明,把这些概念要扫一个盲,通过这个视频可以让大家理解 shuffle 过程中间所涉及的概念以及 shuffle spark 当中的进化。

RDD Shuffle 是什么

val sourceRdd = sc.textFile (" hdfs ://node01:9θ2θ/dataset/ wordcount . txt ")

val flattenCountRdd = sourceRdd.flatMap (_.split (“ ”)). map ((_,1))

val aggCountRdd = flattenCountRdd . reduceByKey (_+_)

val result = aggCountRdd . collect

这份代码其实就是 flatmapreduce 这三个算子配合而成的一个磁片统计。

image.png

步骤

数据存放在 hdfs 上的一个文件,接下来读取文件生成一个 RDD ,生成的 RDD 是一行一行的字符串,再通过 flatmap RDD 进行一对多,把单词拆解出来,变成第二列这样的 RDD ,这个 RDD 中的每一条数据就是一个单词。接下来为每一个数据指定一个初始词频,然后按照key 把所有的初始词频汇总起来,汇总的过程就是一个 shuffle 过程,假设第三列为 rdd1,第四列是 rdd2,整个过程就相当于 rdd2=rdd1.reduceByKey(),reduceByKey 在这个 rdd2=rdd1.reduceByKey()函数当中应该是rdd2的计算函数,因为 rdd2是通过 reduceByKey rdd1中拉取数据, reduceByKey 应该是属于 rdd2的。

rdd2是整个算子的 reduce 端,rdd1是整个算子的 map 端,数据从 rdd1流向 rdd2

通过 rdd 五大属性中的 partitioner 函数来确定 Hadoop 应该分发到哪一台机器。这个函数默认就是一个 hashpartitioner Hashpartitioner 的作用是对 Hadoop 求一个 hashcode 和三取一个模,模是几就分发到哪一个 reducer

第一张图是从数据的视角上说明了 shuffle 的过程,接下来从分区视角上说明 shuffle 过程。

image.png

对于分区过程,RDD2RDD3 shuffle 之间把 RDD2的数据分发到 RDD3当中他们之间一个交错的关系。即一个RDD3中的分区可能会有三个副分区,RDD2中的每一个分区都有可能是 RDD3的副分区。Shuffle 一般分为端:第一端为 mapper 端,第二端为 reducer 端,如果 RDD2mapperRDD3reducer

reducer mapper 中拉取数据有两种方法,第一种是 mapper 直接把数据推送到 reducer 中去,第二种是 mapper把数据放在内存或磁盘里 reducer 去拉。在分布式框架计算当中 Shuffle 的过程 mapper reducer 之间,Mapper就是放在一个文件当中,reducer 去拉取文件这样来拷贝数据。Reducer 一般通过拉取文件获取数据,mapper 的任务是把发给 reducer 的数据放在文件中。

reduceByKey 这个算子本质上是先按照 Key 分组,再对每一组数据进行 reduce ,但是 Key 相同的所有数据分布在不同的 Partition 分区中,甚至可能在不同的节点中,但是它们必须被共同计算。为了让来自相同 Key 的所有数据都在 reduceByKey 的同一个 reduce 中处理,需要执行 all - to - all 的操作,需要在不同的节点(不同的分区)之间拷贝数据,必须跨分区聚集相同 Key 的所有数据,这个过程叫做 Shuffle

mapper reducer 之间进行数据拷贝有两种方式,第一种方式叫 Hash base shuffle,第二个叫做 Sort base shuffle。这两个 Shuffle 方式其实也是大数据平台,在数据处理学术界所研究的内容之一。

RDD Shuffle 原理。

Spark Shuffle 发展大致有两个阶段:Hash base shuffleSort base shuffle

 

二、Hash base shuffle

 image.png

Hash base shuffle Spark 早期版本所用的 Shuffle 过程,Hash base shuffle 的原理其实是在描述 mapper reducer 之间如何去存放文件,如何去拷贝文件的过程。

现在所说的 Hash base shuffleSort base shuffle partitioner 没有关系,partitioner 用于计算一条数据应该发往哪一个机器上,Hash base shuffle Sort base shuffle 用于描述中间过程如何存文件。如果用 Hash base shuffle,加入要分发给两个 reducer,应该生成两个文件,应该把发往 reduce1的数据存放到第一个文件当中,把发往 reduce2的数据存放在第二个文件当中。

第二个 mapper 端同理,他也有两个文件,一个发往 reducer1,一个发往 reducer2Map3也是一样的也是两个文件,一个给1,一个给2

第二步过程,reducer 可以直接到第一个 map 1这个文件当中去拷贝,也可以到第二个 map 1这个文件当中去拷贝,也可以到第三个 map 1这个文件当中去拷贝。Reducer2同理。这个过程称为 Hash base shuffle 过程。一条数据先取不同的 hash,然后放在不同的文件当中。

假设有1000mapper1000reducer,中间产生1000*1000个文件,即1000000个文件。Spark 在早期被很多人指责,说他不适合作大规模的数据处理,大规模数据处理还是用 spark 的原因为 mapreduce 不是 Hash base shuffle,而 spark rdd Hash base shuffle,它产生的中间文件过多。

其实最根本解决这个问题的办法是使用 Sort base shuffle

 

三、Sort base shuffle

image.png

Sort base shuffle 不会像 Hash base shuffle 把数据存到不同的文件当中,Sort base shuffle 会把所有的数据计算出应该发往哪个分区以后,比如第一个 Map 会把所有的数据都放在一个集合当中,这个集合叫做 AppendOnlyMap,指的是追加一个 map,接下来所有的数据都在这一个 map 中。

这个 map,类似于 map reduce 的缓冲区,即环形缓冲区。接下来使用一个叫做 Tim Sort,使用这个 Tim Sort 来对这一个 map 进行排序。首先按照 Partition ID 来进行排序,然后按照 K hash Code 来进行排序,这个时候会产生一组发往 Reduce1的数据叫做r1m1r1 m1r2 m1r3这样的一个过程,对于 reduce 可以去找到对应的某一个部分有一个 renge。第一部分从第一条到第十条都是发给 reduce1的,这些数据就发给 reduce1。从第20条到第200条都是发给 reduce2的,就发给 reduce2

这种排列方式 shuffle 的方式,叫做 Sort base shuffleSort base shuffle 是明显可以解决 shuffle 文件过多的问题。对于每一个 map 来说只有一个文件。

这两种 shuffle 方式是不太一样的。但是在 Spark2.0以后又产生了一种新的 shuffle 方式叫做 Unsafe APIUnsafe API 解决了内存的问题。

总结,首先说了一个 Partitioner 函数,然后说了 shuffle 的两种方式,分别为 Sort base shuffleHash base shuffle

相关文章
|
1月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
42 4
|
1月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
45 0
|
1月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
50 0
|
1月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
38 0
|
6月前
|
分布式计算 Hadoop Java
Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
【2月更文挑战第14天】Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
115 1
|
存储 缓存 分布式计算
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
150 0
Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(下)
|
分布式计算 大数据 Spark
RDD 算子_转换_重分区 | 学习笔记
快速学习 RDD 算子_转换_重分区
RDD 算子_转换_重分区 | 学习笔记
|
分布式计算 大数据 Scala
RDD 的分区和 Shuflee_通过算子重分区 | 学习笔记
快速学习 RDD 的分区和 Shuflee_通过算子重分区
RDD 的分区和 Shuflee_通过算子重分区 | 学习笔记
|
存储 缓存 分布式计算
RDD 的分区和 Shuffle 介绍 | 学习笔记
快速学习 RDD的分区和Shuffle介绍
RDD 的分区和 Shuffle 介绍 | 学习笔记
|
分布式计算 大数据 Shell
RDD 的分区和 shuffle 创建 RDD 时指定分区数 | 学习笔记
快速学习 RDD 的分区和 shuffle 创建 RDD 时指定分区数
101 0
RDD 的分区和 shuffle 创建 RDD 时指定分区数 | 学习笔记