pyspark笔记(RDD,DataFrame和Spark SQL)1

简介: pyspark笔记(RDD,DataFrame和Spark SQL)

RDD和DataFrame

1.SparkSession 介绍

SparkSession 本质上是SparkConf、SparkContext、SQLContext、HiveContext和StreamingContext这些环境的集合,避免使用这些来分别执行配置、Spark环境、SQL环境、Hive环境和Streaming环境。SparkSession现在是读取数据、处理元数据、配置会话和管理集群资源的入口。

2.SparkSession创建RDD

from pyspark.sql.session import SparkSession
if __name__ == "__main__":
    spark = SparkSession.builder.master("local") \
        .appName("My test") \
        .getOrCreate()
    sc = spark.sparkContext
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    rdd = sc.parallelize(data)

SparkSession实例化参数:通过静态类Builder来实例化。Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置。可以通SparkSession.builder 来创建一个 SparkSession 的实例,并通过 stop 函数来停止 SparkSession。

Builder 的主要方法如下

(1)appName函数
appName(String name)
用来设置应用程序名字,会显示在Spark web UI中
(2)master函数
master(String master)
设置Spark master URL 连接,比如"local" 设置本地运行,"local[4]"本地运行4cores,或则"spark://master:7077"运行在spark standalone 集群。
(3)config函数
(4)getOrCreate函数
getOrCreate()
获取已经得到的 SparkSession,或则如果不存在则创建一个新的基于builder选项的SparkSession
(5)enableHiveSupport函数
表示支持Hive,包括 链接持久化Hive metastore, 支持Hive serdes, 和Hive用户自定义函数

3.直接创建DataFrame

# 直接创建Dataframe
df = spark.createDataFrame([
        (1, 144.5, 5.9, 33, 'M'),
        (2, 167.2, 5.4, 45, 'M'),
        (3, 124.1, 5.2, 23, 'F'),
    ], ['id', 'weight', 'height', 'age', 'gender'])

4.从字典创建DataFrame

df = spark.createDataFrame([{'name':'Alice','age':1}, {'name':'Polo','age':1}])

4.指定schema创建DataFrame

schema = StructType([
    StructField("id", LongType(), True),   
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])
df = spark.createDataFrame(csvRDD, schema)

5.读文件创建DataFrame

testDF = spark.read.csv(FilePath, header='true', inferSchema='true', sep='\t')

6.从pandas dataframe创建DataFrame

import pandas as pd
from pyspark.sql import SparkSession
colors = ['white','green','yellow','red','brown','pink']
color_df=pd.DataFrame(colors,columns=['color'])
color_df['length']=color_df['color'].apply(len)
color_df=spark.createDataFrame(color_df)
color_df.show()

7.RDD与DataFrame的转换

RDD转变成DataFrame df.toDF(['col1','col2'])
DataFrame转变成RDD df.rdd.map(lambda x: (x.001,x.002))

DataFrames常用

Row

DataFrame 中的一行。可以访问其中的字段:

  • 类似属性(row.key)
  • 像字典值(row[key])


1698846305604.jpg

查看列名/行数

# 查看有哪些列 ,同pandas
df.columns
# ['color', 'length']
# 行数
df.count()
# 列数
len(df.columns)

统计频繁项目

# 查找每列出现次数占总的30%以上频繁项目
df.stat.freqItems(["id", "gender"], 0.3).show()
+------------+----------------+
|id_freqItems|gender_freqItems|
+------------+----------------+
|      [5, 3]|          [M, F]|
+------------+----------------+

select选择和切片筛选

选择几列

color_df.select('length','color').show()

多列选择和切片

color_df.select('length','color')
        .select(color_df['length']>4).show()

between 范围选择

color_df.filter(color_df.length.between(4,5) )
        .select(color_df.color.alias('mid_length')).show()

联合筛选

# 这里使用一种是 color_df.length, 另一种是color_df[0]
color_df.filter(color_df.length>4)
        .filter(color_df[0]!='white').show()

filter运行类SQL

color_df.filter("color='green'").show()
color_df.filter("color like 'b%'").show()

where方法的SQL

color_df.where("color like '%yellow%'").show()

直接使用SQL语法

# 首先dataframe注册为临时表,然后执行SQL查询
color_df.createOrReplaceTempView("color_df")
spark.sql("select count(1) from color_df").show()

新增、修改列

lit新增一列常量

import pyspark.sql.functions as F
df = df.withColumn('mark', F.lit(1))

聚合后修改

# 重新命名聚合后结果的列名(需要修改多个列名就跟多个:withColumnRenamed)
# 聚合之后不修改列名则会显示:count(member_name)
df_res.agg({'member_name': 'count', 'income': 'sum', 'num': 'sum'})
      .withColumnRenamed("count(member_name)", "member_num").show()
from pyspark.sql import functions as F
df_res.agg(
    F.count('member_name').alias('mem_num'),
    F.sum('num').alias('order_num'),
    F.sum("income").alias('total_income')
).show()

cast修改列数据类型

from pyspark.sql.types import IntegerType
# 下面两种修改方式等价
df = df.withColumn("height", df["height"].cast(IntegerType()))
df = df.withColumn("weight", df.weight.cast('int'))
print(df.dtypes)

排序

混合排序

color_df.sort(color_df.length.desc(),color_df.color.asc())                               
        .show()

orderBy排序

color_df.orderBy('length','color').show()

缺失值

计算列中的空值数目

# 计算一列空值数目
df.filter(df['col_name'].isNull()).count()
# 计算每列空值数目
for col in df.columns:
    print(col, "\t", "with null values: ", 
          df.filter(df[col].isNull()).count())

平均值填充缺失值

from pyspark.sql.functions import when
import pyspark.sql.functions as F
# 计算各个数值列的平均值
def mean_of_pyspark_columns(df, numeric_cols):
    col_with_mean = []
    for col in numeric_cols:
        mean_value = df.select(F.avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row: row[avg_col]).collect()
        col_with_mean.append([col, res[0]])
    return col_with_mean
# 用平均值填充缺失值
def fill_missing_with_mean(df, numeric_cols):
    col_with_mean = mean_of_pyspark_columns(df, numeric_cols)
    for col, mean in col_with_mean:
        df = df.withColumn(col, when(df[col].isNull() == True, F.lit(mean)).otherwise(df[col]))
    return df
if __name__ == '__main__':
    # df需要自行创建
    numeric_cols = ['age2', 'height2']  # 需要填充空值的列
    df = fill_missing_with_mean(df, numeric_cols)  # 空值填充
    df.show()

替换值

replace 全量替换

# 替换pyspark dataframe中的任何值,而无需选择特定列
df = df.replace('?',None)
df = df.replace('ckd \t','ckd')

functions 部分替换

# 只替换特定列中的值,则不能使用replace.而使用pyspark.sql.functions
# 用classck的notckd替换no
import pyspark.sql.functions as F
df = df.withColumn('class',
                   F.when(df['class'] == 'no', F.lit('notckd'))
                    .otherwise(df['class']))

groupBy + agg 聚合

作为聚合函数agg,通常是和分组函数groupby一起使用,表示对分组后的数据进行聚合操作;如果没有分组函数,默认是对整个dataframe进行聚合操作。

explode分割

# 为给定数组或映射中的每个元素返回一个新行
from pyspark.sql.functions import split, explode
df = sc.parallelize([(1, 2, 3, 'a b c'),
                     (4, 5, 6, 'd e f'),
                     (7, 8, 9, 'g h i')])
        .toDF(['col1', 'col2', 'col3', 'col4'])
df.withColumn('col4', explode(split('col4', ' '))).show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   a|
|   1|   2|   3|   b|
|   1|   2|   3|   c|
|   4|   5|   6|   d|
|   4|   5|   6|   e|
|   4|   5|   6|   f|
|   7|   8|   9|   g|
|   7|   8|   9|   h|
|   7|   8|   9|   i|
+----+----+----+----+
# 示例二
from pyspark.sql import Row
from pyspark.sql.functions import explode
eDF = spark.createDataFrame([Row(
    a=1, 
    intlist=[1, 2, 3], 
    mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).show()
+-----+
|anInt|
+-----+
|    1|
|    2|
|    3|
+-----+

isin

# 如果自变量的求值包含该表达式的值,则该表达式为true
df[df.name.isin("Bob", "Mike")].collect()
# [Row(age=5, name='Bob')]
df[df.age.isin([1, 2, 3])].collect()
# [Row(age=2, name='Alice')]

读取

从hive中读取数据

from pyspark.sql import SparkSession
myspark = SparkSession.builder \
    .appName('compute_customer_age') \
    .config('spark.executor.memory','2g') \
    .enableHiveSupport() \
    .getOrCreate()
sql = """
      SELECT id as customer_id,name, register_date
      FROM [db_name].[hive_table_name]
      limit 100
      """
df = myspark.sql(sql)
df.show(20)

将数据保存到数据库中

DataFrame.write.mode("overwrite").saveAsTable("test_db.test_table2")

读写csv/json

from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
csv_content = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(r'./test.csv')
csv_content.show(10)  #读取
df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv",header="true") #保存


df_sparksession_read = spark.read.csv(r"E: \数据\欺诈数据集\PS_7_log.csv",header=True)
df_sparksession_read.show(10)
或:
df_sparksession_read = spark.read.json(r"E: \数据\欺诈json数据集\PS_7_log.json",header=True)
df_sparksession_read.show(10)

pyspark.sql.functions常见内置函数

1.pyspark.sql.functions.abs(col)

计算绝对值。

2.pyspark.sql.functions.acos(col)

计算给定值的反余弦值; 返回的角度在0到π的范围内。

3.pyspark.sql.functions.add_months(start, months)

返回start后months个月的日期。

df=sqlContext.createDataFrame([('2015-04-08',)],['d'])
df.select(add_months(df.d,1).alias('d')).collect()
[Row(d=datetime.date(2015, 5, 8))]

4.pyspark.sql.functions.array_contains(col, value)

集合函数:如果数组包含给定值,则返回True。收集元素和值必须是相同的类型。

>>> df = sqlContext.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>>> df.select(array_contains(df.data, "a")).collect()
[Row(array_contains(data,a)=True), Row(array_contains(data,a)=False)]

5.pyspark.sql.functions.ascii(col)

计算字符串列的第一个字符的数值。

6.pyspark.sql.functions.avg(col)

聚合函数:返回组中的值的平均值。

7.pyspark.sql.functions.cbrt(col)

计算给定值的立方根。

9.pyspark.sql.functions.coalesce(*cols)

返回不为空的第一列。

10.pyspark.sql.functions.col(col)

根据给定的列名返回一个列。

col函数的作用相当于python中的dataframe格式的提取data[‘id’]

11.pyspark.sql.functions.collect_list(col)

聚合函数:返回重复对象的列表。

12.pyspark.sql.functions.collect_set(col)

聚合函数:返回一组消除重复元素的对象。

13.pyspark.sql.functions.concat(*cols)

将多个输入字符串列连接成一个字符串列。

>>> df = sqlContext.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(concat(df.s, df.d).alias('s')).collect()
[Row(s=u'abcd123')]

14.pyspark.sql.functions.concat_ws(sep, *cols)

使用给定的分隔符将多个输入字符串列连接到一个字符串列中。

>>> df = sqlContext.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect()
[Row(s=u'abcd-123')]

15.pyspark.sql.functions.corr(col1, col2)

返回col1和col2的皮尔森相关系数的新列。

16.pyspark.sql.functions.cos(col)

计算给定值的余弦。

17.pyspark.sql.functions.cosh(col)

计算给定值的双曲余弦。

18.pyspark.sql.functions.count(col)

聚合函数:返回组中的项数量。

19.pyspark.sql.functions.countDistinct(col, *cols)

返回一列或多列的去重计数的新列。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.agg(countDistinct(df.age, df.name).alias('c')).collect()
[Row(c=2)]
>>> df.agg(countDistinct("age", "name").alias('c')).collect()
[Row(c=2)]

20.pyspark.sql.functions.current_date()

以日期列的形式返回当前日期。

21.pyspark.sql.functions.current_timestamp()

将当前时间戳作为时间戳列返回。

22.pyspark.sql.functions.date_add(start, days)

返回start后days天的日期

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_add(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 9))]


相关文章
|
2月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
2月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
43 0
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
96 0
|
3月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
113 0
|
3月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
79 0
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
6月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
147 13
|
6月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
6月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
82 6