开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:聚合操作_groupBy_数据读取】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12086
聚合操作_groupBy_数据读取
聚合操作
一般进行数据处理时,会进行数据的读取,读取这个步骤也叫 Extra;读取数据后,要对这个数据来进行一系列的操作,这步是数据操作,也叫做 Transformation 数据转换;数据操作完成后,要把数据落地,这个环节称之为 Load。
这就是一个 ETL 的过程。但是,在企业当中的数据处理,会比这个环境要复杂很多,大致步骤还是 ETL,但可能是从一个临时的数据集中去读取,也有可能落地到一个临时数据集中;这个数据操作,有可能是一个很大的操作,也有可能只是进行一些数据转换,数据分析和清洗。这种情况是比较多的。
所以,在数据操作这个环节,主要关注:首先,对数据进行类型转换,然后进行一些数据清洗,缺失值处理也算数据清洗的一部分。
做完这些之后,谈需求,一般情况下,需求是:比如进行词频统计,在进行词频统计时,需求是统计这个数据集当中有多少个单词,每个单词出现了多少次。这是一个聚合操作,把所有单词聚在一起,合计起来进行操作。
比如,求 2019 年每个月的销售额,这也是一个聚合操作。按月统计订单以后,求得销售额,用每个月的销售额,去绘制一张报表,这个报表可能是一个图表,可以看到每个月的销售额曲线。假如想看哪个月销售额最多,那这种报表的操作就要进行一些聚合,所以大家能看到所有的最终结果的获取,基本上都是聚合操作。
聚合操作关乎于最终结果的获取,所以,聚合操作非常的重要,而且因为聚合操作涉及到聚合,涉及到各种这种操作,所以往往聚合操作是一些比较重的操作,大家对于聚合操作,这些算子的细节呢,要有一些了解,了解到算子的细节后,才能够在进行优化时,不会显得无从下手。
聚合操作,大致分为如下几个算子:
第一个算子,groupby 为数据进行分组;
第二个 rollup,比 groupby 更高级一些;
第三个 cube 算子;
第四个 RealationalGroupedDataset
groupby 也分两个小部分,第一部分,数据读取,数据读取中要读取,然后清洗;接下来进行分组操作,这个操作又分为两个不同的 API。
下面通过讲解让大家了解数据读取和清洗怎么完成。
1.groupby
groupBy 算子会按照列将 Dataset 分组,并返回一个 RelationalGroupedDataset 对象,通过
RelationalGroupedDataset 可以对分组进行聚合。
创建 Scala Class,名称为 AggProcessor
在 AggProcessor 当中第一步创建方法,命名为 groupby()。
整个数据读取的步骤,第一步是要创建 sparksession;之后进行数据读取;第三步数据去掉空值;第四步,使用 function 函数来完成聚合;最后使用 GroupedDataset 的 API 完成聚合。Groupedataset 是 RelationalGroupedDataset 这样的一个对象。
代码:
import org.apache.spark.sql.Sparksession
import org.junit.Test
class AggProcessor {
//1.创建 sparkSession
val spark = Sparksession.builder()
.master( master = "local[6]")
.appName( name = "agg processor"")
.getorcreate()
注:在进行使用 dollar 符,使用单引号时都需要 spark 当中的 implicits
import spark . implicits._
@Test
def groupBy( ): unit = {
要将下面的几行代码写在外面,让其他函数都可进行访问。
//1.创建 sparkSession
val spark = Sparksession.builder()
.master( master = "local[6]")
.appName( name = "agg processor"")
.getorcreate()
import spark . implicits
在读取数据之前,将 schema 拷贝进来,schema 是
beijingpm_with_nan.cn 数据集的 schema
//2.数据读取
private val schema = StructType(
List(
StructField( "id"", IntegerType) ,
StructField ( " year", IntegerType) ,
StructFieldA "month" , IntegerType) ,
StructField("day" , IntegerType) ,
StructField( "hour" , IntegerType) ,
StructField( "season" , IntegerType) ,
StructField( "pm" , DoubleType)
)
)
打开数据集,这个数据集在 pm 列都是 NaN,这一列值是没有意义的,故要去掉。
val pmDF = spark.read
.schema(schema)
. option ( "header" , value = true)
.csv ( path="dataset/beijingpm_without_null.csv")
/进行清洗,将值为 NaN 的 pm 行清洗掉
sourceDF.where( condition = " pm =!= Double.NaN)
注:这里的 Double.NaN,是因为前面代码中的 DoubleType。在读 Doubletype 时,假如数据是 NaN,读出的是 double.NaN 类型,这是一个特殊类型,不要理解为字符串。
show(
运行代码,得出结果如下图:
发现:pm 一列不存在 NaN,说明 where 起了作用
接下来要对这样一个数据集来进行聚合,为什么在前面没有主动处理 NaN,而在聚合之前要进行处理:是因为你不至于 NaN 的数据聚合进去,所以得到的数据是不准确的。
以往会在聚合之前先清洗,保证数据没有问题再进行聚合,一方面是因为聚合操作是一个比较重要的操作。
如果能通过 where 去掉一些多余的数据,那么在进行聚合时,数据越少,效率就越高。当然也可以不这样做,sparkSQL 的 catalyst 会来进行优化。