键值对RDD
mapValues
val rdd = sc.parallelize(List("a","b","c","d")) //通过map创建键值对 var rddp = rdd.map(x=>(x,1)) rddp.collect rddp.keys.collect rddp.values.collect //通过mapValues让所有Value值加一 rddp.mapValues(x=>x+1).collect
val rdd1 = sc.parallelize(List("I am a student","Hello word","Just Play")) val rdd2 = rdd1.map(x=>(x,992)) rdd2.collect rdd2.keys.collect rdd2.values rdd2.values.collect
val rdd3 = sc.parallelize(List("I am a student","Hello word","Just Play")) val rdd4 = rdd1.map(x=>x.split(" ")) rdd4.collect val p1=rdd4.map(x=>(x.split(" "),x)) p1.collect
join按键内连接
val rdd = sc.parallelize(List("a","b","c","d")) //通过map创建键值对 var rddp = rdd.map(x=>(x,1)) //通过mapValues让所有Value值加一 var rdd1 = rddp.mapValues(x=>x+1) //同理得到rdd2 val rdd2 = sc.parallelize(List("a","b","c","d","e")).map(x=>(x,1)) rdd1.collect rdd2.collect //使用join将rdd1和rdd2连接起来 rdd1.join(rdd2).collect rdd2.join(rdd1).collect
leftOuterJoin和rightOuterJoin和fullOuterJoin
rightOuterJoin 右外连接。第二个RDD的键必须存在
leftOuterJoin 左外连接。第一个RDD的键必须存在
fullOuterJoin 全外连接。两个键都要有
//rdd1和rdd2延用上方的 rdd1.collect rdd2.collect //右外连接 rdd1.rightOuterJoin(rdd2).collect //左外连接 rdd1.leftOuterJoin(rdd2).collect //全外连接 rdd1.fullOuterJoin(rdd2).collect
zip
作用:组合两个RDD为键值对RDD
- 两个RDD的分区数必须相同(查询分区数rdd.partitions.size)
- 两个RDD的元素个数必须相同
val rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(List("a","b","c")) rdd1.collect rdd2.collect rdd2.zip(rdd1).collect rdd1.zip(rdd2).collect rdd1.partitions.size rdd2.partitions.size val rdd3 = sc.parallelize(1 to 3,3)//3是指的分区数 val rdd4 = sc.parallelize(List("a","b","c"),3)//3是指的分区数 rdd3.partitions.size rdd4.partitions.size
CombineByKey
合并相同键的值,合并后值的类型可以不同
目标:想将值转换为List类型
groupByKey([numPartitions])
按键分组,在(K,V)对组成的RDD上调用时,返回(K,Iterable)对组成的新的RDD。
val rdd1 = sc.parallelize(List("A","B","C","C","C","D","D")).map(x=>(x,1)) rdd1.groupByKey().collect rdd1.groupByKey().collect()
reduceByKey(func, [numPartitions])
将键值对RDD按键分组后进行聚合(Key相同,则只保留一个Key,值+1)
- 当在(K,V)类型的键值对组成的RDD上调用时,返回一个(K,V)类型键值对组成的新RDD
- 其中新RDD每个键的值使用给定的reduce函数func进行聚合,该函数必须是(V,V)=>V类型
- 可用来统计每个键出现的次数
val rdd1 = sc.parallelize(List("A","B","C","C","C","D","D")).map(x=>(x,1)) rdd1.reduceByKey((x,y)=>x+y).collect rdd1.reduceByKey((x,y)=>x+y).collect()
文件读取与存储
结构名称 | 结构化 | 描述 |
文本文件 | 否 | 普通文本文件,每一行一条记录 |
SequenceFile | 是 | 用于键值对数据的常见Hadoop文件格式 |
rdd.partitions.size
saveAsTextFile(Path:String)
把RDD保存到HDFS中
val rdd1 = sc.parallelize(List(1,2,3,4)) rdd1.saveAsTextFile("/302Spark/savetext") //输入IP:50070 查询
saveAsSequenceFile and sc.sequenceFile
saveAsSequenceFile(Path:String)
序列化文件,仅支持键值对RDD
sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int)
读序列化文件:
//为了让Spark支持Hadoop的数据类型,需要导包 import org.apache.hadoop.io. sc.sequenceFile(Path:String,KeyClass:key[K])
实例
//序列化文件储存 val rdd = sc.parallelize(List(("panda",3),("dog",6),("cat",3))) rdd.saveAsSequenceFile("/hadoop-zwj25/testSeq") rdd.partitions.size
//查看序列化文件 hdfs dfs -ls /hadoop-zwj25 hdfs dfs -ls /hadoop-zwj25/testSeq hdfs dfs -cat /hadoop-zwj25/testSeq/part-00000
//引入hadoop数据类型 import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable //序列化文件读取 //第1个classOf[Text]中的Text是键的类型 //第2个classOf[IntSWritable]中的IntSWritable是值的类型 val output = sc.sequenceFile("/hadoop-zwj25/testSeq",classOf[Text],classOf[IntWritable]) output.map{case(x,y)=>(x.toString,y.get())}.collect rdd.collect val rddtest = sc.parallelize(List(1,2,3)) rddtest.map{case 1=>"One";case 2=>"Two";case _=>"other"}.collect rddtest.map{case x=>(x,"a")}.collect
repartition() 重新分区
repartition(numPartitions: Int)
- 可以增加或减少此RDD中的并行级别。在内部,它使用shuffle重新分发数据。
- 如果要减少此RDD中的分区数,请考虑使用coalesce,这样可以避免执行shuffle。
coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
查询分区:partitions.size
rdd.repartition(numPartitions:Int).partitions.size
减少分区数,请考虑使用coalesce,这样可以避免执行
val rdd1 = sc.parallelize(List(1,2,3,4)) rdd1.saveAsTextFile("/302Spark/savetext") //输入IP:50070 查询 //--------------------------------------- rdd1.partitions.size rdd1.repartition(1).partitions.size rdd1.repartition(1).saveAsTextFile("/302Spark/savetext1")
练习
Practice01
题目
找出考试成绩得过100分的学生ID,最终的结果需要集合到一个RDD中。
素材
请将下面代码块中内容粘贴到文本文档result_bigdata.txt中
1001 大数据基础 90 1002 大数据基础 94 1003 大数据基础 100 1004 大数据基础 99 1005 大数据基础 90 1006 大数据基础 94 1007 大数据基础 100 1008 大数据基础 93 1009 大数据基础 89 1010 大数据基础 78 1011 大数据基础 91 1012 大数据基础 84
代码
//从本地文件创建RDD val rdd_bigdata = sc.textFile("file:///home/用户名/result_bigdata.txt") //随便输出一个测试 rdd_bigdata.take(2) //查看所有结果 rdd_bigdata.collect //下面这种方法需要转为Int型 val bigdata_100=rdd_bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(1),x(2).toInt)).filter(x=>x._3==100).map(x=>x._1) bigdata_100.collect //下面这种方法无需转为Int型 val bigdata_100=rdd_bigdata.map(x=>x.split("\t")).filter(x=>x(2)=="100").map(x=>x(0)) bigdata_100.collect
Practice02
题目
输出每位学生的总成绩,要求将两个成绩表中学生ID相同的成绩相加。
素材
请将下面代码块中内容粘贴到文本文档score.txt中
math John 90 math Betty 88 math Mike 95 math Lily 92 chinese John 78 chinese Betty 80 chinese Mike 88 chinese Lily 85 english John 92 english Betty 84 english Mike 90 english Lily 85
请将下面代码块中内容粘贴到文本文档result_math.txt中
1001 应用数学 96 1002 应用数学 94 1003 应用数学 100 1004 应用数学 100 1005 应用数学 94 1006 应用数学 80 1007 应用数学 90 1008 应用数学 94 1009 应用数学 84 1010 应用数学 86 1011 应用数学 79 1012 应用数学 91
请将下面代码块中内容粘贴到文本文档result_bigdata.txt中
1001 大数据基础 90 1002 大数据基础 94 1003 大数据基础 100 1004 大数据基础 99 1005 大数据基础 90 1006 大数据基础 94 1007 大数据基础 100 1008 大数据基础 93 1009 大数据基础 89 1010 大数据基础 78 1011 大数据基础 91 1012 大数据基础 84
代码
//从本地文件创建RDD val rdd_bigdata = sc.textFile("file:///home/hadoop-zwj25/result_bigdata.txt") val rdd_math = sc.textFile("file:///home/hadoop-zwj25/result_math.txt") //返回RDD中所有的元素 rdd_bigdata.collect rdd_math.collect
//合并两个RDD val rddall = rdd_math.union(rdd_bigdata) rddall.collect rddall.map(x=>(x.split("\t"))).map(x=>(x(0),x(2).toInt)).reduceByKey((x,y)=>x+y).collect
Practice03
题目
1.输出每位同学的平均成绩,要求将两个成绩表中学生ID相同的成绩相加并计算出平均分。
2.合并每位同学的总成绩和平均成绩
完成平均分及合并任务
素材
代码
//1.创建RDD并转换 val math_map = math.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt)) math_map.collect val bigdata=sc.textFile("file:///home/hadoop-zwj25/result_bigdata.txt") val bigdata_map = bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt)) bigdata_map.collect //将两个键值对RDD合并,使用take随机测试一个 math_map.union(bigdata_map).take(3) //为成绩添加成绩计数,从1开始,完成后的格式是 (ID,(成绩,计数)) math_map.union(bigdata_map).mapValues(x=>(x,1)).take(3) //注意,转换后的类型需和原值类型保持一致 //reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2)))解释如下 //x是原来的成绩,y是计数的 //(x._1+y._1)代表总成绩(两门学科成绩相加) //(x._2+y._2)代表总门数(两个计数值相加) math_map.union(bigdata_map).mapValues(x=>(x,1)).reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).take(3) //将合并后RDD按键 分组 并 聚合 //平均成绩为 总成绩/学科数 //x_1是ID ; x._2 是(成绩,计数值) //所以求平均成绩为 (x._2._1/x._2._2) val pj = math_map.union(bigdata_map).mapValues(x=>(x,1)).reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).map(x=>(x._1,(x._2._1/x._2._2))) //查询结果 pj.collect //输出总成绩 val zf = math_map.union(bigdata_map).reduceByKey((x,y)=>x+y) zf.collect pj.count zf.count //合并每个学生的总分与平均分 zf.join(pj).count