【Spark】【RDD】初次学习RDD 笔记 汇总 (2)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 【Spark】【RDD】初次学习RDD 笔记 汇总 (2)

键值对RDD


mapValues

val rdd = sc.parallelize(List("a","b","c","d"))
//通过map创建键值对
var rddp = rdd.map(x=>(x,1))
rddp.collect
rddp.keys.collect
rddp.values.collect
//通过mapValues让所有Value值加一
rddp.mapValues(x=>x+1).collect

q6.png

q5.png

val rdd1 = sc.parallelize(List("I am a student","Hello word","Just Play"))
val rdd2 = rdd1.map(x=>(x,992))
rdd2.collect
rdd2.keys.collect
rdd2.values
rdd2.values.collect

q4.png

val rdd3 = sc.parallelize(List("I am a student","Hello word","Just Play"))
val rdd4 = rdd1.map(x=>x.split(" "))
rdd4.collect
val p1=rdd4.map(x=>(x.split(" "),x))
p1.collect

join按键内连接

val rdd = sc.parallelize(List("a","b","c","d"))
//通过map创建键值对
var rddp = rdd.map(x=>(x,1))
//通过mapValues让所有Value值加一
var rdd1 = rddp.mapValues(x=>x+1)
//同理得到rdd2
val rdd2 = sc.parallelize(List("a","b","c","d","e")).map(x=>(x,1))
rdd1.collect
rdd2.collect
//使用join将rdd1和rdd2连接起来
rdd1.join(rdd2).collect
rdd2.join(rdd1).collect

q3.png

q2.png

leftOuterJoin和rightOuterJoin和fullOuterJoin

rightOuterJoin 右外连接。第二个RDD的键必须存在

leftOuterJoin 左外连接。第一个RDD的键必须存在

fullOuterJoin 全外连接。两个键都要有

//rdd1和rdd2延用上方的
rdd1.collect
rdd2.collect
//右外连接
rdd1.rightOuterJoin(rdd2).collect
//左外连接
rdd1.leftOuterJoin(rdd2).collect
//全外连接
rdd1.fullOuterJoin(rdd2).collect

q1.png

zip

作用:组合两个RDD为键值对RDD

  • 两个RDD的分区数必须相同(查询分区数rdd.partitions.size)
  • 两个RDD的元素个数必须相同
val rdd1  = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(List("a","b","c"))
rdd1.collect
rdd2.collect
rdd2.zip(rdd1).collect
rdd1.zip(rdd2).collect
rdd1.partitions.size
rdd2.partitions.size
val rdd3  = sc.parallelize(1 to 3,3)//3是指的分区数
val rdd4 = sc.parallelize(List("a","b","c"),3)//3是指的分区数
rdd3.partitions.size
rdd4.partitions.size

q6.png

q5.png

CombineByKey

合并相同键的值,合并后值的类型可以不同

目标:想将值转换为List类型

groupByKey([numPartitions])

按键分组,在(K,V)对组成的RDD上调用时,返回(K,Iterable)对组成的新的RDD。

val rdd1 = sc.parallelize(List("A","B","C","C","C","D","D")).map(x=>(x,1))
rdd1.groupByKey().collect
rdd1.groupByKey().collect()

q4.png

reduceByKey(func, [numPartitions])

将键值对RDD按键分组后进行聚合(Key相同,则只保留一个Key,值+1)

  • 当在(K,V)类型的键值对组成的RDD上调用时,返回一个(K,V)类型键值对组成的新RDD
  • 其中新RDD每个键的值使用给定的reduce函数func进行聚合,该函数必须是(V,V)=>V类型
  • 可用来统计每个键出现的次数
val rdd1 = sc.parallelize(List("A","B","C","C","C","D","D")).map(x=>(x,1))
rdd1.reduceByKey((x,y)=>x+y).collect
rdd1.reduceByKey((x,y)=>x+y).collect()

q3.png

文件读取与存储


结构名称 结构化 描述
文本文件 普通文本文件,每一行一条记录
SequenceFile 用于键值对数据的常见Hadoop文件格式
rdd.partitions.size

saveAsTextFile(Path:String)

把RDD保存到HDFS中

val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1.saveAsTextFile("/302Spark/savetext")
//输入IP:50070 查询

q2.png

q1.png

q6.png

saveAsSequenceFile and sc.sequenceFile

saveAsSequenceFile(Path:String)

序列化文件,仅支持键值对RDD

sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int)

读序列化文件:

//为了让Spark支持Hadoop的数据类型,需要导包
import org.apache.hadoop.io.
sc.sequenceFile(Path:String,KeyClass:key[K])

实例

//序列化文件储存
val rdd = sc.parallelize(List(("panda",3),("dog",6),("cat",3)))
rdd.saveAsSequenceFile("/hadoop-zwj25/testSeq")
rdd.partitions.size

q5.png

q4.png

q3.png

q2.png

//查看序列化文件
hdfs dfs -ls /hadoop-zwj25
hdfs dfs -ls /hadoop-zwj25/testSeq
hdfs dfs -cat /hadoop-zwj25/testSeq/part-00000

q1.png

//引入hadoop数据类型
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
//序列化文件读取
//第1个classOf[Text]中的Text是键的类型
//第2个classOf[IntSWritable]中的IntSWritable是值的类型
val output = sc.sequenceFile("/hadoop-zwj25/testSeq",classOf[Text],classOf[IntWritable])
output.map{case(x,y)=>(x.toString,y.get())}.collect
rdd.collect
val rddtest = sc.parallelize(List(1,2,3))
rddtest.map{case 1=>"One";case 2=>"Two";case _=>"other"}.collect
rddtest.map{case x=>(x,"a")}.collect

q6.png

repartition() 重新分区

repartition(numPartitions: Int)

  • 可以增加或减少此RDD中的并行级别。在内部,它使用shuffle重新分发数据。
  • 如果要减少此RDD中的分区数,请考虑使用coalesce,这样可以避免执行shuffle。
coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

查询分区:partitions.size

rdd.repartition(numPartitions:Int).partitions.size

减少分区数,请考虑使用coalesce,这样可以避免执行

val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1.saveAsTextFile("/302Spark/savetext")
//输入IP:50070 查询
//---------------------------------------
rdd1.partitions.size
rdd1.repartition(1).partitions.size
rdd1.repartition(1).saveAsTextFile("/302Spark/savetext1")

q5.png

q4.png

练习


Practice01

题目

找出考试成绩得过100分的学生ID,最终的结果需要集合到一个RDD中。

素材

请将下面代码块中内容粘贴到文本文档result_bigdata.txt中

1001  大数据基础 90
1002  大数据基础 94
1003  大数据基础 100
1004  大数据基础 99
1005  大数据基础 90
1006  大数据基础 94
1007  大数据基础 100
1008  大数据基础 93
1009  大数据基础 89
1010  大数据基础 78
1011  大数据基础 91
1012  大数据基础 84

代码

//从本地文件创建RDD
val rdd_bigdata = sc.textFile("file:///home/用户名/result_bigdata.txt")
//随便输出一个测试
rdd_bigdata.take(2)
//查看所有结果
rdd_bigdata.collect
//下面这种方法需要转为Int型
val bigdata_100=rdd_bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(1),x(2).toInt)).filter(x=>x._3==100).map(x=>x._1)
bigdata_100.collect
//下面这种方法无需转为Int型
val bigdata_100=rdd_bigdata.map(x=>x.split("\t")).filter(x=>x(2)=="100").map(x=>x(0))
bigdata_100.collect

Practice02

题目

输出每位学生的总成绩,要求将两个成绩表中学生ID相同的成绩相加。

素材

请将下面代码块中内容粘贴到文本文档score.txt中

math John 90
math Betty 88
math Mike 95
math Lily 92
chinese John 78
chinese Betty 80
chinese Mike 88
chinese Lily 85
english John 92
english Betty 84
english Mike 90
english Lily 85

请将下面代码块中内容粘贴到文本文档result_math.txt中

1001  应用数学  96
1002  应用数学  94
1003  应用数学  100
1004  应用数学  100
1005  应用数学  94
1006  应用数学  80
1007  应用数学  90
1008  应用数学  94
1009  应用数学  84
1010  应用数学  86
1011  应用数学  79
1012  应用数学  91

请将下面代码块中内容粘贴到文本文档result_bigdata.txt中

1001  大数据基础 90
1002  大数据基础 94
1003  大数据基础 100
1004  大数据基础 99
1005  大数据基础 90
1006  大数据基础 94
1007  大数据基础 100
1008  大数据基础 93
1009  大数据基础 89
1010  大数据基础 78
1011  大数据基础 91
1012  大数据基础 84

代码

//从本地文件创建RDD
val rdd_bigdata = sc.textFile("file:///home/hadoop-zwj25/result_bigdata.txt")
val rdd_math = sc.textFile("file:///home/hadoop-zwj25/result_math.txt")
//返回RDD中所有的元素
rdd_bigdata.collect
rdd_math.collect
//合并两个RDD
val rddall = rdd_math.union(rdd_bigdata)
rddall.collect
rddall.map(x=>(x.split("\t"))).map(x=>(x(0),x(2).toInt)).reduceByKey((x,y)=>x+y).collect

q3.png

q2.png

Practice03

题目

1.输出每位同学的平均成绩,要求将两个成绩表中学生ID相同的成绩相加并计算出平均分。

2.合并每位同学的总成绩和平均成绩

完成平均分及合并任务

素材

代码

//1.创建RDD并转换
val math_map = math.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt))
math_map.collect
val bigdata=sc.textFile("file:///home/hadoop-zwj25/result_bigdata.txt")
val bigdata_map = bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt))
bigdata_map.collect
//将两个键值对RDD合并,使用take随机测试一个
math_map.union(bigdata_map).take(3)
//为成绩添加成绩计数,从1开始,完成后的格式是 (ID,(成绩,计数))
math_map.union(bigdata_map).mapValues(x=>(x,1)).take(3)
//注意,转换后的类型需和原值类型保持一致
//reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2)))解释如下
//x是原来的成绩,y是计数的
//(x._1+y._1)代表总成绩(两门学科成绩相加)
//(x._2+y._2)代表总门数(两个计数值相加)
math_map.union(bigdata_map).mapValues(x=>(x,1)).reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).take(3)
//将合并后RDD按键 分组 并 聚合
//平均成绩为 总成绩/学科数
//x_1是ID ; x._2 是(成绩,计数值)
//所以求平均成绩为 (x._2._1/x._2._2)
val pj = math_map.union(bigdata_map).mapValues(x=>(x,1)).reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).map(x=>(x._1,(x._2._1/x._2._2)))
//查询结果
pj.collect
//输出总成绩
val zf = math_map.union(bigdata_map).reduceByKey((x,y)=>x+y)
zf.collect
pj.count
zf.count
//合并每个学生的总分与平均分
zf.join(pj).count

q1.png


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
12天前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
61 0
|
1月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
39 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
1月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
28 0
|
1月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
16 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
31 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
54 0
|
1月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
39 0
|
21天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
56 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
42 0