RDD算子
转换算子
- map(func) 对RDD数据集中的每个元素都使用func,返回一个新的RDD
val sparkconf = new SparkConf().setAppName("Student_score").setMaster("local[2]")
val sc = new SparkContext(sparkconf)
val distData = sc.parallelize(List(1,3,5,45,3,67))
val sq_dist =distData.map(x =>x * x)
println(sq_dist.collect().toBuffer)
- filter(func) 选出所有func返回值为true的元素,生成一个新的分布式数据集返回
val listRDD = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val retRDD = listRDD.filter(num => num % 2 == 0)
println(retRDD.collect().toBuffer)
- flatMap(func) 与map()类似,但flatmap生成的是多个结果
val listRDD = sc.parallelize(List("hello you", "hello he", "hello me"))
val wordsRDD = listRDD.flatMap(line => line.split(" "))
println(wordsRDD.collect().toBuffer)
- union(otherDataset) 将两个元素合并成一个,不去重,而且RDD中元素的值的个数和类型保持一致
val rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
val rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
println(rdd1.union(rdd2).collect().toBuffer)
- redeceByKey(func, [numTasks]): 分组+聚合 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集, key相同的值,都被使用指定的reduce函数聚合到一起。返回的结果是一个数据集
val r_rdd = sc.parallelize(List(("a",1),("a",2),("b",1),("c",1),("c",1),("a",2))).map(x =>(x._1,x._2))
val re_rdd = r_rdd.reduceByKey((a,b) => a+b)
println(re_rdd.collect().toBuffer)
- groupByKey():对数组进行 group by key操作
val g_rdd = r_rdd.groupByKey()
println(g_rdd.collect().toBuffer)
println(g_rdd.map(x =>(x._1,x._2.size)).collect().toBuffer)
- join():对两个RDD进行内连接
将两个RDD中键相同的数据存放在一个元组中,最后返回两个RDD都存在的键的连接结果
(K,V) (K,W) join (K,(V,W))
val j_rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
val j_rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
val j_rdd = j_rdd1.join(j_rdd2)
println(j_rdd.collect().toBuffer)
- rightOuterJoin(): 右外连接,如果左边RDD中有对应的值,连接结果在显示Some类型,如果没有,则为None值
val j_rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
val j_rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
val righr_join = j_rdd1.rightOuterJoin(j_rdd2)
println(righr_join.collect().toBuffer)
- leftOuterJoin(): 左外连接,与右外相反
val j_rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
val j_rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
val left_join = j_rdd1.leftOuterJoin(j_rdd2)
println(left_join.collect().toBuffer)
- fullOuterJoin():全外连接,会保留两个连接的RDD中所有的键的结果
val j_rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
val j_rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
val full_join = j_rdd1.fullOuterJoin(j_rdd2)
println(full_join.collect().toBuffer)
- zip转换(拉链):用于将两个RDD组成的Key/value形式的RDD,要求两个RDD的partition数量和元素数量相同
var z_rdd1 = sc.parallelize(1 to 5)
val z_rdd2 = sc.parallelize(Seq("A","B","C","D","E"))
println(z_rdd1.zip(z_rdd2).collect().toBuffer)
println(z_rdd2.zip(z_rdd1).collect().toBuffer)
- distinct(): 去重
val rdd = sc.makeRDD(List(("a",1),("b",1),("c",1)))
println(rdd.distinct().collect().toBuffer)
- intersction:求出两个RDD的共同元素,交集
val c_rdd1 = sc.parallelize(List(("a",1),("b",1),("a",1),("c",1)))
val c_rdd2 = sc.parallelize(List(("a",1),("b",1),("d",1)))
println(c_rdd1.intersection(c_rdd2).collect().toBuffer)
- subtract:用于将前一个RDD中在后一个RDD出现的元素删除
val d_rdd1 = sc.parallelize(List(("a",1),("b",1),("c",1)))
val d_rdd2 = sc.parallelize(List(("d",1),("e",1),("c",1)))
println(d_rdd1.subtract(d_rdd2).collect().toBuffer)
- cartesian:将两个集合的元素合并成一组
val rdd01 = sc.parallelize(List(1,3,5,3))
val rdd02 = sc.parallelize(List(2,4,5,1))
println(rdd01.cartesian(rdd02).collect().toBuffer)
- keys,values 转换操作 keys返回一个仅包含键的RDD values返回一个仅包含值的RDD
val line_rdd = sc.parallelize(List("this is a test","how are you","i am fine","can you tell me"))
val words = line_rdd.map(x =>(x.split(" ")(0),x))
println(words.keys.collect().toBuffer)
println(words.values.collect().toBuffer)
- sortBy(): 对RDD 排序,有三个参数
第一个参数:func:(T) => K,左边是要被排序的每一个元素,右边返回的值是元素中要进行排序的值
第二个参数ascending:升序还是降序,默认升序true,反正false
第三个numPartitions:决定排序后的分区个数,默认排序后分区个数与排序前相等
val data = sc.parallelize(List((1,3),(45,3),(7,6)))
val sort_data = data.sortBy(x => x._2, ascending = false, numPartitions = 1)
println(sort_data.collect.toBuffer)
- combineByKey:用于将相同键的数据聚合,并且允许返回类型与输入数据类型不同的返回值
val sparkconf = new SparkConf().setAppName("test").setMaster("local[2]")
val sc = new SparkContext(sparkconf)
// 对一个含有多个相同键值对的数据的平均值
val test = sc.parallelize(List(("panda",1),("panda",8),("pink",4),("pink",8),("pirate",5)))
val cb_test = test.combineByKey(
count => (count,1 ),
(acc:(Int,Int),count) => (acc._1+count,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
)
println(cb_test.map(x =>(x._1,x._2._1.toDouble/x._2._2)).collect().toBuffer)
}
行动算子
- reduce(func):通过函数func聚集数据集中的所有元素。Func函数接受两个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行。
val sparkconf = new SparkConf().setAppName("Student_score").setMaster("local[2]")
val sc = new SparkContext(sparkconf)
val rdd = sc.makeRDD(1 to 10,2)
println(rdd.reduce(_+_))
val rdd2 = sc.makeRDD(Array(("a",1),("b",3),("c",3),("d",5)))
println(rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2)))
- collect(): 以数组的形式返回数据集中的所有元素
println(d_rdd1.subtract(d_rdd2).collect().toBuffer)
- count(): 返回数据集中所有元素的个数
val sparkconf = new SparkConf().setAppName("Student_score").setMaster("local[2]")
val sc = new SparkContext(sparkconf)
val list = sc.parallelize(List(1,2,3,4))
val result =list.count()
println(result)
- take(n): 以数组的形式返回数据集中的前n个元素
val sparkconf = new SparkConf().setAppName("Student_score").setMaster("local[2]")
val sc = new SparkContext(sparkconf)
val data = sc.parallelize(1 to 10)
println(data.take(5).toBuffer)