Spark SQL中自定义函数详解

简介: 笔记

一、自定义函数UDF


数据源:

{"deptName":"dept-1", "name":"Michael", "salary":3000}
{"deptName":"dept-2", "name":"Andy", "salary":5000}
{"deptName":"dept-1", "name":"Alex", "salary":4500}
{"deptName":"dept-2", "name":"Justin", "salary":6700}
{"deptName":"dept-2", "name":"Cherry", "salary":3400}
{"deptName":"dept-1", "name":"Jack", "salary":5500}
{"deptName":"dept-2", "name":"Jone", "salary":12000}
{"deptName":"dept-1", "name":"Lucy", "salary":8000}
{"deptName":"dept-2", "name":"LiLi", "salary":7600}
{"deptName":"dept-2", "name":"Pony", "salary":4200}

初始化SparkSession

package com.kfk.spark.common
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/2
 * @time : 10:02 下午
 */
object CommSparkSessionScala {
    def getSparkSession(): SparkSession ={
        val spark = SparkSession
                .builder
                .appName("CommSparkSessionScala")
                .master("local")
                .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
                .getOrCreate
        return spark
    }
}

自定义UDF将内容改为大写:

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/8
 * @time : 4:46 下午
 */
object UDFScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        val userPath = Comm.fileDirPath + "users.json"
        spark.read.json(userPath).createOrReplaceTempView("user")
        // 自定义UDF将内容改为大写
        spark.udf.register("strUpper",(str:String) => str.toUpperCase)
        spark.sql("select deptName,strUpper(name) from user").show()
        /**
         * +--------+------------------+
         * |deptName|UDF:strUpper(name)|
         * +--------+------------------+
         * |  dept-1|           MICHAEL|
         * |  dept-2|              ANDY|
         * |  dept-1|              ALEX|
         * |  dept-2|            JUSTIN|
         * |  dept-2|            CHERRY|
         * |  dept-1|              JACK|
         * |  dept-2|              JONE|
         * |  dept-1|              LUCY|
         * |  dept-2|              LILI|
         * |  dept-2|              PONY|
         * +--------+------------------+
         */
    }
}


二、自定义聚合函数UDAF


package com.kfk.spark.sql
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, StringType, StructField, StructType}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/8
 * @time : 7:13 下午
 */
class MyCount extends UserDefinedAggregateFunction{
    /**
     * 指的是输入数据的类型
     * @return
     */
    override def inputSchema: StructType = {
         StructType(Array(StructField("str",StringType,true)))
    }
    /**
     * 中间进行聚合的时候所处理的数据类型
     * @return
     */
    override def bufferSchema: StructType = {
        StructType(Array(StructField("count",IntegerType,true)))
    }
    /**
     * 返回类型
     * @return
     */
    override def dataType: DataType = {
        IntegerType
    }
    /**
     * 校验返回值
     * @return
     */
    override def deterministic: Boolean = {
        true
    }
    /**
     * 为每个分组的数据执行初始化操作
     * @param buffer
     */
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0
    }
    /**
     * 每个分组有新的数据进来的时候,如何进行分组对应的聚合值的计算
     * @param buffer
     * @param input
     */
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer(0) = buffer.getAs[Int](0) + 1
    }
    /**
     * 在每一个节点上的集合值要进行最后的merge
     * @param buffer1
     * @param buffer2
     */
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
    }
    /**
     * 返回最终结果
     * @param buffer
     * @return
     */
    override def evaluate(buffer: Row): Any = {
        buffer.getAs(0)
    }
}

自定义聚合函数UDAF,count

package com.kfk.spark.sql
import com.kfk.spark.common.{Comm, CommSparkSessionScala}
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/8
 * @time : 8:06 下午
 */
object UDAFScala {
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession()
        val userPath = Comm.fileDirPath + "users.json"
        spark.read.json(userPath).createOrReplaceTempView("user")
        // 自定义UDAF,聚合函数
        spark.udf.register("count",new MyCount)
        spark.sql("select deptName,count(deptName) count from user group by deptName").show()
        /**
         * +--------+-----+
         * |deptName|count|
         * +--------+-----+
         * |  dept-1|    4|
         * |  dept-2|    6|
         * +--------+-----+
         */
    }
}


相关文章
|
4天前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
4月前
|
SQL Java 流计算
Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
【1月更文挑战第1天】【1月更文挑战第2篇】Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
97 0
|
4月前
|
存储 SQL 分布式计算
性能优化:Spark SQL中的谓词下推和列式存储
性能优化:Spark SQL中的谓词下推和列式存储
|
4月前
|
SQL 分布式计算 测试技术
使用UDF扩展Spark SQL
使用UDF扩展Spark SQL
|
4月前
|
SQL 数据采集 分布式计算
Spark SQL中的聚合与窗口函数
Spark SQL中的聚合与窗口函数
|
4月前
|
SQL JSON 分布式计算
Spark SQL简介与基本用法
Spark SQL简介与基本用法
|
4月前
|
SQL 分布式计算 数据处理
Spark的生态系统概览:Spark SQL、Spark Streaming
Spark的生态系统概览:Spark SQL、Spark Streaming
|
5月前
|
SQL 分布式计算 Java
Spark 基础教程:wordcount+Spark SQL
Spark 基础教程:wordcount+Spark SQL
34 0
|
SQL 消息中间件 分布式计算
通过Spark SQL实时归档SLS数据
我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。
2502 0
|
1天前
|
SQL Windows
安装SQL Server 2005时出现对性能监视器计数器注册表值执行系统配置检查失败的解决办法...
安装SQL Server 2005时出现对性能监视器计数器注册表值执行系统配置检查失败的解决办法...