RDD算子

简介: RDD算子

RDD算子

转换算子

  1. map(func) 对RDD数据集中的每个元素都使用func,返回一个新的RDD
  val sparkconf = new SparkConf().setAppName("Student_score").setMaster("local[2]")
  val sc = new SparkContext(sparkconf)
  val distData = sc.parallelize(List(1,3,5,45,3,67))
  val sq_dist =distData.map(x =>x * x)
  println(sq_dist.collect().toBuffer)
  1. filter(func) 选出所有func返回值为true的元素,生成一个新的分布式数据集返回
    val listRDD = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    val retRDD = listRDD.filter(num => num % 2 == 0)
    println(retRDD.collect().toBuffer)
  1. flatMap(func) 与map()类似,但flatmap生成的是多个结果
    val listRDD = sc.parallelize(List("hello you", "hello he", "hello me"))
    val wordsRDD = listRDD.flatMap(line => line.split(" "))
    println(wordsRDD.collect().toBuffer)
  1. union(otherDataset) 将两个元素合并成一个,不去重,而且RDD中元素的值的个数和类型保持一致
    val rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
    val rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
    println(rdd1.union(rdd2).collect().toBuffer)
  1. redeceByKey(func, [numTasks]): 分组+聚合 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集, key相同的值,都被使用指定的reduce函数聚合到一起。返回的结果是一个数据集
    val r_rdd = sc.parallelize(List(("a",1),("a",2),("b",1),("c",1),("c",1),("a",2))).map(x =>(x._1,x._2))
    val re_rdd = r_rdd.reduceByKey((a,b) => a+b)
    println(re_rdd.collect().toBuffer)
  1. groupByKey():对数组进行 group by key操作
    val g_rdd = r_rdd.groupByKey()
    println(g_rdd.collect().toBuffer)
    println(g_rdd.map(x =>(x._1,x._2.size)).collect().toBuffer)
  1. join():对两个RDD进行内连接
    将两个RDD中键相同的数据存放在一个元组中,最后返回两个RDD都存在的键的连接结果
    (K,V) (K,W) join (K,(V,W))
    val j_rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
    val j_rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
    val j_rdd = j_rdd1.join(j_rdd2)
    println(j_rdd.collect().toBuffer)
  1. rightOuterJoin(): 右外连接,如果左边RDD中有对应的值,连接结果在显示Some类型,如果没有,则为None值
    val j_rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
    val j_rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
    val righr_join = j_rdd1.rightOuterJoin(j_rdd2)
    println(righr_join.collect().toBuffer)
  1. leftOuterJoin(): 左外连接,与右外相反
    val j_rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
    val j_rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
    val left_join = j_rdd1.leftOuterJoin(j_rdd2)
    println(left_join.collect().toBuffer)
  1. fullOuterJoin():全外连接,会保留两个连接的RDD中所有的键的结果
    val j_rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
    val j_rdd2 = sc.parallelize(List(("a",1),("d",4),("e",5)))
    val full_join = j_rdd1.fullOuterJoin(j_rdd2)
    println(full_join.collect().toBuffer)
  1. zip转换(拉链):用于将两个RDD组成的Key/value形式的RDD,要求两个RDD的partition数量和元素数量相同
    var z_rdd1 = sc.parallelize(1 to 5)
    val z_rdd2 = sc.parallelize(Seq("A","B","C","D","E"))
    println(z_rdd1.zip(z_rdd2).collect().toBuffer)
    println(z_rdd2.zip(z_rdd1).collect().toBuffer)
  1. distinct(): 去重
    val rdd = sc.makeRDD(List(("a",1),("b",1),("c",1)))
    println(rdd.distinct().collect().toBuffer)
  1. intersction:求出两个RDD的共同元素,交集
    val c_rdd1 = sc.parallelize(List(("a",1),("b",1),("a",1),("c",1)))
    val c_rdd2 = sc.parallelize(List(("a",1),("b",1),("d",1)))
    println(c_rdd1.intersection(c_rdd2).collect().toBuffer)
  1. subtract:用于将前一个RDD中在后一个RDD出现的元素删除
    val d_rdd1 = sc.parallelize(List(("a",1),("b",1),("c",1)))
    val d_rdd2 = sc.parallelize(List(("d",1),("e",1),("c",1)))
    println(d_rdd1.subtract(d_rdd2).collect().toBuffer)
  1. cartesian:将两个集合的元素合并成一组
    val rdd01 = sc.parallelize(List(1,3,5,3))
    val rdd02 = sc.parallelize(List(2,4,5,1))
    println(rdd01.cartesian(rdd02).collect().toBuffer)
  1. keys,values 转换操作 keys返回一个仅包含键的RDD values返回一个仅包含值的RDD
    val line_rdd = sc.parallelize(List("this is a test","how are you","i am fine","can you tell me"))
    val words = line_rdd.map(x =>(x.split(" ")(0),x))
    println(words.keys.collect().toBuffer)
    println(words.values.collect().toBuffer)
  1. sortBy(): 对RDD 排序,有三个参数
    第一个参数:func:(T) => K,左边是要被排序的每一个元素,右边返回的值是元素中要进行排序的值
    第二个参数ascending:升序还是降序,默认升序true,反正false
    第三个numPartitions:决定排序后的分区个数,默认排序后分区个数与排序前相等
    val data = sc.parallelize(List((1,3),(45,3),(7,6)))
    val sort_data = data.sortBy(x => x._2, ascending = false, numPartitions = 1)
    println(sort_data.collect.toBuffer)
  1. combineByKey:用于将相同键的数据聚合,并且允许返回类型与输入数据类型不同的返回值
    val sparkconf = new SparkConf().setAppName("test").setMaster("local[2]")
    val sc = new SparkContext(sparkconf)
    // 对一个含有多个相同键值对的数据的平均值
    val test = sc.parallelize(List(("panda",1),("panda",8),("pink",4),("pink",8),("pirate",5)))
    val cb_test = test.combineByKey(
      count => (count,1 ),
      (acc:(Int,Int),count) => (acc._1+count,acc._2+1),
      (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
    )
    println(cb_test.map(x =>(x._1,x._2._1.toDouble/x._2._2)).collect().toBuffer)
  }

行动算子

  1. reduce(func):通过函数func聚集数据集中的所有元素。Func函数接受两个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行。
    val sparkconf = new SparkConf().setAppName("Student_score").setMaster("local[2]")
    val sc = new SparkContext(sparkconf)
    val rdd = sc.makeRDD(1 to 10,2)
    println(rdd.reduce(_+_))
    val rdd2 = sc.makeRDD(Array(("a",1),("b",3),("c",3),("d",5)))
    println(rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2)))
  1. collect(): 以数组的形式返回数据集中的所有元素
    println(d_rdd1.subtract(d_rdd2).collect().toBuffer)
  1. count(): 返回数据集中所有元素的个数
    val sparkconf = new SparkConf().setAppName("Student_score").setMaster("local[2]")
    val sc = new SparkContext(sparkconf)
    val list = sc.parallelize(List(1,2,3,4))
    val result =list.count()
    println(result)
  1. take(n): 以数组的形式返回数据集中的前n个元素
    val sparkconf = new SparkConf().setAppName("Student_score").setMaster("local[2]")
    val sc = new SparkContext(sparkconf)
    val data = sc.parallelize(1 to 10)
    println(data.take(5).toBuffer)
相关文章
|
6月前
|
分布式计算 Spark
[Spark精进]必须掌握的4个RDD算子之flatMap算子
[Spark精进]必须掌握的4个RDD算子之flatMap算子
106 0
|
6月前
|
分布式计算 Scala Spark
[Spark精进]必须掌握的4个RDD算子之map算子
[Spark精进]必须掌握的4个RDD算子之map算子
87 0
|
分布式计算 算法 Java
Spark shuffle、RDD 算子【重要】
Spark shuffle、RDD 算子【重要】
349 0
|
分布式计算 数据处理 Spark
RDD 中 groupByKey 和 reduceByKey 哪个性能好,为什么?
RDD 中 groupByKey 和 reduceByKey 哪个性能好,为什么?
138 0
|
分布式计算 大数据 开发者
Rdd 算子_转换_groupbykey | 学习笔记
快速学习 Rdd 算子_转换_groupbykey
143 0
Rdd 算子_转换_groupbykey | 学习笔记
|
分布式计算 大数据 Scala
RDD 算子_转换_ foldByKey | 学习笔记
快速学习 RDD 算子_转换_ foldByKey
160 0
RDD 算子_转换_  foldByKey | 学习笔记
|
分布式计算 算法 大数据
RDD 算子_转换_ combineByKey | 学习笔记
快速学习 RDD 算子_转换_ combineByKey
126 0
RDD 算子_转换_ combineByKey | 学习笔记
|
分布式计算 大数据 Spark
Rdd 算子_转换_回顾 | 学习笔记
快速学习 Rdd 算子_转换_回顾
Rdd 算子_转换_回顾 | 学习笔记
|
分布式计算 大数据 开发者
RDD 算子_转换_ aggregateByKey | 学习笔记
快速学习 RDD 算子_转换_ aggregateByKey
109 0
RDD 算子_转换_ aggregateByKey | 学习笔记
|
分布式计算 算法 大数据
Rdd 算子_转换_mapvalues | 学习笔记
快速学习 Rdd 算子_转换_mapvalues
130 0
Rdd 算子_转换_mapvalues | 学习笔记