RDD和DataFrame
1.SparkSession 介绍
SparkSession 本质上是SparkConf、SparkContext、SQLContext、HiveContext和StreamingContext这些环境的集合,避免使用这些来分别执行配置、Spark环境、SQL环境、Hive环境和Streaming环境。SparkSession现在是读取数据、处理元数据、配置会话和管理集群资源的入口。
2.SparkSession创建RDD
from pyspark.sql.session import SparkSession if __name__ == "__main__": spark = SparkSession.builder.master("local") \ .appName("My test") \ .getOrCreate() sc = spark.sparkContext data = [1, 2, 3, 4, 5, 6, 7, 8, 9] rdd = sc.parallelize(data)
SparkSession实例化参数:通过静态类Builder来实例化。Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置。可以通SparkSession.builder 来创建一个 SparkSession 的实例,并通过 stop 函数来停止 SparkSession。
Builder 的主要方法如下:
(1)appName函数 appName(String name) 用来设置应用程序名字,会显示在Spark web UI中 (2)master函数 master(String master) 设置Spark master URL 连接,比如"local" 设置本地运行,"local[4]"本地运行4cores,或则"spark://master:7077"运行在spark standalone 集群。 (3)config函数 (4)getOrCreate函数 getOrCreate() 获取已经得到的 SparkSession,或则如果不存在则创建一个新的基于builder选项的SparkSession (5)enableHiveSupport函数 表示支持Hive,包括 链接持久化Hive metastore, 支持Hive serdes, 和Hive用户自定义函数
3.直接创建DataFrame
# 直接创建Dataframe df = spark.createDataFrame([ (1, 144.5, 5.9, 33, 'M'), (2, 167.2, 5.4, 45, 'M'), (3, 124.1, 5.2, 23, 'F'), ], ['id', 'weight', 'height', 'age', 'gender'])
4.从字典创建DataFrame
df = spark.createDataFrame([{'name':'Alice','age':1}, {'name':'Polo','age':1}])
4.指定schema创建DataFrame
schema = StructType([ StructField("id", LongType(), True), StructField("name", StringType(), True), StructField("age", LongType(), True), StructField("eyeColor", StringType(), True) ]) df = spark.createDataFrame(csvRDD, schema)
5.读文件创建DataFrame
testDF = spark.read.csv(FilePath, header='true', inferSchema='true', sep='\t')
6.从pandas dataframe创建DataFrame
import pandas as pd from pyspark.sql import SparkSession colors = ['white','green','yellow','red','brown','pink'] color_df=pd.DataFrame(colors,columns=['color']) color_df['length']=color_df['color'].apply(len) color_df=spark.createDataFrame(color_df) color_df.show()
7.RDD与DataFrame的转换
RDD转变成DataFrame df.toDF(['col1','col2']) DataFrame转变成RDD df.rdd.map(lambda x: (x.001,x.002))
DataFrames常用
Row
DataFrame 中的一行。可以访问其中的字段:
- 类似属性(row.key)
- 像字典值(row[key])
查看列名/行数
# 查看有哪些列 ,同pandas df.columns # ['color', 'length'] # 行数 df.count() # 列数 len(df.columns)
统计频繁项目
# 查找每列出现次数占总的30%以上频繁项目 df.stat.freqItems(["id", "gender"], 0.3).show() +------------+----------------+ |id_freqItems|gender_freqItems| +------------+----------------+ | [5, 3]| [M, F]| +------------+----------------+
select选择和切片筛选
选择几列
color_df.select('length','color').show()
多列选择和切片
color_df.select('length','color') .select(color_df['length']>4).show()
between 范围选择
color_df.filter(color_df.length.between(4,5) ) .select(color_df.color.alias('mid_length')).show()
联合筛选
# 这里使用一种是 color_df.length, 另一种是color_df[0] color_df.filter(color_df.length>4) .filter(color_df[0]!='white').show()
filter运行类SQL
color_df.filter("color='green'").show() color_df.filter("color like 'b%'").show()
where方法的SQL
color_df.where("color like '%yellow%'").show()
直接使用SQL语法
# 首先dataframe注册为临时表,然后执行SQL查询 color_df.createOrReplaceTempView("color_df") spark.sql("select count(1) from color_df").show()
新增、修改列
lit新增一列常量
import pyspark.sql.functions as F df = df.withColumn('mark', F.lit(1))
聚合后修改
# 重新命名聚合后结果的列名(需要修改多个列名就跟多个:withColumnRenamed) # 聚合之后不修改列名则会显示:count(member_name) df_res.agg({'member_name': 'count', 'income': 'sum', 'num': 'sum'}) .withColumnRenamed("count(member_name)", "member_num").show() from pyspark.sql import functions as F df_res.agg( F.count('member_name').alias('mem_num'), F.sum('num').alias('order_num'), F.sum("income").alias('total_income') ).show()
cast修改列数据类型
from pyspark.sql.types import IntegerType # 下面两种修改方式等价 df = df.withColumn("height", df["height"].cast(IntegerType())) df = df.withColumn("weight", df.weight.cast('int')) print(df.dtypes)
排序
混合排序
color_df.sort(color_df.length.desc(),color_df.color.asc()) .show()
orderBy排序
color_df.orderBy('length','color').show()
缺失值
计算列中的空值数目
# 计算一列空值数目 df.filter(df['col_name'].isNull()).count() # 计算每列空值数目 for col in df.columns: print(col, "\t", "with null values: ", df.filter(df[col].isNull()).count())
平均值填充缺失值
from pyspark.sql.functions import when import pyspark.sql.functions as F # 计算各个数值列的平均值 def mean_of_pyspark_columns(df, numeric_cols): col_with_mean = [] for col in numeric_cols: mean_value = df.select(F.avg(df[col])) avg_col = mean_value.columns[0] res = mean_value.rdd.map(lambda row: row[avg_col]).collect() col_with_mean.append([col, res[0]]) return col_with_mean # 用平均值填充缺失值 def fill_missing_with_mean(df, numeric_cols): col_with_mean = mean_of_pyspark_columns(df, numeric_cols) for col, mean in col_with_mean: df = df.withColumn(col, when(df[col].isNull() == True, F.lit(mean)).otherwise(df[col])) return df if __name__ == '__main__': # df需要自行创建 numeric_cols = ['age2', 'height2'] # 需要填充空值的列 df = fill_missing_with_mean(df, numeric_cols) # 空值填充 df.show()
替换值
replace 全量替换
# 替换pyspark dataframe中的任何值,而无需选择特定列 df = df.replace('?',None) df = df.replace('ckd \t','ckd')
functions 部分替换
# 只替换特定列中的值,则不能使用replace.而使用pyspark.sql.functions # 用classck的notckd替换no import pyspark.sql.functions as F df = df.withColumn('class', F.when(df['class'] == 'no', F.lit('notckd')) .otherwise(df['class']))
groupBy + agg 聚合
作为聚合函数agg,通常是和分组函数groupby一起使用,表示对分组后的数据进行聚合操作;如果没有分组函数,默认是对整个dataframe进行聚合操作。
explode分割
# 为给定数组或映射中的每个元素返回一个新行 from pyspark.sql.functions import split, explode df = sc.parallelize([(1, 2, 3, 'a b c'), (4, 5, 6, 'd e f'), (7, 8, 9, 'g h i')]) .toDF(['col1', 'col2', 'col3', 'col4']) df.withColumn('col4', explode(split('col4', ' '))).show() +----+----+----+----+ |col1|col2|col3|col4| +----+----+----+----+ | 1| 2| 3| a| | 1| 2| 3| b| | 1| 2| 3| c| | 4| 5| 6| d| | 4| 5| 6| e| | 4| 5| 6| f| | 7| 8| 9| g| | 7| 8| 9| h| | 7| 8| 9| i| +----+----+----+----+ # 示例二 from pyspark.sql import Row from pyspark.sql.functions import explode eDF = spark.createDataFrame([Row( a=1, intlist=[1, 2, 3], mapfield={"a": "b"})]) eDF.select(explode(eDF.intlist).alias("anInt")).show() +-----+ |anInt| +-----+ | 1| | 2| | 3| +-----+
isin
# 如果自变量的求值包含该表达式的值,则该表达式为true df[df.name.isin("Bob", "Mike")].collect() # [Row(age=5, name='Bob')] df[df.age.isin([1, 2, 3])].collect() # [Row(age=2, name='Alice')]
读取
从hive中读取数据
from pyspark.sql import SparkSession myspark = SparkSession.builder \ .appName('compute_customer_age') \ .config('spark.executor.memory','2g') \ .enableHiveSupport() \ .getOrCreate() sql = """ SELECT id as customer_id,name, register_date FROM [db_name].[hive_table_name] limit 100 """ df = myspark.sql(sql) df.show(20)
将数据保存到数据库中
DataFrame.write.mode("overwrite").saveAsTable("test_db.test_table2")
读写csv/json
from pyspark import SparkContext from pyspark.sql import SQLContext sc = SparkContext() sqlContext = SQLContext(sc) csv_content = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(r'./test.csv') csv_content.show(10) #读取 df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv",header="true") #保存
df_sparksession_read = spark.read.csv(r"E: \数据\欺诈数据集\PS_7_log.csv",header=True) df_sparksession_read.show(10) 或: df_sparksession_read = spark.read.json(r"E: \数据\欺诈json数据集\PS_7_log.json",header=True) df_sparksession_read.show(10)
pyspark.sql.functions常见内置函数
1.pyspark.sql.functions.abs(col)
计算绝对值。
2.pyspark.sql.functions.acos(col)
计算给定值的反余弦值; 返回的角度在0到π的范围内。
3.pyspark.sql.functions.add_months(start, months)
返回start后months个月的日期。
df=sqlContext.createDataFrame([('2015-04-08',)],['d']) df.select(add_months(df.d,1).alias('d')).collect() [Row(d=datetime.date(2015, 5, 8))]
4.pyspark.sql.functions.array_contains(col, value)
集合函数:如果数组包含给定值,则返回True。收集元素和值必须是相同的类型。
>>> df = sqlContext.createDataFrame([(["a", "b", "c"],), ([],)], ['data']) >>> df.select(array_contains(df.data, "a")).collect() [Row(array_contains(data,a)=True), Row(array_contains(data,a)=False)]
5.pyspark.sql.functions.ascii(col)
计算字符串列的第一个字符的数值。
6.pyspark.sql.functions.avg(col)
聚合函数:返回组中的值的平均值。
7.pyspark.sql.functions.cbrt(col)
计算给定值的立方根。
9.pyspark.sql.functions.coalesce(*cols)
返回不为空的第一列。
10.pyspark.sql.functions.col(col)
根据给定的列名返回一个列。
col函数的作用相当于python中的dataframe格式的提取data[‘id’]
11.pyspark.sql.functions.collect_list(col)
聚合函数:返回重复对象的列表。
12.pyspark.sql.functions.collect_set(col)
聚合函数:返回一组消除重复元素的对象。
13.pyspark.sql.functions.concat(*cols)
将多个输入字符串列连接成一个字符串列。
>>> df = sqlContext.createDataFrame([('abcd','123')], ['s', 'd']) >>> df.select(concat(df.s, df.d).alias('s')).collect() [Row(s=u'abcd123')]
14.pyspark.sql.functions.concat_ws(sep, *cols)
使用给定的分隔符将多个输入字符串列连接到一个字符串列中。
>>> df = sqlContext.createDataFrame([('abcd','123')], ['s', 'd']) >>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect() [Row(s=u'abcd-123')]
15.pyspark.sql.functions.corr(col1, col2)
返回col1和col2的皮尔森相关系数的新列。
16.pyspark.sql.functions.cos(col)
计算给定值的余弦。
17.pyspark.sql.functions.cosh(col)
计算给定值的双曲余弦。
18.pyspark.sql.functions.count(col)
聚合函数:返回组中的项数量。
19.pyspark.sql.functions.countDistinct(col, *cols)
返回一列或多列的去重计数的新列。
>>> l=[('Alice',2),('Bob',5)] >>> df = sqlContext.createDataFrame(l,['name','age']) >>> df.agg(countDistinct(df.age, df.name).alias('c')).collect() [Row(c=2)] >>> df.agg(countDistinct("age", "name").alias('c')).collect() [Row(c=2)]
20.pyspark.sql.functions.current_date()
以日期列的形式返回当前日期。
21.pyspark.sql.functions.current_timestamp()
将当前时间戳作为时间戳列返回。
22.pyspark.sql.functions.date_add(start, days)
返回start后days天的日期
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['d']) >>> df.select(date_add(df.d, 1).alias('d')).collect() [Row(d=datetime.date(2015, 4, 9))]