开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:聚合操作_多维聚合_编写代码】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12089
聚合操作_多维聚合_编写代码
多维聚合
简单概括,多维聚合就是在一个结果集当中既要呈现合计,小计,又要按照不同维度进行小计。是在一个结果集中完成不同维度的操作,一个维度可能是年、月或者类别,假如按照年月进行统计,又按照类别进行统计,最终得到的合计就是多维聚合。
1.编写代码
import org.apache.spark.sql.functions._
//需求 1:不同年,不同来源,PM 值的平均数
// select source,year,avg(pm)from ... group by source,year
pmFinal.groupBy( cols = 'source,'year)
根据 select 语句,avg 最终再结果集中呈现出的也是 avg(pm),这时需要 as pm,代码如下:
import org.apache.spark.sql.functions._
//需求 1:不同年,不同来源,PM 值的平均数
// select source,year,avg(pm) as pm from ... group by source,year
pmFinal.groupBy( cols = 'source,'year)
注:为了整体将 source 写在 year 前面
.agg(avg( ‘pm) as“pm”)
注:如果要使用 avg,首先要导入 org.apache.spark.sql.functions
//需求 2:在整个数据集中,按照不同的来源来统计 PM 值的平均数
// select source,avg(pm) as pm from ... group by source
pmFinal.groupBy( cols ='source)
.agg(avg( " pm) as "pm")
根据 select 语句返现需求 2 得出的结果集只有两列,而需求 1 得出的结果集有三列,如何将这两个结果集合并:
将转换操作生成的 dataframe 拿到
val postAndYearDF = pmFinal.groupBy( cols = 'source,'year)
.agg(avg( " pm) as "pm")
val postDF = pmFinal.groupBy( cols = "source)
.agg(avg( " pm) as "pm")
第一个 dataframe 为 postAndYearDF,第二个为 postDF,之后将两个 dataframe 合并到一个结果集中。
//合并在同一个结果集中
postAndYearDF.union(postDF)
现在不能进行 union,一个当中有三列,一个是两列,union 之后会很奇怪。可以在需求 2 下加上一列。
val postDF = pmFinal.groupBy( cols = "source)
.agg(avg( " pm) as "pm")
.select( cols = "source,"year," pm)
但是不存在 year 列,就需要造出这一列
.select( cols ='source, lit( literal = nul1) as "year","pm)
}
只要结果集维度相同就可以进行 union:
//合并在同一个结果集中
postAndYearDF.union(postDF)
.sort( sortExprs = "source,"year,"pm)
先按照来源进行排序,看不同来源统计的 pm 值有什么不同,source 相同按照 year 进行排序,如果 source 和 year 都相同,按照 pm 值进行排序。
.show()
}
运行查看结果:
在下图结果集当中统计了多种维度的结果,比如 source、year 和 pm 维度,这就是多维聚合。但是显示的结果集有些奇怪,合计应该在小计后面,所以在按照 year 列进行排序时注意是升序,null 要放在后面。
代码: postAndYearDF . union(postDF)
.sort( sortExprs = "source,"year.asc_nulls_last,"pm)
.show( )
}
再次运行查看结果:
在代码当中,多维聚合就要聚合多次,然后将多次聚合的结果再进行合并。