Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门

Spark Day06:Spark Core

01-[了解]-内容回顾

主要讲解三个方面内容:Sogou日志分析、外部数据源(HBase和MySQL)和共享变量。

1、Sogou日志分析
  以搜狗官方提供用户搜索查询日志为基础,使用SparkCore(RDD)业务分析
  数据格式:
    文本文件数据,每条数据就是用户搜索时点击网页日志数据
    各个字段之间使用制表符分割
  业务需求:
    - 搜索关键词统计,涉及知识点中文分词:HanLP
    - 用户搜索点击统计
    - 搜索时间段统计
  编码实现
    第一步、读取日志数据,封装到实体类对象SougouRecord
    第二步、按照业务需求分析数据
      词频统计WordCount变形
2、外部数据源
  SparkCore与HBase和MySQL数据库交互
  - HBase数据源,底层MapReduce从HBase表读写数据API
    保存数据到HBase表
      TableOutputFormat
      RDD[(RowKey, Put)],其中RowKey = ImmutableBytesWritable
    从HBase表加载数据
      TableInputFormat
      RDD[(RowKey, Result)]
    从HBase 表读写数据,首先找HBase数据库依赖Zookeeper地址信息
  - MySQL数据源
    保存数据RDD到MySQL表中,考虑性能问题,5个方面
      考虑降低RDD分区数目
      针对分区数据进行操作,每个分区创建1个连接
      每个分区数据写入到MySQL数据库表中,批量写入
        可以将每个分区数据加入批次
        批量将所有数据写入
      事务性,批次中数据要么都成功,要么都失败
        人为提交事务
      考虑大数据分析特殊性,重复运行程序,处理相同数据,保存到MySQL表中
        主键存在时,更新数据;不存在时,插入数据
        REPLACE INTO ............
3、共享变量(Shared Variables)
  表示某个值(变量)被所有Task共享
  - 广播变量
    Broadcast Variables,共享变量值不能被改变
    解决问题:
      共享变量存储问题,将变量广播以后,仅仅在每个Executor中存储一份;如果没有对变量进行广播的话,每个Task中存储一份。
    广播变量节省内存使用
  - 累加器
    Accumulators,共享变量值可以被改变,只能“累加”
    类似MapReduce框架种计数器Counter,起到累加统计作用
    Spark框架提供三种类型累加器:
      LongAccumulator、DoubleAccumulator、CollectionAccumulator

02-[了解]-内容提纲

主要讲解2个方面内容:Spark 内核调度和SparkSQL快速入门

1、Spark 内核调度(理解)
  了解Spark框架如何执行Job程序,以词频统计WordCount程序为例,如何执行程序
    RDD 依赖
    DAG图、Stage阶段
    Shuffle
    Job 调度流程
    Spark 基本概念
    并行度
2、SparkSQL快速入门
  SparkSQL中程序入口:SparkSession
  基于SparkSQL实现词频统计
    SQL语句,类似Hive
    DSL语句,类似RDD中调用API,链式编程
  SparkSQL模块概述
    前世今生
    官方定义
    几大特性

03-[掌握]-Spark 内核调度之引例WordCount

Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。

Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行

以词频统计WordCount程序为例,Job执行是DAG图:

运行词频统计WordCount,截取4040监控页面上DAG图:

当RDD调用Action函数(Job触发函数)时,产出1个Job,执行Job。

  • 1、将Job中所有RDD按照依赖关系构建图:DAG图(有向无环图)
  • 2、将DAG图划分为Stage阶段,分为2种类型
  • ResultStage,对结果RDD进行处理Stage阶段
  • ShuffleMapStage,此Stage阶段中最后1个RDD产生Shuffle

04-[掌握]-Spark 内核调度之RDD 依赖

RDD 间存在着血统继承关系,其本质上是 RDD之间的依赖(Dependency)关系

每个RDD记录,如何从父RDD得到的,调用哪个转换函数

从DAG图上来看,RDD之间依赖关系存在2种类型:

  • 窄依赖(Narrow Dependency)

定义:父 RDD 与子 RDD 间的分区是一对一的一(父RDD)对一(子RDD)

  • Shuffle 依赖(宽依赖 Wide Dependency)

定义:父 RDD 中的分区可能会被多个子 RDD 分区使用,一(父)对多(子)

05-[掌握]-Spark 内核调度之DAG和Stage

在Spark应用执行时,每个Job执行时(RDD调用Action函数时),依据最后一个RDD(调用Action函数RDD),依据RDD依赖关系,向前推到,构建Job中所有RDD依赖关系图,称之为DAG图。

当构建完成Job DAG图以后,继续从Job最后一个RDD开始,依据RDD之间依赖关系,将DAG图划分为Stage阶段,当RDD之间依赖为Shuffle依赖时,划分一个Stage。

  • 对于窄依赖,RDD之间的数据不需要进行Shuffle,多个数据处理可以在同一台机器的内存中完
    成,所以窄依赖在Spark中被划分为同一个Stage;
  • 对于宽依赖,由于Shuffle的存在,必须等到父RDD的Shuffle处理完成后,才能开始接下来的计
    算,所以会在此处进行Stage的切分。

可以运行词频统计WordCount查看对应DAG图和Stage阶段

把DAG划分成互相依赖的多个Stage,划分依据是RDD之间的宽依赖,Stage是由一组并行的Task组成。

1、Stage切割规则:从后往前,遇到宽依赖就切割Stage。
2、Stage计算模式:pipeline管道计算模式
  pipeline只是一种计算思想、模式,来一条数据然后计算一条数据,把所有的逻辑走完,然后落地。
  以词频统计WordCount为例:
    从HDFS上读取数据,每个Block对应1个分区,当从Block中读取一条数据以后,经过flatMap、map和reduceByKey操作,最后将结果数据写入到本地磁盘中(Shuffle Write)。
    block0:         hadoop spark spark
              |textFile
    RDD-0     hadoop spark spark
                  |flatMap
        RDD-1     hadoop\spark\spark
                  |map
        RDD-2     (hadoop, 1)\(spark, 1)\(spark, 1)
                  |reduceByKey
       写入磁盘     hadoop, 1   ||       spark, 1\  spark, 1
3、准确的说:一个task处理一串分区的数据,整个计算逻辑全部走完

面试题如下:Spark Core中一段代码,判断执行结果

前提条件:11.data中三条数据
结果A:
  filter..................
  filter..................
  filter..................
  map..................
  map..................
  map..................
  flatMap..................
  flatMap..................
  flatMap..................
  Count = 3
结果B:
  filter..................
  map..................
  flatMap..................
  filter..................
  map..................
  flatMap.................. 
  filter..................
  map..................
  flatMap..................
  Count = 3

在1个Spark Application应用中,如果某个RDD,调用多次Action函数,触发Job执行,重用RDD结果产生过程中Shuffle数据(写入到本地磁盘),节省重新计算RDD时间,提升性能。

可以将某个多次使用RDD数据,认为手动进行缓存。

06-[了解]-Spark 内核调度之Spark Shuffle

首先回顾MapReduce框架中Shuffle过程,整体流程图如下:

Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。

Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等。

Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步。

Stage划分为2种类型:

  • 1)、ShuffleMapStage,在Spark 1个Job中,除了最后一个Stage之外,其他所有的Stage都是此类型
  • 将Shuffle数据写入到本地磁盘,ShuffleWriter
  • 在此Stage中,所有的Task称为:ShuffleMapTask
  • 2)、ResultStage,在Spark的1个Job中,最后一个Stage,对结果RDD进行操作
  • 会读取前一个Stage中数据,ShuffleReader
  • 在此Stage中,所有的Task任务称为ResultTask。

ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。

Spark Shuffle实现历史:
  - Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式
  - 到1.1版本时参考HadoopMapReduce的实现开始引入Sort Shuffle
  - 在1.5版本时开始Tungsten钨丝计划,引入UnSafe Shuffle优化内存及CPU的使用
  - 在1.6中将Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式
  - 到的2.0版本,Hash Shuffle已被删除,所有Shuffle方式全部统一到Sort Shuffle一个实现中。

具体各阶段Shuffle如何实现,参考思维导图XMIND,大纲如下:

07-[掌握]-Spark 内核调度之Job 调度流程

当启动Spark Application的时候,运行MAIN函数,首先创建SparkContext对象(构建DAGSchedulerTaskScheduler)。

  • 第一点、DAGScheduler实例对象
  • 将每个Job的DAG图划分为Stage,依据RDD之间依赖为宽依赖(产生Shuffle)
  • 第二点、TaskScheduler实例对象
  • 调度每个Stage中所有Task:TaskSet,发送到Executor上执行
  • 每个Stage中会有多个Task,所有Task处理数据不一样(每个分区数据被1个Task处理),但是处理逻辑一样的。
  • 将每个Stage中所有Task任务,放在一起称为TaskSet

当RDD调用Action函数(比如count、saveTextFile或foreachPartition)时,触发一个Job执行,调度中流程如下图所示:

Spark RDD通过其Transactions操作,形成了RDD血缘关系图,即DAG,最后通过Action的调用,触发Job并调度执行。

  • 1)、DAGScheduler负责Stage级的调度,主要是将DAG切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。
  • 2)、TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统。

Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度。

一个Spark应用程序包括Job、Stage及Task:
    第一、Job是以Action方法为界,遇到一个Action方法则触发一个Job;
    第二、Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
    第三、Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。

08-[掌握]-Spark 内核调度之Spark 基本概念

Spark Application运行时,涵盖很多概念,主要如下表格:

官方文档:http://spark.apache.org/docs/2.4.5/cluster-overview.html#glossary

09-[理解]-Spark 内核调度之并行度

在Spark Application运行时,并行度可以从两个方面理解:

  • 1)、资源的并行度:由节点数(executor)和cpu数(core)决定的
  • 2)、数据的并行度:task的数据,partition大小

Task数目要是core总数的2-3倍为佳

参数spark.defalut.parallelism默认是没有值的,如果设置了值,是在shuffle的过程才会起作用

在实际项目中,运行某个Spark Application应用时,需要设置资源,尤其Executor个数和CPU核数,如何计算?

分析网站日志数据:20GB,存储在HDFS上,160Block,从HDFS读取数据,
  RDD 分区数目:160 个分区
1、RDD分区数目160,那么Task数目为160个
2、总CPU Core核数
  160/2 = 80
            CPU Core = 60
  160/3 = 50
3、假设每个Executor:6 Core
  60 / 6 = 10 个
4、每个Executor内存
  6 * 2 = 12 GB
  6 * 3 = 18 GB
5、参数设置
  --executor-memory= 12GB
  --executor-cores= 6
  --num-executors=10

10-[掌握]-SparkSQL应用入口SparkSession

Spark 2.0开始,应用程序入口为SparkSession,加载不同数据源的数据,封装到DataFrame/Dataset集合数据结构中,使得编程更加简单,程序运行更加快速高效。

1、SparkSession
  程序入口,加载数据
  底层SparkContext,进行封装
2、DataFrame/Dataset
  Dataset[Row] = DataFrame
  数据结构,从Spark 1.3开始出现,一直到2.0版本,确定下来
  底层RDD,加上Schema约束(元数据):字段名称和字段类型
  • 1)、SparkSession在SparkSQL模块中,添加MAVEN依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.5</version>
</dependency>
  • 2)、SparkSession对象实例通过建造者模式构建,代码如下:

其中①表示导入SparkSession所在的包,②表示建造者模式构建对象和设置属性,③表示导入SparkSession类中implicits对象object中隐式转换函数。

  • 3)、范例演示:构建SparkSession实例,加载文本数据,统计条目数。
package cn.itcast.spark.sql.start
import org.apache.spark.sql.{Dataset, SparkSession}
/**
 * Spark 2.x开始,提供了SparkSession类,作为Spark Application程序入口,
 *      用于读取数据和调度Job,底层依然为SparkContext
 */
object _03SparkStartPoint {
  def main(args: Array[String]): Unit = {
    // 使用建造者设计模式,创建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
        .getOrCreate()
    import spark.implicits._
    // TODO: 使用SparkSession加载数据
    val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount.data")
    // 显示前5条数据
    println(s"Count = ${inputDS.count()}")
    inputDS.show(5, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

学习任务:Java中设计模式【建造者设计模式】,在大数据很多框架种,API设计都是建造者设计模式。

11-[掌握]-词频统计WordCount之基于DSL编程

DataFrame 数据结构相当于给RDD加上约束Schema,知道数据内部结构(字段名称、字段类型),提供两种方式分析处理数据:DataFrame API(DSL编程)SQL(类似HiveQL编程),下面以WordCount程序为例编程实现,体验DataFrame使用。

使用SparkSession加载文本数据,封装到Dataset/DataFrame中,调用API函数处理分析数据(类似RDD中API函数,如flatMap、map、filter等),编程步骤:
  第一步、构建SparkSession实例对象,设置应用名称和运行本地模式;
  第二步、读取HDFS上文本文件数据;
  第三步、使用DSL(Dataset API),类似RDD API处理分析数据;
  第四步、控制台打印结果数据和关闭SparkSession;
package cn.itcast.spark.sql.wordcount
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
 * 使用SparkSQL进行词频统计WordCount:DSL
 */
object _04SparkDSLWordCount {
  def main(args: Array[String]): Unit = {
    // 使用建造设设计模式,创建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
      .getOrCreate()
    import spark.implicits._
    // TODO: 使用SparkSession加载数据
    val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount.data")
    // DataFrame/Dataset = RDD + schema
    /*
    root
            |-- value: string (nullable = true)
     */
    //inputDS.printSchema()
    /*
      +----------------------------------------+
      |value                                   |
      +----------------------------------------+
      |hadoop spark hadoop spark spark         |
      |mapreduce spark spark hive              |
      |hive spark hadoop mapreduce spark       |
      |spark hive sql sql spark hive hive spark|
      |hdfs hdfs mapreduce mapreduce spark hive|
      +----------------------------------------+
     */
    //inputDS.show(10, truncate = false)
    // TODO: 使用DSL(Dataset API),类似RDD API处理分析数据
    val wordDS: Dataset[String] = inputDS.flatMap(line => line.trim.split("\\s+"))
    /*
    root
     |-- value: string (nullable = true)
     */
    //wordDS.printSchema()
    /*
    +---------+
    |value    |
    +---------+
    |hadoop   |
    |spark    |
    +---------+
     */
    // wordDS.show(10, truncate = false)
    /*
      table: words , column: value
          SQL: SELECT value, COUNT(1) AS count  FROM words GROUP BY value
     */
    val resultDS: DataFrame = wordDS.groupBy("value").count()
    /*
    root
     |-- value: string (nullable = true)
     |-- count: long (nullable = false)
     */
    resultDS.printSchema()
    /*
      +---------+-----+
      |value    |count|
      +---------+-----+
      |sql      |2    |
      |spark    |11   |
      |mapreduce|4    |
      |hdfs     |2    |
      |hadoop   |3    |
      |hive     |6    |
      +---------+-----+
     */
    resultDS.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}

12-[掌握]-词频统计WordCount之基于SQL编程

类似HiveQL方式进行词频统计,直接对单词分组group by,再进行count即可,步骤如下:

第一步、构建SparkSession对象,加载文件数据,分割每行数据为单词;
第二步、将DataFrame/Dataset注册为临时视图(Spark 1.x中为临时表);
第三步、编写SQL语句,使用SparkSession执行获取结果;
第四步、控制台打印结果数据和关闭SparkSession;
package cn.itcast.spark.sql.wordcount
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
 * 使用SparkSQL进行词频统计WordCount:SQL
 */
object _05SparkSQLWordCount {
  def main(args: Array[String]): Unit = {
    // 使用建造设设计模式,创建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
        .getOrCreate()
    import spark.implicits._
    // TODO: 使用SparkSession加载数据
    val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount.data")
    /*
      root
       |-- value: string (nullable = true)
     */
    //inputDS.printSchema()
    /*
      +--------------------+
      |               value|
      +--------------------+
      |hadoop spark hado...|
      |mapreduce spark  ...|
      |hive spark hadoop...|
      +--------------------+
     */
    //inputDS.show(5, truncate = false)
    // 将每行数据按照分割划分为单词
    val wordDS: Dataset[String] = inputDS.flatMap(line => line.trim.split("\\s+"))
    /*
      table: words , column: value
          SQL: SELECT value, COUNT(1) AS count  FROM words GROUP BY value
     */
    // step 1. 将Dataset或DataFrame注册为临时视图
    wordDS.createOrReplaceTempView("tmp_view_word")
    // step 2. 编写SQL并执行
    val resultDF: DataFrame = spark.sql(
      """
        |SELECT value as word, COUNT(1) AS count  FROM tmp_view_word GROUP BY value
        |""".stripMargin)
    /*
      +---------+-----+
      |word     |count|
      +---------+-----+
      |sql      |2    |
      |spark    |11   |
      |mapreduce|4    |
      |hdfs     |2    |
      |hadoop   |3    |
      |hive     |6    |
      +---------+-----+
     */
    resultDF.show(10, truncate = false)
    // 应用结束,关闭资源
    spark.stop()
  }
}


相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
5月前
|
SQL 机器学习/深度学习 分布式计算
Spark5:SparkSQL
Spark5:SparkSQL
62 0
|
7月前
|
SQL 分布式计算 大数据
大数据Spark SQL快速入门
大数据Spark SQL快速入门
92 0
|
2月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
57 2
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
106 1
|
1月前
|
分布式计算 Spark 索引
Spark学习---day07、Spark内核(Shuffle、任务执行)
Spark学习---day07、Spark内核(源码提交流程、任务执行)
41 2
|
1月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
41 2
|
7月前
|
分布式计算 监控 大数据
大数据Spark快速入门
大数据Spark快速入门
93 0
|
3月前
|
存储 分布式计算 监控
Spark作业的调度与执行流程
Spark作业的调度与执行流程
|
4月前
|
分布式计算 监控 分布式数据库
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
54 0
|
4月前
|
存储 缓存 分布式计算
【Spark】Spark Core Day04
【Spark】Spark Core Day04
33 1