PySpark入门教程(非常详细)从零基础入门到精通

简介: 本教程聚焦Spark Core核心原理,基于3.5.8版本,用Python详解RDD五大特性(分区、计算函数、依赖关系、分区器、首选位置)、容错机制、Shuffle、DAG调度及共享变量等,并通过WordCount实战演示。

注:本章节将重点阐述基于3.5.8版本的Spark Core,并采用Python语言进行代码实现。尽管在企业级应用中,Spark SQL得到了更为广泛的应用,老言在后续章节将深入探讨Spark SQL的相关内容,但鉴于Spark Core作为Apache Spark的基础组件,掌握其核心概念和技术细节对于全面理解整个生态系统至关重要。因此,我们有必要对Spark Core有一个深刻的认识。

Spark简介

在大数据技术发展迅速的背景下,Apache Spark以卓越性能和灵活编程模型,成为分布式计算核心框架之一。自2009年由加州大学伯克利分校AMPLab提出后,Spark超越传统MapReduce模式,2025年仍引领大数据处理技术趋势。2025年Spark社区报告显示,超80%财富500强企业将其作为大数据平台核心组件,日处理数据超100 EB。

Apache Spark的演进与核心价值

Apache Spark旨在克服MapReduce在迭代计算和交互式查询中的性能局限。与MapReduce将中间结果存于磁盘不同,Spark采用内存计算,显著提升数据处理效率,速度比传统方法快数十甚至上百倍,适用于需频繁访问同一数据集的场景,如机器学习、图算法分析和实时流处理,截至今日,Apache Spark已经发布到了4.1版本了。

版本

时间

核心特性

2.0

2016.10

Dataset/DataFrame 统一 API、Structured Streaming

2.1

2017.01

窗口函数增强、ML Pipeline 改进

2.2

2017.07

Structured Streaming 正式版的、Python UDF

2.3

2018.02

连续流处理、Kubernetes 原生支持

2.4

2018.11

Barrier 调度、高可用 Shuffle

3.0

2020.06

AQE 自适应查询、GPU 调度、性能提升 2 倍

3.1

2021.03

Pandas API on Spark、增量物化视图

3.2

2021.11

Pandas API 正式版、Python 3.9 支持

3.3

2022.05

Spark Connect、Python 3.10、Iceberg 集成

3.4

2023.03

Spark Connect 正式版、Python 3.11

3.5

2023.10

Spark Connect 多语言支持、分布式训练增强

4.0

2025.05

PySpark 原生画图、多态 UDTF、ANSI SQL 模式、Python API 全面改进

4.1

2025.12

支持 ANSI SQL,强化 Python 与 Iceberg

RDD的五大特性

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

RDD 数据结构内部有五个特性(摘录RDD 源码):

  1. A list of partitions(分区列表

对于RDD来说,每个分区都会被一个计算任务处理,分区数决定并行度;用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值;

  1. A function for computing each split(计算会作用于每个分区)

Spark中RDD的计算是以分区为单位的,compute函数会被作用到每个分区上;

  1. A list of dependencies on other RDDs(容错性)

RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算(Spark的容错机制);

  1. Optionally, a Partitioner for key-value RDDs(key- value数据类型的RDD分区器)

当前 Spark 中实现了两种类型的分区函数,一个是基于 Hash 的 HashPartitioner ,另外一个是基于范围的RangePartitioner。

只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。

Partitioner函数不但决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。

  1. Optionally, a list of preferred locations to compute each split on(每个分区都有一个优先位置列表,即首选位置

对于一个HDFS文件来说,列表保存的就是每个Partition所在的块的位置。

基于"移动数据不如移动计算"的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算。(数据本地性),但对于现代存算分离架构来说这一个特性已经不适用了。

RDD的五大特点

我们已经了解了RDD的五大特性了,但那是RDD内部的特性,那以我们作为用户的视角来看,RDD又具备5大特点

  1. 分布式

RDD的数据分布在多个节点上,使得各个节点可以并行处理数据。这种分布式存储方式是Spark高效处理大规模数据集的关键。用户无需关心数据如何分布,Spark框架会自动管理数据的分布和处理过程。

  1. 容错性

RDD通过记录数据转换操作的血缘(lineage)信息来提供弹性和容错功能。当部分节点数据丢失时,RDD能够根据血缘信息重新计算丢失的数据,确保数据处理的可靠性。

  1. 不可变性

一旦创建,RDD中的数据不可更改。每次对RDD的转换操作都会生成一个新的RDD,这种不可变性有助于简化并行计算中的数据管理。

  1. 并行处理

RDD支持在多个计算节点上并行执行作业。通过将数据分为多个分区,Spark能够在不同节点上同时处理数据,从而显著提高处理效率。

  1. 支持多种计算操作

RDD提供了丰富的API,支持多种计算操作,如map、filter、reduce、groupBy等,用户可以根据需求灵活选择合适的操作来处理数据。

RDD 和DataFrame/Dataset、SQL的关系

简单来说,RDD 是底层基石,DataFrame/Dataset 是上层的高级抽象,而 Spark SQL 是操作这些高级抽象的接口。

我们可以把它们的关系比喻成盖房子:

  • RDD 就像是一堆散乱的砖头(底层原材料),你可以随意摆放,但需要自己操心每一块砖怎么搬运、怎么砌。
  • DataFrame/Dataset 就像是预制好的墙体模块(高级抽象),它们内部已经优化了结构,你只需要把它们拼在一起,效率更高。
  • Spark SQL 就像是施工图纸和指令(接口),你用图纸告诉 Spark 想要什么样的房子,Spark 会自动选择最优的模块去搭建。

虽然 DataFrame/Dataset 是高级抽象,但它们底层依然是基于 RDD 的,Spark SQL又是基于DataFrame/Dataset的。

PySpark编程指南

概述

从宏观角度来看,每个 Spark 应用程序都包含一个驱动程序(Driver),它负责运行用户的 main 函数,并在集群上执行各种并行操作。

Spark 提供的核心抽象是弹性分布式数据集(RDD),这是一种分布在集群节点上的元素集合,支持并行操作。RDD 的创建通常始于 Hadoop 文件系统(或其他 Hadoop 支持的文件系统)中的文件,或是驱动程序中的现有 Scala、Python集合,并对其进行转换生成。用户还可以要求 Spark 将 RDD 持久化在内存中,以便在多次并行操作中高效复用。

准备执行环境

通过UV创建python3.11环境

  • windows一键安装uv
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
  • linux/macos一键安装uv
curl -LsSf https://astral.sh/uv/install.sh | sh
  • Pycharm或者Idea(需安装Python插件)新建一个uv项目

  • 添加pyspark依赖项并点击刷新

WordCount

创建一个test01.ipynb文件,pycharm会自动部署一个jupyter环境(这个步骤是可选的,也可以直接创建test01.py文件),只是老言个人比较喜欢用jupyter作为实验环境。

准备一个存放测试数据的文件test01.txt

hello world
hello java
hello scala
hello python
hello spark
hello pyspark
how are you
i am fine thank you
and you

设置环境变量,以防止后续数据处理时报错。注意,老言这里因为是实验环境,因此,在代码中简单指定了一下环境变量,如果想修改系统环境变量的,请自行咨询Google或者百度。

import os
# 一般位于当前uv目录的.venv/Scripts目录下
os.environ["PYSPARK_PYTHON"] = "D:/module/bigdata/Code/tutorials-pyspark/.venv/Scripts/python.exe"
# 如果是本地模式,通常 PYSPARK_DRIVER_PYTHON 也需要设置
os.environ["PYSPARK_DRIVER_PYTHON"] = "D:/module/bigdata/Code/tutorials-pyspark/.venv/Scripts/python.exe"

初始化 Spark

from pyspark import SparkContext, SparkConf
"""
appName 参数是您的应用程序在集群 UI 上显示的名称。
master 是 Spark 或 YARN 集群的 URL,或者是一个特殊的“local”字符串以在本地模式下运行。
实际上,当在集群上运行时,一般不会硬编码master, spark-submit 启动应用程序并在指明master才是最佳实践。然而,对于本地测试和单元测试,您可以传递“local”以在本地运行 Spark。
"""
conf = SparkConf().setAppName("test01").setMaster("local[*]")
sc = SparkContext(conf=conf)

通过Python列表创建 RDD

"""
通过在 Driver 程序中对现有可迭代对象或列表调用 SparkContext 的 parallelize 方法来创建 RDD。列表中的元素被复制以形成一个可以并行操作的 RDD。
例如,以下是如何创建一个包含数字 1 到 5 的 RDD:
"""
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

打印 RDD:

print(distData.collect())

预期输出结果

也可以从存储介质中创建RDD

"""
PySpark 可以从 Hadoop 支持的任何存储源创建 RDD,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。
Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat。
"""
# 注意路径不要用老言的,要换成自己的
distFile = sc.textFile("D:/module/bigdata/Code/dataspire/tutorials-pyspark/data/input/test01.txt")

打印 RDD

print(distFile.collect())

预期输出结果:

RDD 不只是可以打印出来,我们也可以对RDD做后续的数据处理,老言这里会以 WordCount(单词计数) 为例为大家表演一下Spark中 RDD 的过程

print(distFile
      .flatMap(lambda line: line.split(" "))  # 将 RDD 中的每一行按空格切分成单词
      .map(lambda word: (word, 1))  # 将每个单词映射成一个元组,元组的第一个元素是单词,第二个元素是 1
      .reduceByKey(lambda a, b: a + b)  # 将相同单词的元组按照单词进行分组,并计算每个单词出现的次数
      .collect()
      )

到此为止WordCount的数据就被正常输出了

将函数传递给Spark

PySpark 的 API 在很大程度上依赖于将驱动程序中的函数传递到集群上运行。有三种推荐的方式可以做到这一点:

  1. Lambda 表达式,用于可以写成表达式的简单函数。(Lambda 不支持多语句函数或不返回值的语句)
distFile.flatMap(lambda line: line.split(" ")
  1. 本地定义的 Python 函数
def split(line):
    return line.split(" ")
distFile.flatMap(split)
  1. 模块级函数(外部模块)
# my_functions.py
def split(line):
    return line.split(" ")
# 主程序
from my_functions import split
distFile.flatMap(split)

闭包

闭包是指 Spark 算子中使用的函数及其引用的外部变量共同构成的整体。当函数在 Executor 节点上执行时,它会携带这些变量的副本一起运行,这里很多同学会很困惑,为什么会有闭包这个概念呢,是因为Spark 的计算逻辑在驱动程序中定义,但实际执行在Executor 节点上。当算子中的函数需要访问外部变量时,Spark 必须将这些变量连同函数一起发送到Executor 节点上——这就是闭包

概念太空泛,老言也不多巴拉巴拉,直接上案例

# 示例
multiplier = 2
rdd.map(lambda x: x * multiplier)  # multiplier 被"捕获"进闭包

在这个例子中,lambda 函数和它引用的 multiplier 变量一起构成了闭包

同学们只需要记住,闭包就是函数 + 变量

闭包的工作机制

  1. 序列化:驱动程序将闭包(函数 + 引用的变量)序列化
  2. 传输:通过网络发送到每个 Executor 节点
  3. 反序列化:Executor 反序列化闭包并在任务中执行

RDD算子

RDD 支持两种类型的算子:转换算子(从现有 RDD 创建新 RDD)和行动算子(在 RDD 上运行计算后向驱动程序返回一个值)。例如,map 是一种转换算子,它将每个 RDD 中的元素通过一个函数(例如lambda word: (word, 1))计算后返回一个新 RDD。reduce是一种行动算子,它使用某个函数(例如lambda a, b: a + b)根据 Key 聚合 RDD 的所有元素,并将最终结果返回给 Driver。

Spark 中的所有转换算子都是惰性的,它们不会立即计算结果。相反,它们只是记录应用于某个 RDD 的操作。只有出现一个行动算子需要将结果返回给驱动程序时,才会触发计算。这种设计使得 Spark 运行更高效。

默认情况下,每次您对转换后的 RDD 运行行动算子后,再次使用它会重新计算。但是,您可以使用 persist(或 cache)方法将 RDD 持久化到内存中,这样 Spark 会将元素保留在集群上,以便下次使用时更快地访问。

下表列出了 Spark 支持的一些常见转换算子

转换算子

含义

map(func)

通过将 RDD 的每个元素传递给函数 func 来形成新的RDD。

filter(func)

返回一个新的RDD,该 RDD 由 旧RDD 中 func 返回 true 的元素组成。

flatMap(func)

在map算子的基础上,会多做一个扁平化处理。

mapPartitions(func)

类似于 map,但单独运行在 RDD 的每个分区上,因此当在类型为 T 的 RDD 上运行时,func 的类型必须是 Iterator<T> => Iterator<U>。

mapPartitionsWithIndex(func)

类似于 mapPartitions,但还向 func 提供一个表示分区索引的整数值,因此当在类型为 T 的 RDD 上运行时,func 的类型必须是 (Int, Iterator<T>) => Iterator<U>。

sample(withReplacement, fraction, seed)

使用给定的随机数生成器种子,有放回或无放回地采样数据的一个 fraction

union(otherDataset)

返回一个 新RDD,其中包含 旧RDD 和参数中元素的并集。

intersection(otherDataset)

返回一个 新RDD,其中包含 旧RDD 和参数中元素的交集。

distinct([numPartitions]))

返回一个包含 旧RDD 不同元素的 新RDD。

groupByKey([numPartitions])

当在 (K, V) 的 RDD 上调用时,返回一个 (K, Iterable<V>) 对的 RDD 。

注意:如果您是为了对每个 Key 执行聚合(例如求和或平均值)而进行分组,使用 reduceByKey aggregateByKey 将获得更好的性能。
注意:默认情况下,输出中的并行度取决于 父RDD 的分区数量。您可以传递一个可选的
numPartitions 参数来设置不同的任务数量。

reduceByKey(func, [numPartitions])

当在 (K, V) 类型的 RDD 上调用时,返回一个 (K, V) 类型的 RDD,其中每个键的值使用给定的 reduce 函数 func 进行聚合,该函数必须是 (V,V) => V 类型。与 groupByKey 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

当在 (K, V) 类型的 RDD 上调用时,返回一个 (K, U) 的 RDD ,其中每个key 的值使用给定的组合函数和中性“零”值进行聚合。允许聚合值类型与输入值类型不同,同时避免不必要的分配。与 groupByKey

一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

sortByKey([ascending], [numPartitions])

当在 (K, V) 类型的 RDD 上调用时(其中 K 实现了 Ordered 接口),返回一个 (K, V) 对的数据集,按 key 以升序或降序排列,具体取决于布尔型 ascending 参数的指定。

join(otherDataset, [numPartitions])

当在类型为 (K, V) 和 (K, W) 的 RDD 上调用时,返回一个 (K, (V, W)) 类型的 RDD ,其中包含每个 key 的所有元素对。外部连接通过 leftOuterJoinrightOuterJoinfullOuterJoin 支持。

cogroup(otherDataset, [numPartitions])

当在类型为 (K, V) 和 (K, W) 的 RDD 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) 元组的RDD 。此操作也称为 groupWith

cartesian(otherDataset)

当在类型为 T 和 U 的 RDD 上调用时,返回一个 (T, U) 的 RDD (所有元素对)。

pipe(command, [envVars])

将 RDD 的每个分区通过一个 Shell 命令(例如 Perl 或 bash 脚本)进行管道操作。RDD 元素被写入进程的标准输入,其标准输出的行作为字符串 RDD 返回。

coalesce(numPartitions)

将 RDD 中的分区数量减少到 numPartitions。在对大型 RDD 进行过滤后,这对于更高效地运行操作很有用。

repartition(numPartitions)

随机地重新混洗 RDD 中的数据,以创建更多或更少的分区并使其在这些分区中保持平衡。这总是通过网络混洗所有数据。

repartitionAndSortWithinPartitions(partitioner)

根据给定的分区器重新分区 RDD,并在每个结果分区内按键对记录进行排序。这比调用 repartition然后在每个分区内排序更高效,因为它可以在混洗机制中下推排序。

下表列出了 Spark 支持的一些行动算子

行动

含义

reduce(func)

使用函数 func(接受两个参数并返回一个)聚合 RDD 的元素。该函数应该是可交换和结合的,以便可以并行正确计算。

collect()

将 RDD 的所有元素作为数组返回给驱动程序。这通常在过滤器或其他操作返回足够小的 RDD 后很有用。

count()

返回 RDD 中的元素数量。

first()

返回 RDD 的第一个元素(类似于 take(1))。

take(n)

返回一个包含 RDD 前 n 个元素的数组。

takeSample(withReplacement, num, [seed])

返回一个包含 RDD num 个元素的随机样本数组,有放回或无放回,可选地预先指定随机数生成器种子。

takeOrdered(n, [ordering])

使用元素的自然顺序或自定义比较器,返回 RDD 的前 n 个元素。

saveAsTextFile(path)

将 RDD 的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录。Spark 将对每个元素调用 toString 方法,将其转换为文件中的一行文本。

saveAsSequenceFile(path)
(Java 和 Scala)

将 RDD 的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径。此操作适用于实现了 Hadoop Writable 接口的键值对 RDD。在 Scala 中,它也适用于可以隐式转换为 Writable 的类型(Spark 包含对 Int、Double、String 等基本类型的转换)。

saveAsObjectFile(path)
(Java 和 Scala)

使用 Java 序列化以简单格式写入数据集元素,然后可以使用 SparkContext.objectFile()加载。

countByKey()

仅适用于类型为 (K, V) 的 RDD。返回一个 (K, Int) 对的哈希映射,其中包含每个键的计数。

foreach(func)

对 RDD 的每个元素运行函数 func。这通常用于产生副作用,例如更新累加器或与外部存储系统交互。
注意:在 foreach() 之外修改除了累加器(`Accumulator`)之外的变量可能会导致未定义的行为。有关更多详细信息,请参阅理解闭包

Shuffle 操作

Spark 中的某些算子会触发一个称为shuffle的事件。shuffle是 Spark 重新分发数据的机制,以便数据在分区之间以不同方式分组。这通常涉及在Executor之间复制数据,使得shuffle成为一个复杂且代价高昂的操作。对Spark的shuffle原理感兴趣的同学可以参考老言的《Spark Shuffle原理及实现机制》章节。

RDD依赖关系

RDD 之间的依赖关系描述了一个 RDD 如何由其他 RDD 计算而来,它是 Spark 实现容错和任务调度的基础。

依赖类型

窄依赖(Narrow Dependency)

宽依赖(Wide/Shuffle Dependency)

定义

父 RDD 的每个分区最多被子 RDD 的一个分区使用

父 RDD 的每个分区可能被多个子分区使用

数据流动

同一节点内完成,无需网络传输

需要跨节点传输,触发 Shuffle

典型算子

mapfilterunionflatMap

reduceByKeygroupByKeyjoin(非协同划分)

容错成本

低,只需重新计算丢失的父分区

高,需重新计算所有相关父分区

阶段划分

同一 Stage 内

划分新 Stage 的边界

依赖关系的意义

  1. 容错恢复窄依赖只需重算丢失分区,宽依赖需重算更多数据
  2. 阶段划分Spark 根据宽依赖将作业划分为多个 Stage
  3. 执行优化窄依赖可流水线执行,宽依赖需等待 Shuffle 完成
  4. 血缘追踪依赖关系构成 RDD 的血缘图(Lineage),用于故障恢复

DAG 的生成和划分 Stage

当行动算子(Action)被调用时,Spark 触发 DAG 构建。系统从该 RDD 开始逆向追溯所有父 RDD 及转换算子,记录完整的依赖关系形成计算血缘图。DAG 中的节点代表 RDD 或算子,边代表依赖关系,描述了从数据源到最终结果的完整计算流程。

为什么要划分Stage

在处理具有复杂业务逻辑的任务时,如果涉及到shuffle操作,则表明当前阶段的计算结果是下一阶段执行的前提条件,即下一阶段的数据处理依赖于前一阶段产生的数据。基于这一特性,我们可以依据shuffle(或者说宽依赖)来进行任务划分,从而将整个有向无环图(DAG)分解为若干个独立的Stage或阶段。在每个Stage内部,可以包含一系列连续的算子操作,这些操作能够被组织成一条pipeline流水线。值得注意的是,在这条流水线中,对于同一数据集的不同分区,其上的处理任务是可以并行执行的,这有助于提高整体计算效率。

Stage 划分

DAGScheduler 根据宽依赖(Shuffle Dependency)将 DAG 划分为多个 Stage:

依赖类型

特点

Stage 处理

窄依赖

父 RDD 每个分区最多被子 RDD 一个分区使用

同一 Stage 内,可流水线执行

宽依赖

父 RDD 分区可能被多个子分区使用,需 Shuffle

Stage 划分边界

划分规则是遇到宽依赖就切断并开始新 Stage,从行动算子逆向回溯到源头 RDD,Stage 数量等于宽依赖数量加1。

执行流程

  1. 行动算子触发 DAG 构建
  2. DAGScheduler 根据宽依赖划分 Stage
  3. 每个 Stage 转化为 TaskSet 提交给 TaskScheduler
  4. TaskScheduler 将任务分发到 Executor 执行

RDD 持久化

RDD 持久化是 Spark 最重要的功能之一,它将计算后的 RDD 分区存储在节点内存中,供后续操作复用。这一机制可使后续计算速度提升 10 倍以上,是迭代算法和快速交互查询的关键工具。

目前RDD的实现机制分为两种

  • cache:使用默认存储级别 MEMORY_ONLY(内存中存储反序列化对象)的简写
  • persist:可自定义存储级别,通过传入 StorageLevel 对象设置

示例:

# 示例
distFile.cache()  # 等价于 distFile.persist(StorageLevel.MEMORY_ONLY)
distFile.persist(StorageLevel.MEMORY_AND_DISK)  # 内存+磁盘

通过查看 RDD 的源码发现 cache 最终也是调用了 persist 无参方法(默认存储只存在内存中):

每个持久化 RDD 可选择不同的存储策略:

存储级别

含义

MEMORY_ONLY

在 JVM 中将 RDD 存储为反序列化的 Java 对象。如果内存不足 ,某些分区将不会被缓存,并且每次需要时都会即时重新计算。这是默认级别。

MEMORY_AND_DISK

在 JVM 中将 RDD 存储为反序列化的 Java 对象。如果内存不足,则将不足的分区存储在磁盘上,并在需要时从磁盘读取。

MEMORY_ONLY_SER
(Java 和 Scala)

将 RDD 存储为序列化的 Java 对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,尤其是在使用快速序列化器时,但读取时更耗 CPU。

MEMORY_AND_DISK_SER
(Java 和 Scala)

类似于 MEMORY_ONLY_SER,但将内存溢出的分区溢写到磁盘,而不是每次需要时即时重新计算它们。

DISK_ONLY

仅将 RDD 分区存储在磁盘上。

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

与上述级别相同,但在两个集群节点上复制每个分区,这样即使节点挂掉也有一定的容错性。

OFF_HEAP (实验性)

类似于 MEMORY_ONLY_SER,但将数据存储在堆外内存中。这要求启用堆外内存,对Spark内存模型感兴趣的同学可以参考老言的《Spark 内存模型》章节。

Spark 会自动监控每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式淘汰旧数据分区。如果您想手动移除 RDD 而不是等待它从缓存中淘汰,请使用 RDD.unpersist() 方法。请注意,此方法默认是异步的。要阻塞它直到资源被释放,请在调用此方法时指定 blocking=true

RDD的容错机制

核心原理

Spark 的容错机制基于血缘关系(Lineage)。RDD 本身不可变,但会记录自己是如何从其他 RDD 转换而来的完整过程。当某个分区数据丢失时,Spark 可以根据血缘关系重新计算丢失的分区,而非复制备份数据。

容错策略

依赖类型

恢复方式

开销

窄依赖

只需重新计算丢失分区涉及的父分区

宽依赖

需重新计算所有相关父分区

Checkpoint 机制

对于血缘链过长的 RDD,重新计算开销会很大。Spark 提供 Checkpoint 机制,将 RDD 持久化到可靠存储(如 HDFS),切断血缘链,加速故障恢复。

优势与局限

优势:无需数据复制,节省存储开销;适合计算成本低于数据复制的场景。

局限:血缘链过长时恢复慢;宽依赖故障恢复开销大;需配合 Checkpoint 使用。

共享变量

默认机制的局限

Spark 默认采用变量副本机制:当任务在 Executor 节点上并行执行时,Spark 会将函数中使用的所有变量复制一份发送到每个任务。这种设计简单可靠,但存在两个问题:

  1. 效率低下:如果一个大变量(如查找表、模型参数)被大量任务使用,会导致大量重复数据传输,浪费网络带宽和节点内存
  2. 无法共享状态:任务对变量的修改仅存在于本地副本,无法传递回驱动程序或其他任务,无法实现跨任务的状态累积

广播变量

广播变量是 Spark 提供的一种只读共享变量机制,用于将大变量高效地分发到集群的所有节点上,广播变量能把 Driver 端的大数据(如字典、配置)一次性高效地发给所有节点,让每个节点都存一份只读的副本,避免反复传输浪费网络资源。

但是,广播变量也有自己的局限性,比如说它是是只读的,不能在任务中修改,因此它适合"一次分发,多次读取"的场景。

创建广播变量示例:

# 创建广播变量
broadcast_var = sc.broadcast([1, 2, 3, 4, 5])
# 在算子中使用
rdd.map(lambda x: x + broadcast_var.value)

要释放广播变量复制到执行器上的资源,请调用 .unpersist()。如果之后再次使用该广播变量,它将被重新广播。要永久释放广播变量使用的所有资源,请调用 .destroy()。此后不能再使用该广播变量。请注意,这些方法默认不阻塞。要阻塞直到资源被释放,请在调用它们时指定 blocking=true

释放广播变量示例:

broadcast_var.unpersist()
broadcast_var.destroy()

累加器

累加器是 Spark 提供的一种只写共享变量机制,用于在并行任务中进行累加操作,常用于计数、求和等场景。累加器允许各个任务安全地累加值,最终结果可在驱动程序获取。

创建累加器示例:

# 创建累加器
accum = sc.accumulator(0)
# 在算子中使用
rdd.foreach(lambda x: accum.add(x))
# 在驱动程序获取结果
print(accum.value)

总结

OK,PySpark的入门教程到这里就结束了,后续老言还会带着大家更深入的了解Apache Spark,关注老言,后续会有一系列精彩的Spark SQL内容呈现给大家,让你在大数据处理的道路上越走越顺。

相关文章
|
10天前
|
JSON 监控 安全
小红书笔记详情数据获取实战:从笔记链接提取 ID 到解析详情
小红书笔记详情API可获取标题、正文、作者、互动数据、图文/视频资源及话题标签等结构化信息,支持自定义字段与评论拉取。适用于内容分析、竞品监控、营销优化与用户研究,HTTPS+JSON接口,Python调用便捷。(239字)
|
2月前
|
机器学习/深度学习 缓存 物联网
打造社交APP人物动漫化:通义万相wan2.x训练优化指南
本项目基于通义万相AIGC模型,为社交APP打造“真人变身跳舞动漫仙女”特效视频生成功能。通过LoRA微调与全量训练结合,并引入Sage Attention、TeaCache、xDIT并行等优化技术,实现高质量、高效率的动漫风格视频生成,兼顾视觉效果与落地成本,最终优选性价比最高的wan2.1 lora模型用于生产部署。(239字)
1159 103
|
20小时前
|
存储 人工智能 弹性计算
2026新版OpenClaw/Clawdbot完全指南:一键部署+命令大全+实战Skills,从入门到精通
OpenClaw 作为开源AI助手的标杆,更新速度持续领跑行业,最新 v2026.2.12 版本已原生支持飞书通道,无需额外开发webhook或寻找第三方插件,彻底解决了中国用户的核心痛点。对于新手而言,掌握部署流程与常用命令是发挥其强大功能的基础;对于进阶用户,熟练运用模型切换、插件管理、浏览器自动化等命令,能大幅提升工作效率。
103 13
|
1月前
|
存储 缓存 数据建模
StarRocks + Paimon: 构建 Lakehouse Native 数据引擎
12月10日,Streaming Lakehouse Meetup Online EP.2重磅回归,聚焦StarRocks与Apache Paimon深度集成,探讨Lakehouse Native数据引擎的构建。活动涵盖架构统一、多源联邦分析、性能优化及可观测性提升,助力企业打造高效实时湖仓一体平台。
363 39
|
1月前
|
存储 缓存 调度
阿里云Tair KVCache仿真分析:高精度的计算和缓存模拟设计与实现
在大模型推理迈向“智能体时代”的今天,KVCache 已从性能优化手段升级为系统级基础设施,“显存内缓存”模式在长上下文、多轮交互等场景下难以为继,而“以存代算”的多级 KVCache 架构虽突破了容量瓶颈,却引入了一个由模型结构、硬件平台、推理引擎与缓存策略等因素交织而成的高维配置空间。如何在满足 SLO(如延迟、吞吐等服务等级目标)的前提下,找到“时延–吞吐–成本”的最优平衡点,成为规模化部署的核心挑战。
525 39
阿里云Tair KVCache仿真分析:高精度的计算和缓存模拟设计与实现
|
5天前
|
机器学习/深度学习 SQL 人工智能
别再群发拜年消息了!三步微调AI,让它学会你的“独家语气”
每逢春节,通用AI祝福总显生硬空洞。本文探讨如何通过微调(LoRA),将“人情世故”转化为结构化数据(称呼/关系/细节/风格等),让AI真正学会你的语气与记忆,生成有温度、带梗、专属的个性化祝福——技术不是替代表达,而是帮你把来不及说的情意,说得恰到好处。(239字)
161 15
别再群发拜年消息了!三步微调AI,让它学会你的“独家语气”
|
9天前
|
Java 应用服务中间件 Shell
Apache Tomcat 历史版本下载地址 官网地址
本指南详解Tomcat (以7.0.67为例)的完整部署流程:从官网下载历史版本、解压安装,到启动/停止服务(startup.sh/shutdown.sh),再到配置开机自启(systemctl)。涵盖目录结构说明及端口验证方法,适合Linux服务器快速部署。
247 134
|
12天前
|
弹性计算 人工智能 前端开发
2026年OpenClaw(原Clawdbot)部署步骤+Web页面集成保姆级教程
2026年OpenClaw(前身为Clawdbot)凭借轻量化容器化架构、灵活的插件扩展体系,成为企业快速搭建定制化AI应用的核心框架;阿里云提供的弹性计算资源、成熟的云端运维能力与高可用网络环境,为OpenClaw的稳定运行提供了坚实基础;而将OpenClaw集成至自有Web页面,可实现“网页端交互+云端AI处理”的一体化体验,覆盖智能客服、办公助手、数据查询等多场景。本文基于2026年最新版本实测,从阿里云环境搭建、OpenClaw部署、Web页面集成配置到功能验证,提供包含完整代码命令的保姆级教程,零基础用户也能零失误完成部署与集成。
646 4
|
9天前
|
自然语言处理 安全 机器人
OpenClaw(Clawdbot)一键部署+直连苹果生态Skills教程,无需Mac Mini也能玩转iPhone/iCloud
OpenClaw的爆火让Mac Mini成了数码圈抢手货,二手市场溢价严重,而苹果生态的「围墙花园」似乎也让非Mac用户望而却步——想让OpenClaw对接iPhone、iCloud,难道必须为硬件买单?答案是否定的。只需在阿里云轻量应用服务器完成OpenClaw零基础一键部署,再安装专属苹果生态Skills,就能通过飞书控制台直接接管iPhone、操作iCloud,实现相册同步、日程管理、云盘操作、设备查找等全功能,用低成本云服务器打破苹果的硬件壁垒,真正做到「无Mac也能玩转OpenClaw+苹果生态」。
520 9
|
10天前
|
人工智能 运维 数据可视化
2026年新手零门槛部署OpenClaw(Clawdbot) + 接入WhatsApp保姆级教程
对于零基础新手而言,部署OpenClaw(原Clawdbot,曾用名Moltbot)并接入WhatsApp,很容易陷入“服务器配置混乱、依赖安装失败、WhatsApp绑定无响应”的困境。2026年,阿里云针对OpenClaw推出新手专属一键部署方案,将环境配置、依赖安装、服务启动全流程封装为可视化操作和可复制脚本,无需专业开发、运维知识,无需手动调试Node.js等复杂依赖;同时,OpenClaw优化了WhatsApp接入逻辑,简化二维码绑定、权限配置和参数调试步骤,新手全程“抄作业”,40分钟即可完成从阿里云服务器部署OpenClaw,到接入WhatsApp实现AI智能交互的全流程。
416 8