本章介绍数据处理。数据处理是执行Machine Learning所需的关键步骤,因为我们需要清理,过滤,合并和转换我们的所需数据形式。
快速入门
- 读取
>>> from pyspark.sql import SparkSession
>>> spark=SparkSession.builder.appName('data_processing').getOrCreate()
>>> df=spark.read.csv('sample_data.csv',inferSchema=True, header=True)
>>> df.columns
['ratings', 'age', 'experience', 'family', 'mobile']
>>> len(df.columns)
5
>>> df.count()
33
>>> df.printSchema()
root
|-- ratings: integer (nullable = true)
|-- age: integer (nullable = true)
|-- experience: double (nullable = true)
|-- family: integer (nullable = true)
|-- mobile: string (nullable = true)
>>> df.show(3)
+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
| 3| 32| 9.0| 3| Vivo|
| 3| 27| 13.0| 3| Apple|
| 4| 22| 2.5| 0|Samsung|
+-------+---+----------+------+-------+
only showing top 3 rows
>>> df.select('age','mobile').show(5)
+---+-------+
|age| mobile|
+---+-------+
| 32| Vivo|
| 27| Apple|
| 22|Samsung|
| 37| Apple|
| 27| MI|
+---+-------+
only showing top 5 rows
>>> df.describe().show()
+-------+------------------+------------------+------------------+------------------+------+
|summary| ratings| age| experience| family|mobile|
+-------+------------------+------------------+------------------+------------------+------+
| count| 33| 33| 33| 33| 33|
| mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181| null|
| stddev|1.1188806636071336| 6.18527087180309| 6.770731351213326|1.8448330794164254| null|
| min| 1| 22| 2.5| 0| Apple|
| max| 5| 42| 23.0| 5| Vivo|
+-------+------------------+------------------+------------------+------------------+------+
- 添加列
>>> df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)
+-------+---+----------+------+-------+----------------+
|ratings|age|experience|family|mobile |age_after_10_yrs|
+-------+---+----------+------+-------+----------------+
|3 |32 |9.0 |3 |Vivo |42 |
|3 |27 |13.0 |3 |Apple |37 |
|4 |22 |2.5 |0 |Samsung|32 |
|4 |37 |16.5 |4 |Apple |47 |
|5 |27 |9.0 |1 |MI |37 |
|4 |27 |9.0 |0 |Oppo |37 |
|5 |37 |23.0 |5 |Vivo |47 |
|5 |37 |23.0 |5 |Samsung|47 |
|3 |22 |2.5 |0 |Apple |32 |
|3 |27 |6.0 |0 |MI |37 |
+-------+---+----------+------+-------+----------------+
only showing top 10 rows
>>> from pyspark.sql.types import StringType,DoubleType
>>> df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)
+-------+---+----------+------+-------+----------+
|ratings|age|experience|family|mobile |age_double|
+-------+---+----------+------+-------+----------+
|3 |32 |9.0 |3 |Vivo |32.0 |
|3 |27 |13.0 |3 |Apple |27.0 |
|4 |22 |2.5 |0 |Samsung|22.0 |
|4 |37 |16.5 |4 |Apple |37.0 |
|5 |27 |9.0 |1 |MI |27.0 |
|4 |27 |9.0 |0 |Oppo |27.0 |
|5 |37 |23.0 |5 |Vivo |37.0 |
|5 |37 |23.0 |5 |Samsung|37.0 |
|3 |22 |2.5 |0 |Apple |22.0 |
|3 |27 |6.0 |0 |MI |27.0 |
+-------+---+----------+------+-------+----------+
only showing top 10 rows
上面的False表示超过20个字符也不会截断。
- 数据过滤
>>> df.filter(df['mobile']=='Vivo').show()
+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
| 3| 32| 9.0| 3| Vivo|
| 5| 37| 23.0| 5| Vivo|
| 4| 37| 6.0| 0| Vivo|
| 5| 37| 13.0| 1| Vivo|
| 4| 37| 6.0| 0| Vivo|
+-------+---+----------+------+------+
>>> df.filter(df['mobile']=='Vivo').select('age','ratings', 'mobile').show()
+---+-------+------+
|age|ratings|mobile|
+---+-------+------+
| 32| 3| Vivo|
| 37| 5| Vivo|
| 37| 4| Vivo|
| 37| 5| Vivo|
| 37| 4| Vivo|
+---+-------+------+
>>> df.filter(df['mobile']=='Vivo').filter(df['experience']>10).show()
+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 13.0| 1| Vivo|
+-------+---+----------+------+------+
>>> df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()
+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
| 5| 37| 23.0| 5| Vivo|
| 5| 37| 13.0| 1| Vivo|
+-------+---+----------+------+------+
- 唯一值
>>> df.select('mobile').distinct().show()
+-------+
| mobile|
+-------+
| MI|
| Oppo|
|Samsung|
| Vivo|
| Apple|
+-------+
>>> df.select('mobile').distinct().count()
5
- 分组和排序
>>> df.groupBy('mobile').count().show(5,False)
+-------+-----+
|mobile |count|
+-------+-----+
|MI |8 |
|Oppo |7 |
|Samsung|6 |
|Vivo |5 |
|Apple |7 |
+-------+-----+
>>> df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
+-------+-----+
|mobile |count|
+-------+-----+
|MI |8 |
|Oppo |7 |
|Apple |7 |
|Samsung|6 |
|Vivo |5 |
+-------+-----+
>>> df.groupBy('mobile').mean().show(5,False)
+-------+------------------+------------------+------------------+------------------+
|mobile |avg(ratings) |avg(age) |avg(experience) |avg(family) |
+-------+------------------+------------------+------------------+------------------+
|MI |3.5 |30.125 |10.1875 |1.375 |
|Oppo |2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286|
|Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.8333333333333333|
|Vivo |4.2 |36.0 |11.4 |1.8 |
|Apple |3.4285714285714284|30.571428571428573|11.0 |2.7142857142857144|
+-------+------------------+------------------+------------------+------------------+
>>> df.groupBy('mobile').sum().show(5,False)
+-------+------------+--------+---------------+-----------+
|mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)|
+-------+------------+--------+---------------+-----------+
|MI |28 |241 |81.5 |11 |
|Oppo |20 |199 |72.5 |10 |
|Samsung|25 |172 |52.0 |11 |
|Vivo |21 |180 |57.0 |9 |
|Apple |24 |214 |77.0 |19 |
+-------+------------+--------+---------------+-----------+
>>> df.groupBy('mobile').max().show(5,False)
+-------+------------+--------+---------------+-----------+
|mobile |max(ratings)|max(age)|max(experience)|max(family)|
+-------+------------+--------+---------------+-----------+
|MI |5 |42 |23.0 |5 |
|Oppo |4 |42 |23.0 |2 |
|Samsung|5 |37 |23.0 |5 |
|Vivo |5 |37 |23.0 |5 |
|Apple |4 |37 |16.5 |5 |
+-------+------------+--------+---------------+-----------+
>>> df.groupBy('mobile').max().show(3,False)
+-------+------------+--------+---------------+-----------+
|mobile |max(ratings)|max(age)|max(experience)|max(family)|
+-------+------------+--------+---------------+-----------+
|MI |5 |42 |23.0 |5 |
|Oppo |4 |42 |23.0 |2 |
|Samsung|5 |37 |23.0 |5 |
+-------+------------+--------+---------------+-----------+
only showing top 3 rows
>>> df.groupBy('mobile').min().show(5,False)
+-------+------------+--------+---------------+-----------+
|mobile |min(ratings)|min(age)|min(experience)|min(family)|
+-------+------------+--------+---------------+-----------+
|MI |1 |27 |2.5 |0 |
|Oppo |2 |22 |6.0 |0 |
|Samsung|2 |22 |2.5 |0 |
|Vivo |3 |32 |6.0 |0 |
|Apple |3 |22 |2.5 |0 |
+-------+------------+--------+---------------+-----------+
- 聚合
>>> df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)
+-------+---------------+
|mobile |sum(experience)|
+-------+---------------+
|MI |81.5 |
|Oppo |72.5 |
|Samsung|52.0 |
|Vivo |57.0 |
|Apple |77.0 |
+-------+---------------+