之前说到了之后工作中会接触到Spark离线任务相关的内容,也预先学习了Scala,所以这篇文章它来了。本篇文章会介绍Spark的相关概念以及原理,帮助初学者快速入门Spark。
Spark是什么
学习一个东西之前总要知道这个东西是什么。
Spark 是一个开源的大数据处理引擎,它提供了一整套开发 API,包括流计算和机器学习。它支持批处理和流处理。
Spark 的一个显著特点是它能够在内存中进行迭代计算,从而加快数据处理速度。尽管 Spark 是用 Scala 开发的,但它也为 Java、Scala、Python 和 R 等高级编程语言提供了开发接口。
Spark组件
Spark提供了6大组件:
- Spark Core
- Spark SQL
- Spark Streaming
- Spark MLlib
- Spark GraphX
- Spark Core
Spark Core 是 Spark 的基础,它提供了内存计算的能力,是分布式处理大数据集的基础。它将分布式数据抽象为弹性分布式数据集(RDD),并为运行在其上的上层组件提供 API。所有 Spark 的上层组件都建立在 Spark Core 的基础之上。
- Spark SQL
Spark SQL 是一个用于处理结构化数据的 Spark 组件。它允许使用 SQL 语句查询数据。Spark 支持多种数据源,包括 Hive 表、Parquet 和 JSON 等。
- Spark Streaming
Spark Streaming 是一个用于处理动态数据流的 Spark 组件。它能够开发出强大的交互和数据查询程序。在处理动态数据流时,流数据会被分割成微小的批处理,这些微小批处理将会在 Spark Core 上按时间顺序快速执行。
- Spark MLlib
Spark MLlib 是 Spark 的机器学习库。它提供了常用的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维等。MLlib 还提供了一些底层优化原语和高层流水线 API,可以帮助开发人员更快地创建和调试机器学习流水线。
- Spark GraphX
Spark GraphX 是 Spark 的图形计算库。它提供了一种分布式图形处理框架,可以帮助开发人员更快地构建和分析大型图形。
Spark的优势
Spark 有许多优势,其中一些主要优势包括:
- 速度:Spark 基于内存计算,能够比基于磁盘的计算快很多。对于迭代式算法和交互式数据挖掘任务,这种速度优势尤为明显。
- 易用性:Spark 支持多种语言,包括 Java、Scala、Python 和 R。它提供了丰富的内置 API,可以帮助开发人员更快地构建和运行应用程序。
- 通用性:Spark 提供了多种组件,可以支持不同类型的计算任务,包括批处理、交互式查询、流处理、机器学习和图形处理等。
- 兼容性:Spark 可以与多种数据源集成,包括 Hadoop 分布式文件系统(HDFS)、Apache Cassandra、Apache HBase 和 Amazon S3 等。
- 容错性:Spark 提供了弹性分布式数据集(RDD)抽象,可以帮助开发人员更快地构建容错应用程序。
Word Count
下面是一个简单的Word Count的Spark程序:
import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount { def main (args:Array [String]): Unit = { //setMaster("local[9]") 表示在本地运行 Spark 程序,使用 9 个线程。local[*] 表示使用所有可用的处理器核心。 //这种模式通常用于本地测试和开发。 val conf = new SparkConf ().setAppName ("Word Count").setMaster("local[9]"); val sc = new SparkContext (conf); sc.setLogLevel("ERROR") val data = List("Hello World", "Hello Spark") val textFile = sc.parallelize(data) val wordCounts = textFile.flatMap (line => line.split (" ")).map ( word => (word, 1)).reduceByKey ( (a, b) => a + b) wordCounts.collect().foreach(println) } }
程序首先创建了一个 SparkConf 对象,用来设置应用程序名称和运行模式。然后,它创建了一个 SparkContext 对象,用来连接到 Spark 集群。
接下来,程序创建了一个包含两个字符串的列表,并使用 parallelize 方法将其转换为一个 RDD。然后,它使用 flatMap 方法将每一行文本拆分成单词,并使用 map 方法将每个单词映射为一个键值对(key-value pair),其中键是单词,值是 1。
最后,程序使用 reduceByKey 方法将具有相同键的键值对进行合并,并对它们的值进行求和。最终结果是一个包含每个单词及其出现次数的 RDD。程序使用 collect 方法将结果收集到驱动程序,并使用 foreach 方法打印出来。
Spark基本概念
Spark的理论较多,所以先了解一下基本概念,有助于后面展开学习Spark。
Application
用户编写的Spark应用程序。
如下,"Word Count"就是该应用程序的名字。
import org.apache.spark.sql.SparkSession object WordCount { def main(args: Array[String]) { // 创建 SparkSession 对象,它是 Spark Application 的入口 val spark = SparkSession.builder.appName("Word Count").getOrCreate() // 读取文本文件并创建 Dataset val textFile = spark.read.textFile("hdfs://...") // 使用 flatMap 转换将文本分割为单词,并使用 reduceByKey 转换计算每个单词的数量 val counts = textFile.flatMap(line => line.split(" ")) .groupByKey(identity) .count() // 将结果保存到文本文件中 counts.write.text("hdfs://...") // 停止 SparkSession spark.stop() } }
Driver
Driver 是运行 Spark Application 的进程,它负责创建 SparkSession 和 SparkContext 对象,并将代码转换为转换和操作操作。它还负责创建逻辑和物理计划,并与集群管理器协调调度任务。
简而言之,Spark Application 是使用 Spark API 编写的程序,而 Spark Driver 是负责运行该程序并与集群管理器协调的进程。
可以将Driver 理解为运行 Spark Application main
方法的进程。
driver的内存大小可以进行设置:
# 设置 driver内存大小 driver-memory 1024m
Master和Worker
在Spark中,Master是独立集群的控制者,而Worker是工作者。一个Spark独立集群需要启动一个Master和多个Worker。Worker就是物理节点,可以在上面启动Executor进程。
Executor
在每个Worker上为某应用启动的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个任务都有各自独立的Executor。Executor是一个执行Task的容器。实际上它是一组计算资源(cpu核心、memory)的集合。
一个Worker节点可以有多个Executor。一个Executor可以运行多个Task。
executor创建成功后,在日志文件会显示如下信息:INFO Executor: Starting executor ID [executorId] on host [executorHostname]
Job
一个Job包含多个RDD及作用于相应RDD上的各种操作,每个Action的触发就会生成一个job。用户提交的Job会提交给DAGScheduler,Job会被分解成Stage,Stage会被细化成Task。
Task
被发送到executor上的工作单元。每个Task负责计算一个分区的数据。
Stage
在 Spark 中,一个作业(job)会被划分为多个阶段(stage)。同一个 Stage 可以有多个 Task 并行执行(task 数=分区数)。
阶段之间的划分是根据数据的依赖关系来确定的。当一个 RDD 的分区依赖于另一个 RDD 的分区时,这两个 RDD 就属于同一个阶段。当一个 RDD 的分区依赖于多个 RDD 的分区时,这些 RDD 就属于不同的阶段。
上图中,stage表示一个可以顺滑完成的阶段,就是可以单机运行。曲线表示Shuffle。
如果stage能够复用前面的stage的话,那么会显示灰色。
Stage的划分
Stage的划分,简单的说是以宽依赖来划分:
对于窄依赖,partition 的转换处理在 stage 中完成计算,不划分(将窄依赖尽量放在在同一个 stage 中,可以实现流水线计算)。对于宽依赖,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,也就是说需要要划分 stage。
Spark 会根据 shuffle/宽依赖使用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的 stage/阶段中。
Spark会根据RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
至于什么是窄依赖和宽依赖,下面马上就会提及。
窄依赖 & 宽依赖
- 窄依赖
父 RDD 的一个分区只会被子 RDD 的一个分区依赖。比如:map/filter和union,这种依赖称之为窄依赖。
窄依赖的多个分区可以并行计算,并且窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。
- 宽依赖
指子RDD的分区依赖于父RDD的所有分区,这是因为shuffle类操作,称之为宽依赖。
对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。
Shuffle
在 Spark 中,shuffle 是指在不同阶段之间重新分配数据的过程。它通常发生在需要对数据进行聚合或分组操作的时候,例如 reduceByKey 或 groupByKey 等操作。
在 shuffle 过程中,Spark 会将数据按照键值进行分区,并将属于同一分区的数据发送到同一个计算节点上。这样,每个计算节点就可以独立地处理属于它自己分区的数据。
RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD的Partition是指数据集的分区。它是数据集中元素的集合,这些元素被分区到集群的节点上,可以并行操作。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
一个函数会被作用在每一个分区。Spark 中 RDD 的计算是以分片为单位的,compute 函数会被作用到每个分区上。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
DAG
有向无环图,其实说白了就是RDD之间的依赖关系图。
- 开始:通过 SparkContext 创建的 RDD;
- 结束:触发 Action,一旦触发 Action 就形成了一个完整的 DAG(有几个 Action,就有几个 DAG)。
Spark执行流程
Spark的执行流程大致如下:
- 构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源。
- 资源管理器为Executor分配资源并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上。
- SparkContext构建DAG图,将DAG图分解成多个Stage,并把每个Stage的TaskSet(任务集)发送给Task Scheduler (任务调度器)。
- Executor向SparkContext申请Task, Task Scheduler将Task发放给Executor,同时,SparkContext将应用程序代码发放给Executor。
- Task在Executor上运行,把执行结果反馈给Task Scheduler,然后再反馈给DAG Scheduler。
- 当一个阶段完成后,Spark 会根据数据依赖关系将结果传输给下一个阶段,并开始执行下一个阶段的任务。
- 最后,当所有阶段都完成后,Spark 会将最终结果返回给驱动程序,并完成作业的执行。
Spark运行模式
Spark 支持多种运行模式,包括本地模式、独立模式、Mesos 模式、YARN 模式和 Kubernetes 模式。
- 本地模式:在本地模式下,Spark 应用程序会在单个机器上运行,不需要连接到集群。这种模式适用于开发和测试,但不适用于生产环境。
- 独立模式:在独立模式下,Spark 应用程序会连接到一个独立的 Spark 集群,并在集群中运行。这种模式适用于小型集群,但不支持动态资源分配。
- Mesos 模式:在 Mesos 模式下,Spark 应用程序会连接到一个 Apache Mesos 集群,并在集群中运行。这种模式支持动态资源分配和细粒度资源共享,目前国内使用较少。
- YARN 模式:在 YARN 模式下,Spark 应用程序会连接到一个 Apache Hadoop YARN 集群,并在集群中运行。这种模式支持动态资源分配和与其他 Hadoop 生态系统组件的集成,Spark在Yarn模式下是不需要Master和Worker的。
- Kubernetes 模式:在 Kubernetes 模式下,Spark 应用程序会连接到一个 Kubernetes 集群,并在集群中运行。这种模式支持动态资源分配和容器化部署。