开发者学堂课程【大数据 Spark2020最新课程(知识精讲与实战演练)第一阶段:RDD 入门_创建 RDD 的三种方式】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/688/detail/11952
RDD 入门_创建 RDD 的三种方式
课题引入:
通过上节课的内容,已经了解到 RDD 创建中必备的知识点,即: sparkcontext, 本节内容着重于如何创建 RDD 。
通过本节内容, 做到可以快速创建 RDD 以及清楚其创建原理,创建 RDD 的方式有几种。
具体到 RDD 的三种创建方式即:
通过本地文件集合创建RDD 用于测试、学习 ;
通过外部数据创建 RDD 常用于公司或生产环境 RDD 通过读取外部文件生成处理,最终加载到数据仓库,该过程被称为 ETL ,此步骤有助于了解 spark 如何编写 ETL ;
通过 RDD 衍生新的 RDD 该步骤是 spark 的核心所在,通过此步骤可以了解到 RDD 调用其算子的过程同样是创建新的 RDD 。
内容介绍:
一、从本地集合创建
二、从文件创建
三、从 RDD 衍生创建
一、从本地集合创建
先创建三个方法,编写如下:
//1.从本地集合创建
注意此处需要注解(@Tset)以此类推,同下所示,第一种方式使用到的 sparkcontext 是常用的 API ,不妨直接放入 lead 的初始代码块中,利用 scala 的特点, lead 加载直接执行。
此处利用本地集合直接进创建,存在相关的两个 API ,第一个叫做 sc.parallelize ,其中存在两个参数,存在两参数:
seq 指集合, scala 当中一个著名的数据结构 sequence 类似于 Java 中的 list。
思考若从本地集合创建 RDD ,其本地集合具有分区信息吗?
可以确定不存在,spark 是一个并行计算框架,该计算需要并行,若要并行计算,则数据源需切开,而 numslices 具有切开的概念 numslices 具体是指传入的 sequence 再次使用会被拆分为多份,相当于 rdd 的一个分区数。
parallelize 当中传入 sequence ,其可以接收任意有限元素,例如 Hello1、Hello2、Hello3 。
通过该方式创建 sequence ,为使其更加清楚如上所写。
通过第二个参数可以确定并行数,即本地集合的区分数,例如 “2”,命名为 rdd1,类型为 RDD 内在类型 [String] ,若将 Hello1、Hello2、Hello3 改为1、2、3, 执行报错,其原因是此处范型错误, RDD 中放有何种数据,对应的外部便需何种类型,放数字对应的 “Int” 。
@Test
def rddCreationLocal( ):Unit ={
val seq = seq(“Hello1”, “Hello2”, “Hello3”)
val rdd1:RDD[Int]=sc.parallelize(seq, numslices=2)
val rdd2:RDD[Int]= sc.make RDD( seq,numslices=2)
(如图所示编码,存在两个 API 即 makeRDD 和 numSlice)
spark 提供的另一种 API ,命名为 make RDD ,与 parallelize 基本相同,比如它也可以通过 sequence 来创造 rdd ,指定 numslices 分区,指定分区的调用的是 parallelize 。
通过 makeRDD 创建,传递 seq ,指定分区2,命名为 val rdd2:RDD[Int]= sc.makeRDD( seq,numslices=2)
rdd1 与 rdd2 的区别在于 parallelize API 可以不指定分区数,直接创建 rdd ,makeRDD 若不指定分区数,其接收的范型类型不同。
二、从文件创建
//2.从文件创建
通过文件创建 API 是 sc.textFile() ,需明白
1.括号中可以填写哪些内容,
2.是否支持分区,
3.支持什么平台。
首先点击 textFile 发现传入的是 path ,则括号中可填 ( path= hdfs://node01 或/.../... 自动从 HDFS 中读取,若要在集群中运行,指定 file:///...) 传入的是一个路径,读取路径。
写此方式不仅可以从 HDFS 中创建也可以从本地文件中创建
@Test
def rddCreationFiles( ):Unit ={
Sc.textFile(path= “file:///...”)
//1.textFile 传入的是什么
//传入的是一个路径,读取路径
//hdfs://file:// /.../...(这种方式分为在集群中执行;在本地中执行;若在集群中,读取 hdfs ,本地读取文件系统)
//2.是否支持分区?
//支持分区,若传入的 path 是 hdfs:///...
//分区是由 HDFS 中文件的 block 决定的,注意 textFile 中也可以指定分区数,不一定必须由外部数据源决定,前面需要存在 min说明,即最少是。
//3.支持什么平台
//能读取 HDFS 或支持 AWS 和阿里云,通过阿里云和 AWS 提供的 API 支持。
}
三、从 RDD 衍生创建
//3.从 RDD 衍生创建
不妨创建新的 RDD 即 val rdd1 =sc.parallelize(seq(1,2,3)) 得到 rdd1 ,通过 rdd1.map 的方法接收 item=>item 函数,返回的 rdd2 ,即通过在rdd 上执行算子操作,会生成新的 rdd 。
思考在 Java 中 str.substring 返回新的字符串,称为非原地计算,和字符串中的方式相似,字符串不可变, rdd 也不可变。
@Test
def rddCreateFromRDD( ):Unit ={
val rdd1 =sc.parallelize(seq(1,2,3))
//通过在 rdd 上执行算子操作,会生成新的 rdd
//原地计算
//str.substr 返回新的字符串,非原地计算
//和字符串中的方式很像,字符串是不可变的
//RDD 不可变
val rdd2:RDD[Int]=rdd1.map(item=>item)
}
注意在知识网图中,通过本地集合创建 rdd 和通过外部数据创建 rdd 方式的分区数,通过本地集合创建分区数可以指定,若通过外部数据源创建,分区数取决于外部数据源的分区。通过 RDD 衍生新的 RDD 的方式理解 rdd 不可变。