文章目录
一、Spark 架构原理
1.1 Spark架构核心组件
1.2 各部分功能图
二、RDD概述
2.1 什么是RDD?
2.2 RDD具体包含了一些什么东西?
2.3 RDD的五大特性
2.4 RDD可以从哪来
2.5 WordCount粗图解RDD
三、RDD的创建方式
3.1 通过读取文件生成的
3.2 通过并行化的方式创建RDD
3.3 其他方式
四、RDD编程API
4.1 Transformation
4.2 Action
4.3 Spark WordCount代码编写
(1)使用scala进行编写
4.4 WordCount执行过程图
五、RDD的宽依赖和窄依赖
5.1 RDD依赖关系的本质内幕
5.2 依赖关系下的数据流视图
一、Spark 架构原理
SparkContext 主导应用执行
Cluster Manager 节点管理器
Cache : Worker Node 之间共享信息、通信
Executor 虚拟机 容器启动 接任务 Task(core数 一次处理一个RDD分区)
1.1 Spark架构核心组件
1.2 各部分功能图
Driver 注册了一些 Executor后,就可以开始正式执行 spark 应用程序了。第一步是创建 RDD,读取数据源;
HDFS 文件被读取到多个 Worker节点,形成内存中的分布式数据集,也就是初始RDD;
Driver会根据程序对RDD的定义的操作,提交 Task 到 Executor;
Task会对RDD的partition数据执行指定的算子操作,形成新的RDD的partition;
二、RDD概述
2.1 什么是RDD?
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
2.2 RDD具体包含了一些什么东西?
RDD是一个类,它包含了数据应该在哪算,具体该怎么算,算完了放在哪个地方。它是能被序列化,也能被反序列化。在开发的时候,RDD给人的感觉就是一个只读的数据。但是不是,RDD存储的不是数据,而是数据的位置,数据的类型,获取数据的方法,分区的方法等等。
2.3 RDD的五大特性
(1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
(2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
(3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
(4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
(5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
2.4 RDD可以从哪来
通过序列化集合的方式
通过读取文件的方式
scala> sc.textFile("hdfs://wc/e.txt") res0: org.apache.spark.rdd.RDD[String] = hdfs://wc/e.txt MapPartitionsRDD[1] at textFile at <console>:25 scala> val rdd = sc.textFile("hdfs://192.168.56.137:9000/wc/e.txt") rdd: org.apache.spark.rdd.RDD[String] = hdfs://192.168.56.137:9000/wc/e.txt MapPartitionsRDD[21] at textFile at <console>:24
通过其他的RDD进行transformation转换而来
2.5 WordCount粗图解RDD
其中hello.txt
三、RDD的创建方式
3.1 通过读取文件生成的
由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
scala> val file = sc.textFile("/spark/hello.txt")
3.2 通过并行化的方式创建RDD
由一个已经存在的Scala集合创建。
scala> val array = Array(1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5) scala> val rdd = sc.parallelize(array) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:26 scala>
3.3 其他方式
读取数据库等等其他的操作。也可以生成RDD。
RDD可以通过其他的RDD转换而来的。