【Spark】Spark Core Day04

简介: 【Spark】Spark Core Day04

Spark Day04:Spark Core

02-[了解]-今日课程内容提纲

主要讲解RDD函数,分为2类:Transformation转换函数和Action触发函数

RDD中函数:
  - 函数分类,不同类型函数功能
  - 常见函数概述
  - 5种类型RDD函数
    实际项目中使用最多的,必须要掌握
  - RDD 持久化函数
    可以将RDD分布式集合数据进行缓存,比如缓存到Executor内存中,再次处理数据时,直接从内存读取
  - RDD Checkpoint
    将RDD数据保存到可靠文件系统中,比如HDFS

首先创建Maven Module模块,编写好代码模块,讲解某个知识点时,在编写核心代码

03-[掌握]-RDD 函数分类

RDD 的操作主要可以分为 TransformationAction 两种。

  • Transformation 转换,将1个RDD转换为另一个RDD
  • Action 触发,当1个RDD调用函数以后,触发一个Job执行(调用Action函数以后,返回值不是RDD)

官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations

RDD中2种类型操作函数:Transformation(lazy)和Action(eager)函数

  • Transformation转换函数

  • Action触发函数,触发一个Job执行

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a1fQcH5e-1638793130130)(/img/image-20210422150349862.png)]

04-[了解]-RDD 中常见函数概述

RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。

主要常见使用函数如下,每个函数通过演示范例讲解。

1、分区操作函数
  对RDD中每个分区数据进行操作
2、重分区函数
  调整RDD中分区数目,要么变大,要么变小
3、聚合函数
  对RDD中数据进行聚合统计,比如使用reduce、redueBykey等
4、关联函数
  对2个RDD进行JOIN操作,类似SQL中JOIN,分为:等值JOIN、左外连接和右外连接、全外连接fullOuterJoin

RDD函数练习:运行spark-shell命令行,在本地模式运行,执行函数使用

05-[掌握]-RDD 函数之基本函数使用

RDD中map、filter、flatMap及foreach等函数为最基本函数,都是对RDD中每个元素进行操作,将元素传递到函数中进行转换

编写词频统计WordCount程序,使用基本函数

package cn.itcast.spark.func.basic
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * 演示RDD中基本函数使用
 */
object _01SparkBasicTest {
  def main(args: Array[String]): Unit = {
    // 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
    val sc: SparkContext = {
      // a. 创建SparkConf对象
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // b. 传递sparkConf对象,构建SparkContext实例
      SparkContext.getOrCreate(sparkConf)
    }
    // step1. 读取数据
    val inputRDD: RDD[String] = sc.textFile("datas/wordcount/input.data", minPartitions = 2)
    // step2. 处理数据
    val resultRDD: RDD[(String, Int)] = inputRDD
      // 过滤数据
      .filter(line => null != line && line.trim.length > 0)
      // 分割单词
      .flatMap(line => line.trim.split("\\s+"))
      // 转换为二元组
      .map(word => word -> 1)
      // 按照单词分组,对组内数据进行聚合求和
      .reduceByKey((tmp, item) => tmp + item) // TODO: 隐式转换,将RDD对象抓好为PairRDDFunctions对象,调用方法
    // step3. 输出数据
    resultRDD.foreach(item => println(item))
    // 应用结束,关闭资源
    sc.stop()
  }
}

06-[掌握]-RDD 函数之分区操作函数

每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,map函数使用mapPartitions代替foreach函数使用foreachPartition代替

前面编写WordCount词频统计代码中,使用map函数和forearch函数,针对RDD中每个元素操作,并不是针对每个分区数据操作的,如果针对分区操作:mapPartitions和foreachPartition

针对分区数据进行操作时,函数的参数类型:迭代器Iterator,封装分区中所有数据

针对词频统计WordCount代码进行修改,针对分区数据操作,范例代码如下:

package cn.itcast.spark.func.iter
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * 分区操作函数:mapPartitions和foreachPartition
 */
object _02SparkIterTest {
  def main(args: Array[String]): Unit = {
    // 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
    val sc: SparkContext = {
      // a. 创建SparkConf对象
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // b. 传递sparkConf对象,构建SparkContext实例
      SparkContext.getOrCreate(sparkConf)
    }
    // step1. 读取数据
    val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
    // step2. 处理数据
    val resultRDD: RDD[(String, Int)] = inputRDD
      // 过滤数据
      .filter(line => line.trim.length != 0 )
      // 对每行数据进行单词分割
      .flatMap(line => line.trim.split("\\s+"))
      // 转换为二元组
        //.map(word => word -> 1)
      /*
        def mapPartitions[U: ClassTag](
            f: Iterator[T] => Iterator[U],
            preservesPartitioning: Boolean = false
        ): RDD[U]
       */
        .mapPartitions(iter => iter.map(word => (word, 1)))
      // 分组聚合
      .reduceByKey((tmp, item) => tmp + item)
    // step3. 输出数据
    //resultRDD.foreach(item => println(item))
    /*
      def foreachPartition(f: Iterator[T] => Unit): Unit
     */
    resultRDD.foreachPartition(iter => iter.foreach(item => println(item)))
    // 应用结束,关闭资源
    sc.stop()
  }
}

为什么要对分区操作,而不是对每个数据操作,好处在哪里呢???

07-[掌握]-RDD 函数之重分区函数

如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。

上述2个函数最为关键:
  - 增加RDD分区数目:repartition
  - 减少RDD分区数目:coalesce,不产生Shuffle
package cn.itcast.spark.func.iter
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * 分区操作函数:mapPartitions和foreachPartition
 */
object _02SparkPartitionTest {
  def main(args: Array[String]): Unit = {
    // 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
    val sc: SparkContext = {
      // a. 创建SparkConf对象
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // b. 传递sparkConf对象,构建SparkContext实例
      SparkContext.getOrCreate(sparkConf)
    }
    // step1. 读取数据
    val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
    println(s"raw rdd partitions = ${inputRDD.getNumPartitions}")
    // TODO: 增加RDD分区数目
    val etlRDD: RDD[String] = inputRDD.repartition(3)
    println(s"etl rdd partitions = ${etlRDD.getNumPartitions}")
    // step2. 处理数据
    val resultRDD: RDD[(String, Int)] = inputRDD
      // 过滤数据
      .filter(line => line.trim.length != 0 )
      // 对每行数据进行单词分割
      .flatMap(line => line.trim.split("\\s+"))
      // 转换为二元组
        //.map(word => word -> 1)
      /*
        def mapPartitions[U: ClassTag](
            f: Iterator[T] => Iterator[U],
            preservesPartitioning: Boolean = false
        ): RDD[U]
       */
        .mapPartitions(iter => iter.map(word => (word, 1)))
      // 分组聚合
      .reduceByKey((tmp, item) => tmp + item)
    // step3. 输出数据
    //resultRDD.foreach(item => println(item))
    /*
      def foreachPartition(f: Iterator[T] => Unit): Unit
     */
    // TODO: 降低结果RDD分区数目
    val outputRDD: RDD[(String, Int)] = resultRDD.coalesce(1)
    println(s"output rdd partitions = ${outputRDD.getNumPartitions}")
    outputRDD.foreachPartition(iter => iter.foreach(item => println(item)))
    // 应用结束,关闭资源
    sc.stop()
  }
}

在实际开发中,什么时候适当调整RDD的分区数目呢?让程序性能更好好呢????

08-[掌握]-RDD 函数之RDD 中聚合函数

回顾列表List中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量。查看列表List中聚合函数reduce和fold源码如下:

通过代码,看看列表List中聚合函数使用:

运行截图如下所示:

fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:

聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:

在RDD中提供类似列表List中聚合函数reduce和fold,查看如下:

案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:

运行结果解析如下:

查看RDD中高级聚合函数aggregate,函数声明如下:

业务需求:对RDD中数据进行求和sum。

// TODO:aggregate函数,累计求和
    /*
    def aggregate[U: ClassTag]
    (zeroValue: U)
    (
       seqOp: (U, T) => U,
       combOp: (U, U) => U
    ): U
     */
    val aggSum: Int = datasRDD.aggregate(0)(
      // seqOp: (U, T) => U    分区内数据聚合
      (tmp: Int, item: Int) => {
        println(s"seq -> p-${TaskContext.getPartitionId()}: tmp = ${tmp}, item = ${item}, sum = ${tmp + item}")
        tmp + item
      },
      // combOp: (U, U) => U    分区间数据聚合
      (tmp, item) => {
        println(s"comb -> p-${TaskContext.getPartitionId()}: tmp = ${tmp}, item = ${item}, sum = ${tmp + item}")
        tmp + item
      }
    )
    println(s"aggSum = ${aggSum}")

09-[掌握]-RDD 函数之PairRDDFunctions 聚合函数

在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。

*ByKey函数将相同Key的Value进行聚合操作的,省去先分组再聚合。

  • 第一类:分组函数groupByKey

  • 第二类:分组聚合函数reduceByKey和foldByKey

  • 第三类:分组聚合函数aggregateByKey

在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,基本上都能完成任意聚合功能。

10-[掌握]-RDD 函数之关联JOIN函数

当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。

RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下:

具体看一下join(等值连接)函数说明:

范例演示代码:

package cn.itcast.spark.func.join
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * RDD中关联函数Join,针对RDD中数据类型为Key/Value对
 */
object _04SparkJoinTest {
  def main(args: Array[String]): Unit = {
    // 创建SparkContext实例对象,传递SparkConf对象,设置应用配置信息
    val sc: SparkContext = {
      // a. 创建SparkConf对象
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // b. 传递sparkConf对象,构建SparkContext实例
      SparkContext.getOrCreate(sparkConf)
    }
    // 模拟数据集
    val empRDD: RDD[(Int, String)] = sc.parallelize(
      Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu"))
    )
    val deptRDD: RDD[(Int, String)] = sc.parallelize(
      Seq((1001, "sales"), (1002, "tech"))
    )
    // TODO: 等值连接
    //                deptno  empname  deptname
    val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD)
    joinRDD.foreach{case (deptno, (empname, deptname)) =>
      println(s"deptno = ${deptno}, empname = ${empname}, deptname = ${deptname}")
    }
    println("======================================================")
    // TODO:左外连接
    val leftRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)
    leftRDD.foreach{case (deptno, (empname, option)) =>
      val deptname: String = option match {
        case Some(name) => name
        case None => "未知"
      }
      println(s"deptno = ${deptno}, empname = ${empname}, deptname = ${deptname}")
    }
    // 应用结束,关闭资源
    sc.stop()
  }
}

11-[掌握]-RDD 持久化

在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

将RDD数据进行缓存时,本质上就是将RDD各个分区数据进行缓存

  • 缓存函数

可以将RDD数据直接缓存到内存中,函数声明如下:

但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bxNtlFD7-1638793130145)(/img/image-20210422172215367.png)]

  • 缓存级别

在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:

实际项目中缓存数据时,往往选择如下两种级别:

缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count函数触发

  • 释放缓存

缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:

此函数属于eager,立即执行。

  • 何时缓存数据

在实际项目开发中,什么时候缓存RDD数据,最好呢???

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TKk5WJgJ-1638793130147)(img/image-20210422172821282.png)]

12-[了解]-RDD Checkpoint

RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。

在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;

案例演示代码如下:

package cn.itcast.spark.ckpt
import org.apache.spark.{SparkConf, SparkContext}
/**
 * RDD数据Checkpoint设置,案例演示
 */
object _06SparkCkptTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    // TODO: 设置检查点目录,将RDD数据保存到那个目录
    sc.setCheckpointDir("datas/ckpt/")
    // 读取文件数据
    val datasRDD = sc.textFile("datas/wordcount.data")
    // TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
    datasRDD.checkpoint()
    datasRDD.count()
    // TODO: 再次执行count函数, 此时从checkpoint读取数据
    println(datasRDD.count())
    // 应用程序运行结束,关闭资源
    Thread.sleep(1000000000)
    sc.stop()
  }
}

面试题:持久化和Checkpoint的区别:


目录
相关文章
|
7月前
|
设计模式 SQL 分布式计算
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
80 0
|
7月前
|
分布式计算 监控 分布式数据库
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
174 0
|
SQL 机器学习/深度学习 存储
Spark Core
Spark Core
238 0
|
存储 缓存 分布式计算
|
存储 SQL 分布式计算
|
分布式计算 Spark C++
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
|
消息中间件 JSON 分布式计算
Spark Core读取ES的分区问题分析
写这篇文章的原因是前两天星球球友去面试,面试管问了一下,Spark 分析ES的数据,生成的RDD分区数跟什么有关系呢? 稍微猜测一下就能想到跟分片数有关,但是具体是什么关系呢? 可想的具体关系可能是以下两种: 1).就像KafkaRDD的分区与kafka topic分区数的关系一样,一对一。 2).ES支持游标查询,那么是不是也可以对比较大ES 索引的分片进行拆分成多个RDD分区呢? 那么下面浪尖带着大家翻一下源码看看具体情况。
260 0
|
分布式计算 Spark 存储
Spark中Task,Partition,RDD、节点数、Executor数、core数目的关系
梳理一下Spark中关于并发度涉及的几个概念File,Block,Split,Task,Partition,RDD以及节点数、Executor数、core数目的关系。
1595 0
|
存储 分布式计算 API
Spark Core组件:RDD、DataFrame和DataSet
1. 介绍 spark生态系统中,Spark Core,包括各种Spark的各种核心组件,它们能够对内存和硬盘进行操作,或者调用CPU进行计算。
1668 0