大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)

简介: 大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


Spark RDD的依赖关系

重回 WordCount

RDD 持久化

RDD 缓存

RDD容错机制

基本概念

涉及到的算子:checkpoint,也是Transformation


Spark中对于数据的保存除了持久化操作外,还提供了检查点的机制

检查点本质是通过RDD写入高可靠的磁盘,主要目的是为了容错。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。

Lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销

cache和checkpoint是有显著区别的,缓存把RDD计算出来然后放到内存中,但RDD的依赖链不能丢掉,当某个点某个Executor宕机了,上面cache的RDD就会丢掉,需要通过依赖链重新计算。不同的是,checkpoint是把RDD保存在HDFS中,是多副本的可靠存储,此时依赖链可以丢弃,所以斩断了依赖链。

适合场景

DAG中的Lineage过长,如果重新计算,开销会很大

在宽依赖上做checkpoint获得的收益更大

启动Shell

# 启动 spark-shell
spark-shell --master local[*]

checkpoint

// 设置检查点目录
sc.setCheckpointDir("/tmp/checkpoint")

val rdd1 = sc.parallelize(1 to 1000)
val rdd2 = rdd1.map(_*2)
rdd2.checkpoint
// checkpoint是lazy操作
rdd2.isCheckpointed

可以发现,返回结果是False

RDD 依赖关系1

checkpoint之前的rdd依赖关系

  • rdd2.dependencies(0).rdd
  • rdd2.dependencies(0).rdd.collect
  • 我们可以观察到,依赖关系是有的,关系到之前的 rdd1 的数据了:

触发checkpoint

我们可以通过执行 Action 的方式,来触发 checkpoint

执行一次action,触发checkpoint的执行

  • rdd2.count
  • rdd2.isCheckpointed
  • 此时观察,可以发现 checkpoint 已经是 True 了:

RDD依赖关系2

我们再次观察RDD的依赖关系:

再次查看RDD的依赖关系。可以看到checkpoint后,RDD的lineage被截断,变成从checkpointRDD开始


rdd2.dependencies(0).rdd

rdd2.dependencies(0).rdd.collect

此时观察到,已经不是最开始的 rdd1 了:

查看checkpoint

我们可以查看对应的保存的文件,查看RDD所依赖的checkpoint文件

  • rdd2.getCheckpointFile
    运行的结果如下图:

RDD的分区

基本概念

spark.default.paralleism: 默认的并发数 2

本地模式

# 此时 spark.default.paralleism 为 N
spark-shell --master local[N]
# 此时 spark.default.paralleism 为 1
spark-shell --master local

伪分布式

  • x为本机上启动的Executor数
  • y为每个Executor使用的core数
  • z为每个Executor使用的内存
  • spark.default.paralleism 为 x * y
spark-shell --master local-cluster[x,y,z]

分布式模式

spark.default.paralleism = max(应用程序持有Executor的core总数, 2)
• 1

创建RDD方式

集合创建

简单的说,RDD分区数等于cores总数

val rdd1 = sc.paralleize(1 to 100)
rdd.getNumPartitions

textFile创建

如果没有指定分区数:


本地文件: rdd的分区数 = max(本地文件分片数,sc.defaultMinPartitions)

HDFS文件:rdd的分区数 = max(HDFS文件block数,sc.defaultMinPartitions)

需要额外注意的是:


本地文件分片数 = 本地文件大小 / 32M

  • 读取 HDFS 文件,同时指定了分区数 < HDFS文件的Block数,指定的数将不会生效
val rdd = sc.textFile("data/1.txt")
rdd.getNumPartitions

RDD分区器

判断分区器

以下RDD分别是否有分区器,是什么类型的分区器

val rdd1 = sc.textFile("/wcinput/wc.txt")
rdd1.partitioner

val rdd2 = sc.flatMap(_.split("\\s+"))
rdd2.partitioner

val rdd3 = rdd2.map((_, 1))
rdd3.partitioner

val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.partitioner

val rdd5 = rdd4.sortByKey()
rdd5.partitioner

接下篇:https://developer.aliyun.com/article/1622536

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
9月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
469 0
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
1117 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
619 79
|
8月前
|
数据采集 分布式计算 大数据
不会Python,还敢说搞大数据?一文带你入门大数据编程的“硬核”真相
不会Python,还敢说搞大数据?一文带你入门大数据编程的“硬核”真相
193 1
|
人工智能 分布式计算 调度
打破资源边界、告别资源浪费:ACK One 多集群Spark和AI作业调度
ACK One多集群Spark作业调度,可以帮助您在不影响集群中正在运行的在线业务的前提下,打破资源边界,根据各集群实际剩余资源来进行调度,最大化您多集群中闲置资源的利用率。
|
存储 分布式计算 调度
Spark Master HA 主从切换过程不会影响到集群已有作业的运行, 为什么?
Spark Master 的高可用性(HA)机制确保主节点故障时,备用主节点能无缝接管集群管理,保障稳定运行。关键在于: 1. **Driver 和 Executor 独立**:任务执行不依赖 Master。 2. **应用状态保持**:备用 Master 通过 ZooKeeper 恢复集群状态。 3. **ZooKeeper 协调**:快速选举新 Master 并同步状态。 4. **容错机制**:任务可在其他 Executor 上重新调度。 这些特性保证了集群在 Master 故障时仍能正常运行。
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
571 2
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
673 2
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
571 1