开发者学堂课程【大数据 Spark2020版(知识精讲与实战演练)第三阶段:聚合操作_groupBy_聚合操作】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12087
聚合操作_groupBy_聚合操作
聚合操作
在读取数据集之后,进行具体操作,通过这节课了解如何使用 groupBy 及 groupBy 分组结果进行聚合操作
第一个问题是大家为自己设计需求,
为什么让大家自己设计需求:原因是如果要把需求设计好,首先要了解什么是 groupBy,什么是分组,以及分组之后要做什么。只有理解这些,才能知道哪些操作可能和 groupBy 有关,这个需求才能设计好。所以,个人认为,如果要检测自己对一个知识的掌握好坏,就可以为自己设计一些需求。如果需求自己能设计出来,能用自己所学习的东西完成所设计的需求,就表明知识掌握的很好。
指定需求:统计每个月 PM 值的平均数
1.每个月的 PM 值->分组
2.这些 PM 值的均数->把每个分组数据聚合
groupBy->分组
函数、api->聚合
按照什么分组——年和月
聚合什么——PM 值
将需求拆分为两部分:第一个是每个月的 PM 值,第二个是这些 PM 值的均数。这两个步骤还可以去继续往下拆分:第一个分组每个月的 PM 值,在求这些 PM 值的均数的过程就是一个把每个分组数据聚合的一个过程,所以要用 groupBy 来做,groupBy 是分组,分组之后是聚合,聚合要使用一些函数或者使用 API。函数和 API 指的是 groupBy 所返回的 RelationalGroupedDataset 对象当中的函数和 API。
整个过程是先分组,要按照年和月进行分组,同一年同一月的数据放在一个组中;聚合的是 PM 值的均数。
接下来进入 AggProcessor.scala 中编写代码:
// 3.数据去掉空值
val cleanDF = sourceDF.where( condition = 'pm =l= Double.NaN)
//分组
val groupedDF= cleanDF.groupBy( cols = 'year,$"month")
注:如果要使用单引号和 dollar 符要先导入 spark 下的 implicits 隐式转换。
分组结果是 RelationalGroupedDataset,下面两种方式都是针对 RelationalGroupedDataset 进行聚合。
点开 agg,在 RelationalGroupedDataset.scala 当中 agg 代码中的几种算子重载,第一个是接受 map
def agg(exprs: Map[string,string]): DataFrame = {
toDl( exprs.map { case ( colName,expr) =>
strTOExpr(expr)(df(colName ).expr)
}.toseq)
}
第二是接受 JAVA 的 map
def agg(exprs: java.util.Map[string,string]): DataFrame = {
agg(exprs.asscala.toMap)
}
第三是接收 column,可以使用 column api 进行聚合
def agg(expr: column,exprs: column*): DataFrame = {
toDF((expr +: exprs ).map {
case typed: Typedcolumn[_,_] =>
typed.withInputType(df. exprEnc,df.LogicalPLan.output ).expr
case c => c.expr
})
使用 column api 进行聚合,但 column 中没有 avg,聚合方式如下:
// 4.使用 functions 函数来完成聚合
import org.apache. spark.sql.functions.
//本质上,avg 这个函数定义了一个操作,把表达式设置给 pm 列
//select avg( pm) from ... group by
注:这个 sql 语句生成的聚合列叫 avg(pm),groupedDF.agg(avg ( 'pm))和 sql 语句一样,所以生成的聚合列的名字叫 avg(pm),故要 as。
groupedDF.agg(avg( " pm) as "pm_avg")
聚合完成之后,按照 pm 平均数进行 orderby,一般是降序排列
.orderBy( sortExprs ="pm_avg.desc)
注:如果 desc 没有参数,在调用此方法时直接.desc
.show( )
运行代码,查看结果:
结果如上图,数据是由高到低排序的。
声明:分组是使用 groupBy,聚合是对分组的结果进行聚合
接下来讲的是分组结果的第二种聚合方式
// 5.使用 GroupedDataset 的 API 来完成聚合
groupedDF.avg(colNames = "pm")
.orderBy (...)
注:OrderBy 中传入的内容如果不确定,可以使用 select()得到;
还可以使用 select()更改结果列的名称
.show()
代码如下:
groupedDF.avg( colNames ="pm")
.select( cols = $"avg(pm)" as “pm_avg)
.orderBy( sortCol ="avg(pm)")
.show()
以上这段代码除了可以求 avg 外,还可以求 min、max、sum;还可以使用如下代码求方差:
groupedDF.agg(stddev( columnName = " "))
.orderBy (‘ pm_avg.desc)
.show()
总结:
以上知识点包括如何进行分组;聚合的两种方式,一种是使用 agg 加上函数的方式求聚合,一种是使用 groupDF 中的 api 求聚合结果。