Spark入门(一篇就够了)(三)

简介: Spark入门(一篇就够了)(三)

4.3 DStream 相关操作

DStream 上的操作与 RDD 的类似,分为以下两种:

  • Transformations(转换)
  • Output Operations(输出)/Action

4.3.1 Transformations

以下是常见 Transformation—都是无状态转换:即每个批次的处理不依赖于之前批次的数据:

Transformation 含义
map(func) 对 DStream 中的各个元素进行 func 函数操作,然后返回一个新的 DStream
flatMap(func) 与 map 方法类似,只不过各个输入项可以被输出为零个或多个输出项
filter(func) 过滤出所有函数 func 返回值为 true 的 DStream 元素并返回一个新的 DStream
union(otherStream) 将源 DStream 和输入参数为 otherDStream 的元素合并,并返回一个新的 DStream
reduceByKey(func, [numTasks]) 利用 func 函数对源 DStream 中的 key 进行聚合操作,然后返回新的(K,V)对构成的 DStream
join(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的 DStream,返回一个新的(K,(V,W)类型的 DStream
transform(func) 通过 RDD-to-RDD 函数作用于 DStream 中的各个 RDD,可以是任意的 RDD 操作,从而返回一个新的 RDD

除此之外还有一类特殊的 Transformations—有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。

有状态转换包括基于追踪状态变化的转换(updateStateByKey)和滑动窗口的转换:

  • UpdateStateByKey(func)
  • Window Operations 窗口操作

4.3.2 Output/Action

Output Operations 可以将 DStream 的数据输出到外部的数据库或文件系统。

当某个 Output Operations 被调用时,spark streaming 程序才会开始真正的计算过程(与 RDD 的 Action 类似)。

Output Operation 含义
print() 打印到控制台
saveAsTextFiles(prefix, [suffix]) 保存流的内容为文本文件,文件名为"prefix-TIME_IN_MS[.suffix]"
saveAsObjectFiles(prefix,[suffix]) 保存流的内容为 SequenceFile,文件名为 “prefix-TIME_IN_MS[.suffix]”
saveAsHadoopFiles(prefix,[suffix]) 保存流的内容为 hadoop 文件,文件名为"prefix-TIME_IN_MS[.suffix]"
foreachRDD(func) 对 Dstream 里面的每个 RDD 执行 func

5. Structured Streaming

Spark Streaming本质上是一种 micro-batch(微批处理)的方式处理,用批的思想去处理流数据,这种设计让Spark Streaming 面对复杂的流式处理场景时捉襟见肘。所以Structured Streaming就出现了。

Structured Streaming 是一个基于 Spark SQL 引擎的可扩展、容错的流处理引擎。统一了流、批的编程模型,你可以使用静态数据批处理一样的方式来编写流式计算操作。并且支持基于 event_time 的时间窗口的处理逻辑。

Structured Streaming 最核心的思想就是将实时到达的数据看作是一个不断追加的 unbound table 无界表,到达流的每个数据项(RDD)就像是表中的一个新行被附加到无边界的表中,这样用户就可以用静态结构化数据的批处理查询方式进行流计算,如可以使用 SQL 对到来的每一行数据进行实时查询处理

5.1 数据抽象

Structured Streaming 是 Spark2.0 新增的可扩展和高容错性的实时计算框架,它构建于 Spark SQL 引擎,把流式计算也统一到 DataFrame/Dataset 里去了。

其实就类似于 Dataset 相比于 RDD 的进步:

5.2 应用场景

Structured Streaming 将数据源映射为类似于关系数据库中的表,然后将经过计算得到的结果映射为另一张表,完全以结构化的方式去操作流式数据,这种编程模型非常有利于处理分析结构化的实时数据;

下面举个例子。

5.2.1 Source源端

读取 Socket 数据:

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.接收数据
    val dataDF: DataFrame = spark.readStream
      .option("host", "node01")
      .option("port", 9999)
      .format("socket")
      .load()
    //3.处理数据
    import spark.implicits._
    val dataDS: Dataset[String] = dataDF.as[String]
    val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
    val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
    //result.show()
    //Queries with streaming sources must be executed with writeStream.start();
    result.writeStream
      .format("console")//往控制台写
      .outputMode("complete")//每次将所有的数据写出
      .trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快
      //.option("checkpointLocation","./ckp")//设置checkpoint目录,socket不支持数据恢复,所以第二次启动会报错,需要注掉
      .start()//开启
      .awaitTermination()//等待停止
  }
}

读取目录下文本数据:

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
  * {"name":"json","age":23,"hobby":"running"}
  * {"name":"charles","age":32,"hobby":"basketball"}
  * {"name":"tom","age":28,"hobby":"football"}
  * {"name":"lili","age":24,"hobby":"running"}
  * {"name":"bob","age":20,"hobby":"swimming"}
  * 统计年龄小于25岁的人群的爱好排行榜
  */
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    val Schema: StructType = new StructType()
      .add("name","string")
      .add("age","integer")
      .add("hobby","string")
    //2.接收数据
    import spark.implicits._
    // Schema must be specified when creating a streaming source DataFrame.
    val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\\data\\spark\\data")
    //3.处理数据
    val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)
    //4.输出结果
    result.writeStream
      .format("console")
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
  }
}

5.2.2 Transform实时计算

获得到 Source 之后的基本数据处理方式和之前讲的 DataFrameDataSet 一致,不再赘述。

官网示例代码:

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                 // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

5.2.3 输出

计算结果可以选择输出到多种设备并进行如下设定:

  • output mode:以哪种方式将 result table 的数据写入 sink,即是全部输出 complete 还是只输出新增数据;
  • format/output sink 的一些细节:数据格式、位置等。如 console;
  • query name:指定查询的标识。类似 tempview 的名字;
  • trigger interval:触发间隔,如果不指定,默认会尽可能快速地处理数据;
  • checkpointLocation:一般是 hdfs 上的目录。注意:Socket 不支持数据恢复,如果设置了,第二次启动会报错,Kafka 支持。
5.2.3.1 output mode

每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

这里有三种输出模型:

  • Append mode:默认模式,新增的行才输出,每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询 select,where,map,flatMap,filter,join 等会支持追加模式。不支持聚合
  • Complete mode:所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。
  • Update mode:更新的行才输出,每次更新结果集时,仅将被更新的结果行输出到接收器(自 Spark 2.1.1 起可用),不支持排序
5.2.3.2 output sink

File sink:输出存储到一个目录中。支持 parquet 文件,以及 append 模式。

writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()

Kafka sink:将输出存储到 Kafka 中的一个或多个 topics 中。

writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()

Foreach sink:对输出中的记录运行任意计算

writeStream
    .foreach(...)
    .start()

Console sink:将输出打印到控制台

writeStream
    .format("console")
    .start()

6. Spark 两种核心 Shuffle

在 MapReduce 框架中,Shuffle 阶段是连接 Map 与 Reduce 之间的桥梁, Map 阶段通过 Shuffle 过程将数据输出到 Reduce 阶段中。由于 Shuffle 涉及磁盘的读写和网络 I/O,因此 Shuffle 性能的高低直接影响整个程序的性能。Spark 也有 Map 阶段和 Reduce 阶段,因此也会出现Shuffle。

Spark Shuffle 分为两种:

  • 一种是基于 Hash 的 Shuffle;
  • 另一种是基于 Sort 的 Shuffle。

6.1 Hash Shuffle 解析

以下的讨论都假设每个 Executor 有 1 个 cpu core。

6.1.1 HashShuffleManager

shuffle write 阶段,主要就是在一个 stage 结束计算之后,为了下一个 stage 可以执行 shuffle 类的算子(比如 reduceByKey),而将每个 task 处理的数据按 key 进行“划分”。所谓“划分”,就是对相同的 key 执行 hash 算法,从而将相同 key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件。比如:

  • 下一个 stage 总共有 100 个 task,那么当前 stage 的每个 task 都要创建 100 份磁盘文件。
  • 如果当前 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,那么每个 Executor 上总共就要创建 500 个磁盘文件,所有 Executor 上会创建 5000 个磁盘文件。

由此可见,未经优化的 shuffle write 操作所产生的磁盘文件的数量是极其惊人的。


shuffle read 阶段,通常就是一个 stage 刚开始时要做的事情。此时该 stage 的每一个 task 就需要将上一个 stage 的计算结果中的所有相同 key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行 key 的聚合或连接等操作。由于 shuffle write 的过程中,map task 给下游 stage 的每个 reduce task 都创建了一个磁盘文件,因此 shuffle read 的过程中,每个 reduce task 只要从上游 stage 的所有 map task 所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read 的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个自己的 buffer 缓冲,每次都只能拉取与 buffer 缓冲相同大小的数据,然后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 buffer 缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。


HashShuffleManager 工作原理如下图所示:

6.1.2 优化的HashShuffleManager

为了优化 HashShuffleManager 我们可以设置一个参数:spark.shuffle.consolidateFiles,该参数默认值为 false,将其设置为 true 即可开启优化机制,通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项

开启 consolidate 机制之后,在 shuffle write 过程中,task 就不是为下游 stage 的每个 task 创建一个磁盘文件了,此时会出现 shuffleFileGroup 的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游 stage 的 task 数量是相同的。一个 Executor 上有多少个 cpu core,就可以并行执行多少个 task。而第一批并行执行的每个 task 都会创建一个 shuffleFileGroup,并将数据写入对应的磁盘文件内。

当 Executor 的 cpu core 执行完一批 task,接着执行下一批 task 时,下一批 task 就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件,也就是说,此时 task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制允许不同的 task 复用同一批磁盘文件,这样就可以有效将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能。

假设第二个 stage 有 100 个 task,第一个 stage 有 50 个 task,总共还是有 10 个 Executor(Executor CPU 个数为 1),每个 Executor 执行 5 个 task。那么原本使用未经优化的 HashShuffleManager 时,每个 Executor 会产生 500 个磁盘文件,所有 Executor 会产生 5000 个磁盘文件的。但是此时经过优化之后,每个 Executor 创建的磁盘文件的数量的计算公式为:cpu core的数量 * 下一个stage的task数量,也就是说,每个 Executor 此时只会创建 100 个磁盘文件,所有 Executor 只会创建 1000 个磁盘文件。

优化后的 HashShuffleManager 工作原理如下图所示:

6.1.3 优缺点

优点:

  • 可以省略不必要的排序开销。
  • 避免了排序所需的内存开销。

缺点:

  • 生产的文件过多,会对文件系统造成压力。
  • 大量小文件的随机读写带来一定的磁盘开销。
  • 数据块写入时所需的缓存空间也会随之增加,对内存造成压力。

6.2 SortShuffle 解析

SortShuffleManager 的运行机制主要分成三种:

  • 普通运行机制;
  • bypass 运行机制:当 shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为 200),就会启用 bypass 机制;
  • Tungsten Sort 运行机制:开启此运行机制需设置配置项 spark.shuffle.manager=tungsten-sort。开启此项配置也不能保证就一定采用此运行机制(后面会解释)。

6.2.1 普通运行机制

在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件 ,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。

普通运行机制的 SortShuffleManager 工作原理如下图所示:

6.2.2 bypass 运行机制

Reducer 端任务数比较少的情况下,基于 Hash Shuffle 实现机制明显比基于 Sort Shuffle 实现机制要快,因此基于 Sort huffle 实现机制提供了一个回退方案,就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带 Hash 风格的回退计划。

bypass 运行机制的触发条件如下:

shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。

不是聚合类的 shuffle 算子。

此时,每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

而该机制与普通 SortShuffleManager 运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销

bypass 运行机制的 SortShuffleManager 工作原理如下图所示:

6.2.3 Tungsten Sort Shuffle 运行机制

基于 Tungsten Sort 的 Shuffle 实现机制主要是借助 Tungsten 项目所做的优化来高效处理 Shuffle。

Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用 SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的:

对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort 方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle 方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制

因此,当设置了 spark.shuffle.manager=tungsten-sort 时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。

要实现 Tungsten Sort Shuffle 机制需要满足以下条件:

Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。

Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。

Shuffle 过程中的输出分区个数少于 16777216 个。

实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。

所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。

6.2.4 优缺点

优点:

  • 小文件的数量大量减少,Mapper 端的内存占用变少;
  • Spark 不仅可以处理小规模的数据,即使处理大规模的数据,也不会很容易达到性能瓶颈。

缺点:

  • 如果 Mapper 中 Task 的数量过大,依旧会产生很多小文件,此时在 Shuffle 传数据的过程中到 Reducer 端, Reducer 会需要同时大量地记录进行反序列化,导致大量内存消耗和 GC 负担巨大,造成系统缓慢,甚至崩溃;
  • 强制了在 Mapper 端必须要排序,即使数据本身并不需要排序;
  • 它要基于记录本身进行排序,这就是 Sort-Based Shuffle 最致命的性能消耗。

7. Spark 底层执行原理

7.1 Spark 运行流程

具体运行流程如下:

  1. SparkContext 向资源管理器注册并向资源管理器申请运行 Executor
  2. 资源管理器分配 Executor,然后资源管理器启动 Executor
  3. Executor 发送心跳至资源管理器
  4. SparkContext 构建 DAG 有向无环图
  5. 将 DAG 分解成 Stage(TaskSet)
  6. 把 Stage 发送给 TaskScheduler
  7. Executor 向 SparkContext 申请 Task
  8. TaskScheduler 将 Task 发送给 Executor 运行
  9. 同时 SparkContext 将应用程序代码发放给 Executor
  10. Task 在 Executor 上运行,运行完毕释放所有资源

7.2 从代码角度看 DAG 图的构建

Val lines1 = sc.textFile(inputPath1).map(...).map(...)
Val lines2 = sc.textFile(inputPath2).map(...)
Val lines3 = sc.textFile(inputPath3)
Val dtinone1 = lines2.union(lines3)
Val dtinone = lines1.join(dtinone1)
dtinone.saveAsTextFile(...)
dtinone.filter(...).foreach(...)

上述代码的 DAG 图如下所示:

Spark 内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是如上图所示的 DAG。

Spark 的计算发生在 RDD 的 Action 操作,而对 Action 之前的所有 Transformation,Spark 只是记录下 RDD 生成的轨迹,而不会触发真正的计算。

7.3 将 DAG 划分为 Stage 核心算法

一个 Application 可以有多个 job 多个 Stage:

Spark Application 中可以因为不同的 Action 触发众多的 job,一个 Application 中可以有很多的 job,每个 job 是由一个或者多个 Stage 构成的,后面的 Stage 依赖于前面的 Stage,也就是说只有前面依赖的 Stage 计算完毕后,后面的 Stage 才会运行。

划分依据:Stage 划分的依据就是宽依赖,像 reduceByKey,groupByKey 等算子,会导致宽依赖的产生

回顾下宽窄依赖的划分原则:

  • 窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖。即一对一或者多对一的关系,可理解为独生子女。常见的窄依赖有:map、filter、union、mapPartitions、mapValues、join(父 RDD 是 hash-partitioned)等。
  • 宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。即一对多的关系,可理解为超生。常见的宽依赖有 groupByKey、partitionBy、reduceByKey、join(父 RDD 不是 hash-partitioned)等。

核心算法:回溯算法

从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分

Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推,首先会为最后一个 RDD 创建一个 Stage,然后继续倒推,如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止。

7.4 将 DAG 划分为 Stage 剖析

一个 Spark 程序可以有多个 DAG(有几个 Action,就有几个 DAG,上图最后只有一个 Action(图中未表现),那么就是一个 DAG)。

一个 DAG 可以有多个 Stage(根据宽依赖/shuffle 进行划分)。

同一个 Stage 可以有多个 Task 并行执行(task 数=分区数,如上图,Stage1 中有三个分区 P1、P2、P3,对应的也有三个 Task)。

可以看到这个 DAG 中只 reduceByKey 操作是一个宽依赖,Spark 内核会以此为边界将其前后划分成不同的 Stage。

同时我们可以注意到,在图中 Stage1 中,从 textFile 到 flatMap 到 map 都是窄依赖,这几步操作可以形成一个流水线操作,通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束,而是继续进行 map 操作,这样大大提高了计算的效率

7.5 提交 Stages

调度阶段的提交,最终会被转换成一个任务集的提交,DAGScheduler 通过 TaskScheduler 接口提交任务集,这个任务集最终会触发 TaskScheduler 构建一个 TaskSetManager 的实例来管理这个任务集的生命周期,对于 DAGScheduler 来说,提交调度阶段的工作到此就完成了。

而 TaskScheduler 的具体实现则会在得到计算资源的时候,进一步通过 TaskSetManager 调度具体的任务到对应的 Executor 节点上进行运算。

7.6 监控 Job、Task、Executor

DAGScheduler 监控 Job 与 Task:

  • 要保证相互依赖的作业调度阶段能够得到顺利的调度执行,DAGScheduler 需要监控当前作业调度阶段乃至任务的完成情况。
  • 这通过对外暴露一系列的回调函数来实现的,对于 TaskScheduler 来说,这些回调函数主要包括任务的开始结束失败、任务集的失败,DAGScheduler 根据这些任务的生命周期信息进一步维护作业和调度阶段的状态信息。

DAGScheduler 监控 Executor 的生命状态:

  • TaskScheduler 通过回调函数通知 DAGScheduler 具体的 Executor 的生命状态,如果某一个 Executor 崩溃了,则对应的调度阶段任务集的 ShuffleMapTask 的输出结果也将标志为不可用,这将导致对应任务集状态的变更,进而重新执行相关计算任务,以获取丢失的相关数据。

7.7 获取任务执行结果

结果 DAGScheduler:

  • 一个具体的任务在 Executor 中执行完毕后,其结果需要以某种形式返回给 DAGScheduler,根据任务类型的不同,任务结果的返回方式也不同。

两种结果,中间结果与最终结果:

  • 对于 FinalStage 所对应的任务,返回给 DAGScheduler 的是运算结果本身。
  • 而对于中间调度阶段对应的任务 ShuffleMapTask,返回给 DAGScheduler 的是一个 MapStatus 里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个调度阶段的任务获取输入数据的依据。

两种类型,DirectTaskResult 与 IndirectTaskResult:

根据任务结果大小的不同,ResultTask 返回的结果又分为两类:

  • 如果结果足够小,则直接放在 DirectTaskResult 对象内中。
  • 如果超过特定尺寸则在 Executor 端会将 DirectTaskResult 先序列化,再把序列化的结果作为一个数据块存放在 BlockManager 中,然后将 BlockManager 返回的 BlockID 放在 IndirectTaskResult 对象中返回给 TaskScheduler,TaskScheduler 进而调用 TaskResultGetter 将 IndirectTaskResult 中的 BlockID 取出并通过 BlockManager 最终取得对应的 DirectTaskResult。

7.8 任务调度总体诠释

7.8.1 Executor 进程专属

每个 Application 获取专属的 Executor 进程,该进程在 Application 期间一直驻留,并以多线程方式运行 Tasks。

Spark Application 不能跨应用程序共享数据,除非将数据写入到外部存储系统。如图所示:

7.8.2 支持多种资源管理器

Spark 与资源管理器无关,只要能够获取 Executor 进程,并能保持相互通信就可以了。

Spark 支持资源管理器包含:Standalone、On Mesos、On YARN、Or On EC2。如图所示:

7.8.3 Job 提交就近原则

提交 SparkContext 的 Client 应该靠近 Worker 节点(运行 Executor 的节点),最好是在同一个 Rack(机架)里,因为 Spark Application 运行过程中 SparkContext 和 Executor 之间有大量的信息交换;

如果想在远程集群中运行,最好使用 RPC 将 SparkContext 提交给集群,不要远离 Worker 运行 SparkContext。

如图所示:

7.8.4 移动程序而非移动数据的原则执行

移动程序而非移动数据的原则执行,Task 采用了数据本地性和推测执行的优化机制。

关键方法:taskIdToLocations、getPreferedLocations。

如图所示:

目录
相关文章
|
6月前
|
SQL 分布式计算 Java
Spark入门指南:从基础概念到实践应用全解析
在这个数据驱动的时代,信息的处理和分析变得越来越重要。而在众多的大数据处理框架中, Apache Spark 以其独特的优势脱颖而出。
67 0
|
7月前
|
存储 分布式计算 网络协议
大数据Spark Streaming入门
大数据Spark Streaming入门
80 1
|
3天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
|
5月前
|
存储 缓存 分布式计算
Spark入门(一篇就够了)(一)
Spark入门(一篇就够了)(一)
132 0
|
5月前
|
分布式计算 Hadoop 大数据
178 Spark入门
178 Spark入门
30 0
|
2月前
|
分布式计算 资源调度 监控
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
96 1
|
2月前
|
数据采集 分布式计算 Linux
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
111 0
|
8月前
|
分布式计算 资源调度 Apache
【大数据】Apache Spark入门到实战 1
【大数据】Apache Spark入门到实战
79 0
|
5月前
|
SQL JSON 分布式计算
Spark入门(一篇就够了)(二)
Spark入门(一篇就够了)(二)
68 0
|
6月前
|
SQL 分布式计算 Java
Spark入门指南:从基础概念到实践应用全解析
Spark入门指南:从基础概念到实践应用全解析