Spark SQL中DataSet函数操作

简介: 笔记

一、DataSet中常见函数详解


(1)重分区函数:coalesce / repartition


coalesce:只能用于减少分区的数据,而且可以选择不发生shuffle。

repartition:可以增加分区的数据,也可以减少分区的数据,必须会发生shuffle,相当于进行了一次重新分区操作。

(2)去重函数:distinct / dropDuplicates


distinct:是根据每一条数据进行完整内容的对比和去重。

dropDuplicates:可以根据指定的字段进行去重。

(3)过滤函数:filter / except / intersect


filter:根据我们自己的业务逻辑,如果返回true,就保留该元素,如果是false,就不保留该元素。

except:获取在当前的dataset中有,但是在另一个dataset中没有的元素。

intersect:获取两个dataset中的交集。

(4)Map函数:map / flatMap / mapPartition


map:将数据集中的每一条数据做一个映射,返回一条映射。

flatMap:数据集中的每条数据都可以返回多条数据。

mapPartition:一次性对一个partition中的数据进行处理。

(5)Join函数:join


join:根据某一个字段,关联合并两个数据集。

(6)Sort函数:sort


sort:对数据表排序。

(7)随机切分函数:sample / randomSplit


randomSplit:根据Array中的元素的数量进行切分,然后再给定每个元素的值来保证对切分数据的权重。

sample:根据我们设定的比例进行随机抽取。

示例代码及运行演示:

初始化SparkSession

package com.kfk.spark.common
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:02 下午
 */
object CommSparkSessionScala {
    def getSparkSession(): SparkSession ={
        val spark = SparkSession
                .builder
                .appName("CommSparkSessionScala")
                .master("local")
                .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate
        return spark
    }
}

案例中用到的数据

people.json

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"Justin", "age":19}
{"name":"Justin", "age":32}

people1.json

{"name":"Justin", "age":32}
package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/4
 * @time : 2:45 下午
 */
object TypeOperationScala {
    // case class Person(name:String, age:Option[Long])
    case class Person(name:String, age:Long)
    case class Employee(name1:String, salary:Long)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        /**
         * coalesce / repartition
         */
        coalesceAndRepartitions(spark)
        /**
         * distinct / dropDuplicates
         */
        distinctAndDropDuplicates(spark)
        /**
         * filter / except / intersect
         */
        filterAndExceptAndIntersect(spark)
        /**
         * map / flatMap / mapPartition
         */
        mapAndFlatMap(spark)
        /**
         * join
         */
        joinWith(spark)
        /**
         * sort
         */
        sort(spark)
        /**
         * sample / randomSplit
         */
        sampleAndRandomSplit(spark)
    }
    /**
     * coalesce 和 repartition 操作
     * 都是用来进行重分区的
     * 区别在于:coalesce只能用于减少分区的数据,而且可以选择不发生shuffle
     * repartition可以增加分区的数据,也可以减少分区的数据,必须会发生shuffle,相当于进行了一次重新分区操作
     * @param spark
     */
    def coalesceAndRepartitions(spark : SparkSession): Unit ={
        val personDF = spark.read.json(Comm.fileDirPath + "people.json")
        // 1
        println(personDF.rdd.partitions.size)
        // repartition
        val personDFRepartition = personDF.repartition(4)
        // 4
        println(personDFRepartition.rdd.partitions.size)
        // coalesce
        val personDFCoalesce = personDFRepartition.coalesce(3)
        // 3
        println(personDFCoalesce.rdd.partitions.size)
    }
    /**
     * distinct:是根据每一条数据进行完整内容的对比和去重
     * DropDuplicates:可以根据指定的字段进行去重
     * @param spark
     */
    def distinctAndDropDuplicates(spark : SparkSession): Unit ={
        val personDF = spark.read.json(Comm.fileDirPath + "people.json")
        personDF.show()
        /**
         * +----+-------+
         * | age|   name|
         * +----+-------+
         * |null|Michael|
         * |  30|   Andy|
         * |  19| Justin|
         * |  19| Justin|
         * |  32| Justin|
         * +----+-------+
         */
        // distinct
        personDF.distinct().show()
        /**
         * +----+-------+
         * | age|   name|
         * +----+-------+
         * |null|Michael|
         * |  30|   Andy|
         * |  19| Justin|
         * |  32| Justin|
         * +----+-------+
         */
        // dropDuplicates
        personDF.dropDuplicates(Seq("name")).show()
        /**
         * +----+-------+
         * | age|   name|
         * +----+-------+
         * |null|Michael|
         * |  30|   Andy|
         * |  19| Justin|
         * +----+-------+
         */
    }
    /**
     * filter:根据我们自己的业务逻辑,如果返回true,就保留该元素,如果是false,就不保留该元素
     * except:获取在当前的dataset中有,但是在另一个dataset中没有的元素
     * intersect:获取两个dataset中的交集
     * @param spark
     */
    def filterAndExceptAndIntersect(spark : SparkSession): Unit ={
        import spark.implicits._
        val personDS = spark.read.json(Comm.fileDirPath + "people.json").as[Person]
        val personDS1 = spark.read.json(Comm.fileDirPath + "people1.json").as[Person]
        // filter方式一
        personDS.filter(line => line.age != None).show()
        /**
         * +---+------+
         * |age|  name|
         * +---+------+
         * | 30|  Andy|
         * | 19|Justin|
         * | 19|Justin|
         * | 32|Justin|
         * +---+------+
         */
        // filter方式二
        val personDF = spark.read.json(Comm.fileDirPath + "people.json")
        val rdd = personDF.rdd.filter(line => line.getAs("age") != null)
        for (elem <- rdd.collect()) {
            System.out.println(elem)
        }
        /**
         * [30,Andy]
         * [19,Justin]
         * [19,Justin]
         * [32,Justin]
         */
        // except
        personDS.except(personDS1).show()
        /**
         * +----+-------+
         * | age|   name|
         * +----+-------+
         * |null|Michael|
         * |  30|   Andy|
         * |  19| Justin|
         * +----+-------+
         */
        // intersect
        personDS.intersect(personDS1).show()
        /**
         * +---+------+
         * |age|  name|
         * +---+------+
         * | 32|Justin|
         * +---+------+
         */
    }
    /**
     * map:将数据集中的每一条数据做一个映射,返回一条映射
     * flatMap:数据集中的每条数据都可以返回多条数据
     * mapPartition:一次性对一个partition中的数据进行处理
     * @param spark
     */
    def mapAndFlatMap(spark : SparkSession): Unit ={
        import spark.implicits._
        val personDS = spark.read.json(Comm.fileDirPath + "people.json").as[Person]
        // map
        personDS.filter("age is not null").map(line => (line.name,line.age+10)).show()
        /**
         * +------+---+
         * |    _1| _2|
         * +------+---+
         * |  Andy| 40|
         * |Justin| 29|
         * |Justin| 29|
         * |Justin| 42|
         * +------+---+
         */
        // flatMap
        personDS.filter("age is not null").flatMap(line => {
            Seq(Person(line.name + "_1",line.age + 1000),Person(line.name + "_2",line.age + 1000))
        }).show()
        /**
         * +--------+----+
         * |    name| age|
         * +--------+----+
         * |  Andy_1|1030|
         * |  Andy_2|1030|
         * |Justin_1|1019|
         * |Justin_2|1019|
         * |Justin_1|1019|
         * |Justin_2|1019|
         * |Justin_1|1032|
         * |Justin_2|1032|
         * +--------+----+
         */
        // mapPartition
        personDS.filter("age is not null").mapPartitions(line => {
            var result = scala.collection.mutable.ArrayBuffer[(String, Long)]()
            while (line.hasNext){
                var person = line.next()
                result += ((person.name,person.age + 1000))
            }
            result.iterator
        }).show()
        /**
         * +------+----+
         * |    _1|  _2|
         * +------+----+
         * |  Andy|1030|
         * |Justin|1019|
         * |Justin|1019|
         * |Justin|1032|
         * +------+----+
         */
    }
    /**
     * join:根据某一个字段,关联合并两个数据集
     * @param spark
     */
    def joinWith(spark : SparkSession): Unit ={
        import spark.implicits._
        val peopleFilePath = Comm.fileDirPath + "people.json"
        val employeeFilePath = Comm.fileDirPath + "employees.json"
        val personDS = spark.read.json(peopleFilePath).as[Person]
        val employeeDS = spark.read.json(employeeFilePath).as[Employee]
        personDS.joinWith(employeeDS,$"name" === $"name1").show()
        /**
         * +------------+---------------+
         * |          _1|             _2|
         * +------------+---------------+
         * | [, Michael]|[Michael, 3000]|
         * |  [30, Andy]|   [Andy, 4500]|
         * |[32, Justin]| [Justin, 3500]|
         * |[19, Justin]| [Justin, 3500]|
         * |[19, Justin]| [Justin, 3500]|
         * +------------+---------------+
         */
    }
    /**
     * sort:对数据表排序
     * @param spark
     */
    def sort(spark : SparkSession): Unit ={
        import spark.implicits._
        val employeeFilePath = Comm.fileDirPath + "employees.json"
        val employeeDS = spark.read.json(employeeFilePath).as[Employee]
        employeeDS.show()
        /**
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |Michael|  3000|
         * |   Andy|  4500|
         * | Justin|  3500|
         * |  Berta|  4000|
         * +-------+------+
         */
        employeeDS.sort($"salary".desc).show()
        /**
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |   Andy|  4500|
         * |  Berta|  4000|
         * | Justin|  3500|
         * |Michael|  3000|
         * +-------+------+
         */
        // 创建临时表,使用sql语句
        employeeDS.toDF().createOrReplaceTempView("employee")
        val result = spark.sql("select * from employee a where 1=1 order by salary desc")
        result.show()
        /**
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |   Andy|  4500|
         * |  Berta|  4000|
         * | Justin|  3500|
         * |Michael|  3000|
         * +-------+------+
         */
    }
    /**
     * randomSplit:根据Array中的元素的数量进行切分,然后再给定每个元素的值来保证对切分数据的权重
     * sample:根据我们设定的比例进行随机抽取
     * @param spark
     */
    def sampleAndRandomSplit(spark : SparkSession): Unit ={
        import spark.implicits._
        val employeeFilePath = Comm.fileDirPath + "employees.json"
        val employeeDS = spark.read.json(employeeFilePath).as[Employee]
        employeeDS.randomSplit(Array(2,5,1)).foreach(ds => ds.show())
        /**
         * +-----+------+
         * |name1|salary|
         * +-----+------+
         * |Berta|  4000|
         * +-----+------+
         *
         * +------+------+
         * | name1|salary|
         * +------+------+
         * |  Andy|  4500|
         * |Justin|  3500|
         * +------+------+
         *
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |Michael|  3000|
         * +-------+------+
         */
        employeeDS.sample(false,0.5).show()
        /**
         * +-------+------+
         * |  name1|salary|
         * +-------+------+
         * |Michael|  3000|
         * | Justin|  3500|
         * +-------+------+
         */
    }


二、DataSet中untype详解


案例中使用到的数据

employees.json

{"name1":"Michael", "salary":3000}
{"name1":"Andy", "salary":4500}
{"name1":"Justin", "salary":3500}
{"name1":"Berta", "salary":4000}

dept.json

{"id":"dept-1","deptName":"开发部"}
{"id":"dept-2","deptName":"市场部"}

案例代码:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/6
 * @time : 3:39 下午
 */
object UnTypeOperationScala {
    case class Employee(deptId:String,name:String, salary:Long)
    case class Dept(id:String,deptName:String)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        import spark.implicits._
        val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee]
        val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept]
        // join
        val joinDS = employeeDS.join(deptDS,$"deptId" === $"id")
        joinDS.show()
        /**
         * +------+-------+------+--------+------+
         * |deptId|   name|salary|deptName|    id|
         * +------+-------+------+--------+------+
         * |dept-1|Michael|  3000|  开发部|dept-1|
         * |dept-2|   Andy|  4500|  市场部|dept-2|
         * |dept-2| Justin|  3500|  市场部|dept-2|
         * |dept-1|  Berta|  4000|  开发部|dept-1|
         * +------+-------+------+--------+------+
         */
        // 使用算子
        joinDS.groupBy("deptId","deptName").sum("salary").select("deptId","deptName","sum(salary)").orderBy($"sum(salary)".desc).show()
        /**
         * +------+--------+-----------+
         * |deptId|deptName|sum(salary)|
         * +------+--------+-----------+
         * |dept-2|  市场部|       8000|
         * |dept-1|  开发部|       7000|
         * +------+--------+-----------+
         */
        // 创建临时表,使用sql语句
        joinDS.createOrReplaceTempView("deptSalary")
        val resultDF = spark.sql("select a.deptId, a.deptName ,sum(salary) as salary from deptSalary a group by a.deptId, a.deptName order by salary desc")
        resultDF.show()
        /**
         * +------+--------+------+
         * |deptId|deptName|salary|
         * +------+--------+------+
         * |dept-2|  市场部|  8000|
         * |dept-1|  开发部|  7000|
         * +------+--------+------+
         */
    }
}


三、DataSet中聚合函数详解


案例中使用到的数据

employees1.json

{"deptId":"dept-1", "name":"Michael", "salary":3000}
{"deptId":"dept-2", "name":"Andy", "salary":4500}
{"deptId":"dept-2", "name":"Justin", "salary":3500}
{"deptId":"dept-1", "name":"Berta", "salary":4000}

dept.json

{"id":"dept-1","deptName":"开发部"}
{"id":"dept-2","deptName":"市场部"}

案例代码:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/6
 * @time : 7:29 下午
 */
object AggOptionScala {
    case class Employee(deptId:String,name:String, salary:Long)
    case class Dept(id:String,deptName:String)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        agg1(spark)
        agg2(spark)
    }
    /**
     * avg
     * sum
     * max
     * min
     * count
     * @param spark
     */
    def agg1(spark : SparkSession): Unit ={
        import spark.implicits._
        val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee]
        val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept]
        // join
        val joinDS = employeeDS.join(deptDS,$"deptId" === $"id")
        joinDS.show()
        joinDS.groupBy("deptId","deptName").agg(avg("salary"),sum("salary"),max("salary"),min("salary"),count("name")).show()
        /**
         * +------+--------+-----------+-----------+-----------+-----------+-----------+
         * |deptId|deptName|avg(salary)|sum(salary)|max(salary)|min(salary)|count(name)|
         * +------+--------+-----------+-----------+-----------+-----------+-----------+
         * |dept-1|  开发部|     3500.0|       7000|       4000|       3000|          2|
         * |dept-2|  市场部|     4000.0|       8000|       4500|       3500|          2|
         * +------+--------+-----------+-----------+-----------+-----------+-----------+
         */
        joinDS.groupBy("deptId","deptName").agg("salary" -> "avg","salary" -> "sum","salary" -> "max","salary" -> "min","name" -> "count").show()
    }
    /**
     * collect_list(行转列)将一个分组内指定字段的值收集在一起,不会去重
     * collect_set(行转列)意思同上,唯一的区别就是它会去重
     * @param spark
     */
    def agg2(spark : SparkSession): Unit ={
        import spark.implicits._
        val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee]
        val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept]
        // join
        val joinDS = employeeDS.join(deptDS,$"deptId" === $"id")
        joinDS.show()
        joinDS.groupBy("deptId","deptName").agg(collect_list("name")).show()
        joinDS.groupBy("deptId","deptName").agg(collect_set("name")).show()
        /**
         * +------+--------+-----------------+
         * |deptId|deptName|collect_set(name)|
         * +------+--------+-----------------+
         * |dept-1|  开发部| [Michael, Berta]|
         * |dept-2|  市场部|   [Andy, Justin]|
         * +------+--------+-----------------+
         */
    }
}


四、DataSet中其他函数详解


案例中使用到的数据

employees1.json

{"deptId":"dept-1", "name":"Michael", "salary":3000}
{"deptId":"dept-2", "name":"Andy", "salary":4500}
{"deptId":"dept-2", "name":"Justin", "salary":3500}
{"deptId":"dept-1", "name":"Berta", "salary":4000}

dept.json

{"id":"dept-1","deptName":"开发部"}
{"id":"dept-2","deptName":"市场部"}

案例代码:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
import org.apache.spark.sql.functions._
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/6
 * @time : 8:13 下午
 */
object OtherOperationScala {
    case class Employee(deptId:String,name:String, salary:Long)
    case class Dept(id:String,deptName:String)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        import spark.implicits._
        val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee]
        val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept]
        // join
        val joinDS = employeeDS.join(deptDS,$"deptId" === $"id")
        joinDS.show()
        /**
         * 日期函数:current_date current_timestamp
         * 数学函数:round
         * 随机函数:rand
         * 字符串函数:concat
         */
        joinDS.select(current_date(),current_timestamp(),rand(),round($"salary",2),concat($"name",$"salary")).show()
        /**
         * +--------------+--------------------+-------------------------+----------------+--------------------+
         * |current_date()| current_timestamp()|rand(7003525560546994152)|round(salary, 2)|concat(name, salary)|
         * +--------------+--------------------+-------------------------+----------------+--------------------+
         * |    2020-12-06|2020-12-06 21:13:...|       0.8157198990085353|            3000|         Michael3000|
         * |    2020-12-06|2020-12-06 21:13:...|       0.8446053083722802|            4500|            Andy4500|
         * |    2020-12-06|2020-12-06 21:13:...|       0.5323697131017762|            3500|          Justin3500|
         * |    2020-12-06|2020-12-06 21:13:...|      0.10113425439292889|            4000|           Berta4000|
         * +--------------+--------------------+-------------------------+----------------+--------------------+
         */
    }
}


相关文章
|
1月前
|
SQL Oracle 关系型数据库
SQL优化-使用联合索引和函数索引
在一次例行巡检中,发现一条使用 `to_char` 函数将日期转换为字符串的 SQL 语句 CPU 利用率很高。为了优化该语句,首先分析了 where 条件中各列的选择性,并创建了不同类型的索引,包括普通索引、函数索引和虚拟列索引。通过对比不同索引的执行计划,最终确定了使用复合索引(包含函数表达式)能够显著降低查询成本,提高执行效率。
|
12天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
SQL 数据库 数据库管理
数据库SQL函数应用技巧与方法
在数据库管理中,SQL函数是处理和分析数据的强大工具
|
1月前
|
SQL 数据库 索引
SQL中COUNT函数结合条件使用的技巧与方法
在SQL查询中,COUNT函数是一个非常常用的聚合函数,用于计算表中满足特定条件的记录数
|
1月前
|
SQL 关系型数据库 MySQL
SQL日期函数
SQL日期函数
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
45 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
79 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
分布式计算 Spark
Spark2.4.0 Dataset head 源码分析
Dataset 如何转成RDD触发作业运行 Dataset head 是如何读到HDFS上文件前n行数据 head源码分析
10235 0
|
21天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
56 2
ClickHouse与大数据生态集成:Spark & Flink 实战
下一篇
无影云桌面