大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


Spark RDD的依赖关系

重回 WordCount

RDD 持久化

RDD 缓存

RDD容错机制

基本概念

涉及到的算子:checkpoint,也是Transformation


Spark中对于数据的保存除了持久化操作外,还提供了检查点的机制

检查点本质是通过RDD写入高可靠的磁盘,主要目的是为了容错。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。

Lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销

cache和checkpoint是有显著区别的,缓存把RDD计算出来然后放到内存中,但RDD的依赖链不能丢掉,当某个点某个Executor宕机了,上面cache的RDD就会丢掉,需要通过依赖链重新计算。不同的是,checkpoint是把RDD保存在HDFS中,是多副本的可靠存储,此时依赖链可以丢弃,所以斩断了依赖链。

适合场景

DAG中的Lineage过长,如果重新计算,开销会很大

在宽依赖上做checkpoint获得的收益更大

启动Shell

# 启动 spark-shell
spark-shell --master local[*]

checkpoint

// 设置检查点目录
sc.setCheckpointDir("/tmp/checkpoint")

val rdd1 = sc.parallelize(1 to 1000)
val rdd2 = rdd1.map(_*2)
rdd2.checkpoint
// checkpoint是lazy操作
rdd2.isCheckpointed

可以发现,返回结果是False

RDD 依赖关系1

checkpoint之前的rdd依赖关系

  • rdd2.dependencies(0).rdd
  • rdd2.dependencies(0).rdd.collect
  • 我们可以观察到,依赖关系是有的,关系到之前的 rdd1 的数据了:

触发checkpoint

我们可以通过执行 Action 的方式,来触发 checkpoint

执行一次action,触发checkpoint的执行

  • rdd2.count
  • rdd2.isCheckpointed
  • 此时观察,可以发现 checkpoint 已经是 True 了:

RDD依赖关系2

我们再次观察RDD的依赖关系:

再次查看RDD的依赖关系。可以看到checkpoint后,RDD的lineage被截断,变成从checkpointRDD开始


rdd2.dependencies(0).rdd

rdd2.dependencies(0).rdd.collect

此时观察到,已经不是最开始的 rdd1 了:

查看checkpoint

我们可以查看对应的保存的文件,查看RDD所依赖的checkpoint文件

  • rdd2.getCheckpointFile
    运行的结果如下图:

RDD的分区

基本概念

spark.default.paralleism: 默认的并发数 2

本地模式

# 此时 spark.default.paralleism 为 N
spark-shell --master local[N]
# 此时 spark.default.paralleism 为 1
spark-shell --master local

伪分布式

  • x为本机上启动的Executor数
  • y为每个Executor使用的core数
  • z为每个Executor使用的内存
  • spark.default.paralleism 为 x * y
spark-shell --master local-cluster[x,y,z]

分布式模式

spark.default.paralleism = max(应用程序持有Executor的core总数, 2)
• 1

创建RDD方式

集合创建

简单的说,RDD分区数等于cores总数

val rdd1 = sc.paralleize(1 to 100)
rdd.getNumPartitions

textFile创建

如果没有指定分区数:


本地文件: rdd的分区数 = max(本地文件分片数,sc.defaultMinPartitions)

HDFS文件:rdd的分区数 = max(HDFS文件block数,sc.defaultMinPartitions)

需要额外注意的是:


本地文件分片数 = 本地文件大小 / 32M

  • 读取 HDFS 文件,同时指定了分区数 < HDFS文件的Block数,指定的数将不会生效
val rdd = sc.textFile("data/1.txt")
rdd.getNumPartitions

RDD分区器

判断分区器

以下RDD分别是否有分区器,是什么类型的分区器

val rdd1 = sc.textFile("/wcinput/wc.txt")
rdd1.partitioner

val rdd2 = sc.flatMap(_.split("\\s+"))
rdd2.partitioner

val rdd3 = rdd2.map((_, 1))
rdd3.partitioner

val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.partitioner

val rdd5 = rdd4.sortByKey()
rdd5.partitioner

接下篇:https://developer.aliyun.com/article/1622536

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
2天前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
15 0
|
2天前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
13 0
|
2天前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
11 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
2天前
|
存储 分布式计算 大数据
大数据-145 Apache Kudu 架构解读 Master Table 分区 读写
大数据-145 Apache Kudu 架构解读 Master Table 分区 读写
9 0
|
2天前
|
存储 算法 NoSQL
大数据-138 - ClickHouse 集群 表引擎详解3 - MergeTree 存储结构 数据标记 分区 索引 标记 压缩协同
大数据-138 - ClickHouse 集群 表引擎详解3 - MergeTree 存储结构 数据标记 分区 索引 标记 压缩协同
10 0
|
2天前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
8 0
|
2天前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
9 0
|
存储 分布式计算 Spark
聊聊Spark的分区
通过上篇文章【Spark RDD详解】,大家应该了解到Spark会通过DAG将一个Spark job中用到的所有RDD划分为不同的stage,每个stage内部都会有很多子任务处理数据,而每个stage的任务数是决定性能优劣的关键指标。
聊聊Spark的分区
|
2天前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
12 0
|
2天前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
10 0