前言
Spark RDD的算子分为转换算子(Transformation)和行动算子(Action)。
转换算子
转换算子分为:Value类型、双Value类型和K-V类型。
一、Value类型
1. map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(1, 2, 3, 4) ) val mapRDD = rdd.map(_*2) mapRDD.collect().foreach(println) sc.stop()
2. mapPartitions
以分区为单位对数据进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(1, 2, 3, 4), 2 ) val mapRDD = rdd.mapPartitions(datas => datas.map(_*2)) mapRDD.collect().foreach(println) sc.stop()
注意:
1、会将整个分区的数据加载到内存,如果处理完不被释放,在内存较小并且数据量较大的情况下,容易出现内存溢出(OOM)
2、可以实现一些特殊功能,比如取每个分区中最大值,map无法实现
3. mapPartitionsWithIndex
类似于mapPartitions,比mapPartitions多一个参数来表示分区号
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(1, 2, 3, 4), 2 ) val mapRDD = rdd.mapPartitionsWithIndex((index, datas) =>{ index match { case 1 => datas.map(_ * 2) case _ => datas } }) mapRDD.collect().foreach(println) sc.stop()
4. flatMap
将处理的数据进行扁平化后再进行映射处理,所以算子也称为扁平映射。返回一个可迭代的集合。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(List(1, 2), List(3, 4)) ) val fmRDD = rdd.flatMap( list => { list } ) fmRDD.collect().foreach(println) sc.stop()
当集合中的数据类型不同时,可以使用match case进行模式匹配,转换成集合类型。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(List(1, 2),3, List(3, 4)) ) val fmRDD = rdd.flatMap { case list: List[_] => list case d => List(d) } fmRDD.collect().foreach(println) sc.stop()
5. glom
将RDD中每一个分区变成一个数组,数组中元素类型与原分区中元素类型一致。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(1, 2, 3, 4),2 ) val gRDD = rdd.glom() gRDD.collect().foreach(data => println(data.mkString(","))) sc.stop()
6. groupBy
根据指定的规则进行分组,分区默认不变,数据会被打乱(shuffle)。极限情况下,数据可能会被分到同一个分区中。
一个分区可以有多个组,一个组只能在一个分区中。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(1, 2, 3, 4),2 ) // groupBy 会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组,相同的key值的数据会被放置在一个组中 def groupFunction(num:Int):Int = { num % 2 } val groupRDD = rdd.groupBy(groupFunction) groupRDD.collect().foreach(println) sc.stop()
7. filter
根据指定的规则进行筛选过滤,符合规则的数据保留,不符合的丢弃。
当数据进行筛选过滤后,分区不变,但是分区内数据可能不均衡,导致数据倾斜。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(1, 2, 3, 4),2 ) val filterRDD = rdd.filter(_ % 2 == 1) filterRDD.collect().foreach(println) sc.stop()
8. sample
根据指定规则从数据集中采样数据。通过它可以找到数据倾斜的key。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(1, 2, 3, 4),2 ) println(rdd.sample( true, 2 // 1 ).collect().mkString((","))) sc.stop()
9. distinct
将数据集中的数据去重。使用分布式处理方式实现,与内存集合使用HashSet去重方式不同。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(1, 2, 3, 4, 2, 3, 1),2 ) val distinctRDD = rdd.distinct() distinctRDD.collect().foreach(println) // 内存集合distinct的去重方式使用 HashSet 去重 // List(1,1,2,2).distinct sc.stop()
10. coalesce
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。
当Spark程序中存在过多的小任务时,可以通过coalesce合并分区,减少分区个数,进而减少任务调度成本。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(1, 2, 3, 4, 5, 6),3 ) // 无shuffle val coaRDD = rdd.coalesce(2) // 有shuffle // val coaRDD = rdd.coalesce(2, true) coaRDD.saveAsTextFile("output1") sc.stop()
注意:
1、coalesce默认不会将分区数据打乱重新组合,这种情况会导致数据不均衡,出现数据倾斜
2、可以设置第二个参数为true,进行shuffle处理,让数据均衡
3、扩大分区时,可以使用coalesce(,true)或者repartition
11. sortBy
根据指定规则进行排序,默认升序,设置第二个参数改变排序方式。
默认情况下,不会改变分区个数,但是中间存在shuffle处理。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD( List(4, 5, 1, 3, 2, 6),2 ) val sortRDD = rdd.sortBy(num => num) sortRDD.saveAsTextFile("output") sc.stop()
二、双Value类型
1. intersection
两个RDD求交集
2. union
两个RDD求并集
3. subtract
两个RDD求差集
4. zip
拉链操作,以键值对的形式进行合并。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd1 = sc.makeRDD(List(1,2,3,4)) val rdd2 = sc.makeRDD(List(3,4,5,6)) // 要求两个数据源数据类型保持一致 // 求交集 // val newRDD = rdd1.intersection(rdd2) // 求并集 : 只是合并不去重,要想去重可以使用 distinct 算子进行去重 // val newRDD = rdd1.union(rdd2) // 求差集 // val newRDD = rdd1.subtract(rdd2) // 拉链, 对应位置一对一映射,组成(key,value),需要每个对应分区上的数据个数相同 val newRDD = rdd1.zip(rdd2) println(newRDD.collect().mkString(",")) sc.stop()
注意:
1、intersection,union和subtract要求两个RDD中的数据类型保持一致
2、zip:不要求两个RDD中的数据类型保持一致,但要求分区个数以及对应分区上的数据个数保持一致
三、K-V类型
1. partitionBy
将数据按照指定artitioner重新进行分区,默认的分区器是HashPartitioner。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(1,2,3,4),2) val newRDD:RDD[(Int, Int)] = rdd.map((_, 1)) // partitionBy 根据指定的分区规则对数据进行重分区 newRDD.partitionBy(new HashPartitioner(2)) .saveAsTextFile("output") sc.stop()
2. reduceByKey
将数据按照相同的key对value进行聚合。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",2))) // reduceByKey 相同的Key的数据进行value数据的聚合操作 // scala 语言中一般的聚合都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合的 // reduceByKey 中如果key的数据只有一个,是不会参与运算的。 val reduceRDD = rdd.reduceByKey(_ + _) reduceRDD.collect().foreach(println) sc.stop()
3. groupByKey
将数据按照相同的key对value进行分组,形成一个对偶元祖。
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",2))) // groupByKey : 将数据源中的数据,相同的key的数据分到一个组中,形成一个对偶元组 // 元组中的第一个元素就是key,第二个元素就是相同key的value集合 val groupRDD = rdd.groupByKey() groupRDD.collect().foreach(println) sc.stop()
4. aggregateByKey
根据不同的规则进行分区内计算和分区间计算。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2) // 分区内和分区间的计算规则可以不同,也可以相同 rdd.aggregateByKey(0)( (x,y) => math.max(x,y), (x,y) => x + y ).collect().foreach(println) sc.stop()
5. foldByKey
aggregateByKey的简化操作,分区内和分区间的计算规则一样
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2) // 分区内和分区间的计算规则可以相同 // rdd.aggregateByKey(0)( // (x,y) => x + y, // (x,y) => x + y // ).collect().foreach(println) // 可以使用foldByKey来简化 rdd.foldByKey(0)(_+_).collect().foreach(println) sc.stop()
6. combineByKey
针对相同K,将V合并成一个集合。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3), ("b",4),("b",5),("a",6)),2) // 获取相同key的数据的平均值 => (a,3) (b,4) // combineByKey 需要三个参数 // 第一个参数表示:将相同key的第一个数据进行数据结构的转换,实现操作 // 第二个参数表示:分区内的计算规则 // 第三个参数表示:分区间的计算规则 val newRDD = rdd.combineByKey( v => (v, 1), // 转换为 tuple是在运行当中动态得到的,所以下面的tuple需要添加数据类型 (t:(Int, Int), v) => { (t._1 + v, t._2 + 1) }, (t1:(Int, Int), t2:(Int, Int)) => { (t1._1 + t2._1, t1._2 + t2._2) } ) val resultRDD = newRDD.mapValues { case (sum, cnt) => sum / cnt } sc.stop()
7. join
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同的key对应的所有元素连接在一起的(K,(V,W))的RDD。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4))) val rdd2 = sc.makeRDD(List(("a",5),("a",6),("e",8),("c",7))) // join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组。 // 如果两个数据源中key没有匹配上,那么数据不会出现在结果中。 // 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔积,数据量会几何性增长,会导致性能降低 rdd.join(rdd2).collect().foreach(println) sc.stop()
8. sortByKey
在一个(K,V)的RDD上调用,K必须实现ordered接口,返回一个按照key进行排序的(K,V)的RDD
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4))) //按照key对rdd中的元素进行排序,默认升序 rdd.sortByKey().collect().foreach(println) //降序 rdd.sortByKey(false).collect().foreach(println) sc.stop()
9. mapValues
针对于(K,V)形式的类型只对V进行操作
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4))) rdd.mapValues("pre_"+_).collect().foreach(println) sc.stop()
10. cogroup
相同的key,value分组后连接起来。
例子:
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Operator")) val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4))) val rdd2 = sc.makeRDD(List(("a",5),("a",6),("e",8),("c",7))) // cogroup : connect + group (分组,连接) // 可以有多个参数 rdd.cogroup(rdd2).collect().foreach(println) sc.stop()
行动算子
1. reduce
聚合RDD中的所有数据,先聚合分区内数据,在聚合分区间数据。
2. collect
采集,该方法会将不同分区间的数据按照分区顺序采集到Driver端,形成数组。
3. count
统计数据个数。
4. first
获取RDD中的第一个元素。
5. take
获取RDD前n个元素组成的数组。
6. takeOrdered
获取RDD排序后的前n个元素组成的数组。
7. aggregate
将每个分区里面的元素通过分区内逻辑和初始值进行聚合,然后用分区间逻辑和初始值(zeroValue)进行操作。注意:分区间逻辑再次使用初始值和aggregateByKey是有区别的。
8. fold
折叠操作,aggregate的简化操作,分区内逻辑和分区间逻辑相同。
9. countByValue
统计每个value的个数
10. countByKey
统计每种key的个数。
11. foreach
遍历RDD中每一个元素。
12. save
(1)saveAsTextFile(path)保存成Text文件
(2)saveAsSequenceFile(path) 保存成Sequencefile文件
(3)saveAsObjectFile(path) 序列化成对象保存到文件
例子:
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(1,2,3,4),2) // 行动算子,其实就是触发作业执行的方法 // 底层代码调用的是环境对象中 runJob 方法,调用dagScheduler的runJob方法创建ActiveJob,并提交执行。 // reduce // val i = rdd.reduce(_ + _) // println(i) // collect : 采集,该方法会将不同分区间的数据按照分区顺序采集到Driver端,形成数组 // val ints = rdd.collect() // ints.foreach(println) // count : 数据源中数据个数 // val l = rdd.count() // println(l) // first : 获取数据源中数据的第一个 // val i = rdd.first() // println(i) // take : 获取数据源中数据的前N个 // val ints = rdd.take(2) // println(ints.mkString(",")) // takeOrdered : 数据先排序,再取N个,默认升序排序,可以使用第二个参数列表(比如 : Ordering.Int.reverse)实现倒序功能 // val rdd1 = sc.makeRDD(List(4,3,2,1)) // val ints1 = rdd1.takeOrdered(2)(Ordering.Int.reverse) // println(ints1.mkString(",")) //aggregate // val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8) // println(rdd.aggregate(10)(_ + _, _ + _)) //fold 是aggregate的简化版 // rdd.fold(10)(_+_) //countByKey 统计每种key出现的次数 // val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c"))) // println(rdd.countByKey()) // val intToLong = rdd.countByValue() // println(intToLong) // save val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)),2) rdd.saveAsTextFile("output") rdd.saveAsObjectFile("output1") // saveAsSequenceFile 方法要求数据的格式必须为 K-V 键值对类型 rdd.saveAsSequenceFile("output2") sc.stop()
结语
以上就是帮大家总结的Spark常用算子了。好了,今天就为大家分享到这里了,如果本文对你有帮助的话,欢迎点赞&收藏&分享,这对我继续分享&创作优质文章非常重要。感谢🙏🏻