Spark2.X弹性分布式数据集(一)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 笔记

一、三大弹性分布式数据集介绍


RDD、DataFrame、DataSet是Spark平台下的弹性分布式数据集,为高效处理超大型数据集提供便利。


RDD

优点:


编译时类型安全,编译时就能检查出类型错误

面向对象的编程风格,直接通过类名点的方式来操作数据

缺点:


序列化和反序列化的性能开销,无论是集群间的通信、还是IO操作都需要对对象的结构和数据进行序列化和反序列化

GC的性能开销,频繁的创建和销毁对象,势必会增加GC

DataFrame

DataFrame引入了schema和off-heap


schema:RDD中每一个元素的结构都是一致的,在DataFrame中这个结构就存储在schema中. 因此,在通信和IO中,就只用序列化和反序列化数据,而结构的部分就可以省略了

off-heap:意味着DataFrame使用了「JVM堆以外」的内存,这些内存直接受操作系统管理(而不是JVM),Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中,当要操作数据时,就直接操作off-heap内存

通过off-heap,spark一定程度上脱离了JVM的控制,也就极大程度上免于GC的困扰. 通过schema和off-heap,DataFrame解决了RDD的缺点,但是却丢了RDD的优点, DataFrame不是类型安全的, API也不是面向对象风格的。


DataSet

DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder


当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。


二、Spark RDD概述与创建方式


学习之前可以先了解RDD编程指南:

http://spark.apache.org/docs/2.4.6/rdd-programming-guide.html


在较高级别上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的main功能并在集群上执行各种并行操作。Spark提供的主要抽象是弹性分布式数据集(RDD),它是跨集群节点划分的元素的集合,可以并行操作。 通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或驱动程序中现有的Scala集合开始并进行转换来创建RDD。用户还可以要求Spark将RDD 保留在内存中,以使其能够在并行操作中有效地重用。最后,RDD会自动从节点故障中恢复。


(1)连接Spark

默认情况下,Spark 2.4.6已构建并分发为可与Scala 2.12一起使用。(可以将Spark构建为与其他版本的Scala一起使用。)要在Scala中编写应用程序,您将需要使用兼容的Scala版本(例如2.12.X)。


要编写Spark应用程序,您需要在Spark上添加Maven依赖项。可通过Maven Central在以下位置获得Spark:

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 2.4.6

另外,如果您想访问HDFS群集,则需要hadoop-client为您的HDFS版本添加依赖项 。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>


最后,您需要将一些Spark类导入程序。添加以下行:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


(2)初始化Spark

Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建一个,SparkContext您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。在spark2.0以后也可以通过初始化SparkSession来创建rdd和dataset。

每个JVM只能激活一个SparkContext。stop()在创建新的SparkContext之前,您必须先激活它。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

该appName参数是您的应用程序显示在集群UI上的名称。 master是Spark,Mesos或YARN群集URL或特殊的“local”字符串,以本地模式运行。实际上,当在集群上运行时,您将不希望master在程序中进行硬编码,而是在其中启动应用程序spark-submit并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark。


关于spark rdd可以参考这篇文章:

https://blog.csdn.net/weixin_45366499/article/details/108676602


三、Spark RDD五大特性


spark rdd的五大特性:

官方文档给出的解释


A list of partitions

A function for computing each split

A list of dependencies on other RDDs

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

中文翻译


一个数据集被拆分成partition,这是进行并行计算的基础

partition 是 RDD 的基本组成单位,对于 RDD 来说,每个 partition 都会被一个Task处理,并决定并行计算的粒度,用户可以在创建 RDD 时指定 RDD 的 partition 个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU core的数目每个 partition 的存储是由BlockManager 实现的,每个 partition 都会被逻辑映射成BlockManager 的一个 Block ,而这个 Block 会被一个 Task 负责计算。


作用在 RDD 的函数,会作用到每一个 partition

Spark中的 RDD 的计算是以 partition 为单位的,每个 RDD 都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。


RDD之间的依赖,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。

RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。


重点要掌握前面三个特性,可以一句话总结:spark以RDD为核心的抽象弹性分布式数据集,这个数据集是以分区的方式运行在cluster node节点上,并且每一个分区都有一个计算任务对数据进行计算,每一个RDD数据之间相互依赖,这样依赖关系能够保证我们的数据RDD故障自动恢复。


当我们的rdd是key-value的这种结构的时候进行partitioner的时候,所用到的算法是HashPartitioner

key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制key分到哪个reduce。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDDShuffle输出时的分片数量。


每一分片的优先计算位置,比如HDFS的block的所在位置应该是优先计算的位置。

对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

四、Spark RDD操作方式及使用

关于DRR的详细操作请看这篇博客,这里只做大概讲解:

https://blog.csdn.net/weixin_45366499/article/details/108676602

image.pngimage.pngimage.pngimage.png

Spark RDD操作方式的综合使用

准备数据集

val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
val rdd2 = sc.textFile("file:///opt/datas/stu1.txt")

合并数据集操作

val allRdd = rdd1.union(rdd2)

将数据按空格分开

val lines = allRdd.flatMap(x => x.split(" ")).collect
lines: Array[String] = Array(java, python, hadoop, spring, python, hadoop, java, c, c++, hbase, spark, scala, scala, python, java, linux, unix, java, php, mysql, hive, hue, java, python, hadoop, spring, python, hadoop, java, c, c++, hbase, spark, scala, scala, python, java, linux, unix, java, php, mysql, hive, hue, java, python, spring, javascirpt, mapreduce, java, hello, world, python, aikfk, caizhengjie)

将数据分成元祖对

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).collect
lines: Array[(String, Int)] = Array((java,1), (python,1), (hadoop,1), (spring,1), (python,1), (hadoop,1), (java,1), (c,1), (c++,1), (hbase,1), (spark,1), (scala,1), (scala,1), (python,1), (java,1), (linux,1), (unix,1), (java,1), (php,1), (mysql,1), (hive,1), (hue,1), (java,1), (python,1), (hadoop,1), (spring,1), (python,1), (hadoop,1), (java,1), (c,1), (c++,1), (hbase,1), (spark,1), (scala,1), (scala,1), (python,1), (java,1), (linux,1), (unix,1), (java,1), (php,1), (mysql,1), (hive,1), (hue,1), (java,1), (python,1), (spring,1), (javascirpt,1), (mapreduce,1), (java,1), (hello,1), (world,1), (python,1), (aikfk,1), (caizhengjie,1))

将数据按key合并

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).collect
lines: Array[(String, Int)] = Array((hive,2), (php,2), (python,8), (aikfk,1), (linux,2), (hue,2), (unix,2), (spark,2), (caizhengjie,1), (hadoop,4), (spring,3), (hbase,2), (scala,4), (mapreduce,1), (mysql,2), (hello,1), (java,10), (world,1), (c++,2), (javascirpt,1), (c,2))

筛选数据,保留value>1的

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).collect
lines: Array[(String, Int)] = Array((hive,2), (php,2), (python,8), (linux,2), (hue,2), (unix,2), (spark,2), (hadoop,4), (spring,3), (hbase,2), (scala,4), (mysql,2), (java,10), (c++,2), (c,2))

key value掉换位置

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =>
lines: Array[(Int, String)] = Array((2,hive), (2,php), (8,python), (2,linux), (2,hue), (2,unix), (2,spark), (4,hadoop), (3,spring), (2,hbase), (4,scala), (2,mysql), (10,java), (2,c++), (2,c))

按key的降序排序

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =>
lines: Array[(Int, String)] = Array((10,java), (8,python), (4,hadoop), (4,scala), (3,spring), (2,hive), (2,php), (2,linux), (2,hue), (2,unix), (2,spark), (2,hbase), (2,mysql), (2,c++), (2,c))

再次掉换key value掉换位置

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =
lines: Array[(String, Int)] = Array((java,10), (python,8), (hadoop,4), (scala,4), (spring,3), (hive,2), (php,2), (linux,2), (hue,2), (unix,2), (spark,2), (hbase,2), (mysql,2), (c++,2), (c,2))

输出打印

scala> lines.foreach(println)
(java,10)
(python,8)
(hadoop,4)
(scala,4)
(spring,3)
(hive,2)
(php,2)
(linux,2)
(hue,2)
(unix,2)
(spark,2)
(hbase,2)
(mysql,2)
(c++,2)
(c,2)

将结果写入hdfs中

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x

查看结果

bin/hdfs dfs -text /user/datas/wordcount/par*
20/09/20 09:04:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(java,10)
(python,8)
(hadoop,4)
(scala,4)
(spring,3)
(hive,2)
(php,2)
(linux,2)
(hue,2)
(unix,2)
(spark,2)
(hbase,2)
(mysql,2)
(c++,2)
(c,2)

通过查看网页,我们可知rdd被分成了四个区

14.png

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1))
scala> lines.getNumPartitions
res11: Int = 4

我们也可以自己指定分区

lines.repartitions(3)


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
5月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
696 1
|
2月前
|
存储 分布式计算 数据处理
解释弹性分布式数据集(RDD)的概念
【8月更文挑战第13天】
78 4
|
2月前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
55 0
|
5月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
166 0
|
5月前
|
SQL 分布式计算 Hadoop
Spark分布式内存计算框架
Spark分布式内存计算框架
124 0
|
5月前
|
机器学习/深度学习 负载均衡 PyTorch
PyTorch分布式训练:加速大规模数据集的处理
【4月更文挑战第18天】PyTorch分布式训练加速大规模数据集处理,通过数据并行和模型并行提升训练效率。`torch.distributed`提供底层IPC与同步,适合定制化需求;`DistributedDataParallel`则简化并行过程。实际应用注意数据划分、通信开销、负载均衡及错误处理。借助PyTorch分布式工具,可高效应对深度学习的计算挑战,未来潜力无限。
|
5月前
|
存储 缓存 分布式计算
Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
【2月更文挑战第13天】Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
69 1
|
5月前
|
分布式计算 大数据 数据处理
Spark RDD(弹性分布式数据集)
Spark RDD(弹性分布式数据集)
|
2月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
80 2
基于Redis的高可用分布式锁——RedLock
|
2月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】