一、数据统计
1.1 读取文件
步骤1:读取文件https://cdn.coggle.club/Pokemon.csv
import pandas as pd from pyspark.sql import SparkSession # 创建spark应用 spark = SparkSession.builder.appName('mypyspark').getOrCreate() # 用python链接spark环境 from pyspark import SparkFiles spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv') df = spark.read.csv(SparkFiles.get('Pokemon.csv'), header = True, inferSchema = True) df.show(5)
并且我们将表头属性有.
的部分进行更名:
#重命名 df = df.withColumnRenamed('Sp. Atk', 'Sp Atk') df = df.withColumnRenamed('Sp. Def', 'Sp Def') df.show(5)
1.2 保存读取的信息
步骤2:将读取的进行保存,表头也需要保存,这里可保存为csv或者json格式文件。
#df.write df.write.csv(path='Pokemon.csv',mode='overwrite', header=True ) df.write.json(path='Pokemon.json',mode='overwrite' )
1.3 分析每列的类型,取值个数
步骤3:分析每列的类型,取值个数,这里就可以使用df.describe分析每列的最大值、最小值、平均值、方差等特性了(下面我只贴出HP的结果)。
for column in df.columns: df.describe(column).show()
1.4 分析每列是否包含缺失值
步骤4:分析每列是否包含缺失值,打印出有缺失值的样本数,占总数的比例。
df.filter(df['Type 2'].isNull()).count() # 386 # 转换成pandas,打印出每一列的缺失值个数 df.toPandas().isnull().sum() # 结果: Name 0 Type 1 0 Type 2 386 Total 0 HP 0 Attack 0 Defense 0 Sp Atk 0 Sp Def 0 Speed 0 Generation 0 Legendary 0 dtype: int64
打印出有缺失值的样本数,占总数的比例:
for col in df.columns: print(col, df.filter(df[col].isNull()).count()/df.count()) Name 0.0 Type 1 0.0 Type 2 0.4825 Total 0.0 HP 0.0 Attack 0.0 Defense 0.0 Sp Atk 0.0 Sp Def 0.0 Speed 0.0 Generation 0.0 Legendary 0.0
二、分组聚合
读取文件https://cdn.coggle.club/Pokemon.csv
和刚才的步骤一是一样的。
import pandas as pd from pyspark.sql import SparkSession # 创建spark应用 spark = SparkSession.builder.appName('mypyspark').getOrCreate() # 用python链接spark环境 from pyspark import SparkFiles spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv') df = spark.read.csv(SparkFiles.get('Pokemon.csv'), header = True, inferSchema = True) df.show(5)
2.1 学习groupby分组聚合的使用
和sql的group by一样意思,分组聚合,下面找到Type 1
对应数据进行分组和计数。
# Type 1 各种数量 df.groupby('Type 1').count().show(5)
还可以分组后(按照Type 1
分组)计算每组内的HP
平均值:
df.groupBy(['Type 1']).avg('HP').show(5) +------+-----------------+ |Type 1| avg(HP)| +------+-----------------+ | Water| 72.0625| |Poison| 67.25| | Steel|65.22222222222223| | Rock|65.36363636363636| | Ice| 72.0| +------+-----------------+ only showing top 5 rows
而如果是计算每组的个数:
# Type 1 各种数量 df.groupby('Type 1').count().show(5) +------+-----+ |Type 1|count| +------+-----+ | Water| 112| |Poison| 28| | Steel| 27| | Rock| 44| | Ice| 24| +------+-----+ only showing top 5 rows
2.2 学习agg分组聚合的使用
和2.1一样实现分组后(按照Type 1
分组)计算每组内的HP
平均值,我们也可以用agg
分组聚合,{'HP': 'mean'}
为参数。
df.groupBy('Type 1').agg({'HP': 'mean'}).show(5) +------+-----------------+ |Type 1| avg(HP)| +------+-----------------+ | Water| 72.0625| |Poison| 67.25| | Steel|65.22222222222223| | Rock|65.36363636363636| | Ice| 72.0| +------+-----------------+ only showing top 5 rows
当数据量不是很多时,可以使用toPandas
转成pandas的dataframe,然后使用pandas的各种函数:
df.toPandas().groupby('Type 1', as_index=False)['HP'].agg({'hp_mean': 'mean'})
2.3 transform的使用
transform的参数如下,可以看到需要传入一个函数(用于转换数据)。
DataFrame.transform(func: Callable[[…], Series], axis: Union[int, str] = 0, *args: Any, **kwargs: Any) → DataFrame
如果我们要按照Type 1
分组后,求组内关于HP
的平均值,可以将np.mean
函数传入transform
,具体介绍可以参考pyspark官方文档。
df.toPandas().groupby('Type 1')['HP'].transform(np.mean) 0 67.271429 1 67.271429 2 67.271429 3 67.271429 4 69.903846 ... 795 65.363636 796 65.363636 797 70.631579 798 70.631579 799 69.903846 Name: HP, Length: 800, dtype: float64