2.1.6 RDD 的依赖关系
RDD
有两种依赖,分别为宽依赖(wide dependency/shuffle dependency
)和窄依赖(narrow dependency
) :
从上图可以看到:
- 窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖;
- 宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。
对于窄依赖:
- 窄依赖的多个分区可以并行计算;
- 窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。
对于宽依赖:
- 划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。
2.1.7 DAG 的生成和划分 Stage
2.1.7.1 DAG
DAG(Directed Acyclic Graph
有向无环图):指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程);
原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。
DAG 的边界:
- 开始:通过 SparkContext 创建的 RDD;
- 结束:触发 Action,一旦触发 Action 就形成了一个完整的 DAG。
2.1.7.2 DAG 划分Stage
从上图可以看出:
- 一个 Spark 程序可以有多个 DAG(有几个 Action,就有几个 DAG,上图最后只有一个 Action(图中未表现),那么就是一个 DAG);
- 一个 DAG 可以有多个 Stage(根据宽依赖/shuffle 进行划分);
- 同一个 Stage 可以有多个 Task 并行执行(task 数=分区数,如上图,Stage1 中有三个分区 P1、P2、P3,对应的也有三个 Task);
- 可以看到这个 DAG 中只 reduceByKey 操作是一个宽依赖,Spark 内核会以此为边界将其前后划分成不同的 Stage;
- 在图中 Stage1 中,从 textFile 到 flatMap 到 map 都是窄依赖,这几步操作可以形成一个流水线操作,通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束,而是继续进行 map 操作,这样大大提高了计算的效率。
为什么要划分 Stage? --并行计算
- 一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。
如何划分 DAG 的 stage?
- 对于窄依赖,partition 的转换处理在 stage 中完成计算,不划分(将窄依赖尽量放在在同一个 stage 中,可以实现流水线计算)。
- 对于宽依赖,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,也就是说需要要划分 stage。
总结:
- Spark 会根据 shuffle/宽依赖使用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的 stage/阶段中。
具体的划分算法请参见 AMP 实验室发表的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
2.1.8 RDD累加器和广播变量
在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark 提供了两种类型的变量:
- 累加器 (accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。
- 广播变量 (broadcast variables):广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。
2.1.8.1 累加器
通常在向 Spark
传递函数时,比如使用 map()
函数或者用filter()
传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果:
语法:val xx: Accumulator[Int] = sc.accumulator(0)
示例代码:
import org.apache.spark.rdd.RDD import org.apache.spark.{Accumulator, SparkConf, SparkContext} object AccumulatorTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //使用scala集合完成累加 var counter1: Int = 0; var data = Seq(1,2,3) data.foreach(x => counter1 += x ) println(counter1)//6 println("+++++++++++++++++++++++++") //使用RDD进行累加 var counter2: Int = 0; val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3] dataRDD.foreach(x => counter2 += x) println(counter2)//0 //注意:上面的RDD操作运行结果是0 //因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量 //而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2 //最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系 //那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊! //如果解决?---使用累加器 val counter3: Accumulator[Int] = sc.accumulator(0) dataRDD.foreach(x => counter3 += x) println(counter3)//6 } }
2.1.8.2 广播变量
关键词:sc.broadcast()
import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object BroadcastVariablesTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //不使用广播变量 val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape"))) val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana) val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3)) //根据水果编号取水果名称 val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x)) fruitNames.foreach(println) //注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多, //那么会导致,被各个Task共用到的fruitMap会被多次传输 //应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可 //如何做到?---使用广播变量 //注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redis println("=====================") val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap) val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x)) fruitNames2.foreach(println) } }
3. Spark SQL
3.1 Spark SQL 概述
Hive 是将 SQL 转为 MapReduce。
SparkSQL 可以理解成是将 SQL 解析成:“RDD + 优化” 再执行
在学习Spark SQL前,需要了解数据分类。
3.2 数据分类
数据分为如下几类:
定义 | 特点 | 举例 | |
结构化数据 | 有固定的 Schema | 有预定义的 Schema | 关系型数据库的表 |
半结构化数据 | 没有固定的 Schema,但是有结构 | 没有固定的 Schema,有结构信息,数据一般是自描述的 | 指一些有结构的文件格式,例如 JSON |
非结构化数据 | 没有固定 Schema,也没有结构 | 没有固定 Schema,也没有结构 | 指图片/音频之类的格式 |
总结:
- RDD 主要用于处理非结构化数据 、半结构化数据、结构化;
- SparkSQL 是一个既支持 SQL 又支持命令式数据处理的工具;
- SparkSQL 主要用于处理结构化数据(较为规范的半结构化数据也可以处理)。
3.3 Spark SQL 数据抽象
3.3.1 DataFrame 和 DataSet
Spark SQL数据抽象可以分为两类:
① DataFrame:DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格,带有 Schema 元信息(可以理解为数据库的列名和类型)。DataFrame = RDD + 泛型 + SQL 的操作 + 优化
② DataSet:DataSet是DataFrame的进一步发展,它比RDD保存了更多的描述信息,概念上等同于关系型数据库中的二维表,它保存了类型信息,是强类型的,提供了编译时类型检查。调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!DataFrame = Dateset[Row]
RDD
、DataFrame
、DataSet
的关系如下:
- RDD[Person]:以 Person 为类型参数,但不了解其内部结构。
- DataFrame:提供了详细的结构信息 schema 列的名称和类型。这样看起来就像一张表了。
- DataSet[Person]:不光有 schema 信息,还有类型信息。
3.3.2 举例
假设 RDD 中的两行数据长这样:
RDD[Person]:
那么 DataFrame 中的数据长这样
DataFrame = RDD[Person] - 泛型 + Schema + SQL 操作 + 优化
那么 Dataset 中的数据长这样:
Dataset[Person] = DataFrame + 泛型:
Dataset 也可能长这样:Dataset[Row]:
即 DataFrame = DataSet[Row]:
总结:
- DataFrame = RDD - 泛型 + Schema + SQL + 优化
- DataSet = DataFrame + 泛型
- DataSet = RDD + Schema + SQL + 优化
3.4 Spark SQL 应用
3.4.1 创建 DataFrame/DataSet
方式一:读取本地文件
① 在本地创建一个文件,有 id、name、age 三列,用空格分隔,然后上传到 hdfs 上。
vim /root/person.txt
内容如下:
1 zhangsan 20 2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 kobe 40
② 打开 spark-shell
spark/bin/spark-shell ##创建 RDD val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]
③ 定义 case class(相当于表的 schema)
case class Person(id:Int, name:String, age:Int)
④ 将 RDD 和 case class 关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]
⑤ 将 RDD 转换成 DataFrame
val personDF = personRDD.toDF //DataFrame
⑥ 查看数据和 schema
personDF.show
⑦ 注册表
personDF.createOrReplaceTempView("t_person")
⑧ 执行 SQL
spark.sql("select id,name from t_person where id > 3").show
⑨ 也可以通过 SparkSession 构建 DataFrame
val dataFrame=spark.read.text("hdfs://node1:8020/person.txt") dataFrame.show //注意:直接读取的文本文件没有完整schema信息 dataFrame.printSchema
方式二:读取 json 文件
val jsonDF= spark.read.json("file:///resources/people.json")
接下来就可以使用 DataFrame
的函数操作
jsonDF.show
注意:直接读取
json
文件有schema
信息,因为json
文件本身含有Schema
信息,SparkSQL
可以自动解析。
方式三:读取 parquet 文件
val parquetDF=spark.read.parquet("file:///resources/users.parquet")
接下来就可以使用 DataFrame
的函数操作
parquetDF.show
注意:直接读取
parquet
文件有schema
信息,因为parquet
文件中保存了列的信息。
3.4.2 两种查询风格:DSL 和 SQL
DSL风格示例:
personDF.select(personDF.col("name")).show personDF.select(personDF("name")).show personDF.select(col("name")).show personDF.select("name").show
SQL 风格示例:
spark.sql("select * from t_person").show
总结:
DataFrame
和DataSet
都可以通过RDD
来进行创建;- 也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过
RDD
+Schema
; - 通过
josn/parquet
会有完整的约束; - 不管是
DataFrame
还是DataSet
都可以注册成表,之后就可以使用SQL
进行查询了! 也可以使用DSL
!
3.4.3 Spark SQL 多数据源交互
读取 json 文件:
spark.read.json("D:\\data\\output\\json").show()
读取 csv 文件:
spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()
读取 parquet 文件:
spark.read.parquet("D:\\data\\output\\parquet").show()
读取 mysql 表:
val prop = new Properties() prop.setProperty("user","root") prop.setProperty("password","root") spark.read.jdbc( "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
写入 json 文件:
personDF.write.json("D:\\data\\output\\json")
写入 csv 文件:
personDF.write.csv("D:\\data\\output\\csv")
写入 parquet 文件:
personDF.write.parquet("D:\\data\\output\\parquet")
写入 mysql 表:
val prop = new Properties() prop.setProperty("user","root") prop.setProperty("password","root") personDF.write.mode(SaveMode.Overwrite).jdbc( "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
4. Spark Streaming
Spark Streaming 是一个基于 Spark Core 之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点。
Spark Streaming 的特点:
- 易用:可以像编写离线批处理一样去编写流式程序,支持 java/scala/python 语言。
- 容错:SparkStreaming 在没有额外代码和配置的情况下可以恢复丢失的工作。
- 易整合到 Spark 体系:流式处理与批处理和交互式查询相结合。
4.1 整体流程
- ① Spark Streaming 中,会有一个接收器组件 Receiver,作为一个长期运行的 task 跑在一个 Executor 上,Receiver 接收外部的数据流形成 input DStream。
- ② DStream 会被按照时间间隔划分成一批一批的 RDD,当批处理间隔缩短到秒级时,便可以用于处理实时数据流(时间间隔的大小可以由参数指定,一般设在 500 毫秒到几秒之间)。
- ③ 对 DStream 进行操作就是对 RDD 进行操作,计算处理的结果可以传给外部系统。
- ④ 接受到实时数据后,给数据分批次,然后传给 Spark Engine 处理最后生成该批次的结果。
4.2 数据抽象
Spark Streaming 的基础抽象是 DStream(Discretized Stream,离散化数据流,连续不断的数据流),代表持续性的数据流和经过各种 Spark 算子操作后的结果数据流。
可以从以下多个角度深入理解 DStream:
① DStream 本质上就是一系列时间上连续的 RDD:
② 对 DStream 的数据的进行操作也是按照 RDD 为单位来进行的:
③ 容错性,底层 RDD 之间存在依赖关系,DStream 直接也有依赖关系,RDD 具有容错性,那么 DStream 也具有容错性。
④ 准实时性/近实时性
- Spark Streaming 将流式计算分解成多个 Spark Job,对于每一时间段数据的处理都会经过 Spark DAG 图分解以及 Spark 的任务集的调度过程。
- 对于目前版本的 Spark Streaming 而言,其最小的 Batch Size 的选取在 0.5~5 秒钟之间。
所以 Spark Streaming 能够满足流式准实时计算场景,对实时性要求非常高的如高频实时交易场景则不太适合。
总结: 简单来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。对于 DataFrame/DataSet/DStream 来说本质上都可以理解成 RDD。