Spark【Spark SQL(三)DataSet】

简介: Spark【Spark SQL(三)DataSet】

DataSet

        DataFrame 的出现,让 Spark 可以更好地处理结构化数据的计算,但存在一个问题:编译时的类型安全问题,为了解决它,Spark 引入了 DataSet API(DataFrame API 的扩展)。DataSet 是分布式的数据集合,它提供了强类型支持,也就是给 RDD 的每行数据都添加了类型约束。


       在 Spark 2.0 中,DataFrame 和 DataSet 被合并为 DataSet 。DataSet包含 DataFrame 的功能,DataFrame 表示为 DataSet[Row] ,即DataSet 的子集。


三种 API 的选择

       RDD 是DataFrame 和 DataSet 的底层,如果需要更多的控制功能(比如精确控制Spark 怎么执行一条查询),尽量使用 RDD。


       如果希望在编译时获得更高的类型安全,建议使用 DataSet。


       如果想统一简化 Spark 的API ,则使用 DataFrame 和 DataSet。


       基于 DataFrame API 和 DataSet API 开发的程序会被自动优化,开发人员不需要操作底层的RDD API 来手动优化,大大提高了开发效率。但是RDD API 对于非结构化数据的处理有独特的优势(比如文本数据流),而且更方便我们做底层的操作。


DataSet 的创建

1、使用createDataset()方法创建

def main(args: Array[String]): Unit = {
    //local代表本地单线程模式 local[*]代表本地多线程模式
    val spark = SparkSession.builder()
      .appName("create dataset")
      .master("local[*]")
      .getOrCreate()
    //一定要导入它 不然无法创建DataSet对象
    import spark.implicits._
    val ds1 = spark.createDataset(1 to 5)
    ds1.show()
    val ds2 = spark.createDataset(spark.sparkContext.textFile("data/sql/people.txt"))
    ds2.show()
    spark.stop()
  }

运行结果:

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+
+--------+
|   value|
+--------+
| Tom, 21|
|Mike, 25|
|Andy, 18|
+--------+

2、通过 toDS 方法生成

import org.apache.spark.sql.{Dataset, SparkSession}
object DataSetCreate {
  //case类一定要写到main方法之外
  case class Person(name:String,age:Int)
  def main(args: Array[String]): Unit = {
    //local代表本地单线程模式 local[*]代表本地多线程模式
    val spark = SparkSession.builder()
      .appName("create dataset")
      .master("local[*]")
      .getOrCreate()
    //一定要导入 SparkSession对象下的implicits
    import spark.implicits._
    val data = List(Person("Tom",21),Person("Andy",22))
    val ds: Dataset[Person] = data.toDS()
    ds.show()
    spark.stop()
  }
}

运行结果:

+----+---+
|name|age|
+----+---+
| Tom| 21|
|Andy| 22|
+----+---+

3、通过DataFrame 转换生成

需要注意:json中的数

object DataSetCreate{
 case class Person(name:String,age:Long,sex:String)
def main(args: Array[String]): Unit = {
    //local代表本地单线程模式 local[*]代表本地多线程模式
    val spark = SparkSession.builder()
      .appName("create dataset")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    val df = spark.read.json("data/sql/people.json")
    val ds = df.as[Person]
    ds.show()
    spark.stop()
    }
}

RDD、DataFrame 和 DataSet 之间的相互转换

RDD <=> DataFrame

  1. RDD 转 DataFrame ,也就是上一篇博客中介绍的两种方法


  1. 能创建case类,就直接映射出一个RDD[Person],然后调用toDF方法利用反射机制推断RDD模式。
  2. 无法创建case类,就使用编程方式定义RDD模式,使用

createDataFrame(rowRDD,schema) 指定rowRDD:RDD[Row]和schema:StructType。

3.如果RDD是像:RDD[(Long, String)] 这样保存的是一个元组类型的RDD,那么也可以直接使用 toDF 方法转为 DataFrame 对象,因为元组的 k,v 数据类型是已知的,就相当于有了创建 DataFrame 的模式信息(schema)。


  1. DataFrame 转 RDD,直接使用 rdd() 方法。
package com.study.spark.core.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
object TransForm {
  case class Person(name:String,age:Int)  //txt文件age字段可以用Int,但json文件尽量用Long
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("transform")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    //1.RDD和DataFrame之间互相转换
    //1.1 创建RDD对象
    val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
      .map(_.split(","))
      .map(attr => Person(attr(0), attr(1).trim.toInt))
    rdd.foreach(println)
    /*
      Person(Andy,18)
      Person(Tom,21)
      Person(Mike,25)
     */
    //1.2 RDD转DataFrame
    val df = rdd.toDF()
    df.show()
    /*
      +----+---+
      |name|age|
      +----+---+
      | Tom| 21|
      |Mike| 25|
      |Andy| 18|
      +----+---+
  */
    //1.3 DataFrame转RDD
    val res: RDD[Row] = df.rdd
    /*
      [Andy,18]
      [Tom,21]
      [Mike,25]
     */
    res.foreach(println)
    spark.stop()
  }
}

可以看到,RDD[Person]转为DataFrame后,再从DataFrame转回RDD就变成了RDD[Row] 类型了。


RDD <=> DataSet

RDD 和 DataSet 之间的转换比较简单:

  1. RDD 转 DataSet 直接使用case 类(比如Person),然后映射出 RDD[Person] ,直接调用 toDS方法。
  2. DataSet 转 RDD 直接调用 rdd方法即可。
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
object TransForm {
  case class Person(name:String,age:Int)  //txt文件age字段可以用Int,但json文件尽量用Long
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("transform")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    //1.RDD和DataSet之间互相转换
    //1.1 创建RDD对象
    val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
      .map(_.split(","))
      .map(attr => Person(attr(0), attr(1).trim.toInt))
    rdd.foreach(println)
    /*
      Person(Andy,18)
      Person(Tom,21)
      Person(Mike,25)
     */
    //1.2 RDD转DataSet
    val ds = rdd.toDS()
    ds.show()
    /*
      +----+---+
      |name|age|
      +----+---+
      | Tom| 21|
      |Mike| 25|
      |Andy| 18|
      +----+---+
  */
    //1.3 DataFrame转RDD
    val res: RDD[Person] = ds.rdd
    res.foreach(println)
    /*
      Person(Andy,18)
      Person(Tom,21)
      Person(Mike,25)
     */
    spark.stop()
  }
}

可以看到,相比RDD和DataFrame互相转换,RDD和DataSet转换的过程中,不会有数据类型的变化,而DataFrame转RDD的过程就会把我们定义的case类转为Row对象。

DataFrame <=> DataSet

  1. DataFrame 转 DataSet 先使用case类,然后直接使用 as[case 类] 方法。
  2. DataSet 转 DataFrame 直接使用 toDF 方法。
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object TransForm {
  case class Person(name:String,age:Long,sex:String)  //txt文件age字段可以用Int,但json文件尽量用Long
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("transform")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    //1.DataFrame和DataSet之间互相转换
    //1.1 创建DataFrame对象
    val df = spark.read.json("data/sql/people.json")
    df.show()
    /*
      +---+----------+---+
      |age|      name|sex|
      +---+----------+---+
      | 30|   Michael| 男|
      | 19|      Andy| 女|
      | 19|    Justin| 男|
      | 20|Bernadette| 女|
      | 23|  Gretchen| 女|
      | 27|     David| 男|
      | 33|    Joseph| 女|
      | 27|     Trish| 女|
      | 33|      Alex| 女|
      | 25|       Ben| 男|
      +---+----------+---+
    */
    //1.2 DataFrame转DataSet
    val ds = df.as[Person]
    ds.show()
    /*
      +---+----------+---+
      |age|      name|sex|
      +---+----------+---+
      | 30|   Michael| 男|
      | 19|      Andy| 女|
      | 19|    Justin| 男|
      | 20|Bernadette| 女|
      | 23|  Gretchen| 女|
      | 27|     David| 男|
      | 33|    Joseph| 女|
      | 27|     Trish| 女|
      | 33|      Alex| 女|
      | 25|       Ben| 男|
      +---+----------+---+
  */
    //1.3 DataSet转DataFrame
    val res: DataFrame = ds.toDF()
    res.show()
    /*
      +---+----------+---+
      |age|      name|sex|
      +---+----------+---+
      | 30|   Michael| 男|
      | 19|      Andy| 女|
      | 19|    Justin| 男|
      | 20|Bernadette| 女|
      | 23|  Gretchen| 女|
      | 27|     David| 男|
      | 33|    Joseph| 女|
      | 27|     Trish| 女|
      | 33|      Alex| 女|
      | 25|       Ben| 男|
      +---+----------+---+
    */
    spark.stop()
  }
}

DataSet 实现 WordCount

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("create dataset")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    val res: Dataset[(String, Long)] = spark.read.text("data/word.txt").as[String]
      .flatMap(_.split(" "))
      .groupByKey(_.toLowerCase)
      .count()
    res.show()
    spark.stop()
  }

运行结果:

|   key|count(1)|
+------+--------+
|  fast|       1|
|    is|       3|
| spark|       2|
|better|       1|
|  good|       1|
|hadoop|       1|
+------+--------+

总结

       剩下来就是不断练习各种DataFrame和DataSet的操作、熟悉各种转换和行动操作。

相关文章
|
1月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
70 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
88 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
58 0
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
75 0
|
2月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
46 0
|
SQL 消息中间件 分布式计算
通过Spark SQL实时归档SLS数据
我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。
2570 0
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
5月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
134 13
|
5月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
下一篇
DataWorks