一、三大弹性分布式数据集介绍
RDD、DataFrame、DataSet是Spark平台下的弹性分布式数据集,为高效处理超大型数据集提供便利。
RDD
优点:
编译时类型安全,编译时就能检查出类型错误
面向对象的编程风格,直接通过类名点的方式来操作数据
缺点:
序列化和反序列化的性能开销,无论是集群间的通信、还是IO操作都需要对对象的结构和数据进行序列化和反序列化
GC的性能开销,频繁的创建和销毁对象,势必会增加GC
DataFrame
DataFrame引入了schema和off-heap
schema:RDD中每一个元素的结构都是一致的,在DataFrame中这个结构就存储在schema中. 因此,在通信和IO中,就只用序列化和反序列化数据,而结构的部分就可以省略了
off-heap:意味着DataFrame使用了「JVM堆以外」的内存,这些内存直接受操作系统管理(而不是JVM),Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中,当要操作数据时,就直接操作off-heap内存
通过off-heap,spark一定程度上脱离了JVM的控制,也就极大程度上免于GC的困扰. 通过schema和off-heap,DataFrame解决了RDD的缺点,但是却丢了RDD的优点, DataFrame不是类型安全的, API也不是面向对象风格的。
DataSet
DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder
当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。
二、Spark RDD概述与创建方式
学习之前可以先了解RDD编程指南:
http://spark.apache.org/docs/2.4.6/rdd-programming-guide.html
在较高级别上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的main功能并在集群上执行各种并行操作。Spark提供的主要抽象是弹性分布式数据集(RDD),它是跨集群节点划分的元素的集合,可以并行操作。 通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或驱动程序中现有的Scala集合开始并进行转换来创建RDD。用户还可以要求Spark将RDD 保留在内存中,以使其能够在并行操作中有效地重用。最后,RDD会自动从节点故障中恢复。
(1)连接Spark
默认情况下,Spark 2.4.6已构建并分发为可与Scala 2.12一起使用。(可以将Spark构建为与其他版本的Scala一起使用。)要在Scala中编写应用程序,您将需要使用兼容的Scala版本(例如2.12.X)。
要编写Spark应用程序,您需要在Spark上添加Maven依赖项。可通过Maven Central在以下位置获得Spark:
groupId = org.apache.spark artifactId = spark-core_2.12 version = 2.4.6
另外,如果您想访问HDFS群集,则需要hadoop-client为您的HDFS版本添加依赖项 。
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>
最后,您需要将一些Spark类导入程序。添加以下行:
import org.apache.spark.SparkContext import org.apache.spark.SparkConf
(2)初始化Spark
Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建一个,SparkContext您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。在spark2.0以后也可以通过初始化SparkSession来创建rdd和dataset。
每个JVM只能激活一个SparkContext。stop()在创建新的SparkContext之前,您必须先激活它。
val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf)
该appName参数是您的应用程序显示在集群UI上的名称。 master是Spark,Mesos或YARN群集URL或特殊的“local”字符串,以本地模式运行。实际上,当在集群上运行时,您将不希望master在程序中进行硬编码,而是在其中启动应用程序spark-submit并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark。
关于spark rdd可以参考这篇文章:
https://blog.csdn.net/weixin_45366499/article/details/108676602
三、Spark RDD五大特性
spark rdd的五大特性:
官方文档给出的解释
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
中文翻译
一个数据集被拆分成partition,这是进行并行计算的基础
partition 是 RDD 的基本组成单位,对于 RDD 来说,每个 partition 都会被一个Task处理,并决定并行计算的粒度,用户可以在创建 RDD 时指定 RDD 的 partition 个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU core的数目每个 partition 的存储是由BlockManager 实现的,每个 partition 都会被逻辑映射成BlockManager 的一个 Block ,而这个 Block 会被一个 Task 负责计算。
作用在 RDD 的函数,会作用到每一个 partition
Spark中的 RDD 的计算是以 partition 为单位的,每个 RDD 都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
RDD之间的依赖,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
重点要掌握前面三个特性,可以一句话总结:spark以RDD为核心的抽象弹性分布式数据集,这个数据集是以分区的方式运行在cluster node节点上,并且每一个分区都有一个计算任务对数据进行计算,每一个RDD数据之间相互依赖,这样依赖关系能够保证我们的数据RDD故障自动恢复。
当我们的rdd是key-value的这种结构的时候进行partitioner的时候,所用到的算法是HashPartitioner
key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制key分到哪个reduce。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDDShuffle输出时的分片数量。
每一分片的优先计算位置,比如HDFS的block的所在位置应该是优先计算的位置。
对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
四、Spark RDD操作方式及使用
关于DRR的详细操作请看这篇博客,这里只做大概讲解:
https://blog.csdn.net/weixin_45366499/article/details/108676602
Spark RDD操作方式的综合使用
准备数据集
val rdd1 = sc.textFile("file:///opt/datas/stu.txt") val rdd2 = sc.textFile("file:///opt/datas/stu1.txt")
合并数据集操作
val allRdd = rdd1.union(rdd2)
将数据按空格分开
val lines = allRdd.flatMap(x => x.split(" ")).collect
lines: Array[String] = Array(java, python, hadoop, spring, python, hadoop, java, c, c++, hbase, spark, scala, scala, python, java, linux, unix, java, php, mysql, hive, hue, java, python, hadoop, spring, python, hadoop, java, c, c++, hbase, spark, scala, scala, python, java, linux, unix, java, php, mysql, hive, hue, java, python, spring, javascirpt, mapreduce, java, hello, world, python, aikfk, caizhengjie)
将数据分成元祖对
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).collect
lines: Array[(String, Int)] = Array((java,1), (python,1), (hadoop,1), (spring,1), (python,1), (hadoop,1), (java,1), (c,1), (c++,1), (hbase,1), (spark,1), (scala,1), (scala,1), (python,1), (java,1), (linux,1), (unix,1), (java,1), (php,1), (mysql,1), (hive,1), (hue,1), (java,1), (python,1), (hadoop,1), (spring,1), (python,1), (hadoop,1), (java,1), (c,1), (c++,1), (hbase,1), (spark,1), (scala,1), (scala,1), (python,1), (java,1), (linux,1), (unix,1), (java,1), (php,1), (mysql,1), (hive,1), (hue,1), (java,1), (python,1), (spring,1), (javascirpt,1), (mapreduce,1), (java,1), (hello,1), (world,1), (python,1), (aikfk,1), (caizhengjie,1))
将数据按key合并
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).collect
lines: Array[(String, Int)] = Array((hive,2), (php,2), (python,8), (aikfk,1), (linux,2), (hue,2), (unix,2), (spark,2), (caizhengjie,1), (hadoop,4), (spring,3), (hbase,2), (scala,4), (mapreduce,1), (mysql,2), (hello,1), (java,10), (world,1), (c++,2), (javascirpt,1), (c,2))
筛选数据,保留value>1的
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).collect
lines: Array[(String, Int)] = Array((hive,2), (php,2), (python,8), (linux,2), (hue,2), (unix,2), (spark,2), (hadoop,4), (spring,3), (hbase,2), (scala,4), (mysql,2), (java,10), (c++,2), (c,2))
key value掉换位置
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =>
lines: Array[(Int, String)] = Array((2,hive), (2,php), (8,python), (2,linux), (2,hue), (2,unix), (2,spark), (4,hadoop), (3,spring), (2,hbase), (4,scala), (2,mysql), (10,java), (2,c++), (2,c))
按key的降序排序
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =>
lines: Array[(Int, String)] = Array((10,java), (8,python), (4,hadoop), (4,scala), (3,spring), (2,hive), (2,php), (2,linux), (2,hue), (2,unix), (2,spark), (2,hbase), (2,mysql), (2,c++), (2,c))
再次掉换key value掉换位置
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =
lines: Array[(String, Int)] = Array((java,10), (python,8), (hadoop,4), (scala,4), (spring,3), (hive,2), (php,2), (linux,2), (hue,2), (unix,2), (spark,2), (hbase,2), (mysql,2), (c++,2), (c,2))
输出打印
scala> lines.foreach(println)
(java,10) (python,8) (hadoop,4) (scala,4) (spring,3) (hive,2) (php,2) (linux,2) (hue,2) (unix,2) (spark,2) (hbase,2) (mysql,2) (c++,2) (c,2)
将结果写入hdfs中
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x
查看结果
bin/hdfs dfs -text /user/datas/wordcount/par*
20/09/20 09:04:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (java,10) (python,8) (hadoop,4) (scala,4) (spring,3) (hive,2) (php,2) (linux,2) (hue,2) (unix,2) (spark,2) (hbase,2) (mysql,2) (c++,2) (c,2)
通过查看网页,我们可知rdd被分成了四个区
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1))
scala> lines.getNumPartitions res11: Int = 4
我们也可以自己指定分区
lines.repartitions(3)