RDD 的分区和 Shuflee_Shuffle 过程扫盲 | 学习笔记

简介: 快速学习 RDD 的分区和 Shuflee_Shuffle 过程扫盲

开发者学堂课程【大数据Spark2020最新课程(知识精讲与实战演练)第二阶段RDD 的分区和 Shuflee_Shuffle 过程扫盲学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/689/detail/11995


RDD 的分区和 Shuflee_Shuffle 过程扫盲


内容介绍

一、Shuffle 说明

二、Hash base shuffle

三、Sort base shuffle

 

一、Shuffle 说明

关于 RDD 分区,在最后一部分简要介绍一个 shuffle 过程,接下来在后面的内容当中一部分是 spark 的原理,而 spark 的原理中一部分是讲 shuffle 的原理。

在讲 shuffle 原理之前,在 sparkRDD 特性的介绍部分,要进行一个简单的 shuffle 说明,把这些概念要扫一个盲,通过这个视频可以让大家理解 shuffle 过程中间所涉及的概念以及 shuffle spark 当中的进化。

RDD Shuffle 是什么

val sourceRdd = sc.textFile (" hdfs ://node01:9θ2θ/dataset/ wordcount . txt ")

val flattenCountRdd = sourceRdd.flatMap (_.split (“ ”)). map ((_,1))

val aggCountRdd = flattenCountRdd . reduceByKey (_+_)

val result = aggCountRdd . collect

这份代码其实就是 flatmapreduce 这三个算子配合而成的一个磁片统计。

image.png

步骤

数据存放在 hdfs 上的一个文件,接下来读取文件生成一个 RDD ,生成的 RDD 是一行一行的字符串,再通过 flatmap RDD 进行一对多,把单词拆解出来,变成第二列这样的 RDD ,这个 RDD 中的每一条数据就是一个单词。接下来为每一个数据指定一个初始词频,然后按照key 把所有的初始词频汇总起来,汇总的过程就是一个 shuffle 过程,假设第三列为 rdd1,第四列是 rdd2,整个过程就相当于 rdd2=rdd1.reduceByKey(),reduceByKey 在这个 rdd2=rdd1.reduceByKey()函数当中应该是rdd2的计算函数,因为 rdd2是通过 reduceByKey rdd1中拉取数据, reduceByKey 应该是属于 rdd2的。

rdd2是整个算子的 reduce 端,rdd1是整个算子的 map 端,数据从 rdd1流向 rdd2

通过 rdd 五大属性中的 partitioner 函数来确定 Hadoop 应该分发到哪一台机器。这个函数默认就是一个 hashpartitioner Hashpartitioner 的作用是对 Hadoop 求一个 hashcode 和三取一个模,模是几就分发到哪一个 reducer

第一张图是从数据的视角上说明了 shuffle 的过程,接下来从分区视角上说明 shuffle 过程。

image.png

对于分区过程,RDD2RDD3 shuffle 之间把 RDD2的数据分发到 RDD3当中他们之间一个交错的关系。即一个RDD3中的分区可能会有三个副分区,RDD2中的每一个分区都有可能是 RDD3的副分区。Shuffle 一般分为端:第一端为 mapper 端,第二端为 reducer 端,如果 RDD2mapperRDD3reducer

reducer mapper 中拉取数据有两种方法,第一种是 mapper 直接把数据推送到 reducer 中去,第二种是 mapper把数据放在内存或磁盘里 reducer 去拉。在分布式框架计算当中 Shuffle 的过程 mapper reducer 之间,Mapper就是放在一个文件当中,reducer 去拉取文件这样来拷贝数据。Reducer 一般通过拉取文件获取数据,mapper 的任务是把发给 reducer 的数据放在文件中。

reduceByKey 这个算子本质上是先按照 Key 分组,再对每一组数据进行 reduce ,但是 Key 相同的所有数据分布在不同的 Partition 分区中,甚至可能在不同的节点中,但是它们必须被共同计算。为了让来自相同 Key 的所有数据都在 reduceByKey 的同一个 reduce 中处理,需要执行 all - to - all 的操作,需要在不同的节点(不同的分区)之间拷贝数据,必须跨分区聚集相同 Key 的所有数据,这个过程叫做 Shuffle

mapper reducer 之间进行数据拷贝有两种方式,第一种方式叫 Hash base shuffle,第二个叫做 Sort base shuffle。这两个 Shuffle 方式其实也是大数据平台,在数据处理学术界所研究的内容之一。

RDD Shuffle 原理。

Spark Shuffle 发展大致有两个阶段:Hash base shuffleSort base shuffle

 

二、Hash base shuffle

 image.png

Hash base shuffle Spark 早期版本所用的 Shuffle 过程,Hash base shuffle 的原理其实是在描述 mapper reducer 之间如何去存放文件,如何去拷贝文件的过程。

现在所说的 Hash base shuffleSort base shuffle partitioner 没有关系,partitioner 用于计算一条数据应该发往哪一个机器上,Hash base shuffle Sort base shuffle 用于描述中间过程如何存文件。如果用 Hash base shuffle,加入要分发给两个 reducer,应该生成两个文件,应该把发往 reduce1的数据存放到第一个文件当中,把发往 reduce2的数据存放在第二个文件当中。

第二个 mapper 端同理,他也有两个文件,一个发往 reducer1,一个发往 reducer2Map3也是一样的也是两个文件,一个给1,一个给2

第二步过程,reducer 可以直接到第一个 map 1这个文件当中去拷贝,也可以到第二个 map 1这个文件当中去拷贝,也可以到第三个 map 1这个文件当中去拷贝。Reducer2同理。这个过程称为 Hash base shuffle 过程。一条数据先取不同的 hash,然后放在不同的文件当中。

假设有1000mapper1000reducer,中间产生1000*1000个文件,即1000000个文件。Spark 在早期被很多人指责,说他不适合作大规模的数据处理,大规模数据处理还是用 spark 的原因为 mapreduce 不是 Hash base shuffle,而 spark rdd Hash base shuffle,它产生的中间文件过多。

其实最根本解决这个问题的办法是使用 Sort base shuffle

 

三、Sort base shuffle

image.png

Sort base shuffle 不会像 Hash base shuffle 把数据存到不同的文件当中,Sort base shuffle 会把所有的数据计算出应该发往哪个分区以后,比如第一个 Map 会把所有的数据都放在一个集合当中,这个集合叫做 AppendOnlyMap,指的是追加一个 map,接下来所有的数据都在这一个 map 中。

这个 map,类似于 map reduce 的缓冲区,即环形缓冲区。接下来使用一个叫做 Tim Sort,使用这个 Tim Sort 来对这一个 map 进行排序。首先按照 Partition ID 来进行排序,然后按照 K hash Code 来进行排序,这个时候会产生一组发往 Reduce1的数据叫做r1m1r1 m1r2 m1r3这样的一个过程,对于 reduce 可以去找到对应的某一个部分有一个 renge。第一部分从第一条到第十条都是发给 reduce1的,这些数据就发给 reduce1。从第20条到第200条都是发给 reduce2的,就发给 reduce2

这种排列方式 shuffle 的方式,叫做 Sort base shuffleSort base shuffle 是明显可以解决 shuffle 文件过多的问题。对于每一个 map 来说只有一个文件。

这两种 shuffle 方式是不太一样的。但是在 Spark2.0以后又产生了一种新的 shuffle 方式叫做 Unsafe APIUnsafe API 解决了内存的问题。

总结,首先说了一个 Partitioner 函数,然后说了 shuffle 的两种方式,分别为 Sort base shuffleHash base shuffle

相关文章
|
存储 人工智能 算法
【五子棋实战】第2章 博弈树负值极大alpha-beta剪枝算法(二)
  博弈树(Game Tree)是博弈论中的一个概念,用于表示博弈过程中的各种可能走法和对应的结果。它是树结构,树的每个节点表示游戏的一个状态,每个节点的子节点表示在该状态下可能的下一步行动。
455 0
|
数据处理 索引 Python
Pandas中concat的用法
Pandas中concat的用法
609 1
|
运维 监控 数据可视化
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
403 1
|
11月前
|
存储 机器学习/深度学习 搜索推荐
去中心化的模型训练
去中心化的模型训练(Decentralized Model Training)是一种不依赖单一中心服务器或数据存储中心,而是在多个节点(如设备或数据拥有者)上进行联合训练的方法。这种训练模式可以更好地保护数据隐私、降低数据传输成本,并提升模型的鲁棒性和可扩展性。随着数据安全和隐私保护需求的提升,去中心化训练在深度学习和人工智能应用中的重要性逐渐增加。
322 4
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之运行DDL任务时出现异常,具体错误是ODPS-0110061,该如何处理
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
272 3
|
JSON NoSQL MongoDB
mongodb导出聚合查询的数据
mongodb导出聚合查询的数据
|
存储 安全 API
C++一分钟之-C++中的枚举类型(enum class)
【6月更文挑战第25天】C++的`enum class`(强类型枚举)在C++11中引入,增强了枚举的作用域和类型安全,减少命名冲突。它要求使用全名(如`Color::Green`)访问枚举成员,并能显式指定底层类型。常见问题包括默认值非0、隐式转换和范围溢出,解决办法是明确赋值、显式转换和选择合适底层类型。高效技巧包括用于状态机、作为函数参数、创建别名和迭代。掌握这些能提升代码质量。
559 0
|
缓存 前端开发 JavaScript
深入理解React Hooks:原理、应用与最佳实践
【4月更文挑战第6天】React Hooks是16.8版引入的更新,允许在函数组件中处理状态和生命周期。useState用于添加状态,返回状态值和更新函数。useEffect处理副作用,根据依赖项执行和清理。其他Hooks如useContext和useReducer进一步扩展功能。Hooks适用于状态管理、生命周期逻辑、性能优化和跨组件共享。最佳实践包括明确依赖、避免滥用、编写自定义Hook和遵循规则。它们提高了代码可读性和复用性,通过理解原理和实践,开发者能更好地掌握React开发。
967 1
|
分布式计算 算法 Java
Spark shuffle、RDD 算子【重要】
Spark shuffle、RDD 算子【重要】
642 0
|
分布式计算 监控 数据处理
Spark Streaming的DStream与窗口操作
Spark Streaming的DStream与窗口操作