Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)

Action行动算子

行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。

创建包名:com.zhm.spark.operator.action

1)reduce

聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

package sparkRDDaction.com
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_action {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    val reduceResult: Int = listRDD.reduce(_ + _)
    println(reduceResult)  //10

  }

}

2) collect

在驱动程序(Driver)中,以数组Array的形式返回数据集的所有元素

package sparkRDDaction.com
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_action {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    val collectResult: Array[Int] = listRDD.collect()
    println(collectResult.mkString(" ,"))  //1 ,2 ,3 ,4

  }

}

3) count

返回RDD中元素的个数

    val countResult: Long = listRDD.count()
    println(countResult)  //4

4) first

返回RDD中的第一个元素

val firstResult: Int = listRDD.first()
    println(firstResult)

5) take

返回一个由RDD的前n个元素组成的数组

 val takeResult: Array[Int] = listRDD.take(3)
    println(takeResult.mkString(" , "))

6) takeOrdered

返回该RDD排序后的前n个元素组成的数组

    val listRDD1: RDD[Int] = sc.makeRDD(List(1, 3, 2, 4), 2)
    val takeOrderedResutl: Array[Int] = listRDD1.takeOrdered(3)
    println(takeOrderedResutl.mkString(" , "))

7) aggregate

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

    //13+17+10
    val aggregateResult: Int = listRDD.aggregate(10)(_ + _, _ + _)
    println(aggregateResult)

7) fold

折叠操作,aggregate的简化版操作

//如果aggregate的分区内和分区间计算逻辑一样, 可以简化成fold
    val foldResult: Int = listRDD.fold(10)(_+_)
    println(foldResult)

8) countByKey

统计每种key的个数

    
    val listRDD2 = sc.makeRDD(
      List(("a", 1), ("a", 2), ("a", 3), ("b", 4))
      , 2
    )
    // wordcount : ("a",2) => ("a",2) ("a",2)
    val countByKeyResult: collection.Map[String, Long] = listRDD2.countByKey()
    println(countByKeyResult)

wordcount几种实现

package com.atguigu
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object wordCount4 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount3")
    val sc = new SparkContext(conf)
    wordcount1(sc)
    wordcount2(sc)
    wordcount4(sc)
    sc.stop()

  }
  //groupby
  def wordcount1(sc:SparkContext):Unit={
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
//    group.foreach(println)
    val wordcount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
//     wordcount.foreach(println)
  }
  //groupbykey
  def wordcount2(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordOne: RDD[(String, Int)] = words.map((_, 1))
//    wordOne.foreach(println)
val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()
//    group.collect().foreach(println)
val wordcount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
//    wordcount.collect().foreach(println)
  }
  //reduceByKey
  def wordcount3(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordOne: RDD[(String, Int)] = words.map((_, 1))
    //    wordOne.foreach(println)
    val wordcount: RDD[(String, Int)] = wordOne.reduceByKey(_ + _)
    wordcount.collect().foreach(println)
  }

  //reduceByKey
  def wordcount4(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordOne: RDD[(String, Int)] = words.map((_, 1))
    //    wordOne.foreach(println)
    val wordcount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_ + _, _ + _)
    wordcount.collect().foreach(println)
  }
  //foldByKey
  def wordcount5(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordOne: RDD[(String, Int)] = words.map((_, 1))
    //    wordOne.foreach(println)
    val wordcount: RDD[(String, Int)] = wordOne.foldByKey(0)(_ + _)

  }

  def wordcount6(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordOne: RDD[(String, Int)] = words.map((_, 1))
    //    wordOne.foreach(println)
    val wordcount: RDD[(String, Int)] = wordOne.combineByKey(
      v=>v,
      (x:Int,y)=>x+y,
      (x:Int,y:Int)=>x+y
    )

  }

  def wordcount7(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val wordOne: RDD[(String, Int)] = words.map((_, 1))
    //    wordOne.foreach(println)
    val stringToLong: collection.Map[String, Long] = wordOne.countByKey()
//    val stringToLong: collection.Map[(String, Int), Long] = wordOne.countByValue()


  }

  def wordcount8(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val stringToLong: collection.Map[String, Long] = words.countByValue()
    println(stringToLong)
  }

  def wordcount9(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words: RDD[String] = rdd.flatMap(_.split(" "))
    val mapWord: RDD[Map[String, Int]] = words.map(word => {
      Map((word, 1))
    })
    val wordCount: Map[String, Int] = mapWord.reduce((map1, map2) => {
      map1
    })
    println(wordCount)
  }

}

9) save相关算子

数据保存到不同格式的文件中

将数据保存到不同格式的文件中
// 保存成Text文件
rdd.saveAsTextFile("output")

// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")

// 保存成Sequencefile文件
rdd.map((_,1)).saveAsSequenceFile("output2")
//saveAsSequenceFile方法要求数据的格式必须为K-V类型
rdd.saveAsSequenceFile("output2")

10) foreach

image.png

分布式遍历RDD中的每一个元素,调用指定函数

image.png

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

// 收集后打印
rdd.map(num=>num).collect().foreach(println)

println("****************")

// 分布式打印
rdd.foreach(println)

 RDD序列化

1) 闭包检查

从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测Scala2.12版本后闭包编译方式发生了改变

2) 序列化方法和属性

从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行,看如下代码:

 def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.创建一个RDD
        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))

        //3.1创建一个Search对象
        val search = new Search("hello")

        //3.2 函数传递,打印:ERROR Task not serializable
        search.getMatch1(rdd).collect().foreach(println)

        //3.3 属性传递,打印:ERROR Task not serializable
        search.getMatch2(rdd).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

class Search(query:String) extends Serializable {

    def isMatch(s: String): Boolean = {
        s.contains(query)
    }

    // 函数序列化案例
    def getMatch1 (rdd: RDD[String]): RDD[String] = {
        //rdd.filter(this.isMatch)
        rdd.filter(isMatch)
    }

    // 属性序列化案例
    def getMatch2(rdd: RDD[String]): RDD[String] = {
        //rdd.filter(x => x.contains(this.query))
        rdd.filter(x => x.contains(query))
        //val q = query
        //rdd.filter(x => x.contains(q))
    }

1) Kryo序列化框架

参考地址: https://github.com/EsotericSoftware/kryo

Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

注意:即使使用Kryo序列化,也要继承Serializable接口。

注意:即使使用Kryo序列化,也要继承Serializable接口。
object serializable_Kryo {

    def main(args: Array[String]): Unit = {

        val conf: SparkConf = new SparkConf()
                .setAppName("SerDemo")
                .setMaster("local[*]")
                // 替换默认的序列化机制
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                // 注册需要使用 kryo 序列化的自定义类
                .registerKryoClasses(Array(classOf[Searcher]))

        val sc = new SparkContext(conf)

        val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)

        val searcher = new Searcher("hello")
        val result: RDD[String] = searcher.getMatchedRDD1(rdd)

        result.collect.foreach(println)
    }
}
case class Searcher(val query: String) {

    def isMatch(s: String) = {
        s.contains(query)
    }

    def getMatchedRDD1(rdd: RDD[String]) = {
        rdd.filter(isMatch) 
    }

    def getMatchedRDD2(rdd: RDD[String]) = {
        val q = query
        rdd.filter(_.contains(q))
    }
}


相关文章
|
1月前
|
分布式计算 Shell Scala
学习使用Spark
学习使用Spark
62 3
|
2月前
|
分布式计算 Shell Scala
如何开始学习使用Spark?
【8月更文挑战第31天】如何开始学习使用Spark?
44 2
|
2月前
|
分布式计算 Serverless 数据处理
|
5月前
|
存储 分布式计算 Hadoop
Spark编程实验一:Spark和Hadoop的安装使用
Spark编程实验一:Spark和Hadoop的安装使用
80 4
|
5月前
|
分布式计算 关系型数据库 MySQL
Spark编程实验四:Spark Streaming编程
Spark编程实验四:Spark Streaming编程
99 2
|
5月前
|
SQL 分布式计算 关系型数据库
Spark编程实验三:Spark SQL编程
Spark编程实验三:Spark SQL编程
81 1
|
5月前
|
分布式计算 Shell 开发工具
Spark编程实验二:RDD编程初级实践
Spark编程实验二:RDD编程初级实践
72 1
|
5月前
|
存储 分布式计算 程序员
Spark中的RDD介绍
Spark中的RDD介绍
36 0
|
5月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
295 1
|
5月前
|
存储 分布式计算 Scala
bigdata-36-Spark转换算子与动作算子
bigdata-36-Spark转换算子与动作算子
40 0
下一篇
无影云桌面