《Spark大数据分析实战》——1.4节弹性分布式数据集

简介:

本节书摘来自华章社区《Spark大数据分析实战》一书中的第1章,第1.4节弹性分布式数据集,作者高彦杰 倪亚宇,更多章节内容可以访问云栖社区“华章社区”公众号查看

1.4 弹性分布式数据集
本节将介绍弹性分布式数据集RDD。Spark是一个分布式计算框架,而RDD是其对分布式内存数据的抽象,可以认为RDD就是Spark分布式算法的数据结构,而RDD之上的操作是Spark分布式算法的核心原语,由数据结构和原语设计上层算法。Spark最终会将算法(RDD上的一连串操作)翻译为DAG形式的工作流进行调度,并进行分布式任务的分发。
1.4.1 RDD简介
在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD)。它在集群中的多台机器上进行了数据分区,逻辑上可以认为是一个分布式的数组,而数组中每个记录可以是用户自定义的任意数据结构。RDD是Spark的核心数据结构,通过RDD的依赖关系形成Spark的调度顺序,通过对RDD的操作形成整个Spark程序。
(1)RDD创建方式
1)从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、HBase)输入(例如HDFS)创建。
2)从父RDD转换得到新RDD。
3)通过parallelize或makeRDD将单机数据创建为分布式RDD。
(2)RDD的两种操作算子
对于RDD可以有两种操作算子:转换(Transformation)与行动(Action)。
1)转换(Transformation):Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需要等到有Action操作的时候才会真正触发运算。
2)行动(Action):Action算子会触发Spark提交作业(Job),并将数据输出Spark系统。
(3)RDD的重要内部属性
通过RDD的内部属性,用户可以获取相应的元数据信息。通过这些信息可以支持更复杂的算法或优化。
1)分区列表:通过分区列表可以找到一个RDD中包含的所有分区及其所在地址。
2)计算每个分片的函数:通过函数可以对每个数据块进行RDD需要进行的用户自定义函数运算。
3)对父RDD的依赖列表:为了能够回溯到父RDD,为容错等提供支持。
4)对key-value pair数据类型RDD的分区器,控制分区策略和分区数。通过分区函数可以确定数据记录在各个分区和节点上的分配,减少分布不平衡。
5)每个数据分区的地址列表(如HDFS上的数据块的地址)。
如果数据有副本,则通过地址列表可以获知单个数据块的所有副本地址,为负载均衡和容错提供支持。
(4)Spark计算工作流
图1-5中描述了Spark的输入、运行转换、输出。在运行转换中通过算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
输入:在Spark程序运行中,数据从外部数据空间(例如,HDFS、Scala集合或数据)输入到Spark,数据就进入了Spark运行时数据空间,会转化为Spark中的数据块,通过BlockManager进行管理。
运行:在Spark数据输入形成RDD后,便可以通过变换算子f?liter等,对数据操作并将RDD转化为新的RDD,通过行动(Action)算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS)或Scala数据或集合中(collect输出到Scala集合,count返回Scala Int型数据)。


1ef1e957cfde965a439c9ee58e30a0a349a5d521

Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、Shuff?ledRDD等子类。Spark将常用的大数据操作都转化成为RDD的子类。
1.4.2 RDD算子分类
本节将主要介绍Spark算子的作用,以及算子的分类。
Spark算子大致可以分为以下两类。
1)Transformation变换算子:这种变换并不触发提交作业,完成作业中间过程处理。
2)Action行动算子:这类算子会触发SparkContext提交Job作业。
下面分别对两类算子进行详细介绍。
1.?Transformations算子
下文将介绍常用和较为重要的Transformation算子。
(1)map
将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。源码中map算子相当于初始化一个RDD,新RDD叫做MappedRDD(this,sc.clean(f))。
图1-7中每个方框表示一个RDD分区,左侧的分区经过用户自定义函数f:T->U映射为右侧的新RDD分区。但是,实际只有等到Action算子触发后这个f函数才会和其他函数在一个stage中对数据进行运算。在图1-6中的第一个分区,数据记录V1输入f,通过f转换输出为转换后的分区中的数据记录V'1。
(2)f?latMap
将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的元素合并为一个集合,内部创建FlatMappedRDD(this,sc.clean(f))。
图1-7表示RDD的一个分区进行f?latMap函数操作,f?latMap中传入的函数为f:T->U,T和U可以是任意的数据类型。将分区中的数据通过用户自定义函数f转换为新的数据。外部大方框可以认为是一个RDD分区,小方框代表一个集合。V1、V2、V3在一个集合作为RDD的一个数据项,可能存储为数组或其他容器,转换为V'1、V'2、V'3后,将原来的数组或容器结合拆散,拆散的数据形成为RDD中的数据项。
(3)mapPartitions
mapPartitions函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。内部实现是生成MapPartitionsRDD。图1-8中的方框代表一个RDD分区。
图1-8中,用户通过函数f?(iter)=>iter.f?ilter(_>=3)对分区中所有数据进行过滤,大于和等于3的数据保留。一个方块代表一个RDD分区,含有1、2、3的分区过滤只剩下元素3。


09226f8de163dcb01c3a9ea5e87a0d82d47f4b6d

(4)union
使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。并不进行去重操作,保存所有元素,如果想去重可以使用distinct()。同时Spark还提供更为简洁的使用union的API,通过++符号相当于union函数操作。
图1-9中左侧大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。合并后,V1、V2、V3……V8形成一个分区,其他元素同理进行合并。
(5)cartesian
对两个RDD内的所有元素进行笛卡尔积操作。操作后,内部实现返回CartesianRDD。图1-10中左侧大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。
例如:V1和另一个RDD中的W1、W2、Q5进行笛卡尔积运算形成(V1,W1)、(V1,W2)、(V1,Q5)。
          


eea083208d5cfbd76e98bf84587839d7a3009d96

(6)groupBy
groupBy:将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组。
函数实现如下:
1)将用户函数预处理:

val cleanF = sc.clean(f)

2)对数据map进行函数操作,最后再进行groupByKey分组操作。
this.map(t => (cleanF(t), t)).groupByKey(p)
其中,p确定了分区个数和分区函数,也就决定了并行化的程度。
图1-11中方框代表一个RDD分区,相同key的元素合并到一个组。例如V1和V2合并为V,Value为V1,V2。形成V,Seq(V1,V2)。


661671469b52915dbb252e883a7d58dc2ff89571

(7)f?ilter
f?ilter函数功能是对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回值为false的元素将被过滤掉。内部实现相当于生成FilteredRDD(this,sc.clean(f))。
下面代码为函数的本质实现:

deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

图1-12中每个方框代表一个RDD分区,T可以是任意的类型。通过用户自定义的过滤函数f,对每个数据项操作,将满足条件、返回结果为true的数据项保留。例如,过滤掉V2和V3保留了V1,为区分命名为V'1。
(8)sample
sample将RDD这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。
内部实现是生成SampledRDD(withReplacement,fraction,seed)。
函数参数设置:
withReplacement=true,表示有放回的抽样。
withReplacement=false,表示无放回的抽样。
图1-13中的每个方框是一个RDD分区。通过sample函数,采样50%的数据。V1、V2、U1、U2……U4采样出数据V1和U1、U2形成新的RDD。
        


e07604d61860e63a6a14b435d74c2bb64daca285


299fd547e08b21eba62c2e8387b10c1fa6549e2d

图1-16中方框代表RDD分区。disk代表存储在磁盘,mem代表存储在内存。数据最初全部存储在磁盘,通过persist(MEMORY_AND_DISK)将数据缓存到内存,但是有的分区无法容纳在内存,将含有V1、V2、V3的分区存储到磁盘。
(11)mapValues
mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。
图1-17中的方框代表RDD分区。a=>a+2代表对(V1,1)这样的Key Value数据对,数据只对Value中的1进行加2操作,返回结果为3。
     


9d3c7e53ae2ce97d96cefab14e140f426b56e78b

(12)combineByKey
下面代码为combineByKey函数的定义:

combineByKey[C](createCombiner:(V)C,
mergeValue:(C, V)C,
mergeCombiners:(C, C)C,
partitioner:Partitioner,
mapSideCombine:Boolean=true,
serializer:Serializer=null):RDD[(K,C)]
说明:
createCombiner:V => C,C不存在的情况下,比如通过V创建seq C。
mergeValue:(C, V) => C,当C已经存在的情况下,需要merge,比如把item V加到seq C中,或者叠加。
mergeCombiners:(C, C) => C,合并两个C。
partitioner:Partitioner, Shuff?le时需要的Partitioner。
mapSideCombine:Boolean = true,为了减小传输量,很多combine可以在map端先做,比如叠加,可以先在一个partition中把所有相同的key的value叠加,再shuff?le。
serializerClass:String = null,传输需要序列化,用户可以自定义序列化类:

例如,相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。


e41bc6c2f16507c366059b2019144126117518c0

图1-21表示foreach算子通过用户自定义函数对每个数据项进行操作。本例中自定义函数为println(),控制台打印所有数据项。
(2)saveAsTextFile
函数将数据输出,存储到HDFS的指定目录。
下面为saveAsTextFile函数的内部实现,其内部通过调用saveAsHadoopFile进行实现:

this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) 

将RDD中的每个元素映射转变为(null,x.toString),然后再将其写入HDFS。
图1-22中左侧方框代表RDD分区,右侧方框代表HDFS的Block。通过函数将RDD的每个分区存储为HDFS中的一个Block。
(3)collect
collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数组。在这个数组上运用scala的函数式操作。
图1-23中左侧方框代表RDD分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到Driver程序所在的节点,以数组形式存储。
     


6b2d79cbb796d731b7aad4bb9c9abc121bffcb43
相关文章
|
2月前
|
消息中间件 RocketMQ 微服务
RocketMQ 分布式事务消息实战指南
RocketMQ 分布式事务消息实战指南
271 1
|
14天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
3月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
2月前
|
Java 数据库连接 API
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
60 0
|
开发框架 Java 数据库连接
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)(下)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
37 0
|
数据库 微服务
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)(上)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
41 0
|
28天前
|
缓存 应用服务中间件 数据库
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
33 1
|
1月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
41 0
|
1月前
|
存储 缓存 分布式计算
Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
【2月更文挑战第13天】Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
34 1
|
2月前
|
JSON 分布式计算 MaxCompute
MaxCompute问题之创建数据集失败如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
33 0

热门文章

最新文章