Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(中)

简介: Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(中)

请看下面的例子(根据相同键,计算其所有值的平均值):


val cbRDD = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 5)))
val result = cbRDD.combineByKey(
// 分区内遇到新的键时,创建一个(累加值,出现次数)的键值对
(v) => (v, 1),
// 分区内遇到已经创建过的相应累加器的旧键时,更新对应累加器
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
// 多个分区遇到同一个键的累加器,更新主累加器
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
// 求平均值
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
// 输出结果
result.collectAsMap().foreach(println(_))


遍历到第一个元素: ("a", 1)时,因为此元素肯定没出现过,所以调用的是第一个参数:(v) => (v, 1),创建一个(累加值,出现次数)的键值对,此处的累加值为1,因为要计算平均值,所以此键值对为:(1,1)。


遍历到第二个元素:("a", 2)时,发现我们的 key 已经在遍历第一个元素时出现过了,所以,需要调用第二个参数:(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),acc 表示已经存在的(累加值,出现次数)键值对,acc._1 表示键值对的键(即为1),acc._2 表示键值对的值(即为1)。第二个元素的累加值为2,所以,此处(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1)得到的结果应该是:(1+2,1+1),即(3,2)。


遍历到第三个元素:("a", 3)时,发现 key 也已经存在了,一样是调用第二个参数。结果为:(3+3,2+1),即(6,3)。


以此类似,扫描完所有元素,扫描完后需要对多个分区的数据进行合并,调用第三个参数:

(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2),acc1、acc2表示具有相同键的键值对,比如"a"在一个分区的结果为:(6,3),一个分区为:(2,3),则进行相加,得到结果:(6+2,3+3),即(8,6)才是最终的结果(此比喻与例子无关)。


最后进行 map 算子操作,将 key 映射回来:.map{ case (key, value) => (key, value._1 / value._2.toFloat) },此处的value指的是我们前面所统计的键值对结果,比如(8,6),8为累加之和,6为一共有多少个数。


最后打印结果,如图:


image.png


【9】subtractByKey()

含义:删掉 rdd1中的与 rdd2的相同的元素。


val rdd1= sc.parallelize(List((1,2),(2,3),(2,4)))
val rdd2= sc.parallelize(List((2,4),(3,5)))
rdd1.subtractByKey(rdd2).collect()


image.png


代码解释:rdd1中有键为2的元素,rdd2中也有,所以,删除rdd1中的两个元素:(2,3),(2,4),最后剩下一个元素(1,2)

【10】cogroup()

含义:将两个 RDD 中拥有相同的数据分组到一起。


rdd1.cogroup(rdd2).collect().foreach(println(_))


image.png


代码解释:value 中第一个 CompactBuffer 为 rdd1 的值,第二个 CompactBuffer 为 rdd2 的值。

3. 行动算子
【1】countByKey()

含义:对每个键对应的元素分别计数。

val rdd = sc.parallelize(List((1,2),(2,3),(2,4),(3,5)))
rdd.countByKey().foreach(println(_))


image.png


代码解释:键为1出现了1次,2出现了2次,3出现了1次。

【2】lookup()

含义:返回给定键对应的所有值。

rdd.lookup(2)


image.png


【3】collectAsMap()

含义:将结果以映射表的形式返回,key 如果重复,后边的元素会覆盖前面的元素。与 collect 类似,但适用于键值 RDD 并且会保留其键值结构。

rdd.collectAsMap()


image.png


0x02 RDD的缓存与持久化


1. 缓存与持久化的意义

在大数据处理场景中,我们的数据量会达到TB、甚至PB级别,并且会重复调用同一组数据,如果每一次调用都要重新计算,将会非常消耗资源,所以我们可以对处理过程中的中间数据进行数据缓存,或者持久化到内存或者磁盘中。


2. 缓存

我们可以对 RDD 使用 cache()方法进行缓存,即在集群相关节点的内存中进行缓存。


首先,我们需要引入相关的模块:


注意:务必先启动HDFS再执行下面的代码!

put.txt 文件为:


shao nai yi
nai nai yi yi
shao nai nai
val textFileRDD = sc.textFile("hdfs://master:9999/files/put.txt")
val wordRDD = textFileRDD.flatMap(line => line.split(" "))
val pairWordRDD = wordRDD.map(word => (word, 1))
val wordCountRDD = pairWordRDD.reduceByKey((a, b) => a + b)
wordCountRDD.cache() // 这里还没有执行缓存
wordCountRDD.collect().foreach(println) // 遇到行动算子操作才真正开始计算RDD并缓存
相关文章
|
7月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
393 1
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
49 4
|
7月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
7月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
150 2
|
7月前
|
分布式计算 Hadoop Java
Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
【2月更文挑战第14天】Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
122 1
|
7月前
|
存储 缓存 分布式计算
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
149 1
|
7月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
122 1
|
7月前
|
存储 分布式计算 Scala
bigdata-36-Spark转换算子与动作算子
bigdata-36-Spark转换算子与动作算子
55 0
|
7月前
|
存储 缓存 分布式计算
Spark RDD持久化与缓存:提高性能的关键
Spark RDD持久化与缓存:提高性能的关键