聚合操作_groupBy_数据读取 | 学习笔记

简介: 快速学习聚合操作_groupBy_数据读取

开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段聚合操作_groupBy_数据读取】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/690/detail/12086


聚合操作_groupBy_数据读取

 

聚合操作

一般进行数据处理时,会进行数据的读取,读取这个步骤也叫 Extra;读取数据后,要对这个数据来进行一系列的操作,这步是数据操作,也叫做 Transformation 数据转换;数据操作完成后,要把数据落地,这个环节称之为 Load。

这就是一个 ETL 的过程。但是,在企业当中的数据处理,会比这个环境要复杂很多,大致步骤还是 ETL,但可能是从一个临时的数据集中去读取,也有可能落地到一个临时数据集中;这个数据操作,有可能是一个很大的操作,也有可能只是进行一些数据转换,数据分析和清洗。这种情况是比较多的。

所以,在数据操作这个环节,主要关注:首先,对数据进行类型转换,然后进行一些数据清洗,缺失值处理也算数据清洗的一部分。

做完这些之后,谈需求,一般情况下,需求是:比如进行词频统计,在进行词频统计时,需求是统计这个数据集当中有多少个单词,每个单词出现了多少次。这是一个聚合操作,把所有单词聚在一起,合计起来进行操作。

比如,求 2019 年每个月的销售额,这也是一个聚合操作。按月统计订单以后,求得销售额,用每个月的销售额,去绘制一张报表,这个报表可能是一个图表,可以看到每个月的销售额曲线。假如想看哪个月销售额最多,那这种报表的操作就要进行一些聚合,所以大家能看到所有的最终结果的获取,基本上都是聚合操作。

聚合操作关乎于最终结果的获取,所以,聚合操作非常的重要,而且因为聚合操作涉及到聚合,涉及到各种这种操作,所以往往聚合操作是一些比较重的操作,大家对于聚合操作,这些算子的细节呢,要有一些了解,了解到算子的细节后,才能够在进行优化时,不会显得无从下手。

聚合操作,大致分为如下几个算子:

第一个算子,groupby 为数据进行分组;

第二个 rollup,比 groupby 更高级一些;

第三个 cube 算子;

第四个 RealationalGroupedDataset

groupby 也分两个小部分,第一部分,数据读取,数据读取中要读取,然后清洗;接下来进行分组操作,这个操作又分为两个不同的 API。

下面通过讲解让大家了解数据读取和清洗怎么完成。

1.groupby

groupBy 算子会按照列将 Dataset 分组,并返回一个 RelationalGroupedDataset 对象,通过

RelationalGroupedDataset 可以对分组进行聚合。

创建 Scala Class,名称为 AggProcessor

在 AggProcessor 当中第一步创建方法,命名为 groupby()。

整个数据读取的步骤,第一步是要创建 sparksession;之后进行数据读取;第三步数据去掉空值;第四步,使用 function 函数来完成聚合;最后使用 GroupedDataset 的 API 完成聚合。Groupedataset 是 RelationalGroupedDataset 这样的一个对象。

代码:

import org.apache.spark.sql.Sparksession

import org.junit.Test

class AggProcessor {

//1.创建 sparkSession

val spark = Sparksession.builder()

.master( master = "local[6]")

.appName( name = "agg processor"")

.getorcreate()

注:在进行使用 dollar 符,使用单引号时都需要 spark 当中的 implicits

import spark . implicits._

@Test

def groupBy( ): unit = {

要将下面的几行代码写在外面,让其他函数都可进行访问。

//1.创建 sparkSession

val spark = Sparksession.builder()

.master( master = "local[6]")

.appName( name = "agg processor"")

.getorcreate()

import spark . implicits

在读取数据之前,将 schema 拷贝进来,schema 是

beijingpm_with_nan.cn 数据集的 schema

//2.数据读取

private val schema = StructType(

List(

StructField( "id"", IntegerType) ,

StructField ( " year", IntegerType) ,

StructFieldA "month" , IntegerType) ,

StructField("day" , IntegerType) ,

StructField( "hour" , IntegerType) ,

StructField( "season" , IntegerType) ,

StructField( "pm" , DoubleType)

打开数据集,这个数据集在 pm 列都是 NaN,这一列值是没有意义的,故要去掉。

val pmDF = spark.read

.schema(schema)

. option ( "header" , value = true)

.csv ( path="dataset/beijingpm_without_null.csv")

/进行清洗,将值为 NaN 的 pm 行清洗掉

sourceDF.where( condition = " pm =!= Double.NaN)

image.png

注:这里的 Double.NaN,是因为前面代码中的 DoubleType。在读 Doubletype 时,假如数据是 NaN,读出的是 double.NaN 类型,这是一个特殊类型,不要理解为字符串。

show(

运行代码,得出结果如下图:

image.png

发现:pm 一列不存在 NaN,说明 where 起了作用

接下来要对这样一个数据集来进行聚合,为什么在前面没有主动处理 NaN,而在聚合之前要进行处理:是因为你不至于 NaN 的数据聚合进去,所以得到的数据是不准确的。

以往会在聚合之前先清洗,保证数据没有问题再进行聚合,一方面是因为聚合操作是一个比较重要的操作。

如果能通过 where 去掉一些多余的数据,那么在进行聚合时,数据越少,效率就越高。当然也可以不这样做,sparkSQL 的 catalyst 会来进行优化。

相关文章
|
1月前
|
Serverless Python
分组和聚合DataFrame信息案例解析
该文介绍了如何使用pandas对DataFrame进行分组和聚合操作。首先,通过创建字典并转换为DataFrame,展示了基础数据结构。接着,利用`groupby()`方法按城市字段进行数据分组,然后应用`mean()`函数计算各城市平均年龄,显示了聚合功能。此外,文中指出还可使用`sum()`、`count()`等其他聚合函数处理分组数据。
13 0
使用mongo聚合分组查询获取每一组的时间最大的一条数据
使用mongo聚合分组查询获取每一组的时间最大的一条数据
953 0
|
6天前
|
SQL 存储 大数据
Hive的查询、数据加载和交换、聚合、排序、优化
Hive的查询、数据加载和交换、聚合、排序、优化
23 2
|
20天前
|
数据可视化 Python
如何在Pandas中对数据集进行多级分组并进行聚合计算?
在Pandas中进行多级分组与聚合计算的步骤包括导入库(如pandas和matplotlib),准备数据集,使用`groupby()`方法分组,应用聚合函数(如`sum()`、`mean()`)及可视化结果。
25 11
|
4月前
|
缓存 分布式计算 Java
MapReduce编程:join操作和聚合操作
MapReduce编程:join操作和聚合操作
35 0
|
5月前
在pyodps中,`groupby`方法用于对数据进行分组
在pyodps中,`groupby`方法用于对数据进行分组
52 1
|
8月前
|
Python
条件选取数据dataframe
在pandas中,可以使用`merge`函数将两个dataframe合并在一起,然后使用`query`函数根据指定的条件选取数据。以下是一个例子:
62 0
|
8月前
|
Python
dataframe操作查询
Pandas提供了多种查询方法,以下是一些常见的方法: 使用df.loc方法,根据行、列的标签值查询。 使用df.iloc方法,根据行、列的数字位置查询。 使用df.where方法,根据条件过滤数据。 使用df.query方法,根据字符串表达式查询数据。
484 0
|
9月前
|
索引 Python
pandas数据分组与聚合
pandas数据分组与聚合
61 0
|
12月前
|
SQL JSON 分布式计算