聚合操作_groupBy_聚合操作 | 学习笔记

简介: 快速学习聚合操作_groupBy_聚合操作

开发者学堂课程【大数据 Spark2020版(知识精讲与实战演练)第三阶段聚合操作_groupBy_聚合操作】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/690/detail/12087


聚合操作_groupBy_聚合操作

 

聚合操作

在读取数据集之后,进行具体操作,通过这节课了解如何使用 groupBy 及 groupBy 分组结果进行聚合操作image.png

第一个问题是大家为自己设计需求,

为什么让大家自己设计需求:原因是如果要把需求设计好,首先要了解什么是 groupBy,什么是分组,以及分组之后要做什么。只有理解这些,才能知道哪些操作可能和 groupBy 有关,这个需求才能设计好。所以,个人认为,如果要检测自己对一个知识的掌握好坏,就可以为自己设计一些需求。如果需求自己能设计出来,能用自己所学习的东西完成所设计的需求,就表明知识掌握的很好。

指定需求:统计每个月 PM 值的平均数

1.每个月的 PM 值->分组

2.这些 PM 值的均数->把每个分组数据聚合

groupBy->分组

函数、api->聚合

按照什么分组——年和月

聚合什么——PM 值

将需求拆分为两部分:第一个是每个月的 PM 值,第二个是这些 PM 值的均数。这两个步骤还可以去继续往下拆分:第一个分组每个月的 PM 值,在求这些 PM 值的均数的过程就是一个把每个分组数据聚合的一个过程,所以要用 groupBy 来做,groupBy 是分组,分组之后是聚合,聚合要使用一些函数或者使用 API。函数和 API 指的是 groupBy 所返回的 RelationalGroupedDataset 对象当中的函数和 API。

整个过程是先分组,要按照年和月进行分组,同一年同一月的数据放在一个组中;聚合的是 PM 值的均数。

接下来进入 AggProcessor.scala 中编写代码:

// 3.数据去掉空值

val cleanDF = sourceDF.where( condition = 'pm =l= Double.NaN)

//分组

val groupedDF= cleanDF.groupBy( cols = 'year,$"month")

注:如果要使用单引号和 dollar 符要先导入 spark 下的 implicits 隐式转换。

分组结果是 RelationalGroupedDataset,下面两种方式都是针对 RelationalGroupedDataset 进行聚合。

点开 agg,在 RelationalGroupedDataset.scala 当中 agg 代码中的几种算子重载,第一个是接受 map

def agg(exprs: Map[string,string]): DataFrame = {

toDl( exprs.map { case ( colName,expr) =>

strTOExpr(expr)(df(colName ).expr)

}.toseq)

}

第二是接受 JAVA 的 map

def agg(exprs: java.util.Map[string,string]): DataFrame = {

agg(exprs.asscala.toMap)

}

第三是接收 column,可以使用 column api 进行聚合

def agg(expr: column,exprs: column*): DataFrame = {

toDF((expr +: exprs ).map {

case typed: Typedcolumn[_,_] =>

typed.withInputType(df. exprEnc,df.LogicalPLan.output ).expr

case c => c.expr

})

使用 column api 进行聚合,但 column 中没有 avg,聚合方式如下:

// 4.使用 functions 函数来完成聚合

import org.apache. spark.sql.functions.

//本质上,avg 这个函数定义了一个操作,把表达式设置给 pm 列

//select avg( pm) from ... group by

注:这个 sql 语句生成的聚合列叫 avg(pm),groupedDF.agg(avg ( 'pm))和 sql 语句一样,所以生成的聚合列的名字叫 avg(pm),故要 as。

groupedDF.agg(avg( " pm) as "pm_avg")

聚合完成之后,按照 pm 平均数进行 orderby,一般是降序排列

.orderBy( sortExprs ="pm_avg.desc)

注:如果 desc 没有参数,在调用此方法时直接.desc

.show( ) 

运行代码,查看结果:

image.png

结果如上图,数据是由高到低排序的。

声明:分组是使用 groupBy,聚合是对分组的结果进行聚合

接下来讲的是分组结果的第二种聚合方式

// 5.使用 GroupedDataset 的 API 来完成聚合

groupedDF.avg(colNames = "pm")

.orderBy (...)

注:OrderBy 中传入的内容如果不确定,可以使用 select()得到;

还可以使用 select()更改结果列的名称

.show()

代码如下:

groupedDF.avg( colNames ="pm")

.select( cols = $"avg(pm)" as “pm_avg)

.orderBy( sortCol ="avg(pm)")

.show()

以上这段代码除了可以求 avg 外,还可以求 min、max、sum;还可以使用如下代码求方差:

groupedDF.agg(stddev( columnName = " "))

.orderBy (‘ pm_avg.desc)

.show()

总结:

以上知识点包括如何进行分组;聚合的两种方式,一种是使用 agg 加上函数的方式求聚合,一种是使用 groupDF 中的 api 求聚合结果。

相关文章
|
18天前
|
Serverless Python
分组和聚合DataFrame信息案例解析
该文介绍了如何使用pandas对DataFrame进行分组和聚合操作。首先,通过创建字典并转换为DataFrame,展示了基础数据结构。接着,利用`groupby()`方法按城市字段进行数据分组,然后应用`mean()`函数计算各城市平均年龄,显示了聚合功能。此外,文中指出还可使用`sum()`、`count()`等其他聚合函数处理分组数据。
13 0
使用mongo聚合分组查询获取每一组的时间最大的一条数据
使用mongo聚合分组查询获取每一组的时间最大的一条数据
944 0
|
5天前
|
数据可视化 Python
如何在Pandas中对数据集进行多级分组并进行聚合计算?
在Pandas中进行多级分组与聚合计算的步骤包括导入库(如pandas和matplotlib),准备数据集,使用`groupby()`方法分组,应用聚合函数(如`sum()`、`mean()`)及可视化结果。
20 11
|
3月前
|
缓存 分布式计算 Java
MapReduce编程:join操作和聚合操作
MapReduce编程:join操作和聚合操作
32 0
|
4月前
在pyodps中,`groupby`方法用于对数据进行分组
在pyodps中,`groupby`方法用于对数据进行分组
51 1
|
5月前
DQL-分组聚合
DQL-分组聚合
13 0
|
数据采集 分布式计算 数据挖掘
聚合操作_groupBy_数据读取 | 学习笔记
快速学习聚合操作_groupBy_数据读取
110 0
聚合操作_groupBy_数据读取 | 学习笔记
|
大数据 开发者
聚合操作_多维聚合_rollup | 学习笔记
快速学习聚合操作_多维聚合_rollup
71 0
聚合操作_多维聚合_rollup | 学习笔记
|
大数据 开发者
聚合操作_多维聚合_cube | 学习笔记
快速学习聚合操作_多维聚合_cube
59 0
聚合操作_多维聚合_cube | 学习笔记
|
分布式计算 大数据 Spark
聚合操作_多维聚合_rollup 案例 | 学习笔记
快速学习聚合操作_多维聚合_rollup 案例
58 0
聚合操作_多维聚合_rollup 案例 | 学习笔记