开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:有类型转换_map】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12066
有类型转换_map
内容介绍:
一、回顾总结
二、新课介绍
三、有类型算子与无类型算子
四、有类型的转换算子
一、回顾总结
通过之前的学习我们已经对数据读写有了较为深入的了解,可以发现 Spark 程序应有一下特点:
1、首先,应读出 sourceDF,读取方式是通过 DateFrame reader 来进行相应的读取。读取从文件当中读,而文件又支持多种格式,可以从各种各样的地方去读取数据,如从 JDBC 的关系型数据库、Hive、HBase 等中读取。
val sourceDF = spark.read
2、读完数据后,可以使用 sourceDF 进行一系列的操作。如:
val result = sourceDF.map
.flatMap
.groupBy
//聚合
.join
//连接
最终通过中间的处理得到 result 的 DateFrame 或 Dateset。
3、接下来也可以对 Dateset 进行一些小的操作,如:
.catch
//catch 在前面的 RDD 部分也进行了详细的说明,其中的细节不作赘述。
4、整个过程中一定会对数据集进行一些处理。处理完之后,可以使用 result write 来将处理好的数据写入到某个地方,如写入到 HDFS 中的某个文件或写入到某个数据库中。
二、新课介绍
到现在为止,我们对于使用 sourceDF 进行操作的部分了解较少,包括 groupBy、join 到底有什么用等。这节课就通过数据操作介绍如何进行聚合操作、连接操作,怎么进行控制处理等一些细节。在数据操作部分内容较多,我们大致分为两个部分:其一,对整个的内容进行概述性的讲解,这一部分会介绍有类型的算子、无类型的算子,这两部分会作为总的章节来学习,其中包含了很多算子的使用。
其二,以专题的形式来学习如何聚合、如何控制处理。以上就是整个的课程思路。本节课我们首先学习有类型的算子,了解一下有类型和无类型的区别。
三、有类型算子与无类型算子
其实无论是有类型算子,还是无类型算子,指的都是转换操作。
举一个非常常见的例子,在 df 当中对其进行 select,先 select name 列,再 select age 列,
df.select(‘name ‘age)
//该操作是无类型的,因为只是针对于整个数据集的形式来进行转换操作,只是去取出 name 列、age 列。
在 ds 中进行 map 操作,在 map 当中接受 item,在转换完成之后,再返回其他类型的 item,
ds.map(item => ...)
//这种操作是有类型的,因为最终返回了 item,它是编译时安全的、一个有类型的对象。它的类型可能是 person、people,或者是 order(订单)、payment(支付信息)等等,它是有对象类型的。而对象类型不是具体的列,没有叫做 name 和 age 的对象类型。
四、有类型的转换算子
1、有类型转换算子的简单分类
根据转换类别,分为 flatMap、map、mapPartitions 等。这里我们将学习 flatMap、map、mapPartitions 之间及与之前 RDD 算子的区别。通过本章内容即可了解 Dayset 中的一些算子。
2、flatMap、map 与 mapPartitions
三者的关系我们可以通过如下表格的方式呈现,在使用时可以通过表格进行查询。
进入到 idea,创建 scala class,编写代码,命名为 TypeTransformation(有类型转换)
package cn.itcast.spark.sql
import org.apache.spark.sql.SparkSession
import org.junit.Test
class TypeTransformation {
//1、创建 SparkSession
在转换部分,要学习 flatMap、map、mapPartitions,因此要
先创建 SparkSession。这一部分应在根层级,因为方法不止要写
一个,因此要在所有的操作方法使用,故而放在根层级。
val spark = new Spark/SparkSession.builder().master(local[6]).appname(“typed”).getorceated
//导入包,并设置 master。
//这一部分可以放在本地或集群中运行。为了方便复习,直接写在了测试方法中,而不是 SparkShell 中。在生产环境中,是针对于某场景来进行代码编写,编写完成后,放到集群当中去运行。
//设置 appname 为 typed,通过 getorcreate 得到 SparkSession 对象。
创建@Test 方法,名字定义为转换。
@Test
def trans():unit = {
//2、创建数据集
由于对每个算子创建的数据集会略有区别,因此该部分不作为单独
的步骤了。
//3、flatMap
val ds1 = Seq( “hellospark”,“hellohadoop”).toDS
//直接创建 ds,使用 Seq 存放两个字符串吧,可以通过 toDS 进行转换.
import spark.implicites
//要想使用 toDS,需要引入 spark.implicites
ds1 flatMap(item =>item.split(‘ ’)).show()
//通过想要把两个字符串拆成四个单词
//该操作明显是有类型的算子,item 是字符串类型
//4、map
val ds2 = Seq(Person(“zhangsan”,15),Person(“lisi”,2
0)).toDS()
//创建 ds2,在创建的数据集中放一些对象,使它具有结构信息。如之前学习的创建类 Person,在 Person 当中传入两个参数,名字和年龄。
//若要通过 map 操作将年龄乘以 2
ds2.map(Person => Person(person.name,person.age*2)).sh
ow()
//ds2.map 取到 Person 对象,并生成新的 Person 对象的 name 和 age*2
//此时即可查看数据集。而 map 也是有类型的操作,取到 Person 其实就是前面所创建的 Person 类型的对象。
//5、mapPartitions
(1)作用:可以增进执行效率,是非常常见的操作。但是它和 map 所做的事情相同,只是 mapPartitions 作用于每一个分区的所有数据。在 map 当中是一条一条的数据,而 mapPartitions 是一次性生成一个分区的集合。
值得注意的是该分区的数据要能放到内存里,才能使用 mapPartitions 操作,反之,mapPartitions 会报错。
(2)操作
//mapPartitions 与其他的算子不同,它接收的是集合,而这个集合是整个分区的所有数据。因此,mapPartitions 可以直接获取该集合进行操作。
ds2.mapPartitions(
//分区数据必须要能放到内存里,才能使用 mapPartitions 操作
//iter 不能大到每个 Executor(分区)的内存放不下,否则就会 OOM(out of memory)堆栈溢出。
//mapPartitions 的操作方式可能不容易理解,这里的 iter 是 iterable 类型,也就是集合的类型,要进行转换,就要把 iterable 里的每个元素进行转换,并返回新的元素,进而形成新的集合,放入到对应的分区中作为该分区的所有数据。
iter => {
//此处的 map 是 scala map,因为 iterable 是 scala 的
对象。
val iter.map(person => person(person.name,person.
age*2))
//map 接收到 person,根据 person 进行相同的转换,获取到 person.name 和 person.age*2。在 scala 中,对 iterable 进行 map 转换后,会生成新的 iterable。
result
//将这个新的 iterable 定义为 result,运行 result。
}
}.show()
}
}
运行代码,第一个运行结果对应 flatMap 操作数据 ds1 的结果,如下:
第一次操作的结果中应注意两个问题,其一,在 toDS 时,得到的是单独的、基础数据类型的列,且此列没有 命名,因此输出结果中显示为 value;其二,通过 flatmap 是把每个字符串拆开并且生成了两条数据,即一对多。
第二个其实我们 map 的结果。
第二个运行结果对应的是 map 操作数据 ds2 的结果,如下:
map 是把年龄乘以 2,则 Person zhangsan 由 15 岁变成了 30 岁,Person lisi 由 20 岁变成了 40 岁。
最后一个运行结果对应 mapPartitions 操作数据的结果,如下:
同样则 Person zhangsan 由 15 岁变成了 30 岁,Person lisi 由 20 岁变成了 40 岁。
三个数据都输出了,且都无误。