Action行动算子
行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。
创建包名:com.zhm.spark.operator.action
1)reduce
聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
package sparkRDDaction.com import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_action { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) val reduceResult: Int = listRDD.reduce(_ + _) println(reduceResult) //10 } }
2) collect
在驱动程序(Driver)中,以数组Array的形式返回数据集的所有元素
package sparkRDDaction.com import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_action { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) val collectResult: Array[Int] = listRDD.collect() println(collectResult.mkString(" ,")) //1 ,2 ,3 ,4 } }
3) count
返回RDD中元素的个数
val countResult: Long = listRDD.count() println(countResult) //4
4) first
返回RDD中的第一个元素
val firstResult: Int = listRDD.first() println(firstResult)
5) take
返回一个由RDD的前n个元素组成的数组
val takeResult: Array[Int] = listRDD.take(3) println(takeResult.mkString(" , "))
6) takeOrdered
返回该RDD排序后的前n个元素组成的数组
val listRDD1: RDD[Int] = sc.makeRDD(List(1, 3, 2, 4), 2) val takeOrderedResutl: Array[Int] = listRDD1.takeOrdered(3) println(takeOrderedResutl.mkString(" , "))
7) aggregate
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
//13+17+10 val aggregateResult: Int = listRDD.aggregate(10)(_ + _, _ + _) println(aggregateResult)
7) fold
折叠操作,aggregate的简化版操作
//如果aggregate的分区内和分区间计算逻辑一样, 可以简化成fold val foldResult: Int = listRDD.fold(10)(_+_) println(foldResult)
8) countByKey
统计每种key的个数
val listRDD2 = sc.makeRDD( List(("a", 1), ("a", 2), ("a", 3), ("b", 4)) , 2 ) // wordcount : ("a",2) => ("a",2) ("a",2) val countByKeyResult: collection.Map[String, Long] = listRDD2.countByKey() println(countByKeyResult)
wordcount几种实现
package com.atguigu import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object wordCount4 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount3") val sc = new SparkContext(conf) wordcount1(sc) wordcount2(sc) wordcount4(sc) sc.stop() } //groupby def wordcount1(sc:SparkContext):Unit={ val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val group: RDD[(String, Iterable[String])] = words.groupBy(word => word) // group.foreach(println) val wordcount: RDD[(String, Int)] = group.mapValues(iter => iter.size) // wordcount.foreach(println) } //groupbykey def wordcount2(sc: SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordOne: RDD[(String, Int)] = words.map((_, 1)) // wordOne.foreach(println) val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey() // group.collect().foreach(println) val wordcount: RDD[(String, Int)] = group.mapValues(iter => iter.size) // wordcount.collect().foreach(println) } //reduceByKey def wordcount3(sc: SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordOne: RDD[(String, Int)] = words.map((_, 1)) // wordOne.foreach(println) val wordcount: RDD[(String, Int)] = wordOne.reduceByKey(_ + _) wordcount.collect().foreach(println) } //reduceByKey def wordcount4(sc: SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordOne: RDD[(String, Int)] = words.map((_, 1)) // wordOne.foreach(println) val wordcount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_ + _, _ + _) wordcount.collect().foreach(println) } //foldByKey def wordcount5(sc: SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordOne: RDD[(String, Int)] = words.map((_, 1)) // wordOne.foreach(println) val wordcount: RDD[(String, Int)] = wordOne.foldByKey(0)(_ + _) } def wordcount6(sc: SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordOne: RDD[(String, Int)] = words.map((_, 1)) // wordOne.foreach(println) val wordcount: RDD[(String, Int)] = wordOne.combineByKey( v=>v, (x:Int,y)=>x+y, (x:Int,y:Int)=>x+y ) } def wordcount7(sc: SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordOne: RDD[(String, Int)] = words.map((_, 1)) // wordOne.foreach(println) val stringToLong: collection.Map[String, Long] = wordOne.countByKey() // val stringToLong: collection.Map[(String, Int), Long] = wordOne.countByValue() } def wordcount8(sc: SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val stringToLong: collection.Map[String, Long] = words.countByValue() println(stringToLong) } def wordcount9(sc: SparkContext): Unit = { val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark")) val words: RDD[String] = rdd.flatMap(_.split(" ")) val mapWord: RDD[Map[String, Int]] = words.map(word => { Map((word, 1)) }) val wordCount: Map[String, Int] = mapWord.reduce((map1, map2) => { map1 }) println(wordCount) } }
9) save相关算子
将数据保存到不同格式的文件中
将数据保存到不同格式的文件中 // 保存成Text文件 rdd.saveAsTextFile("output") // 序列化成对象保存到文件 rdd.saveAsObjectFile("output1") // 保存成Sequencefile文件 rdd.map((_,1)).saveAsSequenceFile("output2") //saveAsSequenceFile方法要求数据的格式必须为K-V类型 rdd.saveAsSequenceFile("output2")
10) foreach
分布式遍历RDD中的每一个元素,调用指定函数
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) // 收集后打印 rdd.map(num=>num).collect().foreach(println) println("****************") // 分布式打印 rdd.foreach(println)
RDD序列化
1) 闭包检查
从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变
2) 序列化方法和属性
从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行,看如下代码:
def main(args: Array[String]): Unit = { //1.创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") //2.创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.创建一个RDD val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu")) //3.1创建一个Search对象 val search = new Search("hello") //3.2 函数传递,打印:ERROR Task not serializable search.getMatch1(rdd).collect().foreach(println) //3.3 属性传递,打印:ERROR Task not serializable search.getMatch2(rdd).collect().foreach(println) //4.关闭连接 sc.stop() } } class Search(query:String) extends Serializable { def isMatch(s: String): Boolean = { s.contains(query) } // 函数序列化案例 def getMatch1 (rdd: RDD[String]): RDD[String] = { //rdd.filter(this.isMatch) rdd.filter(isMatch) } // 属性序列化案例 def getMatch2(rdd: RDD[String]): RDD[String] = { //rdd.filter(x => x.contains(this.query)) rdd.filter(x => x.contains(query)) //val q = query //rdd.filter(x => x.contains(q)) }
1) Kryo序列化框架
参考地址: https://github.com/EsotericSoftware/kryo
Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
注意:即使使用Kryo序列化,也要继承Serializable接口。
注意:即使使用Kryo序列化,也要继承Serializable接口。 object serializable_Kryo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("SerDemo") .setMaster("local[*]") // 替换默认的序列化机制 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册需要使用 kryo 序列化的自定义类 .registerKryoClasses(Array(classOf[Searcher])) val sc = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2) val searcher = new Searcher("hello") val result: RDD[String] = searcher.getMatchedRDD1(rdd) result.collect.foreach(println) } } case class Searcher(val query: String) { def isMatch(s: String) = { s.contains(query) } def getMatchedRDD1(rdd: RDD[String]) = { rdd.filter(isMatch) } def getMatchedRDD2(rdd: RDD[String]) = { val q = query rdd.filter(_.contains(q)) } }