大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


Spark程序的优化:广播变量、累加器

研究广播变量

研究累加器

Standalone提交

Standalone组成

Standalone 模式下有四个重要的组成部分,分别是:


Driver: 用户编写的Spark应用程序就运行在Driver上,由Driver进程执行

Master:主要负责资源的调度和分配,并进行集群的监控等职责

Worker:Worker运行在集群中的一台服务器上,负责管理该节点上的资源,负责启动节点上的Executor

Executor:一个Worker上可以运行多个Executor,Executor通过启动多个线程(Task)对RDD的分区进行计算

SparkContext组件

什么是SparkContext

SparkContext 是 Spark 应用程序的主控制器,它负责与 Spark 集群的管理节点(Driver)和工作节点(Workers)进行交互。通过 SparkContext,用户可以提交作业、管理 RDD(弹性分布式数据集)和其他数据集,并执行各种操作。SparkContext 是 Spark 应用程序的基础,每个应用程序在启动时都会创建一个 SparkContext 实例。


SparkContext 的主要职责

集群连接: SparkContext 负责连接到集群管理器(如 YARN、Mesos 或 Spark 的独立集群管理器),并获取集群的资源,以便在集群上执行任务。

作业调度: SparkContext 通过 DAG(有向无环图)将用户的应用程序逻辑转换为一系列任务(Tasks),然后将这些任务分配给集群中的工作节点执行。

RDD 管理: RDD 是 Spark 的核心抽象,用于表示分布式数据集。SparkContext 提供了创建 RDD 的方法,如从外部存储系统(HDFS、S3 等)中加载数据,或者从 Scala 集合创建 RDD。

广播变量和累加器: SparkContext 提供了广播变量和累加器的支持,广播变量用于在集群中的所有节点间共享只读数据,累加器用于在集群中执行全局计数或求和操作。

检查点: 为了支持容错,SparkContext 提供了将 RDD 存储到可靠存储中的功能,这称为检查点。这样,在发生故障时,Spark 可以从检查点恢复 RDD。

SparkContext中的三大组件:

DAGScheduler:负责将DAG划分若干个Stage

TaskScheduler:将DAGScheduler提交的Stage(Taskset)进行优先排序,再将Task发送到Executor

SchedulerBackend:定义了许多与Executor事件相关的处理,包括:新的Executor注册进来的时候记录Executor的信息,增加全局的资源量(核数),Executor更新状态,若任务完成的话,回收Core,其他停止Executor、Remove Executor等事件

常用的 SparkContext 方法

parallelize: 将本地集合转换为 RDD。

textFile: 从文本文件中读取数据并创建 RDD。

stop: 停止 SparkContext。

broadcast: 创建广播变量。

accumulator: 创建累加器。

Standalone提交

启动应用程序,完成SparkContext的初始化

Driver向Master注册,申请资源

Master检查集群资源状况,若集群资源满足,通知Worker启动Executor

Executor启动后向Driver注册(称为反向注册)

Driver完成DAG的解析,得到Tasks,然后向Executor发送Task

Executor向Driver汇总任务的执行情况

应用程序执行完毕,回收资源

Shuffle原理

基本概念

Shuffle的本意是洗牌,目的是为了把牌弄乱。


Spark、Hadoop中的Shuffle可不是为了把数据弄乱,而是为了将随机排列的数据转换成具有一定规则的数据。

Shuffle是MapReduce计算框架中的一个特殊的阶段,介于Map和Reduce之间。

Shuffle涉及到了本地磁盘(非HDFS)的读写和网络传输,大多数Spark作业的性能都消耗在了Shuffle阶段,因此Shuffle性能的高低直接影响到了整个程序的运行效率

Shuffle历史

Spark 0.8 及以前 Hash Based Shuflle

Spark 0.8.1 为 Hash Based Shuflle 引入 File Consolidation 机制

Spark 0.9 引入 External Append Only Map

Spark 1.1 引入 Sort Based Shuffle,但默认仍为 Hash Based Shuffle

Spark 1.2 默认的 Shuffle方式改为 Sort Based Shuffle

Spark 1.4 引入 Tungsten-Sort Based Shuffle

Spark 1.6 Tungsten-Sort 并入 Sort Based Shuffle

Spark 2.0 Hash Based Shuffle 退出历史舞台

Hash Base Shuffle V1

简单介绍

Hash-based Shuffle 是 Apache Spark 中数据分布和重新排序的一种方式。Shuffle 是指在不同阶段的任务之间重新分配数据的过程。Hash-based Shuffle 在 Spark 1.x 版本中引入,被称为 Shuffle V1。

Shuffle V1 是 Spark 最初版本使用的 Shuffle 机制,基于 Hash 方法实现数据分布。它的主要特点是通过对数据的键进行哈希处理,将数据分配到相应的 reducer 节点上。Shuffle V1 的实现相对简单,但在大规模数据处理时存在一些局限性,如磁盘 I/O 过多、垃圾回收压力大等。


每个 Shuffle Map Task 需要为每个下游的Task创建一个单独的文件

Shuffle 过程中会生成海量的小文件,同时打开过多的文件、IO效率低

工作原理

Map 端处理:


每个 map 任务在完成后,会根据键的哈希值将数据划分到不同的 bucket 中,这些 bucket 对应下游的 reduce 任务。

Map 任务会将这些数据块(称为 partition)写入本地磁盘,并为每个 reduce 任务生成一个文件(包括索引文件和数据文件)。

Reduce 端处理:


当 reduce 任务启动时,它会从所有 map 任务生成的输出中拉取对应的数据块。

Reduce 任务根据 map 任务输出的索引文件来读取相应的 partition 数据,并在本地进行聚合或其他处理。

局限性

磁盘 I/O: 每个 map 任务为每个 reduce 任务生成单独的文件,这会导致大量的小文件和频繁的磁盘 I/O 操作。当集群规模和数据量增大时,I/O 开销变得非常大。

垃圾回收: Shuffle V1 在处理过程中会产生大量的中间结果,导致 JVM 内存中会积累大量对象,增加了垃圾回收的压力,可能导致频繁的 GC 暂停(Stop-the-world)。

容错性: 如果某个任务失败,Spark 需要重新计算该任务的所有中间结果,Shuffle V1 没有很好的机制来优化这一过程。

适用场景

尽管 Shuffle V1 存在一些问题,但在小规模数据处理或集群中,Shuffle V1 的性能表现还是可以接受的,特别是对资源消耗较少的作业。不过,随着数据规模的增大,Shuffle V1 的局限性会变得明显,因此后续的 Spark 版本引入了更优化的 Shuffle 机制(Shuffle V2 和 Tungsten-Sort Based Shuffle)。


Hash Base Shuffle V2

简单介绍

Hash-Based Shuffle V2 是 Apache Spark 中对最初版本的 Hash-Based Shuffle 进行的改进,旨在解决 Shuffle V1 中存在的一些性能和稳定性问题。Shuffle 是分布式计算中数据重新分布的重要机制,而 Shuffle V2 的引入大大提高了 Spark 在处理大规模数据集时的性能和效率。


核心思想

Hash Base Shuffle V2 核心思想:

允许不同Task复用同一批磁盘文件,有效将多个Task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升ShuffleWrite的性能,一定程度上解决了HashV1中的问题,但不彻底。

Hash Shuffle 规避了排序,提高了性能,总的来说在 Hash Shuffle过程中生成了海量的小文件

Shuffle V2 的改进点

合并输出文件:


在 Shuffle V2 中,map 任务不再为每个 reduce 任务生成一个单独的文件,而是将多个 partition 的输出合并到一个文件中。这样,每个 map 任务只生成一个数据文件和一个索引文件,大大减少了生成的小文件数量。

索引文件记录了每个 reduce 任务的数据在数据文件中的偏移量和长度,reduce 任务可以根据这个索引文件来定位它所需的数据。

磁盘 I/O 优化:


通过合并输出文件,Shuffle V2 大幅减少了磁盘 I/O 操作,减少了文件系统的压力,并且降低了与小文件相关的元数据管理开销。

内存消耗优化:


由于减少了文件数量,Shuffle V2 对 JVM 的内存压力也有所降低,垃圾回收(GC)的频率和时长得到了优化。

容错性改进:


Shuffle V2 采用了更加高效的数据管理机制,使得在任务失败时,重新拉取数据的开销更小。此外,数据文件的合并也使得在节点故障时可以更容易地恢复数据。

工作原理

Shuffle V2 的工作原理

Map 端处理:


每个 map 任务在处理数据时,基于键的哈希值将数据分配到不同的 partition。与 Shuffle V1 不同的是,Shuffle V2 将多个 partition 的数据写入同一个文件。

同时生成一个索引文件,记录每个 partition 在数据文件中的位置和长度。

Reduce 端处理:

Reduce 任务通过索引文件,定位需要处理的数据块,并从 Map 任务的输出文件中读取相应的数据。

通过这种方式,减少了 I/O 开销,并优化了数据拉取的效率。

适用场景

Shuffle V2 适用于绝大多数的 Spark 作业,特别是在处理大规模数据集时效果尤为明显。它减少了磁盘 I/O 操作,优化了内存消耗,并提高了系统的容错性。对于需要高性能和稳定性的场景,Shuffle V2 是更好的选择。


Sort Base Shuffle

Sort Base Shuffle 大大减少了 Shuffle 过程中产生的文件数,提高 Shuffle 的效率。

Spark Shuffle 与 Hadoop Shuffle 从目的、意义、功能上看是类似的,实现上有区别。

RDD编程优化

RDD复用

避免创建重复的RDD,在开发过程中要注意,对于同一份数据,只应该创建一个RDD,不要创建过多个RDD来表示同一份数据。


RDD缓存/持久化

当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费

对多次使用的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据

RDD的持久化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据的体积,将数据完整存储在内存中

巧用 filter

尽可能过早地执行filter操作,过滤无用数据

在filter过滤较多数据后,使用 coalesce 对数据进行重分区

使用高性能算子

避免使用 groupByKey,根据场景选择使用高性能的聚合算子:reduceByKey、aggregateByKey

coalesce、repartition,在可能得情况下优先选择没有Shuffle的操作

foreachPartition 优化输出操作

map、mapPartition,选择合理的选择算子,mapPartitions性能更好,但数据量过大时可能会OOM

用 repartitionAndSortWithinPartitions 替代 repartition + Sort 操作

合理使用 cache、persist、checkpoint,选择合理的数据存储级别

filter 的使用

减少对数据源的扫描(算法复杂)

设置合理的并行度

Spark作业中的并行度指各个Stage的Task的数量

设置合理的并行度,让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度尽可能大,达到充分利用集群资源。

广播大变量

默认情况下,Task中的算子中如果使用了外部变量,每个Task都会获取一份变量的副本,这会造多余的网络传输和内存消耗

使用广播变量,只会在每个Executor保存一个副本,Executor的所有Task共用此广播变量,这样就节约了网络及内存资源


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
120 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
28天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
42 2
zdl
|
28天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
144 56
|
25天前
|
存储 算法 固态存储
大数据分区优化存储成本
大数据分区优化存储成本
31 4
|
26天前
|
SQL 存储 大数据
单机顶集群的大数据技术来了
大数据时代,分布式数仓如MPP成为热门技术,但其高昂的成本让人望而却步。对于多数任务,数据量并未达到PB级,单体数据库即可胜任。然而,由于SQL语法的局限性和计算任务的复杂性,分布式解决方案显得更为必要。esProc SPL作为一种开源轻量级计算引擎,通过高效的算法和存储机制,实现了单机性能超越集群的效果,为低成本、高效能的数据处理提供了新选择。
|
27天前
|
存储 大数据 Serverless
大数据增加分区优化资源使用
大数据增加分区优化资源使用
24 1
|
1月前
|
存储 NoSQL 大数据
大数据 数据存储优化
【10月更文挑战第25天】
76 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
102 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
71 1
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。