Spark SQL中自定义函数详解

简介: 笔记

一、自定义函数UDF


数据源:

{"deptName":"dept-1", "name":"Michael", "salary":3000}
{"deptName":"dept-2", "name":"Andy", "salary":5000}
{"deptName":"dept-1", "name":"Alex", "salary":4500}
{"deptName":"dept-2", "name":"Justin", "salary":6700}
{"deptName":"dept-2", "name":"Cherry", "salary":3400}
{"deptName":"dept-1", "name":"Jack", "salary":5500}
{"deptName":"dept-2", "name":"Jone", "salary":12000}
{"deptName":"dept-1", "name":"Lucy", "salary":8000}
{"deptName":"dept-2", "name":"LiLi", "salary":7600}
{"deptName":"dept-2", "name":"Pony", "salary":4200}

初始化SparkSession

package com.kfk.spark.common
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:02 下午
 */
object CommSparkSessionScala {
    def getSparkSession(): SparkSession ={
        val spark = SparkSession
                .builder
                .appName("CommSparkSessionScala")
                .master("local")
                .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate
        return spark
    }
}

自定义UDF将内容改为大写:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/8
 * @time : 4:46 下午
 */
object UDFScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        val userPath = Comm.fileDirPath + "users.json"
        spark.read.json(userPath).createOrReplaceTempView("user")
        // 自定义UDF将内容改为大写
        spark.udf.register("strUpper",(str:String) => str.toUpperCase)
        spark.sql("select deptName,strUpper(name) from user").show()
        /**
         * +--------+------------------+
         * |deptName|UDF:strUpper(name)|
         * +--------+------------------+
         * |  dept-1|           MICHAEL|
         * |  dept-2|              ANDY|
         * |  dept-1|              ALEX|
         * |  dept-2|            JUSTIN|
         * |  dept-2|            CHERRY|
         * |  dept-1|              JACK|
         * |  dept-2|              JONE|
         * |  dept-1|              LUCY|
         * |  dept-2|              LILI|
         * |  dept-2|              PONY|
         * +--------+------------------+
         */
    }
}


二、自定义聚合函数UDAF


package com.kfk.spark.sql
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, StringType, StructField, StructType}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/8
 * @time : 7:13 下午
 */
class MyCount extends UserDefinedAggregateFunction{
    /**
     * 指的是输入数据的类型
     * @return
     */
    override def inputSchema: StructType = {
         StructType(Array(StructField("str",StringType,true)))
    }
    /**
     * 中间进行聚合的时候所处理的数据类型
     * @return
     */
    override def bufferSchema: StructType = {
        StructType(Array(StructField("count",IntegerType,true)))
    }
    /**
     * 返回类型
     * @return
     */
    override def dataType: DataType = {
        IntegerType
    }
    /**
     * 校验返回值
     * @return
     */
    override def deterministic: Boolean = {
        true
    }
    /**
     * 为每个分组的数据执行初始化操作
     * @param buffer
     */
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0
    }
    /**
     * 每个分组有新的数据进来的时候,如何进行分组对应的聚合值的计算
     * @param buffer
     * @param input
     */
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getAs[Int](0) + 1
    }
    /**
     * 在每一个节点上的集合值要进行最后的merge
     * @param buffer1
     * @param buffer2
     */
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
    }
    /**
     * 返回最终结果
     * @param buffer
     * @return
     */
    override def evaluate(buffer: Row): Any = {
        buffer.getAs(0)
    }
}

自定义聚合函数UDAF,count

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/8
 * @time : 8:06 下午
 */
object UDAFScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        val userPath = Comm.fileDirPath + "users.json"
        spark.read.json(userPath).createOrReplaceTempView("user")
        // 自定义UDAF,聚合函数
        spark.udf.register("count",new MyCount)
        spark.sql("select deptName,count(deptName) count from user group by deptName").show()
        /**
         * +--------+-----+
         * |deptName|count|
         * +--------+-----+
         * |  dept-1|    4|
         * |  dept-2|    6|
         * +--------+-----+
         */
    }
}


相关文章
|
12天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
45 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
79 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
54 0
|
SQL 消息中间件 分布式计算
通过Spark SQL实时归档SLS数据
我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。
2563 0
|
2月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
4月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
115 13
|
4月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
4月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
61 6
下一篇
无影云桌面