【大数据】Apache Spark入门到实战 2

简介: 【大数据】Apache Spark入门到实战

RDD

RDD的概念在Spark中十分重要,上面只是简单的介绍了一下,下面详细的对RDD展开介绍。

RDD是“Resilient Distributed Dataset”的缩写,从全称就可以了解到RDD的一些典型特性:

  • Resilient(弹性):RDD之间会形成有向无环图(DAG),如果RDD丢失了或者失效了,可以从父RDD重新计算得到。即容错性。
  • Distributed(分布式):RDD的数据是以逻辑分区的形式分布在集群的不同节点的。
  • Dataset(数据集):即RDD存储的数据记录,可以从外部数据生成RDD,例如Json文件,CSV文件,文本文件,数据库等。

RDD里面的数据集会被逻辑分成若干个分区,这些分区是分布在集群的不同节点的,基于这样的特性,RDD才能在集群不同节点并行计算。

RDD特性

  • 内存计算

Spark RDD运算数据是在内存中进行的,在内存足够的情况下,不会把中间结果存储在磁盘,所以计算速度非常高效。

  • 惰性求值

所有的转换操作都是惰性的,也就是说不会立即执行任务,只是把对数据的转换操作记录下来而已。只有碰到action操作才会被真正的执行。

  • 容错性

Spark RDD具备容错特性,在RDD失效或者数据丢失的时候,可以根据DAG从父RDD重新把数据集计算出来,以达到数据容错的效果。

  • 不变性

RDD是进程安全的,因为RDD是不可修改的。它可以在任何时间点被创建和查询,使得缓存,共享,备份都非常简单。在计算过程中,是RDD的不可修改特性保证了数据的一致性。

  • 持久化

可以调用cache或者persist函数,把RDD缓存在内存、磁盘,下次使用的时候不需要重新计算而是直接使用。

RDD操作

RDD支持两种操作:

  • 转换操作(Transformation)
  • 行动操作(Actions)

转换操作(Transformation)

转换操作以RDD做为输入参数,然后输出一个或者多个RDD。转换操作不会修改输入RDD。Map()Filter()这些都属于转换操作。

转换操作是惰性求值操作,只有在碰到行动操作(Actions)的时候,转换操作才会真正实行。转换操作分两种:窄依赖宽依赖(上文提到过)。

下面是一些常见的转换操作:

转换操作 描述
map 将函数应用于 RDD 中的每个元素,并返回一个新的 RDD
filter 返回一个新的 RDD,其中包含满足给定谓词的元素
flatMap 将函数应用于 RDD 中的每个元素,并将返回的迭代器展平为一个新的 RDD
union 返回一个新的 RDD,其中包含两个 RDD 的元素
distinct 返回一个新的 RDD,其中包含原始 RDD 中不同的元素
groupByKey 将键值对 RDD 中具有相同键的元素分组到一起,并返回一个新的 RDD
reduceByKey 将键值对 RDD 中具有相同键的元素聚合到一起,并返回一个新的 RDD
sortByKey 返回一个新的键值对 RDD,其中元素按照键排序

行动操作(Action)

Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。

Action 操作 描述
reduce 通过函数聚合 RDD 中的所有元素
collect 将 RDD 中的所有元素返回到驱动程序
count 返回 RDD 中的元素个数
first 返回 RDD 中的第一个元素
take 返回 RDD 中的前 n 个元素
takeOrdered 返回 RDD 中的前 n 个元素,按照自然顺序或指定的顺序排序
saveAsTextFile 将 RDD 中的元素保存到文本文件中
foreach 将函数应用于 RDD 中的每个元素

RDD 的创建方式

创建RDD有3种不同方式:

  • 从外部存储系统
  • 从其他RDD
  • 由一个已经存在的 Scala 集合创建

从外部存储系统

由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等:

val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

从其他RDD

通过已有的 RDD 经过算子转换生成新的 RDD:

val rdd2=rdd1.flatMap(_.split(" "))

由一个已经存在的 Scala 集合创建

val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

其实makeRDD 方法底层调用了 parallelize 方法:

RDD 缓存机制

RDD 缓存是在内存存储RDD计算结果的一种优化技术。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能。

要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了

需要注意的是,在触发action的时候,才会去执行持久化

cache()persist()的区别在于,cache()persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。

rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.unpersist()

存储级别

RDD存储级别主要有以下几种。

级别 使用空间 CPU时间 是否在内存中 是否在磁盘上 备注
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。
MEMORY_ONLY_2 数据存2份
MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化。这种方式更加节省内存
MEMORY_ONLY_SER_2 数据序列化,数据存2份
MEMORY_AND_DISK 中等 部分 部分 如果数据在内存中放不下,则溢写到磁盘
MEMORY_AND_DISK_2 中等 部分 部分 数据存2份
MEMORY_AND_DISK_SER 部分 部分 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化
MEMORY_AND_DISK_SER_2 部分 部分 数据存2份
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
DISK_ONLY_2 数据存2份
OFF_HEAP



这个目前是试验型选项,类似MEMORY_ONLY_SER,但是数据是存储在堆外内存的。

对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉了,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

RDD的血缘关系

血缘关系是指 RDD 之间的依赖关系。当你对一个 RDD 执行转换操作时,Spark 会生成一个新的 RDD,并记录这两个 RDD 之间的依赖关系。这种依赖关系就是血缘关系。

血缘关系可以帮助 Spark 在发生故障时恢复数据。当一个分区丢失时,Spark 可以根据血缘关系重新计算丢失的分区,而不需要从头开始重新计算整个 RDD。

血缘关系还可以帮助 Spark 优化计算过程。Spark 可以根据血缘关系合并多个连续的窄依赖转换,减少数据传输和通信开销。

我们可以执行toDebugString打印RDD的依赖关系。

下面是一个简单的例子:

val conf = new SparkConf().setAppName("Lineage Example").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1, 2, 3, 4, 5))
val mappedData = data.map(x => x + 1)
val filteredData = mappedData.filter(x => x % 2 == 0)
println(filteredData.toDebugString)

在这个例子中,我们首先创建了一个包含 5 个元素的 RDD,并对它执行了两个转换操作:mapfilter。然后,我们使用 toDebugString 方法打印了最终 RDD 的血缘关系。

运行这段代码后,你会看到类似下面的输出:

(2) MapPartitionsRDD[2] at filter at <console>:26 []
 |  MapPartitionsRDD[1] at map at <console>:24 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:22 []

这个输出表示最终的 RDD 是通过两个转换操作(mapfilter)从原始的 ParallelCollectionRDD 转换而来的。

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
6月前
|
存储 SQL 监控
数据中台架构解析:湖仓一体的实战设计
在数据量激增的数字化时代,企业面临数据分散、使用效率低等问题。数据中台作为统一管理与应用数据的核心平台,结合湖仓一体架构,打通数据壁垒,实现高效流转与分析。本文详解湖仓一体的设计与落地实践,助力企业构建统一、灵活的数据底座,驱动业务决策与创新。
|
8月前
|
存储 SQL 分布式计算
别让你的数据“裸奔”!大数据时代的数据隐私保护实战指南
别让你的数据“裸奔”!大数据时代的数据隐私保护实战指南
381 19
|
7月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
384 0
|
8月前
|
SQL 分布式计算 大数据
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
本文深入介绍 Hive 与大数据融合构建强大数据仓库的实战指南。涵盖 Hive 简介、优势、安装配置、数据处理、性能优化及安全管理等内容,并通过互联网广告和物流行业案例分析,展示其实际应用。具有专业性、可操作性和参考价值。
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
|
6月前
|
数据采集 分布式计算 大数据
不会Python,还敢说搞大数据?一文带你入门大数据编程的“硬核”真相
不会Python,还敢说搞大数据?一文带你入门大数据编程的“硬核”真相
155 1
|
5月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
7月前
|
人工智能 运维 监控
Aipy实战:分析apache2日志中的网站攻击痕迹
Apache2日志系统灵活且信息全面,但安全分析、实时分析和合规性审计存在较高技术门槛。为降低难度,可借助AI工具如aipy高效分析日志,快速发现攻击痕迹并提供反制措施。通过结合AI与学习技术知识,新手运维人员能更轻松掌握复杂日志分析任务,提升工作效率与技能水平。
|
4月前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
326 14
|
5月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
190 0

推荐镜像

更多