Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(中)

简介: Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(中)

请看下面的例子(根据相同键,计算其所有值的平均值):


val cbRDD = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 5)))
val result = cbRDD.combineByKey(
// 分区内遇到新的键时,创建一个(累加值,出现次数)的键值对
(v) => (v, 1),
// 分区内遇到已经创建过的相应累加器的旧键时,更新对应累加器
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
// 多个分区遇到同一个键的累加器,更新主累加器
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
// 求平均值
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
// 输出结果
result.collectAsMap().foreach(println(_))


遍历到第一个元素: ("a", 1)时,因为此元素肯定没出现过,所以调用的是第一个参数:(v) => (v, 1),创建一个(累加值,出现次数)的键值对,此处的累加值为1,因为要计算平均值,所以此键值对为:(1,1)。


遍历到第二个元素:("a", 2)时,发现我们的 key 已经在遍历第一个元素时出现过了,所以,需要调用第二个参数:(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),acc 表示已经存在的(累加值,出现次数)键值对,acc._1 表示键值对的键(即为1),acc._2 表示键值对的值(即为1)。第二个元素的累加值为2,所以,此处(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1)得到的结果应该是:(1+2,1+1),即(3,2)。


遍历到第三个元素:("a", 3)时,发现 key 也已经存在了,一样是调用第二个参数。结果为:(3+3,2+1),即(6,3)。


以此类似,扫描完所有元素,扫描完后需要对多个分区的数据进行合并,调用第三个参数:

(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2),acc1、acc2表示具有相同键的键值对,比如"a"在一个分区的结果为:(6,3),一个分区为:(2,3),则进行相加,得到结果:(6+2,3+3),即(8,6)才是最终的结果(此比喻与例子无关)。


最后进行 map 算子操作,将 key 映射回来:.map{ case (key, value) => (key, value._1 / value._2.toFloat) },此处的value指的是我们前面所统计的键值对结果,比如(8,6),8为累加之和,6为一共有多少个数。


最后打印结果,如图:


image.png


【9】subtractByKey()

含义:删掉 rdd1中的与 rdd2的相同的元素。


val rdd1= sc.parallelize(List((1,2),(2,3),(2,4)))
val rdd2= sc.parallelize(List((2,4),(3,5)))
rdd1.subtractByKey(rdd2).collect()


image.png


代码解释:rdd1中有键为2的元素,rdd2中也有,所以,删除rdd1中的两个元素:(2,3),(2,4),最后剩下一个元素(1,2)

【10】cogroup()

含义:将两个 RDD 中拥有相同的数据分组到一起。


rdd1.cogroup(rdd2).collect().foreach(println(_))


image.png


代码解释:value 中第一个 CompactBuffer 为 rdd1 的值,第二个 CompactBuffer 为 rdd2 的值。

3. 行动算子
【1】countByKey()

含义:对每个键对应的元素分别计数。

val rdd = sc.parallelize(List((1,2),(2,3),(2,4),(3,5)))
rdd.countByKey().foreach(println(_))


image.png


代码解释:键为1出现了1次,2出现了2次,3出现了1次。

【2】lookup()

含义:返回给定键对应的所有值。

rdd.lookup(2)


image.png


【3】collectAsMap()

含义:将结果以映射表的形式返回,key 如果重复,后边的元素会覆盖前面的元素。与 collect 类似,但适用于键值 RDD 并且会保留其键值结构。

rdd.collectAsMap()


image.png


0x02 RDD的缓存与持久化


1. 缓存与持久化的意义

在大数据处理场景中,我们的数据量会达到TB、甚至PB级别,并且会重复调用同一组数据,如果每一次调用都要重新计算,将会非常消耗资源,所以我们可以对处理过程中的中间数据进行数据缓存,或者持久化到内存或者磁盘中。


2. 缓存

我们可以对 RDD 使用 cache()方法进行缓存,即在集群相关节点的内存中进行缓存。


首先,我们需要引入相关的模块:


注意:务必先启动HDFS再执行下面的代码!

put.txt 文件为:


shao nai yi
nai nai yi yi
shao nai nai
val textFileRDD = sc.textFile("hdfs://master:9999/files/put.txt")
val wordRDD = textFileRDD.flatMap(line => line.split(" "))
val pairWordRDD = wordRDD.map(word => (word, 1))
val wordCountRDD = pairWordRDD.reduceByKey((a, b) => a + b)
wordCountRDD.cache() // 这里还没有执行缓存
wordCountRDD.collect().foreach(println) // 遇到行动算子操作才真正开始计算RDD并缓存
相关文章
|
2月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
2月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
48 5
|
2月前
|
缓存 监控 安全
检测 Webpack 5 持久化缓存是否存在安全漏洞
【10月更文挑战第23天】通过全面、系统地检测和评估,能够及时发现 Webpack 5 持久化缓存的安全漏洞,并采取有效的措施进行修复,保障项目的安全稳定运行。同时,要持续关注安全技术的发展和变化,不断提升安全检测能力,以应对日益复杂的安全挑战。
|
2月前
|
存储 缓存 监控
配置 Webpack 5 持久化缓存时需要注意哪些安全问题?
【10月更文挑战第23天】通过全面、系统地分析和应对安全问题,能够更好地保障 Webpack 5 持久化缓存的安全,为项目的成功构建和运行提供坚实的安全基础。同时,要保持对安全技术的关注和学习,不断提升安全防范能力,以应对日益复杂的安全挑战。
|
2月前
|
存储 缓存 前端开发
利用 Webpack 5 的持久化缓存来提高构建效率
【10月更文挑战第23天】利用 Webpack 5 的持久化缓存是提高构建效率的有效手段。通过合理的配置和管理,我们可以充分发挥缓存的优势,为项目的构建和开发带来更大的便利和效率提升。你可以根据项目的实际情况,结合以上步骤和方法,进一步优化和完善利用持久化缓存的策略,以达到最佳的构建效果。同时,不断探索和实践新的方法和技术,以适应不断变化的前端开发环境和需求。
|
3月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
56 4
|
3月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
52 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
43 0
|
3月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
100 0
|
3月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
58 0