Spark2.X弹性分布式数据集(一)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 笔记

一、三大弹性分布式数据集介绍


RDD、DataFrame、DataSet是Spark平台下的弹性分布式数据集,为高效处理超大型数据集提供便利。


RDD

优点:


编译时类型安全,编译时就能检查出类型错误

面向对象的编程风格,直接通过类名点的方式来操作数据

缺点:


序列化和反序列化的性能开销,无论是集群间的通信、还是IO操作都需要对对象的结构和数据进行序列化和反序列化

GC的性能开销,频繁的创建和销毁对象,势必会增加GC

DataFrame

DataFrame引入了schema和off-heap


schema:RDD中每一个元素的结构都是一致的,在DataFrame中这个结构就存储在schema中. 因此,在通信和IO中,就只用序列化和反序列化数据,而结构的部分就可以省略了

off-heap:意味着DataFrame使用了「JVM堆以外」的内存,这些内存直接受操作系统管理(而不是JVM),Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中,当要操作数据时,就直接操作off-heap内存

通过off-heap,spark一定程度上脱离了JVM的控制,也就极大程度上免于GC的困扰. 通过schema和off-heap,DataFrame解决了RDD的缺点,但是却丢了RDD的优点, DataFrame不是类型安全的, API也不是面向对象风格的。


DataSet

DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder


当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。


二、Spark RDD概述与创建方式


学习之前可以先了解RDD编程指南:

http://spark.apache.org/docs/2.4.6/rdd-programming-guide.html


在较高级别上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的main功能并在集群上执行各种并行操作。Spark提供的主要抽象是弹性分布式数据集(RDD),它是跨集群节点划分的元素的集合,可以并行操作。 通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或驱动程序中现有的Scala集合开始并进行转换来创建RDD。用户还可以要求Spark将RDD 保留在内存中,以使其能够在并行操作中有效地重用。最后,RDD会自动从节点故障中恢复。


(1)连接Spark

默认情况下,Spark 2.4.6已构建并分发为可与Scala 2.12一起使用。(可以将Spark构建为与其他版本的Scala一起使用。)要在Scala中编写应用程序,您将需要使用兼容的Scala版本(例如2.12.X)。


要编写Spark应用程序,您需要在Spark上添加Maven依赖项。可通过Maven Central在以下位置获得Spark:

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 2.4.6

另外,如果您想访问HDFS群集,则需要hadoop-client为您的HDFS版本添加依赖项 。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>


最后,您需要将一些Spark类导入程序。添加以下行:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


(2)初始化Spark

Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建一个,SparkContext您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。在spark2.0以后也可以通过初始化SparkSession来创建rdd和dataset。

每个JVM只能激活一个SparkContext。stop()在创建新的SparkContext之前,您必须先激活它。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

该appName参数是您的应用程序显示在集群UI上的名称。 master是Spark,Mesos或YARN群集URL或特殊的“local”字符串,以本地模式运行。实际上,当在集群上运行时,您将不希望master在程序中进行硬编码,而是在其中启动应用程序spark-submit并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark。


关于spark rdd可以参考这篇文章:

https://blog.csdn.net/weixin_45366499/article/details/108676602


三、Spark RDD五大特性


spark rdd的五大特性:

官方文档给出的解释


A list of partitions

A function for computing each split

A list of dependencies on other RDDs

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

中文翻译


一个数据集被拆分成partition,这是进行并行计算的基础

partition 是 RDD 的基本组成单位,对于 RDD 来说,每个 partition 都会被一个Task处理,并决定并行计算的粒度,用户可以在创建 RDD 时指定 RDD 的 partition 个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU core的数目每个 partition 的存储是由BlockManager 实现的,每个 partition 都会被逻辑映射成BlockManager 的一个 Block ,而这个 Block 会被一个 Task 负责计算。


作用在 RDD 的函数,会作用到每一个 partition

Spark中的 RDD 的计算是以 partition 为单位的,每个 RDD 都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。


RDD之间的依赖,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。

RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。


重点要掌握前面三个特性,可以一句话总结:spark以RDD为核心的抽象弹性分布式数据集,这个数据集是以分区的方式运行在cluster node节点上,并且每一个分区都有一个计算任务对数据进行计算,每一个RDD数据之间相互依赖,这样依赖关系能够保证我们的数据RDD故障自动恢复。


当我们的rdd是key-value的这种结构的时候进行partitioner的时候,所用到的算法是HashPartitioner

key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制key分到哪个reduce。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDDShuffle输出时的分片数量。


每一分片的优先计算位置,比如HDFS的block的所在位置应该是优先计算的位置。

对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

四、Spark RDD操作方式及使用

关于DRR的详细操作请看这篇博客,这里只做大概讲解:

https://blog.csdn.net/weixin_45366499/article/details/108676602

image.pngimage.pngimage.pngimage.png

Spark RDD操作方式的综合使用

准备数据集

val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
val rdd2 = sc.textFile("file:///opt/datas/stu1.txt")

合并数据集操作

val allRdd = rdd1.union(rdd2)

将数据按空格分开

val lines = allRdd.flatMap(x => x.split(" ")).collect
lines: Array[String] = Array(java, python, hadoop, spring, python, hadoop, java, c, c++, hbase, spark, scala, scala, python, java, linux, unix, java, php, mysql, hive, hue, java, python, hadoop, spring, python, hadoop, java, c, c++, hbase, spark, scala, scala, python, java, linux, unix, java, php, mysql, hive, hue, java, python, spring, javascirpt, mapreduce, java, hello, world, python, aikfk, caizhengjie)

将数据分成元祖对

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).collect
lines: Array[(String, Int)] = Array((java,1), (python,1), (hadoop,1), (spring,1), (python,1), (hadoop,1), (java,1), (c,1), (c++,1), (hbase,1), (spark,1), (scala,1), (scala,1), (python,1), (java,1), (linux,1), (unix,1), (java,1), (php,1), (mysql,1), (hive,1), (hue,1), (java,1), (python,1), (hadoop,1), (spring,1), (python,1), (hadoop,1), (java,1), (c,1), (c++,1), (hbase,1), (spark,1), (scala,1), (scala,1), (python,1), (java,1), (linux,1), (unix,1), (java,1), (php,1), (mysql,1), (hive,1), (hue,1), (java,1), (python,1), (spring,1), (javascirpt,1), (mapreduce,1), (java,1), (hello,1), (world,1), (python,1), (aikfk,1), (caizhengjie,1))

将数据按key合并

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).collect
lines: Array[(String, Int)] = Array((hive,2), (php,2), (python,8), (aikfk,1), (linux,2), (hue,2), (unix,2), (spark,2), (caizhengjie,1), (hadoop,4), (spring,3), (hbase,2), (scala,4), (mapreduce,1), (mysql,2), (hello,1), (java,10), (world,1), (c++,2), (javascirpt,1), (c,2))

筛选数据,保留value>1的

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).collect
lines: Array[(String, Int)] = Array((hive,2), (php,2), (python,8), (linux,2), (hue,2), (unix,2), (spark,2), (hadoop,4), (spring,3), (hbase,2), (scala,4), (mysql,2), (java,10), (c++,2), (c,2))

key value掉换位置

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =>
lines: Array[(Int, String)] = Array((2,hive), (2,php), (8,python), (2,linux), (2,hue), (2,unix), (2,spark), (4,hadoop), (3,spring), (2,hbase), (4,scala), (2,mysql), (10,java), (2,c++), (2,c))

按key的降序排序

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =>
lines: Array[(Int, String)] = Array((10,java), (8,python), (4,hadoop), (4,scala), (3,spring), (2,hive), (2,php), (2,linux), (2,hue), (2,unix), (2,spark), (2,hbase), (2,mysql), (2,c++), (2,c))

再次掉换key value掉换位置

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =
lines: Array[(String, Int)] = Array((java,10), (python,8), (hadoop,4), (scala,4), (spring,3), (hive,2), (php,2), (linux,2), (hue,2), (unix,2), (spark,2), (hbase,2), (mysql,2), (c++,2), (c,2))

输出打印

scala> lines.foreach(println)
(java,10)
(python,8)
(hadoop,4)
(scala,4)
(spring,3)
(hive,2)
(php,2)
(linux,2)
(hue,2)
(unix,2)
(spark,2)
(hbase,2)
(mysql,2)
(c++,2)
(c,2)

将结果写入hdfs中

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x

查看结果

bin/hdfs dfs -text /user/datas/wordcount/par*
20/09/20 09:04:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(java,10)
(python,8)
(hadoop,4)
(scala,4)
(spring,3)
(hive,2)
(php,2)
(linux,2)
(hue,2)
(unix,2)
(spark,2)
(hbase,2)
(mysql,2)
(c++,2)
(c,2)

通过查看网页,我们可知rdd被分成了四个区

14.png

val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1))
scala> lines.getNumPartitions
res11: Int = 4

我们也可以自己指定分区

lines.repartitions(3)


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
710 1
|
3月前
|
存储 分布式计算 数据处理
解释弹性分布式数据集(RDD)的概念
【8月更文挑战第13天】
120 4
|
3月前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
76 0
|
6月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
186 0
|
6月前
|
SQL 分布式计算 Hadoop
Spark分布式内存计算框架
Spark分布式内存计算框架
152 0
|
6月前
|
机器学习/深度学习 负载均衡 PyTorch
PyTorch分布式训练:加速大规模数据集的处理
【4月更文挑战第18天】PyTorch分布式训练加速大规模数据集处理,通过数据并行和模型并行提升训练效率。`torch.distributed`提供底层IPC与同步,适合定制化需求;`DistributedDataParallel`则简化并行过程。实际应用注意数据划分、通信开销、负载均衡及错误处理。借助PyTorch分布式工具,可高效应对深度学习的计算挑战,未来潜力无限。
|
6月前
|
存储 缓存 分布式计算
Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
【2月更文挑战第13天】Spark【基础知识 02】【弹性式数据集RDDs】(部分图片来源于网络)
73 1
|
6月前
|
分布式计算 大数据 数据处理
Spark RDD(弹性分布式数据集)
Spark RDD(弹性分布式数据集)
|
15天前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
3月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
99 2
基于Redis的高可用分布式锁——RedLock