【大数据】Apache Spark入门到实战 2

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【大数据】Apache Spark入门到实战

RDD

RDD的概念在Spark中十分重要,上面只是简单的介绍了一下,下面详细的对RDD展开介绍。

RDD是“Resilient Distributed Dataset”的缩写,从全称就可以了解到RDD的一些典型特性:

  • Resilient(弹性):RDD之间会形成有向无环图(DAG),如果RDD丢失了或者失效了,可以从父RDD重新计算得到。即容错性。
  • Distributed(分布式):RDD的数据是以逻辑分区的形式分布在集群的不同节点的。
  • Dataset(数据集):即RDD存储的数据记录,可以从外部数据生成RDD,例如Json文件,CSV文件,文本文件,数据库等。

RDD里面的数据集会被逻辑分成若干个分区,这些分区是分布在集群的不同节点的,基于这样的特性,RDD才能在集群不同节点并行计算。

RDD特性

  • 内存计算

Spark RDD运算数据是在内存中进行的,在内存足够的情况下,不会把中间结果存储在磁盘,所以计算速度非常高效。

  • 惰性求值

所有的转换操作都是惰性的,也就是说不会立即执行任务,只是把对数据的转换操作记录下来而已。只有碰到action操作才会被真正的执行。

  • 容错性

Spark RDD具备容错特性,在RDD失效或者数据丢失的时候,可以根据DAG从父RDD重新把数据集计算出来,以达到数据容错的效果。

  • 不变性

RDD是进程安全的,因为RDD是不可修改的。它可以在任何时间点被创建和查询,使得缓存,共享,备份都非常简单。在计算过程中,是RDD的不可修改特性保证了数据的一致性。

  • 持久化

可以调用cache或者persist函数,把RDD缓存在内存、磁盘,下次使用的时候不需要重新计算而是直接使用。

RDD操作

RDD支持两种操作:

  • 转换操作(Transformation)
  • 行动操作(Actions)

转换操作(Transformation)

转换操作以RDD做为输入参数,然后输出一个或者多个RDD。转换操作不会修改输入RDD。Map()Filter()这些都属于转换操作。

转换操作是惰性求值操作,只有在碰到行动操作(Actions)的时候,转换操作才会真正实行。转换操作分两种:窄依赖宽依赖(上文提到过)。

下面是一些常见的转换操作:

转换操作 描述
map 将函数应用于 RDD 中的每个元素,并返回一个新的 RDD
filter 返回一个新的 RDD,其中包含满足给定谓词的元素
flatMap 将函数应用于 RDD 中的每个元素,并将返回的迭代器展平为一个新的 RDD
union 返回一个新的 RDD,其中包含两个 RDD 的元素
distinct 返回一个新的 RDD,其中包含原始 RDD 中不同的元素
groupByKey 将键值对 RDD 中具有相同键的元素分组到一起,并返回一个新的 RDD
reduceByKey 将键值对 RDD 中具有相同键的元素聚合到一起,并返回一个新的 RDD
sortByKey 返回一个新的键值对 RDD,其中元素按照键排序

行动操作(Action)

Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。

Action 操作 描述
reduce 通过函数聚合 RDD 中的所有元素
collect 将 RDD 中的所有元素返回到驱动程序
count 返回 RDD 中的元素个数
first 返回 RDD 中的第一个元素
take 返回 RDD 中的前 n 个元素
takeOrdered 返回 RDD 中的前 n 个元素,按照自然顺序或指定的顺序排序
saveAsTextFile 将 RDD 中的元素保存到文本文件中
foreach 将函数应用于 RDD 中的每个元素

RDD 的创建方式

创建RDD有3种不同方式:

  • 从外部存储系统
  • 从其他RDD
  • 由一个已经存在的 Scala 集合创建

从外部存储系统

由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等:

val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

从其他RDD

通过已有的 RDD 经过算子转换生成新的 RDD:

val rdd2=rdd1.flatMap(_.split(" "))

由一个已经存在的 Scala 集合创建

val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

其实makeRDD 方法底层调用了 parallelize 方法:

RDD 缓存机制

RDD 缓存是在内存存储RDD计算结果的一种优化技术。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能。

要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了

需要注意的是,在触发action的时候,才会去执行持久化

cache()persist()的区别在于,cache()persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。

rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.unpersist()

存储级别

RDD存储级别主要有以下几种。

级别 使用空间 CPU时间 是否在内存中 是否在磁盘上 备注
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。
MEMORY_ONLY_2 数据存2份
MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化。这种方式更加节省内存
MEMORY_ONLY_SER_2 数据序列化,数据存2份
MEMORY_AND_DISK 中等 部分 部分 如果数据在内存中放不下,则溢写到磁盘
MEMORY_AND_DISK_2 中等 部分 部分 数据存2份
MEMORY_AND_DISK_SER 部分 部分 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化
MEMORY_AND_DISK_SER_2 部分 部分 数据存2份
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
DISK_ONLY_2 数据存2份
OFF_HEAP



这个目前是试验型选项,类似MEMORY_ONLY_SER,但是数据是存储在堆外内存的。

对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉了,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

RDD的血缘关系

血缘关系是指 RDD 之间的依赖关系。当你对一个 RDD 执行转换操作时,Spark 会生成一个新的 RDD,并记录这两个 RDD 之间的依赖关系。这种依赖关系就是血缘关系。

血缘关系可以帮助 Spark 在发生故障时恢复数据。当一个分区丢失时,Spark 可以根据血缘关系重新计算丢失的分区,而不需要从头开始重新计算整个 RDD。

血缘关系还可以帮助 Spark 优化计算过程。Spark 可以根据血缘关系合并多个连续的窄依赖转换,减少数据传输和通信开销。

我们可以执行toDebugString打印RDD的依赖关系。

下面是一个简单的例子:

val conf = new SparkConf().setAppName("Lineage Example").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1, 2, 3, 4, 5))
val mappedData = data.map(x => x + 1)
val filteredData = mappedData.filter(x => x % 2 == 0)
println(filteredData.toDebugString)

在这个例子中,我们首先创建了一个包含 5 个元素的 RDD,并对它执行了两个转换操作:mapfilter。然后,我们使用 toDebugString 方法打印了最终 RDD 的血缘关系。

运行这段代码后,你会看到类似下面的输出:

(2) MapPartitionsRDD[2] at filter at <console>:26 []
 |  MapPartitionsRDD[1] at map at <console>:24 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:22 []

这个输出表示最终的 RDD 是通过两个转换操作(mapfilter)从原始的 ParallelCollectionRDD 转换而来的。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
205 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
113 5
|
3月前
|
消息中间件 分布式计算 大数据
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
110 5
|
3月前
|
SQL 分布式计算 NoSQL
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
51 1
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
89 1
|
2月前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集
|
3月前
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
60 1
|
3月前
|
Oracle 大数据 数据挖掘
企业内训|大数据产品运营实战培训-某电信运营商大数据产品研发中心
本课程是TsingtaoAI专为某电信运营商的大数据产品研发中心的产品支撑组设计,旨在深入探讨大数据在电信运营商领域的应用与运营策略。通过密集的培训,从数据的本质与价值出发,系统解析大数据工具和技术的最新进展,深入剖析行业内外的实践案例。课程涵盖如何理解和评估数据、如何有效运用大数据技术、以及如何在不同业务场景中实现数据的价值转化。
72 0
|
分布式计算 算法 大数据
大数据实战之spark安装部署
楔子 我是在2013年底第一次听说Spark,当时我对Scala很感兴趣,而Spark就是使用Scala编写的。一段时间之后,我做了一个有趣的数据科学项目,它试着去预测在泰坦尼克号上幸存。
3093 0
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
86 0

热门文章

最新文章

推荐镜像

更多