【Spark】(task2)PySpark数据统计和分组聚合

简介: 1.2 保存读取的信息步骤2:将读取的进行保存,表头也需要保存,这里可保存为csv或者json格式文件。

一、数据统计

1.1 读取文件

步骤1:读取文件https://cdn.coggle.club/Pokemon.csv

import pandas as pd
from pyspark.sql import SparkSession
# 创建spark应用
spark = SparkSession.builder.appName('mypyspark').getOrCreate()
# 用python链接spark环境
from pyspark import SparkFiles
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
df = spark.read.csv(SparkFiles.get('Pokemon.csv'), 
                    header = True,
                    inferSchema = True)
df.show(5)

image.png

并且我们将表头属性有.的部分进行更名:

#重命名
df = df.withColumnRenamed('Sp. Atk', 'Sp Atk')
df = df.withColumnRenamed('Sp. Def', 'Sp Def')
df.show(5)

image.png

1.2 保存读取的信息

步骤2:将读取的进行保存,表头也需要保存,这里可保存为csv或者json格式文件。

#df.write
df.write.csv(path='Pokemon.csv',mode='overwrite', header=True )
df.write.json(path='Pokemon.json',mode='overwrite' )

1.3 分析每列的类型,取值个数

步骤3:分析每列的类型,取值个数,这里就可以使用df.describe分析每列的最大值、最小值、平均值、方差等特性了(下面我只贴出HP的结果)。

for column in df.columns:
    df.describe(column).show()

image.png

1.4 分析每列是否包含缺失值

步骤4:分析每列是否包含缺失值,打印出有缺失值的样本数,占总数的比例。

df.filter(df['Type 2'].isNull()).count() # 386
# 转换成pandas,打印出每一列的缺失值个数
df.toPandas().isnull().sum()
# 结果:
Name            0
Type 1          0
Type 2        386
Total           0
HP              0
Attack          0
Defense         0
Sp Atk          0
Sp Def          0
Speed           0
Generation      0
Legendary       0
dtype: int64

打印出有缺失值的样本数,占总数的比例:

for col in df.columns:
    print(col, df.filter(df[col].isNull()).count()/df.count())
Name 0.0
Type 1 0.0
Type 2 0.4825
Total 0.0
HP 0.0
Attack 0.0
Defense 0.0
Sp Atk 0.0
Sp Def 0.0
Speed 0.0
Generation 0.0
Legendary 0.0

二、分组聚合

读取文件https://cdn.coggle.club/Pokemon.csv

和刚才的步骤一是一样的。

import pandas as pd
from pyspark.sql import SparkSession
# 创建spark应用
spark = SparkSession.builder.appName('mypyspark').getOrCreate()
# 用python链接spark环境
from pyspark import SparkFiles
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
df = spark.read.csv(SparkFiles.get('Pokemon.csv'), 
                    header = True,
                    inferSchema = True)
df.show(5)

2.1 学习groupby分组聚合的使用

和sql的group by一样意思,分组聚合,下面找到Type 1对应数据进行分组和计数。

# Type 1 各种数量
df.groupby('Type 1').count().show(5)

还可以分组后(按照Type 1分组)计算每组内的HP平均值:

df.groupBy(['Type 1']).avg('HP').show(5)
+------+-----------------+
|Type 1|          avg(HP)|
+------+-----------------+
| Water|          72.0625|
|Poison|            67.25|
| Steel|65.22222222222223|
|  Rock|65.36363636363636|
|   Ice|             72.0|
+------+-----------------+
only showing top 5 rows

而如果是计算每组的个数:

# Type 1 各种数量
df.groupby('Type 1').count().show(5)
+------+-----+
|Type 1|count|
+------+-----+
| Water|  112|
|Poison|   28|
| Steel|   27|
|  Rock|   44|
|   Ice|   24|
+------+-----+
only showing top 5 rows

2.2 学习agg分组聚合的使用

和2.1一样实现分组后(按照Type 1分组)计算每组内的HP平均值,我们也可以用agg分组聚合,{'HP': 'mean'}为参数。

df.groupBy('Type 1').agg({'HP': 'mean'}).show(5)
+------+-----------------+
|Type 1|          avg(HP)|
+------+-----------------+
| Water|          72.0625|
|Poison|            67.25|
| Steel|65.22222222222223|
|  Rock|65.36363636363636|
|   Ice|             72.0|
+------+-----------------+
only showing top 5 rows

当数据量不是很多时,可以使用toPandas转成pandas的dataframe,然后使用pandas的各种函数:

df.toPandas().groupby('Type 1', as_index=False)['HP'].agg({'hp_mean': 'mean'})

image.png

2.3 transform的使用

transform的参数如下,可以看到需要传入一个函数(用于转换数据)。

DataFrame.transform(func: Callable[[…], Series], axis: Union[int, str] = 0, *args: Any, **kwargs: Any) → DataFrame

如果我们要按照Type 1分组后,求组内关于HP的平均值,可以将np.mean函数传入transform,具体介绍可以参考pyspark官方文档

df.toPandas().groupby('Type 1')['HP'].transform(np.mean)
0      67.271429
1      67.271429
2      67.271429
3      67.271429
4      69.903846
         ...    
795    65.363636
796    65.363636
797    70.631579
798    70.631579
799    69.903846
Name: HP, Length: 800, dtype: float64
相关实践学习
Serverless极速搭建Hexo博客
本场景介绍如何使用阿里云函数计算服务命令行工具快速搭建一个Hexo博客。
相关文章
|
SQL 分布式计算 HIVE
pyspark笔记(RDD,DataFrame和Spark SQL)1
pyspark笔记(RDD,DataFrame和Spark SQL)
120 1
|
5月前
|
分布式计算 运维 Serverless
EMR Serverless Spark PySpark流任务体验报告
阿里云EMR Serverless Spark是一款全托管的云原生大数据计算服务,旨在简化数据处理流程,降低运维成本。测评者通过EMR Serverless Spark提交PySpark流任务,体验了从环境准备、集群创建、网络连接到任务管理的全过程。通过这次测评,可以看出阿里云EMR Serverless Spark适合有一定技术基础的企业,尤其是需要高效处理大规模数据的场景,但新用户需要投入时间和精力学习和适应。
7179 43
EMR Serverless Spark PySpark流任务体验报告
|
4月前
|
分布式计算 运维 Serverless
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。
265 7
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
|
5月前
spark3总结——分区数对带有初始值聚合操作的影响
spark3总结——分区数对带有初始值聚合操作的影响
31 4
|
5月前
|
分布式计算 运维 Serverless
通过Serverless Spark提交PySpark流任务的实践体验
EMR Serverless Spark服务是阿里云推出的一种全托管、一站式的数据计算平台,旨在简化大数据计算的工作流程,让用户更加专注于数据分析和价值提炼,而非基础设施的管理和运维。下面就跟我一起通过Serverless Spark提交PySpark流任务吧。
220 1
|
6月前
|
SQL 数据采集 分布式计算
Spark SQL中的聚合与窗口函数
Spark SQL中的聚合与窗口函数
|
6月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
239 0
|
SQL 存储 分布式计算
pyspark笔记(RDD,DataFrame和Spark SQL)2
pyspark笔记(RDD,DataFrame和Spark SQL)
88 2
|
分布式计算 资源调度 Java
Spark笔记(pyspark)2
Spark笔记(pyspark)
114 0
|
存储 分布式计算 资源调度
Spark笔记(pyspark)1
Spark笔记(pyspark)
108 0