注:本章节将重点阐述基于3.5.8版本的Spark Core,并采用Python语言进行代码实现。尽管在企业级应用中,Spark SQL得到了更为广泛的应用,老言在后续章节将深入探讨Spark SQL的相关内容,但鉴于Spark Core作为Apache Spark的基础组件,掌握其核心概念和技术细节对于全面理解整个生态系统至关重要。因此,我们有必要对Spark Core有一个深刻的认识。
Spark简介
在大数据技术发展迅速的背景下,Apache Spark以卓越性能和灵活编程模型,成为分布式计算核心框架之一。自2009年由加州大学伯克利分校AMPLab提出后,Spark超越传统MapReduce模式,2025年仍引领大数据处理技术趋势。2025年Spark社区报告显示,超80%财富500强企业将其作为大数据平台核心组件,日处理数据超100 EB。
Apache Spark的演进与核心价值
Apache Spark旨在克服MapReduce在迭代计算和交互式查询中的性能局限。与MapReduce将中间结果存于磁盘不同,Spark采用内存计算,显著提升数据处理效率,速度比传统方法快数十甚至上百倍,适用于需频繁访问同一数据集的场景,如机器学习、图算法分析和实时流处理,截至今日,Apache Spark已经发布到了4.1版本了。
版本 |
时间 |
核心特性 |
2.0 |
2016.10 |
Dataset/DataFrame 统一 API、Structured Streaming |
2.1 |
2017.01 |
窗口函数增强、ML Pipeline 改进 |
2.2 |
2017.07 |
Structured Streaming 正式版的、Python UDF |
2.3 |
2018.02 |
连续流处理、Kubernetes 原生支持 |
2.4 |
2018.11 |
Barrier 调度、高可用 Shuffle |
3.0 |
2020.06 |
AQE 自适应查询、GPU 调度、性能提升 2 倍 |
3.1 |
2021.03 |
Pandas API on Spark、增量物化视图 |
3.2 |
2021.11 |
Pandas API 正式版、Python 3.9 支持 |
3.3 |
2022.05 |
Spark Connect、Python 3.10、Iceberg 集成 |
3.4 |
2023.03 |
Spark Connect 正式版、Python 3.11 |
3.5 |
2023.10 |
Spark Connect 多语言支持、分布式训练增强 |
4.0 |
2025.05 |
PySpark 原生画图、多态 UDTF、ANSI SQL 模式、Python API 全面改进 |
4.1 |
2025.12 |
支持 ANSI SQL,强化 Python 与 Iceberg |
RDD的五大特性
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD 数据结构内部有五个特性(摘录RDD 源码):
- A list of partitions(分区列表)
对于RDD来说,每个分区都会被一个计算任务处理,分区数决定并行度;用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值;
- A function for computing each split(计算会作用于每个分区)
Spark中RDD的计算是以分区为单位的,compute函数会被作用到每个分区上;
- A list of dependencies on other RDDs(容错性)
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算(Spark的容错机制);
- Optionally, a Partitioner for key-value RDDs(key- value数据类型的RDD分区器)
当前 Spark 中实现了两种类型的分区函数,一个是基于 Hash 的 HashPartitioner ,另外一个是基于范围的RangePartitioner。
只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。
Partitioner函数不但决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。
- Optionally, a list of preferred locations to compute each split on(每个分区都有一个优先位置列表,即首选位置)
对于一个HDFS文件来说,列表保存的就是每个Partition所在的块的位置。
基于"移动数据不如移动计算"的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算。(数据本地性),但对于现代存算分离架构来说这一个特性已经不适用了。
RDD的五大特点
我们已经了解了RDD的五大特性了,但那是RDD内部的特性,那以我们作为用户的视角来看,RDD又具备5大特点
- 分布式
RDD的数据分布在多个节点上,使得各个节点可以并行处理数据。这种分布式存储方式是Spark高效处理大规模数据集的关键。用户无需关心数据如何分布,Spark框架会自动管理数据的分布和处理过程。
- 容错性
RDD通过记录数据转换操作的血缘(lineage)信息来提供弹性和容错功能。当部分节点数据丢失时,RDD能够根据血缘信息重新计算丢失的数据,确保数据处理的可靠性。
- 不可变性
一旦创建,RDD中的数据不可更改。每次对RDD的转换操作都会生成一个新的RDD,这种不可变性有助于简化并行计算中的数据管理。
- 并行处理
RDD支持在多个计算节点上并行执行作业。通过将数据分为多个分区,Spark能够在不同节点上同时处理数据,从而显著提高处理效率。
- 支持多种计算操作
RDD提供了丰富的API,支持多种计算操作,如map、filter、reduce、groupBy等,用户可以根据需求灵活选择合适的操作来处理数据。
RDD 和DataFrame/Dataset、SQL的关系
简单来说,RDD 是底层基石,DataFrame/Dataset 是上层的高级抽象,而 Spark SQL 是操作这些高级抽象的接口。
我们可以把它们的关系比喻成盖房子:
- RDD 就像是一堆散乱的砖头(底层原材料),你可以随意摆放,但需要自己操心每一块砖怎么搬运、怎么砌。
- DataFrame/Dataset 就像是预制好的墙体模块(高级抽象),它们内部已经优化了结构,你只需要把它们拼在一起,效率更高。
- Spark SQL 就像是施工图纸和指令(接口),你用图纸告诉 Spark 想要什么样的房子,Spark 会自动选择最优的模块去搭建。
虽然 DataFrame/Dataset 是高级抽象,但它们底层依然是基于 RDD 的,Spark SQL又是基于DataFrame/Dataset的。
PySpark编程指南
概述
从宏观角度来看,每个 Spark 应用程序都包含一个驱动程序(Driver),它负责运行用户的 main 函数,并在集群上执行各种并行操作。
Spark 提供的核心抽象是弹性分布式数据集(RDD),这是一种分布在集群节点上的元素集合,支持并行操作。RDD 的创建通常始于 Hadoop 文件系统(或其他 Hadoop 支持的文件系统)中的文件,或是驱动程序中的现有 Scala、Python集合,并对其进行转换生成。用户还可以要求 Spark 将 RDD 持久化在内存中,以便在多次并行操作中高效复用。
准备执行环境
通过UV创建python3.11环境
- windows一键安装uv
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
- linux/macos一键安装uv
curl -LsSf https://astral.sh/uv/install.sh | sh
- Pycharm或者Idea(需安装Python插件)新建一个uv项目
- 添加pyspark依赖项并点击刷新
WordCount
创建一个test01.ipynb文件,pycharm会自动部署一个jupyter环境(这个步骤是可选的,也可以直接创建test01.py文件),只是老言个人比较喜欢用jupyter作为实验环境。
准备一个存放测试数据的文件test01.txt
hello world hello java hello scala hello python hello spark hello pyspark how are you i am fine thank you and you
设置环境变量,以防止后续数据处理时报错。注意,老言这里因为是实验环境,因此,在代码中简单指定了一下环境变量,如果想修改系统环境变量的,请自行咨询Google或者百度。
import os # 一般位于当前uv目录的.venv/Scripts目录下 os.environ["PYSPARK_PYTHON"] = "D:/module/bigdata/Code/tutorials-pyspark/.venv/Scripts/python.exe" # 如果是本地模式,通常 PYSPARK_DRIVER_PYTHON 也需要设置 os.environ["PYSPARK_DRIVER_PYTHON"] = "D:/module/bigdata/Code/tutorials-pyspark/.venv/Scripts/python.exe"
初始化 Spark
from pyspark import SparkContext, SparkConf """ appName 参数是您的应用程序在集群 UI 上显示的名称。 master 是 Spark 或 YARN 集群的 URL,或者是一个特殊的“local”字符串以在本地模式下运行。 实际上,当在集群上运行时,一般不会硬编码master, spark-submit 启动应用程序并在指明master才是最佳实践。然而,对于本地测试和单元测试,您可以传递“local”以在本地运行 Spark。 """ conf = SparkConf().setAppName("test01").setMaster("local[*]") sc = SparkContext(conf=conf)
通过Python列表创建 RDD
""" 通过在 Driver 程序中对现有可迭代对象或列表调用 SparkContext 的 parallelize 方法来创建 RDD。列表中的元素被复制以形成一个可以并行操作的 RDD。 例如,以下是如何创建一个包含数字 1 到 5 的 RDD: """ data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
打印 RDD:
print(distData.collect())
预期输出结果
也可以从存储介质中创建RDD
""" PySpark 可以从 Hadoop 支持的任何存储源创建 RDD,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。 Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat。 """ # 注意路径不要用老言的,要换成自己的 distFile = sc.textFile("D:/module/bigdata/Code/dataspire/tutorials-pyspark/data/input/test01.txt")
打印 RDD
print(distFile.collect())
预期输出结果:
RDD 不只是可以打印出来,我们也可以对RDD做后续的数据处理,老言这里会以 WordCount(单词计数) 为例为大家表演一下Spark中 RDD 的过程
print(distFile .flatMap(lambda line: line.split(" ")) # 将 RDD 中的每一行按空格切分成单词 .map(lambda word: (word, 1)) # 将每个单词映射成一个元组,元组的第一个元素是单词,第二个元素是 1 .reduceByKey(lambda a, b: a + b) # 将相同单词的元组按照单词进行分组,并计算每个单词出现的次数 .collect() )
到此为止WordCount的数据就被正常输出了
将函数传递给Spark
PySpark 的 API 在很大程度上依赖于将驱动程序中的函数传递到集群上运行。有三种推荐的方式可以做到这一点:
- Lambda 表达式,用于可以写成表达式的简单函数。(Lambda 不支持多语句函数或不返回值的语句)
distFile.flatMap(lambda line: line.split(" ")
- 本地定义的 Python 函数
def split(line): return line.split(" ") distFile.flatMap(split)
- 模块级函数(外部模块)
# my_functions.py def split(line): return line.split(" ") # 主程序 from my_functions import split distFile.flatMap(split)
闭包
闭包是指 Spark 算子中使用的函数及其引用的外部变量共同构成的整体。当函数在 Executor 节点上执行时,它会携带这些变量的副本一起运行,这里很多同学会很困惑,为什么会有闭包这个概念呢,是因为Spark 的计算逻辑在驱动程序中定义,但实际执行在Executor 节点上。当算子中的函数需要访问外部变量时,Spark 必须将这些变量连同函数一起发送到Executor 节点上——这就是闭包。
概念太空泛,老言也不多巴拉巴拉,直接上案例
# 示例 multiplier = 2 rdd.map(lambda x: x * multiplier) # multiplier 被"捕获"进闭包
在这个例子中,lambda 函数和它引用的 multiplier 变量一起构成了闭包
同学们只需要记住,闭包就是函数 + 变量
闭包的工作机制
- 序列化:驱动程序将闭包(函数 + 引用的变量)序列化
- 传输:通过网络发送到每个 Executor 节点
- 反序列化:Executor 反序列化闭包并在任务中执行
RDD算子
RDD 支持两种类型的算子:转换算子(从现有 RDD 创建新 RDD)和行动算子(在 RDD 上运行计算后向驱动程序返回一个值)。例如,map 是一种转换算子,它将每个 RDD 中的元素通过一个函数(例如lambda word: (word, 1))计算后返回一个新 RDD。reduce是一种行动算子,它使用某个函数(例如lambda a, b: a + b)根据 Key 聚合 RDD 的所有元素,并将最终结果返回给 Driver。
Spark 中的所有转换算子都是惰性的,它们不会立即计算结果。相反,它们只是记录应用于某个 RDD 的操作。只有出现一个行动算子需要将结果返回给驱动程序时,才会触发计算。这种设计使得 Spark 运行更高效。
默认情况下,每次您对转换后的 RDD 运行行动算子后,再次使用它会重新计算。但是,您可以使用 persist(或 cache)方法将 RDD 持久化到内存中,这样 Spark 会将元素保留在集群上,以便下次使用时更快地访问。
下表列出了 Spark 支持的一些常见转换算子。
转换算子 |
含义 |
map(func) |
通过将 RDD 的每个元素传递给函数 func 来形成新的RDD。 |
filter(func) |
返回一个新的RDD,该 RDD 由 旧RDD 中 func 返回 true 的元素组成。 |
flatMap(func) |
在map算子的基础上,会多做一个扁平化处理。 |
mapPartitions(func) |
类似于 map,但单独运行在 RDD 的每个分区上,因此当在类型为 T 的 RDD 上运行时,func 的类型必须是 Iterator<T> => Iterator<U>。 |
mapPartitionsWithIndex(func) |
类似于 mapPartitions,但还向 func 提供一个表示分区索引的整数值,因此当在类型为 T 的 RDD 上运行时,func 的类型必须是 (Int, Iterator<T>) => Iterator<U>。 |
sample(withReplacement, fraction, seed) |
使用给定的随机数生成器种子,有放回或无放回地采样数据的一个 fraction。 |
union(otherDataset) |
返回一个 新RDD,其中包含 旧RDD 和参数中元素的并集。 |
intersection(otherDataset) |
返回一个 新RDD,其中包含 旧RDD 和参数中元素的交集。 |
distinct([numPartitions])) |
返回一个包含 旧RDD 不同元素的 新RDD。 |
groupByKey([numPartitions]) |
当在 (K, V) 的 RDD 上调用时,返回一个 (K, Iterable<V>) 对的 RDD 。 注意:如果您是为了对每个 Key 执行聚合(例如求和或平均值)而进行分组,使用 |
reduceByKey(func, [numPartitions]) |
当在 (K, V) 类型的 RDD 上调用时,返回一个 (K, V) 类型的 RDD,其中每个键的值使用给定的 reduce 函数 func 进行聚合,该函数必须是 (V,V) => V 类型。与 |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) |
当在 (K, V) 类型的 RDD 上调用时,返回一个 (K, U) 的 RDD ,其中每个key 的值使用给定的组合函数和中性“零”值进行聚合。允许聚合值类型与输入值类型不同,同时避免不必要的分配。与 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。 |
sortByKey([ascending], [numPartitions]) |
当在 (K, V) 类型的 RDD 上调用时(其中 K 实现了 Ordered 接口),返回一个 (K, V) 对的数据集,按 key 以升序或降序排列,具体取决于布尔型 |
join(otherDataset, [numPartitions]) |
当在类型为 (K, V) 和 (K, W) 的 RDD 上调用时,返回一个 (K, (V, W)) 类型的 RDD ,其中包含每个 key 的所有元素对。外部连接通过 |
cogroup(otherDataset, [numPartitions]) |
当在类型为 (K, V) 和 (K, W) 的 RDD 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) 元组的RDD 。此操作也称为 |
cartesian(otherDataset) |
当在类型为 T 和 U 的 RDD 上调用时,返回一个 (T, U) 的 RDD (所有元素对)。 |
pipe(command, [envVars]) |
将 RDD 的每个分区通过一个 Shell 命令(例如 Perl 或 bash 脚本)进行管道操作。RDD 元素被写入进程的标准输入,其标准输出的行作为字符串 RDD 返回。 |
coalesce(numPartitions) |
将 RDD 中的分区数量减少到 numPartitions。在对大型 RDD 进行过滤后,这对于更高效地运行操作很有用。 |
repartition(numPartitions) |
随机地重新混洗 RDD 中的数据,以创建更多或更少的分区并使其在这些分区中保持平衡。这总是通过网络混洗所有数据。 |
repartitionAndSortWithinPartitions(partitioner) |
根据给定的分区器重新分区 RDD,并在每个结果分区内按键对记录进行排序。这比调用 |
下表列出了 Spark 支持的一些行动算子
行动 |
含义 |
reduce(func) |
使用函数 func(接受两个参数并返回一个)聚合 RDD 的元素。该函数应该是可交换和结合的,以便可以并行正确计算。 |
collect() |
将 RDD 的所有元素作为数组返回给驱动程序。这通常在过滤器或其他操作返回足够小的 RDD 后很有用。 |
count() |
返回 RDD 中的元素数量。 |
first() |
返回 RDD 的第一个元素(类似于 take(1))。 |
take(n) |
返回一个包含 RDD 前 n 个元素的数组。 |
takeSample(withReplacement, num, [seed]) |
返回一个包含 RDD num 个元素的随机样本数组,有放回或无放回,可选地预先指定随机数生成器种子。 |
takeOrdered(n, [ordering]) |
使用元素的自然顺序或自定义比较器,返回 RDD 的前 n 个元素。 |
saveAsTextFile(path) |
将 RDD 的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录。Spark 将对每个元素调用 toString 方法,将其转换为文件中的一行文本。 |
saveAsSequenceFile(path) |
将 RDD 的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径。此操作适用于实现了 Hadoop Writable 接口的键值对 RDD。在 Scala 中,它也适用于可以隐式转换为 Writable 的类型(Spark 包含对 Int、Double、String 等基本类型的转换)。 |
saveAsObjectFile(path) |
使用 Java 序列化以简单格式写入数据集元素,然后可以使用 |
countByKey() |
仅适用于类型为 (K, V) 的 RDD。返回一个 (K, Int) 对的哈希映射,其中包含每个键的计数。 |
foreach(func) |
对 RDD 的每个元素运行函数 func。这通常用于产生副作用,例如更新累加器或与外部存储系统交互。 |
Shuffle 操作
Spark 中的某些算子会触发一个称为shuffle的事件。shuffle是 Spark 重新分发数据的机制,以便数据在分区之间以不同方式分组。这通常涉及在Executor之间复制数据,使得shuffle成为一个复杂且代价高昂的操作。对Spark的shuffle原理感兴趣的同学可以参考老言的《Spark Shuffle原理及实现机制》章节。
RDD依赖关系
RDD 之间的依赖关系描述了一个 RDD 如何由其他 RDD 计算而来,它是 Spark 实现容错和任务调度的基础。
依赖类型 |
窄依赖(Narrow Dependency) |
宽依赖(Wide/Shuffle Dependency) |
定义 |
父 RDD 的每个分区最多被子 RDD 的一个分区使用 |
父 RDD 的每个分区可能被多个子分区使用 |
数据流动 |
同一节点内完成,无需网络传输 |
需要跨节点传输,触发 Shuffle |
典型算子 |
|
|
容错成本 |
低,只需重新计算丢失的父分区 |
高,需重新计算所有相关父分区 |
阶段划分 |
同一 Stage 内 |
划分新 Stage 的边界 |
依赖关系的意义
- 容错恢复:窄依赖只需重算丢失分区,宽依赖需重算更多数据
- 阶段划分:Spark 根据宽依赖将作业划分为多个 Stage
- 执行优化:窄依赖可流水线执行,宽依赖需等待 Shuffle 完成
- 血缘追踪:依赖关系构成 RDD 的血缘图(Lineage),用于故障恢复
DAG 的生成和划分 Stage
当行动算子(Action)被调用时,Spark 触发 DAG 构建。系统从该 RDD 开始逆向追溯所有父 RDD 及转换算子,记录完整的依赖关系形成计算血缘图。DAG 中的节点代表 RDD 或算子,边代表依赖关系,描述了从数据源到最终结果的完整计算流程。
为什么要划分Stage
在处理具有复杂业务逻辑的任务时,如果涉及到shuffle操作,则表明当前阶段的计算结果是下一阶段执行的前提条件,即下一阶段的数据处理依赖于前一阶段产生的数据。基于这一特性,我们可以依据shuffle(或者说宽依赖)来进行任务划分,从而将整个有向无环图(DAG)分解为若干个独立的Stage或阶段。在每个Stage内部,可以包含一系列连续的算子操作,这些操作能够被组织成一条pipeline流水线。值得注意的是,在这条流水线中,对于同一数据集的不同分区,其上的处理任务是可以并行执行的,这有助于提高整体计算效率。
Stage 划分
DAGScheduler 根据宽依赖(Shuffle Dependency)将 DAG 划分为多个 Stage:
依赖类型 |
特点 |
Stage 处理 |
窄依赖 |
父 RDD 每个分区最多被子 RDD 一个分区使用 |
同一 Stage 内,可流水线执行 |
宽依赖 |
父 RDD 分区可能被多个子分区使用,需 Shuffle |
Stage 划分边界 |
划分规则是遇到宽依赖就切断并开始新 Stage,从行动算子逆向回溯到源头 RDD,Stage 数量等于宽依赖数量加1。
执行流程
- 行动算子触发 DAG 构建
- DAGScheduler 根据宽依赖划分 Stage
- 每个 Stage 转化为 TaskSet 提交给 TaskScheduler
- TaskScheduler 将任务分发到 Executor 执行
RDD 持久化
RDD 持久化是 Spark 最重要的功能之一,它将计算后的 RDD 分区存储在节点内存中,供后续操作复用。这一机制可使后续计算速度提升 10 倍以上,是迭代算法和快速交互查询的关键工具。
目前RDD的实现机制分为两种
- cache:使用默认存储级别
MEMORY_ONLY(内存中存储反序列化对象)的简写 - persist:可自定义存储级别,通过传入
StorageLevel对象设置
示例:
# 示例 distFile.cache() # 等价于 distFile.persist(StorageLevel.MEMORY_ONLY) distFile.persist(StorageLevel.MEMORY_AND_DISK) # 内存+磁盘
通过查看 RDD 的源码发现 cache 最终也是调用了 persist 无参方法(默认存储只存在内存中):
每个持久化 RDD 可选择不同的存储策略:
存储级别 |
含义 |
MEMORY_ONLY |
在 JVM 中将 RDD 存储为反序列化的 Java 对象。如果内存不足 ,某些分区将不会被缓存,并且每次需要时都会即时重新计算。这是默认级别。 |
MEMORY_AND_DISK |
在 JVM 中将 RDD 存储为反序列化的 Java 对象。如果内存不足,则将不足的分区存储在磁盘上,并在需要时从磁盘读取。 |
MEMORY_ONLY_SER |
将 RDD 存储为序列化的 Java 对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,尤其是在使用快速序列化器时,但读取时更耗 CPU。 |
MEMORY_AND_DISK_SER |
类似于 MEMORY_ONLY_SER,但将内存溢出的分区溢写到磁盘,而不是每次需要时即时重新计算它们。 |
DISK_ONLY |
仅将 RDD 分区存储在磁盘上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. |
与上述级别相同,但在两个集群节点上复制每个分区,这样即使节点挂掉也有一定的容错性。 |
OFF_HEAP (实验性) |
类似于 MEMORY_ONLY_SER,但将数据存储在堆外内存中。这要求启用堆外内存,对Spark内存模型感兴趣的同学可以参考老言的《Spark 内存模型》章节。 |
Spark 会自动监控每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式淘汰旧数据分区。如果您想手动移除 RDD 而不是等待它从缓存中淘汰,请使用 RDD.unpersist() 方法。请注意,此方法默认是异步的。要阻塞它直到资源被释放,请在调用此方法时指定 blocking=true。
RDD的容错机制
核心原理
Spark 的容错机制基于血缘关系(Lineage)。RDD 本身不可变,但会记录自己是如何从其他 RDD 转换而来的完整过程。当某个分区数据丢失时,Spark 可以根据血缘关系重新计算丢失的分区,而非复制备份数据。
容错策略
依赖类型 |
恢复方式 |
开销 |
窄依赖 |
只需重新计算丢失分区涉及的父分区 |
低 |
宽依赖 |
需重新计算所有相关父分区 |
高 |
Checkpoint 机制
对于血缘链过长的 RDD,重新计算开销会很大。Spark 提供 Checkpoint 机制,将 RDD 持久化到可靠存储(如 HDFS),切断血缘链,加速故障恢复。
优势与局限
优势:无需数据复制,节省存储开销;适合计算成本低于数据复制的场景。
局限:血缘链过长时恢复慢;宽依赖故障恢复开销大;需配合 Checkpoint 使用。
共享变量
默认机制的局限
Spark 默认采用变量副本机制:当任务在 Executor 节点上并行执行时,Spark 会将函数中使用的所有变量复制一份发送到每个任务。这种设计简单可靠,但存在两个问题:
- 效率低下:如果一个大变量(如查找表、模型参数)被大量任务使用,会导致大量重复数据传输,浪费网络带宽和节点内存
- 无法共享状态:任务对变量的修改仅存在于本地副本,无法传递回驱动程序或其他任务,无法实现跨任务的状态累积
广播变量
广播变量是 Spark 提供的一种只读共享变量机制,用于将大变量高效地分发到集群的所有节点上,广播变量能把 Driver 端的大数据(如字典、配置)一次性高效地发给所有节点,让每个节点都存一份只读的副本,避免反复传输浪费网络资源。
但是,广播变量也有自己的局限性,比如说它是是只读的,不能在任务中修改,因此它适合"一次分发,多次读取"的场景。
创建广播变量示例:
# 创建广播变量 broadcast_var = sc.broadcast([1, 2, 3, 4, 5]) # 在算子中使用 rdd.map(lambda x: x + broadcast_var.value)
要释放广播变量复制到执行器上的资源,请调用 .unpersist()。如果之后再次使用该广播变量,它将被重新广播。要永久释放广播变量使用的所有资源,请调用 .destroy()。此后不能再使用该广播变量。请注意,这些方法默认不阻塞。要阻塞直到资源被释放,请在调用它们时指定 blocking=true。
释放广播变量示例:
broadcast_var.unpersist() broadcast_var.destroy()
累加器
累加器是 Spark 提供的一种只写共享变量机制,用于在并行任务中进行累加操作,常用于计数、求和等场景。累加器允许各个任务安全地累加值,最终结果可在驱动程序获取。
创建累加器示例:
# 创建累加器 accum = sc.accumulator(0) # 在算子中使用 rdd.foreach(lambda x: accum.add(x)) # 在驱动程序获取结果 print(accum.value)
总结
OK,PySpark的入门教程到这里就结束了,后续老言还会带着大家更深入的了解Apache Spark,关注老言,后续会有一系列精彩的Spark SQL内容呈现给大家,让你在大数据处理的道路上越走越顺。