Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(中)

简介: Spark RDD算子进阶(转换算子、行动算子、缓存、持久化)(中)

请看下面的例子(根据相同键,计算其所有值的平均值):


val cbRDD = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 5)))
val result = cbRDD.combineByKey(
// 分区内遇到新的键时,创建一个(累加值,出现次数)的键值对
(v) => (v, 1),
// 分区内遇到已经创建过的相应累加器的旧键时,更新对应累加器
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
// 多个分区遇到同一个键的累加器,更新主累加器
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
// 求平均值
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
// 输出结果
result.collectAsMap().foreach(println(_))


遍历到第一个元素: ("a", 1)时,因为此元素肯定没出现过,所以调用的是第一个参数:(v) => (v, 1),创建一个(累加值,出现次数)的键值对,此处的累加值为1,因为要计算平均值,所以此键值对为:(1,1)。


遍历到第二个元素:("a", 2)时,发现我们的 key 已经在遍历第一个元素时出现过了,所以,需要调用第二个参数:(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),acc 表示已经存在的(累加值,出现次数)键值对,acc._1 表示键值对的键(即为1),acc._2 表示键值对的值(即为1)。第二个元素的累加值为2,所以,此处(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1)得到的结果应该是:(1+2,1+1),即(3,2)。


遍历到第三个元素:("a", 3)时,发现 key 也已经存在了,一样是调用第二个参数。结果为:(3+3,2+1),即(6,3)。


以此类似,扫描完所有元素,扫描完后需要对多个分区的数据进行合并,调用第三个参数:

(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2),acc1、acc2表示具有相同键的键值对,比如"a"在一个分区的结果为:(6,3),一个分区为:(2,3),则进行相加,得到结果:(6+2,3+3),即(8,6)才是最终的结果(此比喻与例子无关)。


最后进行 map 算子操作,将 key 映射回来:.map{ case (key, value) => (key, value._1 / value._2.toFloat) },此处的value指的是我们前面所统计的键值对结果,比如(8,6),8为累加之和,6为一共有多少个数。


最后打印结果,如图:


image.png


【9】subtractByKey()

含义:删掉 rdd1中的与 rdd2的相同的元素。


val rdd1= sc.parallelize(List((1,2),(2,3),(2,4)))
val rdd2= sc.parallelize(List((2,4),(3,5)))
rdd1.subtractByKey(rdd2).collect()


image.png


代码解释:rdd1中有键为2的元素,rdd2中也有,所以,删除rdd1中的两个元素:(2,3),(2,4),最后剩下一个元素(1,2)

【10】cogroup()

含义:将两个 RDD 中拥有相同的数据分组到一起。


rdd1.cogroup(rdd2).collect().foreach(println(_))


image.png


代码解释:value 中第一个 CompactBuffer 为 rdd1 的值,第二个 CompactBuffer 为 rdd2 的值。

3. 行动算子
【1】countByKey()

含义:对每个键对应的元素分别计数。

val rdd = sc.parallelize(List((1,2),(2,3),(2,4),(3,5)))
rdd.countByKey().foreach(println(_))


image.png


代码解释:键为1出现了1次,2出现了2次,3出现了1次。

【2】lookup()

含义:返回给定键对应的所有值。

rdd.lookup(2)


image.png


【3】collectAsMap()

含义:将结果以映射表的形式返回,key 如果重复,后边的元素会覆盖前面的元素。与 collect 类似,但适用于键值 RDD 并且会保留其键值结构。

rdd.collectAsMap()


image.png


0x02 RDD的缓存与持久化


1. 缓存与持久化的意义

在大数据处理场景中,我们的数据量会达到TB、甚至PB级别,并且会重复调用同一组数据,如果每一次调用都要重新计算,将会非常消耗资源,所以我们可以对处理过程中的中间数据进行数据缓存,或者持久化到内存或者磁盘中。


2. 缓存

我们可以对 RDD 使用 cache()方法进行缓存,即在集群相关节点的内存中进行缓存。


首先,我们需要引入相关的模块:


注意:务必先启动HDFS再执行下面的代码!

put.txt 文件为:


shao nai yi
nai nai yi yi
shao nai nai
val textFileRDD = sc.textFile("hdfs://master:9999/files/put.txt")
val wordRDD = textFileRDD.flatMap(line => line.split(" "))
val pairWordRDD = wordRDD.map(word => (word, 1))
val wordCountRDD = pairWordRDD.reduceByKey((a, b) => a + b)
wordCountRDD.cache() // 这里还没有执行缓存
wordCountRDD.collect().foreach(println) // 遇到行动算子操作才真正开始计算RDD并缓存
相关文章
|
23天前
|
缓存 监控 安全
检测 Webpack 5 持久化缓存是否存在安全漏洞
【10月更文挑战第23天】通过全面、系统地检测和评估,能够及时发现 Webpack 5 持久化缓存的安全漏洞,并采取有效的措施进行修复,保障项目的安全稳定运行。同时,要持续关注安全技术的发展和变化,不断提升安全检测能力,以应对日益复杂的安全挑战。
|
22天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
23天前
|
存储 缓存 监控
配置 Webpack 5 持久化缓存时需要注意哪些安全问题?
【10月更文挑战第23天】通过全面、系统地分析和应对安全问题,能够更好地保障 Webpack 5 持久化缓存的安全,为项目的成功构建和运行提供坚实的安全基础。同时,要保持对安全技术的关注和学习,不断提升安全防范能力,以应对日益复杂的安全挑战。
|
23天前
|
存储 缓存 前端开发
利用 Webpack 5 的持久化缓存来提高构建效率
【10月更文挑战第23天】利用 Webpack 5 的持久化缓存是提高构建效率的有效手段。通过合理的配置和管理,我们可以充分发挥缓存的优势,为项目的构建和开发带来更大的便利和效率提升。你可以根据项目的实际情况,结合以上步骤和方法,进一步优化和完善利用持久化缓存的策略,以达到最佳的构建效果。同时,不断探索和实践新的方法和技术,以适应不断变化的前端开发环境和需求。
|
1月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
43 4
|
1月前
|
存储 缓存 API
LangChain-18 Caching 将回答内容进行缓存 可在内存中或数据库中持久化缓存
LangChain-18 Caching 将回答内容进行缓存 可在内存中或数据库中持久化缓存
42 6
|
2月前
|
存储 NoSQL Redis
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
Redis持久化、RDB和AOF方案、Redis主从集群、哨兵、分片集群、散列插槽、自动手动故障转移
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
|
4月前
|
canal 缓存 NoSQL
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;先删除缓存还是先修改数据库,双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
|
6月前
|
存储 缓存 NoSQL
缓存中的主要数据结构和持久化
【5月更文挑战第11天】Redis缓存数据库采用多种数据结构,如动态字符串、链表、字典、跳跃表、整数集合、压缩列表。动态字符串支持高效修改,链表用于列表,字典保存键值对,跳跃表实现有序集合,整数集合存储少量整数,压缩列表节省内存。Redis对象系统支持共享和内存管理,数据库通过键空间和过期策略管理键,过期键通过定时、惰性或定期删除。服务器使用文件事件处理器处理网络I/O,时间事件处理定时任务,如清理过期键。服务器以事件驱动方式运行,兼顾文件事件和时间事件。
148 1

热门文章

最新文章