Spark SQL中DataSet函数操作

简介: 笔记

一、DataSet中常见函数详解


(1)重分区函数:coalesce / repartition


coalesce:只能用于减少分区的数据,而且可以选择不发生shuffle。

repartition:可以增加分区的数据,也可以减少分区的数据,必须会发生shuffle,相当于进行了一次重新分区操作。

(2)去重函数:distinct / dropDuplicates


distinct:是根据每一条数据进行完整内容的对比和去重。

dropDuplicates:可以根据指定的字段进行去重。

(3)过滤函数:filter / except / intersect


filter:根据我们自己的业务逻辑,如果返回true,就保留该元素,如果是false,就不保留该元素。

except:获取在当前的dataset中有,但是在另一个dataset中没有的元素。

intersect:获取两个dataset中的交集。

(4)Map函数:map / flatMap / mapPartition


map:将数据集中的每一条数据做一个映射,返回一条映射。

flatMap:数据集中的每条数据都可以返回多条数据。

mapPartition:一次性对一个partition中的数据进行处理。

(5)Join函数:join


join:根据某一个字段,关联合并两个数据集。

(6)Sort函数:sort


sort:对数据表排序。

(7)随机切分函数:sample / randomSplit


randomSplit:根据Array中的元素的数量进行切分,然后再给定每个元素的值来保证对切分数据的权重。

sample:根据我们设定的比例进行随机抽取。

示例代码及运行演示:

初始化SparkSession

package com.kfk.spark.common
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:02 下午
 */
object CommSparkSessionScala {
    def getSparkSession(): SparkSession ={
        val spark = SparkSession
                .builder
                .appName("CommSparkSessionScala")
                .master("local")
                .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate
        return spark
    }
}

案例中用到的数据

people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"Justin", "age":19}
{"name":"Justin", "age":32}

people1.json

{"name":"Justin", "age":32}
package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/4
 * @time : 2:45 下午
 */
object TypeOperationScala {
    // case class Person(name:String, age:Option[Long])
    case class Person(name:String, age:Long)
    case class Employee(name1:String, salary:Long)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        /**
         * coalesce / repartition
         */
        coalesceAndRepartitions(spark)
        /**
         * distinct / dropDuplicates
         */
        distinctAndDropDuplicates(spark)
        /**
         * filter / except / intersect
         */
        filterAndExceptAndIntersect(spark)
        /**
         * map / flatMap / mapPartition
         */
        mapAndFlatMap(spark)
        /**
         * join
         */
        joinWith(spark)
        /**
         * sort
         */
        sort(spark)
        /**
         * sample / randomSplit
         */
        sampleAndRandomSplit(spark)
    }
    /**
     * coalesce 和 repartition 操作
     * 都是用来进行重分区的
     * 区别在于:coalesce只能用于减少分区的数据,而且可以选择不发生shuffle
     * repartition可以增加分区的数据,也可以减少分区的数据,必须会发生shuffle,相当于进行了一次重新分区操作
     * @param spark
     */
    def coalesceAndRepartitions(spark : SparkSession): Unit ={
        val personDF = spark.read.json(Comm.fileDirPath + "people.json")
        // 1
        println(personDF.rdd.partitions.size)
        // repartition
        val personDFRepartition = personDF.repartition(4)
        // 4
        println(personDFRepartition.rdd.partitions.size)
        // coalesce
        val personDFCoalesce = personDFRepartition.coalesce(3)
        // 3
        println(personDFCoalesce.rdd.partitions.size)
    }
    /**
     * distinct:是根据每一条数据进行完整内容的对比和去重
     * DropDuplicates:可以根据指定的字段进行去重
     * @param spark
     */
    def distinctAndDropDuplicates(spark : SparkSession): Unit ={
        val personDF = spark.read.json(Comm.fileDirPath + "people.json")
        personDF.show()
        /**
         * +----+-------+
         * | age|   name|
         * +----+-------+
         * |null|Michael|
         * |  30|   Andy|
         * |  19| Justin|
         * |  19| Justin|
         * |  32| Justin|
         * +----+-------+
         */
        // distinct
        personDF.distinct().show()
        /**
         * +----+-------+
         * | age|   name|
         * +----+-------+
         * |null|Michael|
         * |  30|   Andy|
         * |  19| Justin|
         * |  32| Justin|
         * +----+-------+
         */
        // dropDuplicates
        personDF.dropDuplicates(Seq("name")).show()
        /**
         * +----+-------+
         * | age|   name|
         * +----+-------+
         * |null|Michael|
         * |  30|   Andy|
         * |  19| Justin|
         * +----+-------+
         */
    }
    /**
     * filter:根据我们自己的业务逻辑,如果返回true,就保留该元素,如果是false,就不保留该元素
     * except:获取在当前的dataset中有,但是在另一个dataset中没有的元素
     * intersect:获取两个dataset中的交集
     * @param spark
     */
    def filterAndExceptAndIntersect(spark : SparkSession): Unit ={
        import spark.implicits._
        val personDS = spark.read.json(Comm.fileDirPath + "people.json").as[Person]
        val personDS1 = spark.read.json(Comm.fileDirPath + "people1.json").as[Person]
        // filter方式一
        personDS.filter(line => line.age != None).show()
        /**
         * +---+------+
         * |age|  name|
         * +---+------+
         * | 30|  Andy|
         * | 19|Justin|
         * | 19|Justin|
         * | 32|Justin|
         * +---+------+
         */
        // filter方式二
        val personDF = spark.read.json(Comm.fileDirPath + "people.json")
        val rdd = personDF.rdd.filter(line => line.getAs("age") != null)
        for (elem <- rdd.collect()) {
            System.out.println(elem)
        }
        /**
         * [30,Andy]
         * [19,Justin]
         * [19,Justin]
         * [32,Justin]
         */
        // except
        personDS.except(personDS1).show()
        /**
         * +----+-------+
         * | age|   name|
         * +----+-------+
         * |null|Michael|
         * |  30|   Andy|
         * |  19| Justin|
         * +----+-------+
         */
        // intersect
        personDS.intersect(personDS1).show()
        /**
         * +---+------+
         * |age|  name|
         * +---+------+
         * | 32|Justin|
         * +---+------+
         */
    }
    /**
     * map:将数据集中的每一条数据做一个映射,返回一条映射
     * flatMap:数据集中的每条数据都可以返回多条数据
     * mapPartition:一次性对一个partition中的数据进行处理
     * @param spark
     */
    def mapAndFlatMap(spark : SparkSession): Unit ={
        import spark.implicits._
        val personDS = spark.read.json(Comm.fileDirPath + "people.json").as[Person]
        // map
        personDS.filter("age is not null").map(line => (line.name,line.age+10)).show()
        /**
         * +------+---+
         * |    _1| _2|
         * +------+---+
         * |  Andy| 40|
         * |Justin| 29|
         * |Justin| 29|
         * |Justin| 42|
         * +------+---+
         */
        // flatMap
        personDS.filter("age is not null").flatMap(line => {
            Seq(Person(line.name + "_1",line.age + 1000),Person(line.name + "_2",line.age + 1000))
        }).show()
        /**
         * +--------+----+
         * |    name| age|
         * +--------+----+
         * |  Andy_1|1030|
         * |  Andy_2|1030|
         * |Justin_1|1019|
         * |Justin_2|1019|
         * |Justin_1|1019|
         * |Justin_2|1019|
         * |Justin_1|1032|
         * |Justin_2|1032|
         * +--------+----+
         */
        // mapPartition
        personDS.filter("age is not null").mapPartitions(line => {
            var result = scala.collection.mutable.ArrayBuffer[(String, Long)]()
            while (line.hasNext){
                var person = line.next()
                result += ((person.name,person.age + 1000))
            }
            result.iterator
        }).show()
        /**
         * +------+----+
         * |    _1|  _2|
         * +------+----+
         * |  Andy|1030|
         * |Justin|1019|
         * |Justin|1019|
         * |Justin|1032|
         * +------+----+
         */
    }
    /**
     * join:根据某一个字段,关联合并两个数据集
     * @param spark
     */
    def joinWith(spark : SparkSession): Unit ={
        import spark.implicits._
        val peopleFilePath = Comm.fileDirPath + "people.json"
        val employeeFilePath = Comm.fileDirPath + "employees.json"
        val personDS = spark.read.json(peopleFilePath).as[Person]
        val employeeDS = spark.read.json(employeeFilePath).as[Employee]
        personDS.joinWith(employeeDS,$"name" === $"name1").show()
        /**
         * +------------+---------------+
         * |          _1|             _2|
         * +------------+---------------+
         * | [, Michael]|[Michael, 3000]|
         * |  [30, Andy]|   [Andy, 4500]|
         * |[32, Justin]| [Justin, 3500]|
         * |[19, Justin]| [Justin, 3500]|
         * |[19, Justin]| [Justin, 3500]|
         * +------------+---------------+
         */
    }
    /**
     * sort:对数据表排序
     * @param spark
     */
    def sort(spark : SparkSession): Unit ={
        import spark.implicits._
        val employeeFilePath = Comm.fileDirPath + "employees.json"
        val employeeDS = spark.read.json(employeeFilePath).as[Employee]
        employeeDS.show()
        /**
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |Michael|  3000|
         * |   Andy|  4500|
         * | Justin|  3500|
         * |  Berta|  4000|
         * +-------+------+
         */
        employeeDS.sort($"salary".desc).show()
        /**
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |   Andy|  4500|
         * |  Berta|  4000|
         * | Justin|  3500|
         * |Michael|  3000|
         * +-------+------+
         */
        // 创建临时表,使用sql语句
        employeeDS.toDF().createOrReplaceTempView("employee")
        val result = spark.sql("select * from employee a where 1=1 order by salary desc")
        result.show()
        /**
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |   Andy|  4500|
         * |  Berta|  4000|
         * | Justin|  3500|
         * |Michael|  3000|
         * +-------+------+
         */
    }
    /**
     * randomSplit:根据Array中的元素的数量进行切分,然后再给定每个元素的值来保证对切分数据的权重
     * sample:根据我们设定的比例进行随机抽取
     * @param spark
     */
    def sampleAndRandomSplit(spark : SparkSession): Unit ={
        import spark.implicits._
        val employeeFilePath = Comm.fileDirPath + "employees.json"
        val employeeDS = spark.read.json(employeeFilePath).as[Employee]
        employeeDS.randomSplit(Array(2,5,1)).foreach(ds => ds.show())
        /**
         * +-----+------+
         * |name1|salary|
         * +-----+------+
         * |Berta|  4000|
         * +-----+------+
         *
         * +------+------+
         * | name1|salary|
         * +------+------+
         * |  Andy|  4500|
         * |Justin|  3500|
         * +------+------+
         *
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |Michael|  3000|
         * +-------+------+
         */
        employeeDS.sample(false,0.5).show()
        /**
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |Michael|  3000|
         * | Justin|  3500|
         * +-------+------+
         */
    }


二、DataSet中untype详解


案例中使用到的数据

employees.json

{"name1":"Michael", "salary":3000}
{"name1":"Andy", "salary":4500}
{"name1":"Justin", "salary":3500}
{"name1":"Berta", "salary":4000}

dept.json

{"id":"dept-1","deptName":"开发部"}
{"id":"dept-2","deptName":"市场部"}

案例代码:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/6
 * @time : 3:39 下午
 */
object UnTypeOperationScala {
    case class Employee(deptId:String,name:String, salary:Long)
    case class Dept(id:String,deptName:String)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        import spark.implicits._
        val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee]
        val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept]
        // join
        val joinDS = employeeDS.join(deptDS,$"deptId" === $"id")
        joinDS.show()
        /**
         * +------+-------+------+--------+------+
         * |deptId|   name|salary|deptName|    id|
         * +------+-------+------+--------+------+
         * |dept-1|Michael|  3000|  开发部|dept-1|
         * |dept-2|   Andy|  4500|  市场部|dept-2|
         * |dept-2| Justin|  3500|  市场部|dept-2|
         * |dept-1|  Berta|  4000|  开发部|dept-1|
         * +------+-------+------+--------+------+
         */
        // 使用算子
        joinDS.groupBy("deptId","deptName").sum("salary").select("deptId","deptName","sum(salary)").orderBy($"sum(salary)".desc).show()
        /**
         * +------+--------+-----------+
         * |deptId|deptName|sum(salary)|
         * +------+--------+-----------+
         * |dept-2|  市场部|       8000|
         * |dept-1|  开发部|       7000|
         * +------+--------+-----------+
         */
        // 创建临时表,使用sql语句
        joinDS.createOrReplaceTempView("deptSalary")
        val resultDF = spark.sql("select a.deptId, a.deptName ,sum(salary) as salary from deptSalary a group by a.deptId, a.deptName order by salary desc")
        resultDF.show()
        /**
         * +------+--------+------+
         * |deptId|deptName|salary|
         * +------+--------+------+
         * |dept-2|  市场部|  8000|
         * |dept-1|  开发部|  7000|
         * +------+--------+------+
         */
    }
}


三、DataSet中聚合函数详解


案例中使用到的数据

employees1.json

{"deptId":"dept-1", "name":"Michael", "salary":3000}
{"deptId":"dept-2", "name":"Andy", "salary":4500}
{"deptId":"dept-2", "name":"Justin", "salary":3500}
{"deptId":"dept-1", "name":"Berta", "salary":4000}

dept.json

{"id":"dept-1","deptName":"开发部"}
{"id":"dept-2","deptName":"市场部"}

案例代码:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/6
 * @time : 7:29 下午
 */
object AggOptionScala {
    case class Employee(deptId:String,name:String, salary:Long)
    case class Dept(id:String,deptName:String)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        agg1(spark)
        agg2(spark)
    }
    /**
     * avg
     * sum
     * max
     * min
     * count
     * @param spark
     */
    def agg1(spark : SparkSession): Unit ={
        import spark.implicits._
        val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee]
        val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept]
        // join
        val joinDS = employeeDS.join(deptDS,$"deptId" === $"id")
        joinDS.show()
        joinDS.groupBy("deptId","deptName").agg(avg("salary"),sum("salary"),max("salary"),min("salary"),count("name")).show()
        /**
         * +------+--------+-----------+-----------+-----------+-----------+-----------+
         * |deptId|deptName|avg(salary)|sum(salary)|max(salary)|min(salary)|count(name)|
         * +------+--------+-----------+-----------+-----------+-----------+-----------+
         * |dept-1|  开发部|     3500.0|       7000|       4000|       3000|          2|
         * |dept-2|  市场部|     4000.0|       8000|       4500|       3500|          2|
         * +------+--------+-----------+-----------+-----------+-----------+-----------+
         */
        joinDS.groupBy("deptId","deptName").agg("salary" -> "avg","salary" -> "sum","salary" -> "max","salary" -> "min","name" -> "count").show()
    }
    /**
     * collect_list(行转列)将一个分组内指定字段的值收集在一起,不会去重
     * collect_set(行转列)意思同上,唯一的区别就是它会去重
     * @param spark
     */
    def agg2(spark : SparkSession): Unit ={
        import spark.implicits._
        val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee]
        val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept]
        // join
        val joinDS = employeeDS.join(deptDS,$"deptId" === $"id")
        joinDS.show()
        joinDS.groupBy("deptId","deptName").agg(collect_list("name")).show()
        joinDS.groupBy("deptId","deptName").agg(collect_set("name")).show()
        /**
         * +------+--------+-----------------+
         * |deptId|deptName|collect_set(name)|
         * +------+--------+-----------------+
         * |dept-1|  开发部| [Michael, Berta]|
         * |dept-2|  市场部|   [Andy, Justin]|
         * +------+--------+-----------------+
         */
    }
}


四、DataSet中其他函数详解


案例中使用到的数据

employees1.json

{"deptId":"dept-1", "name":"Michael", "salary":3000}
{"deptId":"dept-2", "name":"Andy", "salary":4500}
{"deptId":"dept-2", "name":"Justin", "salary":3500}
{"deptId":"dept-1", "name":"Berta", "salary":4000}

dept.json

{"id":"dept-1","deptName":"开发部"}
{"id":"dept-2","deptName":"市场部"}

案例代码:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
import org.apache.spark.sql.functions._
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/6
 * @time : 8:13 下午
 */
object OtherOperationScala {
    case class Employee(deptId:String,name:String, salary:Long)
    case class Dept(id:String,deptName:String)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        import spark.implicits._
        val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee]
        val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept]
        // join
        val joinDS = employeeDS.join(deptDS,$"deptId" === $"id")
        joinDS.show()
        /**
         * 日期函数:current_date current_timestamp
         * 数学函数:round
         * 随机函数:rand
         * 字符串函数:concat
         */
        joinDS.select(current_date(),current_timestamp(),rand(),round($"salary",2),concat($"name",$"salary")).show()
        /**
         * +--------------+--------------------+-------------------------+----------------+--------------------+
         * |current_date()| current_timestamp()|rand(7003525560546994152)|round(salary, 2)|concat(name, salary)|
         * +--------------+--------------------+-------------------------+----------------+--------------------+
         * |    2020-12-06|2020-12-06 21:13:...|       0.8157198990085353|            3000|         Michael3000|
         * |    2020-12-06|2020-12-06 21:13:...|       0.8446053083722802|            4500|            Andy4500|
         * |    2020-12-06|2020-12-06 21:13:...|       0.5323697131017762|            3500|          Justin3500|
         * |    2020-12-06|2020-12-06 21:13:...|      0.10113425439292889|            4000|           Berta4000|
         * +--------------+--------------------+-------------------------+----------------+--------------------+
         */
    }
}


相关文章
|
1月前
|
SQL Oracle 关系型数据库
SQL优化-使用联合索引和函数索引
在一次例行巡检中,发现一条使用 `to_char` 函数将日期转换为字符串的 SQL 语句 CPU 利用率很高。为了优化该语句,首先分析了 where 条件中各列的选择性,并创建了不同类型的索引,包括普通索引、函数索引和虚拟列索引。通过对比不同索引的执行计划,最终确定了使用复合索引(包含函数表达式)能够显著降低查询成本,提高执行效率。
|
15天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
SQL 数据库 数据库管理
数据库SQL函数应用技巧与方法
在数据库管理中,SQL函数是处理和分析数据的强大工具
|
1月前
|
SQL 数据库 索引
SQL中COUNT函数结合条件使用的技巧与方法
在SQL查询中,COUNT函数是一个非常常用的聚合函数,用于计算表中满足特定条件的记录数
|
1月前
|
SQL 关系型数据库 MySQL
SQL日期函数
SQL日期函数
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
47 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
81 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
57 0
|
2月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
下一篇
无影云桌面