Spark 【Spark SQL(一)DataFrame的创建、保存与基本操作】

简介: Spark 【Spark SQL(一)DataFrame的创建、保存与基本操作】

前言

       今天学习Spark SQL,前面的RDD编程要想熟练还是得通过项目来熟练,所以先把Spark过一遍,后期针对不足的地方再加强,这样效率会更高一些。

简介

       在RDD编程中,我们使用的是SparkContext接口,接下来的Spark SQL中,我们使用到的是SparkSession接口。Spark2.0 出现的 SparkSession 接口替代了 Spark 1.6版本中的 SQLContext 和 HiveContext接口,来实现对数据的加载、转换、处理等功能。此外,SparkSession 封装了SparkContext、SparkConf 和 StreamingContext 等。


       也就是说,在Spark1.0 中,需要创建 SparkContext 对象用于 RDD编程 ,创建 SQLContext 对象用于 SQL 编程。而在Spqrk 2.x和3.x版本下,只需要创建一个 SparkSession 对象,就可以执行各种 Spark 操作。


       其实在我们的 spark-shell 中默认已经为我们提供了一个 SparkContext 对象(“sc”)和一个SparkSession 对象(“spark”)了。


       从Spark 2.x 开始,RDD被降级为底层的API,所有通过高层的 DataFrame API 表达的计算,都会被分解,生成优化好的底层的 RDD 操作,然后转化为Scala 字节码,交给执行器的JVM虚拟机。

结构化数据 DataFrame

       Spark SQL 所使用的数据抽象并非 RDD,而是 DataFrame。DataFrame 的推出,让 Spark具备了处理大规模结构化数据的能力。

DataFrame 概述

       DataFrame 是一种以 RDD 为基础的表格型的数据结构,提供了详细的结构信息,就相当于关系数据库中的一张表。


       和 RDD 一样,DataFrame 的操作也分为转换和行动操作,DataFrame 的计算过程也是“惰性”的,只有触发行动操作,Spark才会真正从头到尾进行一次计算。

入门案例

给定一组键值对(书名,销量),现在求每个键对应的平均值,也就是图书的平均销量。

def main(args: Array[String]): Unit = {
    //创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("test01")
      .getOrCreate()
    val df: DataFrame = spark.createDataFrame(Array(("spark", 2), ("hadoop", 5), ("spark", 3), ("hadoop", 6)))
      .toDF("book", "amount")
    val df2: DataFrame = df.groupBy("book") agg(avg("amount"))
    df2.show()
    spark.stop()
  }

运行结果:

+------+-----------+
|  book|avg(amount)|
+------+-----------+
| spark|        2.5|
|hadoop|        5.5|
+------+-----------+

DataFrame 的创建与保存

Spark SQL 支持多种数据源创建 DataFrame,也支持把 DataFrame 保存成各种数据格式。

1、Parquet

读取
//1.第一种创建方式
val df1 = spark.read.foramt("parquet").load("文件路径")
//2.第二种创建方式
val df2 = spark.read.parquet("文件路径")
保存
//1.使用 Snappy 压缩算法压缩后输出
df.write.foramt("parquet").mode("overwrite").option("compression","snappy").save("输出路径")
//2.
df.write.parquet("输出路径")

2、JSON

//1.第一种创建方式
val df1 = spark.read.foramt("json").load("文件路径")
//2.第二种创建方式
val df2 = spark.read.json("文件路径")
保存
df.write.format("json").mode("overwrite").save("输出路径")
df.write.json("输出路径")

3、CSV

//两种创建方式都需要定义数据模式
val schema = "name:STRING,age INT,sex STRING"
//1.第一种创建方式
val df1 = spark.read.foramt("csv").schema(schema).option("header","true").option("seq",";").load("文件路径")
//2.第二种创建方式
val df2 = spark.read.schema(schema).option("header","true").option("seq",";").csv("文件地")
保存
//1.
df.write().format("csv").mode("overwrite").save("输出路径")
//2.
df.write.csv("输出路径")

4、文本文件

//1.第一种创建方式
val df1 = spark.read.foramt("text").load("文件路径")
//2.第二种创建方式
val df2 = spark.read.text("文件路径")
保存
//1.
df.write.text("输出路径")
//2.
df.write.foramt("text").save("输出路径")

集合类型

通过 SparkSession 对象调用 createDataFrame(集合) 方法。

  val df: DataFrame = spark.createDataFrame(Array(("spark", 2), ("hadoop", 5), ("spark", 3), ("hadoop", 6)))
      .toDF("book", "amount")

DataFrame 基本操作

我们将这个JSON文件作为输入源进行数据分析:

{"name":"Michael", "age":30, "sex": "男"}
{"name":"Andy", "age":19, "sex": "女"}
{"name":"Justin", "age":19, "sex": "男"}
{"name":"Bernadette", "age":20, "sex": "男"}
{"name":"Gretchen", "age":23,"sex": "女"}
{"name":"David", "age":27, "sex": "男"}
{"name":"Joseph", "age":33,"sex": "女"}
{"name":"Trish", "age":27,"sex": "女"}
{"name":"Alex", "age":33,"sex": "女"}
{"name":"Ben", "age":25, "sex": "男"}

生成DataFrame对象:

val df: DataFrame = spark.read.json("data/sql/people.json")

在操作 DataFrame 时,有两种不同风格的语句,即DSL和SQL语句。但无论是执行 DSL 语句还是 SQL 语句,本质上都会被转换为对 RDD 的操作 。

DSL 语法

1、printSchema()

输出DataFrame对象的模式信息。

df.printSchema()

运行结果:

root
 |-- _corrupt_record: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
2、show()

显示一个DataFrame的二维表格。

//Scala中如果方法没有参数 括号可省略
df.show()

运行结果:

+----------+----+
|      name| age|
+----------+----+
|      null|null|
|   Michael|  30|
|      Andy|  19|
|    Justin|  19|
|Bernadette|  25|
|  Gretchen|  23|
|     David|  27|
|    Joseph|  33|
|     Trish|  27|
|      Alex|  33|
|       Ben|  25|
|      null|null|
+----------+----+
3、select()

从DataFrame中选取部分列的数据,还可以对列进行重命名,对某一列的值也可以统一进行操作(比如age都+1)。

  df.select(df("name").as("username"),df("age")+1).show()  //显示出DataFrame的name和age字段并将age字段的值都+1,将name用username代替

运行结果:

+----------+---------+
|  username|(age + 1)|
+----------+---------+
|      null|     null|
|   Michael|       31|
|      Andy|       20|
|    Justin|       20|
|Bernadette|       26|
|  Gretchen|       24|
|     David|       28|
|    Joseph|       34|
|     Trish|       28|
|      Alex|       34|
|       Ben|       26|
|      null|     null|
+----------+---------+
4、filter()

进行条件查询,找到满足条件要求的数据。

df.filter(df("age")>30).show()    //输出所有30岁以上的人的信息

运行结果:

+------+---+
|  name|age|
+------+---+
|Joseph| 33|
|  Alex| 33|
+------+---+
5、groupBy()

对记录进行分组。

df.groupBy(df("sex")).count().show()

运行结果:

+----+-----+
| sex|count|
+----+-----+
|  男|    3|
|  女|    4|
+----+-----+
6、sort()

根据某一字段进行升序(asc)或降序排列(desc)。

  df.select(df("name"),df("age"),df("sex")).sort(df("age").desc,df("name").asc).show()  //先根据age降序排列 age相同根据name升序排列

运行结果:

+----------+----+----+
|      name| age| sex|
+----------+----+----+
|      Alex|  33|  女|
|    Joseph|  33|  女|
|   Michael|  30|  男|
|     David|  27|  男|
|     Trish|  27|  女|
|       Ben|  25|  男|
|  Gretchen|  23|  女|
|Bernadette|  20|  男|
|      Andy|  19|  女|
|    Justin|  19|  男|
|      null|null|null|
|      null|null|null|
+----------+----+----+
7、withColumn()

用于为 DataFrame 增加一个新的列。

//新增一列 isYoung 如果age>25 为young 否则为 old 
df.select(df("name"),df("age"),df("sex")).withColumn("isYoung",when(df("age")>25,"young").otherwise("old")).show()

运行结果:

+----------+----+----+-------+
|      name| age| sex|isYoung|
+----------+----+----+-------+
|      null|null|null|    old|
|   Michael|  30|  男|    old|
|      Andy|  19|  女|  young|
|    Justin|  19|  男|  young|
|Bernadette|  20|  男|  young|
|  Gretchen|  23|  女|  young|
|     David|  27|  男|    old|
|    Joseph|  33|  女|    old|
|     Trish|  27|  女|    old|
|      Alex|  33|  女|    old|
|       Ben|  25|  男|    old|
|      null|null|null|    old|
+----------+----+----+-------+
8、drop()

       可以删除DataFrame中的一列,上面我们是直接在 DataFrame对象的基础上进行查询并展示,show() 方法并不会有返回对象,但其实其它操作(比如select、withColumn、filter、sort等)都会返回一个新的 DataFrame对象,相当于一张新的二维表格。同样,drop() 后会返回一个新的 DataFrame 对象,相当于删除某列后的新表。

val df: DataFrame = spark.read.json("data/sql/people.json")
    val df2: DataFrame = df.select(df("name"), df("age"), df("sex")).withColumn("isYoung", when(df("age") < 25, "young").otherwise("old"))
    val df3: DataFrame = df2.drop(df("isYoung"))
df3.show()
9、其它操作

       除此之外,还有其它一些操作比如min()、max()、sum()和avg()等,比较简单,用的时候再学。

SQL 语法

       相比较 DSL 语句,SQL 语句徐需要在执行 SQL 语句之前先创建一张临时表,因为毕竟SQL语句本来就是对关系型表进行操作的语句,所以我们的数据源需要先通过createTempView()或createOrReplaceTempView()方法转换为临时表。


       这两个方法没太大区别,只不过createOrReplaceTempView()会判断是否已存在这么张表,如果存在同名的表,就用新表替换掉。而createTempView()的话,如果已经存在同名的表,它就会报错。


我们继续使用上面的 people.json 文件进行操作。

SQL 案例1
//通过JSON文件创建 DataFrame 对象
    val df = spark.read.format("json").load("data/sql/people.json")
    //创建临时表 不需要返回值 
    df.createTempView("people")
    spark.sql("SELECT * FROM PEOPLE").show()

运行结果:

+---+----------+---+
|age|      name|sex|
+---+----------+---+
| 30|   Michael| 男|
| 19|      Andy| 女|
| 19|    Justin| 男|
| 20|Bernadette| 男|
| 23|  Gretchen| 女|
| 27|     David| 男|
| 33|    Joseph| 女|
| 27|     Trish| 女|
| 33|      Alex| 女|
| 25|       Ben| 男|
+---+----------+---+

SQL 案例2

统计男女人数。

spark.sql("SELECT sex,COUNT(*) AS nums FROM people group by sex").show()

注意AS 后面的新字段名不能带引号,不能是中文!

运行结果:

+---+----+
|sex|nums|
+---+----+
| 男|   4|
| 女|   6|
+---+----+
SQL 函数

Spark SQL 提供了200多个函数供用户选择,涵盖了大部分的日常应用场景。此外,用户也可以自定义函数。

案例:

       假设一张用户信息表中有 name、age、create_time 这3列数据,这里要求使用Spark的系统函数 from_unixtime(),将时间戳类型的 create_time 格式化成时间字符串,然后使用自定义的函数将用户名转为大写英文字母。

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local")
      .appName("spark func")
      .getOrCreate()
    val schema: StructType = StructType(List(StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("create_time", LongType, true)
    ))
    val javaList:util.ArrayList[Row] = new util.ArrayList[Row]()
    javaList.add(Row("XiaoMei",24,System.currentTimeMillis()/1000))
    javaList.add(Row("XiaoShuai",23,System.currentTimeMillis()/1000))
    javaList.add(Row("XiaoLiu",21,System.currentTimeMillis()/1000))
    javaList.add(Row("XiaoMa",21,System.currentTimeMillis()/1000))
    val df = spark.createDataFrame(javaList, schema)
    df.show()
    df.createTempView("student")
    spark.sql("SELECT name,age,from_unixtime(create_time,'yyyy-MM-dd HH:mm:ss') FROM student").show()
    //注册一个新的用户自定义函数
    spark.udf.register("toUpperCaseUDF",(column:String)=>column.toUpperCase)
    //调用自定义函数
    spark.sql("SELECT toUpperCaseUDF(name) AS name,age,from_unixtime(create_time,'yyyy-MM-dd HH:mm:ss') AS create_time FROM student").show()
    spark.stop()
  }

运行结果:

默认查询结果

+---------+---+-----------+
|     name|age|create_time|
+---------+---+-----------+
|  XiaoMei| 24| 1694256797|
|XiaoShuai| 23| 1694256797|
|  XiaoLiu| 21| 1694256797|
|   XiaoMa| 21| 1694256797|
+---------+---+-----------+

调用from_unixtime()函数:

+---------+---+-----------------------------------------------+
|     name|age|from_unixtime(create_time, yyyy-MM-dd HH:mm:ss)|
+---------+---+-----------------------------------------------+
|  XiaoMei| 24|                            2023-09-09 18:53:17|
|XiaoShuai| 23|                            2023-09-09 18:53:17|
|  XiaoLiu| 21|                            2023-09-09 18:53:17|
|   XiaoMa| 21|                            2023-09-09 18:53:17|
+---------+---+-----------------------------------------------+

使用自定义函数:

+---------+---+-------------------+
|     name|age|        create_time|
+---------+---+-------------------+
|  XIAOMEI| 24|2023-09-09 18:53:17|
|XIAOSHUAI| 23|2023-09-09 18:53:17|
|  XIAOLIU| 21|2023-09-09 18:53:17|
|   XIAOMA| 21|2023-09-09 18:53:17|
+---------+---+-------------------+

总结

       今天就写到这里,明天周日继续努力,今天我新开了章节-Spark SQL,我学习了Spark SQL中一个重要的抽象数据结构-DataFrame,学习了DataFrame的成绩以及保存,还有DataFrame的两张操作方式:DSL语句和SQL语句。

   至于书上提到的 StructType、StructFeild 明天好好研究一下。

相关文章
|
20天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
48 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
83 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
41 0
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
58 0
|
2月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
43 0
|
4月前
|
SQL 存储 分布式计算
|
7月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
277 0
|
7月前
|
SQL 分布式计算 大数据
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
172 0
|
7月前
|
SQL 分布式计算 数据挖掘
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
174 0