PySpark入门教程(非常详细)从零基础入门到精通

简介: 本教程聚焦Spark Core核心原理,基于3.5.8版本,用Python详解RDD五大特性(分区、计算函数、依赖关系、分区器、首选位置)、容错机制、Shuffle、DAG调度及共享变量等,并通过WordCount实战演示。

注:本章节将重点阐述基于3.5.8版本的Spark Core,并采用Python语言进行代码实现。尽管在企业级应用中,Spark SQL得到了更为广泛的应用,老言在后续章节将深入探讨Spark SQL的相关内容,但鉴于Spark Core作为Apache Spark的基础组件,掌握其核心概念和技术细节对于全面理解整个生态系统至关重要。因此,我们有必要对Spark Core有一个深刻的认识。

Spark简介

在大数据技术发展迅速的背景下,Apache Spark以卓越性能和灵活编程模型,成为分布式计算核心框架之一。自2009年由加州大学伯克利分校AMPLab提出后,Spark超越传统MapReduce模式,2025年仍引领大数据处理技术趋势。2025年Spark社区报告显示,超80%财富500强企业将其作为大数据平台核心组件,日处理数据超100 EB。

Apache Spark的演进与核心价值

Apache Spark旨在克服MapReduce在迭代计算和交互式查询中的性能局限。与MapReduce将中间结果存于磁盘不同,Spark采用内存计算,显著提升数据处理效率,速度比传统方法快数十甚至上百倍,适用于需频繁访问同一数据集的场景,如机器学习、图算法分析和实时流处理,截至今日,Apache Spark已经发布到了4.1版本了。

版本

时间

核心特性

2.0

2016.10

Dataset/DataFrame 统一 API、Structured Streaming

2.1

2017.01

窗口函数增强、ML Pipeline 改进

2.2

2017.07

Structured Streaming 正式版的、Python UDF

2.3

2018.02

连续流处理、Kubernetes 原生支持

2.4

2018.11

Barrier 调度、高可用 Shuffle

3.0

2020.06

AQE 自适应查询、GPU 调度、性能提升 2 倍

3.1

2021.03

Pandas API on Spark、增量物化视图

3.2

2021.11

Pandas API 正式版、Python 3.9 支持

3.3

2022.05

Spark Connect、Python 3.10、Iceberg 集成

3.4

2023.03

Spark Connect 正式版、Python 3.11

3.5

2023.10

Spark Connect 多语言支持、分布式训练增强

4.0

2025.05

PySpark 原生画图、多态 UDTF、ANSI SQL 模式、Python API 全面改进

4.1

2025.12

支持 ANSI SQL,强化 Python 与 Iceberg

RDD的五大特性

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

RDD 数据结构内部有五个特性(摘录RDD 源码):

  1. A list of partitions(分区列表

对于RDD来说,每个分区都会被一个计算任务处理,分区数决定并行度;用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值;

  1. A function for computing each split(计算会作用于每个分区)

Spark中RDD的计算是以分区为单位的,compute函数会被作用到每个分区上;

  1. A list of dependencies on other RDDs(容错性)

RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算(Spark的容错机制);

  1. Optionally, a Partitioner for key-value RDDs(key- value数据类型的RDD分区器)

当前 Spark 中实现了两种类型的分区函数,一个是基于 Hash 的 HashPartitioner ,另外一个是基于范围的RangePartitioner。

只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。

Partitioner函数不但决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。

  1. Optionally, a list of preferred locations to compute each split on(每个分区都有一个优先位置列表,即首选位置

对于一个HDFS文件来说,列表保存的就是每个Partition所在的块的位置。

基于"移动数据不如移动计算"的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算。(数据本地性),但对于现代存算分离架构来说这一个特性已经不适用了。

RDD的五大特点

我们已经了解了RDD的五大特性了,但那是RDD内部的特性,那以我们作为用户的视角来看,RDD又具备5大特点

  1. 分布式

RDD的数据分布在多个节点上,使得各个节点可以并行处理数据。这种分布式存储方式是Spark高效处理大规模数据集的关键。用户无需关心数据如何分布,Spark框架会自动管理数据的分布和处理过程。

  1. 容错性

RDD通过记录数据转换操作的血缘(lineage)信息来提供弹性和容错功能。当部分节点数据丢失时,RDD能够根据血缘信息重新计算丢失的数据,确保数据处理的可靠性。

  1. 不可变性

一旦创建,RDD中的数据不可更改。每次对RDD的转换操作都会生成一个新的RDD,这种不可变性有助于简化并行计算中的数据管理。

  1. 并行处理

RDD支持在多个计算节点上并行执行作业。通过将数据分为多个分区,Spark能够在不同节点上同时处理数据,从而显著提高处理效率。

  1. 支持多种计算操作

RDD提供了丰富的API,支持多种计算操作,如map、filter、reduce、groupBy等,用户可以根据需求灵活选择合适的操作来处理数据。

RDD 和DataFrame/Dataset、SQL的关系

简单来说,RDD 是底层基石,DataFrame/Dataset 是上层的高级抽象,而 Spark SQL 是操作这些高级抽象的接口。

我们可以把它们的关系比喻成盖房子:

  • RDD 就像是一堆散乱的砖头(底层原材料),你可以随意摆放,但需要自己操心每一块砖怎么搬运、怎么砌。
  • DataFrame/Dataset 就像是预制好的墙体模块(高级抽象),它们内部已经优化了结构,你只需要把它们拼在一起,效率更高。
  • Spark SQL 就像是施工图纸和指令(接口),你用图纸告诉 Spark 想要什么样的房子,Spark 会自动选择最优的模块去搭建。

虽然 DataFrame/Dataset 是高级抽象,但它们底层依然是基于 RDD 的,Spark SQL又是基于DataFrame/Dataset的。

PySpark编程指南

概述

从宏观角度来看,每个 Spark 应用程序都包含一个驱动程序(Driver),它负责运行用户的 main 函数,并在集群上执行各种并行操作。

Spark 提供的核心抽象是弹性分布式数据集(RDD),这是一种分布在集群节点上的元素集合,支持并行操作。RDD 的创建通常始于 Hadoop 文件系统(或其他 Hadoop 支持的文件系统)中的文件,或是驱动程序中的现有 Scala、Python集合,并对其进行转换生成。用户还可以要求 Spark 将 RDD 持久化在内存中,以便在多次并行操作中高效复用。

准备执行环境

通过UV创建python3.11环境

  • windows一键安装uv
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
  • linux/macos一键安装uv
curl -LsSf https://astral.sh/uv/install.sh | sh
  • Pycharm或者Idea(需安装Python插件)新建一个uv项目

  • 添加pyspark依赖项并点击刷新

WordCount

创建一个test01.ipynb文件,pycharm会自动部署一个jupyter环境(这个步骤是可选的,也可以直接创建test01.py文件),只是老言个人比较喜欢用jupyter作为实验环境。

准备一个存放测试数据的文件test01.txt

hello world
hello java
hello scala
hello python
hello spark
hello pyspark
how are you
i am fine thank you
and you

设置环境变量,以防止后续数据处理时报错。注意,老言这里因为是实验环境,因此,在代码中简单指定了一下环境变量,如果想修改系统环境变量的,请自行咨询Google或者百度。

import os
# 一般位于当前uv目录的.venv/Scripts目录下
os.environ["PYSPARK_PYTHON"] = "D:/module/bigdata/Code/tutorials-pyspark/.venv/Scripts/python.exe"
# 如果是本地模式,通常 PYSPARK_DRIVER_PYTHON 也需要设置
os.environ["PYSPARK_DRIVER_PYTHON"] = "D:/module/bigdata/Code/tutorials-pyspark/.venv/Scripts/python.exe"

初始化 Spark

from pyspark import SparkContext, SparkConf
"""
appName 参数是您的应用程序在集群 UI 上显示的名称。
master 是 Spark 或 YARN 集群的 URL,或者是一个特殊的“local”字符串以在本地模式下运行。
实际上,当在集群上运行时,一般不会硬编码master, spark-submit 启动应用程序并在指明master才是最佳实践。然而,对于本地测试和单元测试,您可以传递“local”以在本地运行 Spark。
"""
conf = SparkConf().setAppName("test01").setMaster("local[*]")
sc = SparkContext(conf=conf)

通过Python列表创建 RDD

"""
通过在 Driver 程序中对现有可迭代对象或列表调用 SparkContext 的 parallelize 方法来创建 RDD。列表中的元素被复制以形成一个可以并行操作的 RDD。
例如,以下是如何创建一个包含数字 1 到 5 的 RDD:
"""
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

打印 RDD:

print(distData.collect())

预期输出结果

也可以从存储介质中创建RDD

"""
PySpark 可以从 Hadoop 支持的任何存储源创建 RDD,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。
Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat。
"""
# 注意路径不要用老言的,要换成自己的
distFile = sc.textFile("D:/module/bigdata/Code/dataspire/tutorials-pyspark/data/input/test01.txt")

打印 RDD

print(distFile.collect())

预期输出结果:

RDD 不只是可以打印出来,我们也可以对RDD做后续的数据处理,老言这里会以 WordCount(单词计数) 为例为大家表演一下Spark中 RDD 的过程

print(distFile
      .flatMap(lambda line: line.split(" "))  # 将 RDD 中的每一行按空格切分成单词
      .map(lambda word: (word, 1))  # 将每个单词映射成一个元组,元组的第一个元素是单词,第二个元素是 1
      .reduceByKey(lambda a, b: a + b)  # 将相同单词的元组按照单词进行分组,并计算每个单词出现的次数
      .collect()
      )

到此为止WordCount的数据就被正常输出了

将函数传递给Spark

PySpark 的 API 在很大程度上依赖于将驱动程序中的函数传递到集群上运行。有三种推荐的方式可以做到这一点:

  1. Lambda 表达式,用于可以写成表达式的简单函数。(Lambda 不支持多语句函数或不返回值的语句)
distFile.flatMap(lambda line: line.split(" ")
  1. 本地定义的 Python 函数
def split(line):
    return line.split(" ")
distFile.flatMap(split)
  1. 模块级函数(外部模块)
# my_functions.py
def split(line):
    return line.split(" ")
# 主程序
from my_functions import split
distFile.flatMap(split)

闭包

闭包是指 Spark 算子中使用的函数及其引用的外部变量共同构成的整体。当函数在 Executor 节点上执行时,它会携带这些变量的副本一起运行,这里很多同学会很困惑,为什么会有闭包这个概念呢,是因为Spark 的计算逻辑在驱动程序中定义,但实际执行在Executor 节点上。当算子中的函数需要访问外部变量时,Spark 必须将这些变量连同函数一起发送到Executor 节点上——这就是闭包

概念太空泛,老言也不多巴拉巴拉,直接上案例

# 示例
multiplier = 2
rdd.map(lambda x: x * multiplier)  # multiplier 被"捕获"进闭包

在这个例子中,lambda 函数和它引用的 multiplier 变量一起构成了闭包

同学们只需要记住,闭包就是函数 + 变量

闭包的工作机制

  1. 序列化:驱动程序将闭包(函数 + 引用的变量)序列化
  2. 传输:通过网络发送到每个 Executor 节点
  3. 反序列化:Executor 反序列化闭包并在任务中执行

RDD算子

RDD 支持两种类型的算子:转换算子(从现有 RDD 创建新 RDD)和行动算子(在 RDD 上运行计算后向驱动程序返回一个值)。例如,map 是一种转换算子,它将每个 RDD 中的元素通过一个函数(例如lambda word: (word, 1))计算后返回一个新 RDD。reduce是一种行动算子,它使用某个函数(例如lambda a, b: a + b)根据 Key 聚合 RDD 的所有元素,并将最终结果返回给 Driver。

Spark 中的所有转换算子都是惰性的,它们不会立即计算结果。相反,它们只是记录应用于某个 RDD 的操作。只有出现一个行动算子需要将结果返回给驱动程序时,才会触发计算。这种设计使得 Spark 运行更高效。

默认情况下,每次您对转换后的 RDD 运行行动算子后,再次使用它会重新计算。但是,您可以使用 persist(或 cache)方法将 RDD 持久化到内存中,这样 Spark 会将元素保留在集群上,以便下次使用时更快地访问。

下表列出了 Spark 支持的一些常见转换算子

转换算子

含义

map(func)

通过将 RDD 的每个元素传递给函数 func 来形成新的RDD。

filter(func)

返回一个新的RDD,该 RDD 由 旧RDD 中 func 返回 true 的元素组成。

flatMap(func)

在map算子的基础上,会多做一个扁平化处理。

mapPartitions(func)

类似于 map,但单独运行在 RDD 的每个分区上,因此当在类型为 T 的 RDD 上运行时,func 的类型必须是 Iterator<T> => Iterator<U>。

mapPartitionsWithIndex(func)

类似于 mapPartitions,但还向 func 提供一个表示分区索引的整数值,因此当在类型为 T 的 RDD 上运行时,func 的类型必须是 (Int, Iterator<T>) => Iterator<U>。

sample(withReplacement, fraction, seed)

使用给定的随机数生成器种子,有放回或无放回地采样数据的一个 fraction

union(otherDataset)

返回一个 新RDD,其中包含 旧RDD 和参数中元素的并集。

intersection(otherDataset)

返回一个 新RDD,其中包含 旧RDD 和参数中元素的交集。

distinct([numPartitions]))

返回一个包含 旧RDD 不同元素的 新RDD。

groupByKey([numPartitions])

当在 (K, V) 的 RDD 上调用时,返回一个 (K, Iterable<V>) 对的 RDD 。

注意:如果您是为了对每个 Key 执行聚合(例如求和或平均值)而进行分组,使用 reduceByKey aggregateByKey 将获得更好的性能。
注意:默认情况下,输出中的并行度取决于 父RDD 的分区数量。您可以传递一个可选的
numPartitions 参数来设置不同的任务数量。

reduceByKey(func, [numPartitions])

当在 (K, V) 类型的 RDD 上调用时,返回一个 (K, V) 类型的 RDD,其中每个键的值使用给定的 reduce 函数 func 进行聚合,该函数必须是 (V,V) => V 类型。与 groupByKey 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

当在 (K, V) 类型的 RDD 上调用时,返回一个 (K, U) 的 RDD ,其中每个key 的值使用给定的组合函数和中性“零”值进行聚合。允许聚合值类型与输入值类型不同,同时避免不必要的分配。与 groupByKey

一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

sortByKey([ascending], [numPartitions])

当在 (K, V) 类型的 RDD 上调用时(其中 K 实现了 Ordered 接口),返回一个 (K, V) 对的数据集,按 key 以升序或降序排列,具体取决于布尔型 ascending 参数的指定。

join(otherDataset, [numPartitions])

当在类型为 (K, V) 和 (K, W) 的 RDD 上调用时,返回一个 (K, (V, W)) 类型的 RDD ,其中包含每个 key 的所有元素对。外部连接通过 leftOuterJoinrightOuterJoinfullOuterJoin 支持。

cogroup(otherDataset, [numPartitions])

当在类型为 (K, V) 和 (K, W) 的 RDD 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) 元组的RDD 。此操作也称为 groupWith

cartesian(otherDataset)

当在类型为 T 和 U 的 RDD 上调用时,返回一个 (T, U) 的 RDD (所有元素对)。

pipe(command, [envVars])

将 RDD 的每个分区通过一个 Shell 命令(例如 Perl 或 bash 脚本)进行管道操作。RDD 元素被写入进程的标准输入,其标准输出的行作为字符串 RDD 返回。

coalesce(numPartitions)

将 RDD 中的分区数量减少到 numPartitions。在对大型 RDD 进行过滤后,这对于更高效地运行操作很有用。

repartition(numPartitions)

随机地重新混洗 RDD 中的数据,以创建更多或更少的分区并使其在这些分区中保持平衡。这总是通过网络混洗所有数据。

repartitionAndSortWithinPartitions(partitioner)

根据给定的分区器重新分区 RDD,并在每个结果分区内按键对记录进行排序。这比调用 repartition然后在每个分区内排序更高效,因为它可以在混洗机制中下推排序。

下表列出了 Spark 支持的一些行动算子

行动

含义

reduce(func)

使用函数 func(接受两个参数并返回一个)聚合 RDD 的元素。该函数应该是可交换和结合的,以便可以并行正确计算。

collect()

将 RDD 的所有元素作为数组返回给驱动程序。这通常在过滤器或其他操作返回足够小的 RDD 后很有用。

count()

返回 RDD 中的元素数量。

first()

返回 RDD 的第一个元素(类似于 take(1))。

take(n)

返回一个包含 RDD 前 n 个元素的数组。

takeSample(withReplacement, num, [seed])

返回一个包含 RDD num 个元素的随机样本数组,有放回或无放回,可选地预先指定随机数生成器种子。

takeOrdered(n, [ordering])

使用元素的自然顺序或自定义比较器,返回 RDD 的前 n 个元素。

saveAsTextFile(path)

将 RDD 的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录。Spark 将对每个元素调用 toString 方法,将其转换为文件中的一行文本。

saveAsSequenceFile(path)
(Java 和 Scala)

将 RDD 的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径。此操作适用于实现了 Hadoop Writable 接口的键值对 RDD。在 Scala 中,它也适用于可以隐式转换为 Writable 的类型(Spark 包含对 Int、Double、String 等基本类型的转换)。

saveAsObjectFile(path)
(Java 和 Scala)

使用 Java 序列化以简单格式写入数据集元素,然后可以使用 SparkContext.objectFile()加载。

countByKey()

仅适用于类型为 (K, V) 的 RDD。返回一个 (K, Int) 对的哈希映射,其中包含每个键的计数。

foreach(func)

对 RDD 的每个元素运行函数 func。这通常用于产生副作用,例如更新累加器或与外部存储系统交互。
注意:在 foreach() 之外修改除了累加器(`Accumulator`)之外的变量可能会导致未定义的行为。有关更多详细信息,请参阅理解闭包

Shuffle 操作

Spark 中的某些算子会触发一个称为shuffle的事件。shuffle是 Spark 重新分发数据的机制,以便数据在分区之间以不同方式分组。这通常涉及在Executor之间复制数据,使得shuffle成为一个复杂且代价高昂的操作。对Spark的shuffle原理感兴趣的同学可以参考老言的《Spark Shuffle原理及实现机制》章节。

RDD依赖关系

RDD 之间的依赖关系描述了一个 RDD 如何由其他 RDD 计算而来,它是 Spark 实现容错和任务调度的基础。

依赖类型

窄依赖(Narrow Dependency)

宽依赖(Wide/Shuffle Dependency)

定义

父 RDD 的每个分区最多被子 RDD 的一个分区使用

父 RDD 的每个分区可能被多个子分区使用

数据流动

同一节点内完成,无需网络传输

需要跨节点传输,触发 Shuffle

典型算子

mapfilterunionflatMap

reduceByKeygroupByKeyjoin(非协同划分)

容错成本

低,只需重新计算丢失的父分区

高,需重新计算所有相关父分区

阶段划分

同一 Stage 内

划分新 Stage 的边界

依赖关系的意义

  1. 容错恢复窄依赖只需重算丢失分区,宽依赖需重算更多数据
  2. 阶段划分Spark 根据宽依赖将作业划分为多个 Stage
  3. 执行优化窄依赖可流水线执行,宽依赖需等待 Shuffle 完成
  4. 血缘追踪依赖关系构成 RDD 的血缘图(Lineage),用于故障恢复

DAG 的生成和划分 Stage

当行动算子(Action)被调用时,Spark 触发 DAG 构建。系统从该 RDD 开始逆向追溯所有父 RDD 及转换算子,记录完整的依赖关系形成计算血缘图。DAG 中的节点代表 RDD 或算子,边代表依赖关系,描述了从数据源到最终结果的完整计算流程。

为什么要划分Stage

在处理具有复杂业务逻辑的任务时,如果涉及到shuffle操作,则表明当前阶段的计算结果是下一阶段执行的前提条件,即下一阶段的数据处理依赖于前一阶段产生的数据。基于这一特性,我们可以依据shuffle(或者说宽依赖)来进行任务划分,从而将整个有向无环图(DAG)分解为若干个独立的Stage或阶段。在每个Stage内部,可以包含一系列连续的算子操作,这些操作能够被组织成一条pipeline流水线。值得注意的是,在这条流水线中,对于同一数据集的不同分区,其上的处理任务是可以并行执行的,这有助于提高整体计算效率。

Stage 划分

DAGScheduler 根据宽依赖(Shuffle Dependency)将 DAG 划分为多个 Stage:

依赖类型

特点

Stage 处理

窄依赖

父 RDD 每个分区最多被子 RDD 一个分区使用

同一 Stage 内,可流水线执行

宽依赖

父 RDD 分区可能被多个子分区使用,需 Shuffle

Stage 划分边界

划分规则是遇到宽依赖就切断并开始新 Stage,从行动算子逆向回溯到源头 RDD,Stage 数量等于宽依赖数量加1。

执行流程

  1. 行动算子触发 DAG 构建
  2. DAGScheduler 根据宽依赖划分 Stage
  3. 每个 Stage 转化为 TaskSet 提交给 TaskScheduler
  4. TaskScheduler 将任务分发到 Executor 执行

RDD 持久化

RDD 持久化是 Spark 最重要的功能之一,它将计算后的 RDD 分区存储在节点内存中,供后续操作复用。这一机制可使后续计算速度提升 10 倍以上,是迭代算法和快速交互查询的关键工具。

目前RDD的实现机制分为两种

  • cache:使用默认存储级别 MEMORY_ONLY(内存中存储反序列化对象)的简写
  • persist:可自定义存储级别,通过传入 StorageLevel 对象设置

示例:

# 示例
distFile.cache()  # 等价于 distFile.persist(StorageLevel.MEMORY_ONLY)
distFile.persist(StorageLevel.MEMORY_AND_DISK)  # 内存+磁盘

通过查看 RDD 的源码发现 cache 最终也是调用了 persist 无参方法(默认存储只存在内存中):

每个持久化 RDD 可选择不同的存储策略:

存储级别

含义

MEMORY_ONLY

在 JVM 中将 RDD 存储为反序列化的 Java 对象。如果内存不足 ,某些分区将不会被缓存,并且每次需要时都会即时重新计算。这是默认级别。

MEMORY_AND_DISK

在 JVM 中将 RDD 存储为反序列化的 Java 对象。如果内存不足,则将不足的分区存储在磁盘上,并在需要时从磁盘读取。

MEMORY_ONLY_SER
(Java 和 Scala)

将 RDD 存储为序列化的 Java 对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,尤其是在使用快速序列化器时,但读取时更耗 CPU。

MEMORY_AND_DISK_SER
(Java 和 Scala)

类似于 MEMORY_ONLY_SER,但将内存溢出的分区溢写到磁盘,而不是每次需要时即时重新计算它们。

DISK_ONLY

仅将 RDD 分区存储在磁盘上。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

与上述级别相同,但在两个集群节点上复制每个分区,这样即使节点挂掉也有一定的容错性。

OFF_HEAP (实验性)

类似于 MEMORY_ONLY_SER,但将数据存储在堆外内存中。这要求启用堆外内存,对Spark内存模型感兴趣的同学可以参考老言的《Spark 内存模型》章节。

Spark 会自动监控每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式淘汰旧数据分区。如果您想手动移除 RDD 而不是等待它从缓存中淘汰,请使用 RDD.unpersist() 方法。请注意,此方法默认是异步的。要阻塞它直到资源被释放,请在调用此方法时指定 blocking=true

RDD的容错机制

核心原理

Spark 的容错机制基于血缘关系(Lineage)。RDD 本身不可变,但会记录自己是如何从其他 RDD 转换而来的完整过程。当某个分区数据丢失时,Spark 可以根据血缘关系重新计算丢失的分区,而非复制备份数据。

容错策略

依赖类型

恢复方式

开销

窄依赖

只需重新计算丢失分区涉及的父分区

宽依赖

需重新计算所有相关父分区

Checkpoint 机制

对于血缘链过长的 RDD,重新计算开销会很大。Spark 提供 Checkpoint 机制,将 RDD 持久化到可靠存储(如 HDFS),切断血缘链,加速故障恢复。

优势与局限

优势:无需数据复制,节省存储开销;适合计算成本低于数据复制的场景。

局限:血缘链过长时恢复慢;宽依赖故障恢复开销大;需配合 Checkpoint 使用。

共享变量

默认机制的局限

Spark 默认采用变量副本机制:当任务在 Executor 节点上并行执行时,Spark 会将函数中使用的所有变量复制一份发送到每个任务。这种设计简单可靠,但存在两个问题:

  1. 效率低下:如果一个大变量(如查找表、模型参数)被大量任务使用,会导致大量重复数据传输,浪费网络带宽和节点内存
  2. 无法共享状态:任务对变量的修改仅存在于本地副本,无法传递回驱动程序或其他任务,无法实现跨任务的状态累积

广播变量

广播变量是 Spark 提供的一种只读共享变量机制,用于将大变量高效地分发到集群的所有节点上,广播变量能把 Driver 端的大数据(如字典、配置)一次性高效地发给所有节点,让每个节点都存一份只读的副本,避免反复传输浪费网络资源。

但是,广播变量也有自己的局限性,比如说它是是只读的,不能在任务中修改,因此它适合"一次分发,多次读取"的场景。

创建广播变量示例:

# 创建广播变量
broadcast_var = sc.broadcast([1, 2, 3, 4, 5])
# 在算子中使用
rdd.map(lambda x: x + broadcast_var.value)

要释放广播变量复制到执行器上的资源,请调用 .unpersist()。如果之后再次使用该广播变量,它将被重新广播。要永久释放广播变量使用的所有资源,请调用 .destroy()。此后不能再使用该广播变量。请注意,这些方法默认不阻塞。要阻塞直到资源被释放,请在调用它们时指定 blocking=true

释放广播变量示例:

broadcast_var.unpersist()
broadcast_var.destroy()

累加器

累加器是 Spark 提供的一种只写共享变量机制,用于在并行任务中进行累加操作,常用于计数、求和等场景。累加器允许各个任务安全地累加值,最终结果可在驱动程序获取。

创建累加器示例:

# 创建累加器
accum = sc.accumulator(0)
# 在算子中使用
rdd.foreach(lambda x: accum.add(x))
# 在驱动程序获取结果
print(accum.value)

总结

OK,PySpark的入门教程到这里就结束了,后续老言还会带着大家更深入的了解Apache Spark,关注老言,后续会有一系列精彩的Spark SQL内容呈现给大家,让你在大数据处理的道路上越走越顺。

相关文章
|
22天前
|
人工智能 自然语言处理 Shell
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
本教程指导用户在开源AI助手Clawdbot中集成阿里云百炼API,涵盖安装Clawdbot、获取百炼API Key、配置环境变量与模型参数、验证调用等完整流程,支持Qwen3-max thinking (Qwen3-Max-2026-01-23)/Qwen - Plus等主流模型,助力本地化智能自动化。
33197 132
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
|
5天前
|
人工智能 自然语言处理 监控
OpenClaw skills重构量化交易逻辑:部署+AI全自动炒股指南(2026终极版)
2026年,AI Agent领域最震撼的突破来自OpenClaw(原Clawdbot)——这个能自主规划、执行任务的智能体,用50美元启动资金创造了48小时滚雪球至2980美元的奇迹,收益率高达5860%。其核心逻辑堪称教科书级:每10分钟扫描Polymarket近千个预测市场,借助Claude API深度推理,交叉验证NOAA天气数据、体育伤病报告、加密货币链上情绪等多维度信息,捕捉8%以上的定价偏差,再通过凯利准则将单仓位严格控制在总资金6%以内,实现低风险高频套利。
2059 9
|
18天前
|
人工智能 安全 机器人
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI助手,支持钉钉、飞书等多平台接入。本教程手把手指导Linux下部署与钉钉机器人对接,涵盖环境配置、模型选择(如Qwen)、权限设置及调试,助你快速打造私有、安全、高权限的专属AI助理。(239字)
7085 21
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
|
16天前
|
人工智能 机器人 Linux
OpenClaw(Clawdbot、Moltbot)汉化版部署教程指南(零门槛)
OpenClaw作为2026年GitHub上增长最快的开源项目之一,一周内Stars从7800飙升至12万+,其核心优势在于打破传统聊天机器人的局限,能真正执行读写文件、运行脚本、浏览器自动化等实操任务。但原版全英文界面对中文用户存在上手门槛,汉化版通过覆盖命令行(CLI)与网页控制台(Dashboard)核心模块,解决了语言障碍,同时保持与官方版本的实时同步,确保新功能最快1小时内可用。本文将详细拆解汉化版OpenClaw的搭建流程,涵盖本地安装、Docker部署、服务器远程访问等场景,同时提供环境适配、问题排查与国内应用集成方案,助力中文用户高效搭建专属AI助手。
5024 12
|
19天前
|
人工智能 机器人 Linux
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI智能体,支持飞书等多平台对接。本教程手把手教你Linux下部署,实现数据私有、系统控制、网页浏览与代码编写,全程保姆级操作,240字内搞定专属AI助手搭建!
5836 23
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手