(4)SparkSQL中如何定义UDF和使用UDF

简介: Spark SQL中用户自定义函数,用法和Spark SQL中的内置函数类似;是saprk SQL中内置函数无法满足要求,用户根据业务需求自定义的函数。首先定义一个UDF函数:

Spark SQL中用户自定义函数,用法和Spark SQL中的内置函数类似;是saprk SQL中内置函数无法满足要求,用户根据业务需求自定义的函数。
首先定义一个UDF函数:

package com.udf;

import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import scala.collection.mutable.WrappedArray;


/**
 * Created by lj on 2022-07-25.
 */
public class TestUDF  implements UDF1<String, String> {
    @Override
    public String call(String s) throws Exception {
        return s+"_udf";
    }
}

使用UDF函数:

package com.examples;

import com.pojo.WaterSensor;
import com.udf.TestUDF;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * Created by lj on 2022-07-25.
 */
public class SparkSql_Socket_UDF  {
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String host = "localhost";
    private static int port = 9999;

    public static void main(String[] args) {
        //初始化sparkConf
        SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

        //获得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(3));

        /**
         * 设置日志的级别: 避免日志重复
         */
        ssc.sparkContext().setLogLevel("ERROR");

        //从socket源获取数据
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

        JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
            private static final long serialVersionUID = 1L;

            public WaterSensor call(String s) throws Exception {
                String[] cols = s.split(",");
                WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
                return waterSensor;
            }
        }).window(Durations.minutes(6), Durations.minutes(9));      //指定窗口大小 和 滑动频率 必须是批处理时间的整数倍

        mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
            @Override
            public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
                SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());

                spark.udf().register("TestUDF", new TestUDF(), DataTypes.StringType);

                Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
                // 创建临时表
                dataFrame.createOrReplaceTempView("log");
                Dataset<Row> result = spark.sql("select *,TestUDF(id) as udftest from log");
                System.out.println("========= " + time + "=========");
                //输出前20条数据
                result.show();
            }
        });


        //开始作业
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}

代码说明:
image.png

应用效果展示:
image.png

相关文章
|
Linux
Linux tmp目录自动清理总结
在Linux系统中/tmp文件夹下的文件是会被清理、删除的,文件清理的规则是如何设定的呢? 以Redhat为例,这个主要是因为作业里面会调用tmpwatch命令删除那些一段时间没有访问的文件。   那么什么是tmpwatch呢?其实tmpwatch是一个命令或者说是一个包。
3794 0
|
SQL Linux
Cannot connect to discovery server for announce: Announcement failed for http://hadoop102:8881
linux下启动Presto报错:Cannot connect to discovery server for announce: Announcement failed for http://hadoop102:8881
Cannot connect to discovery server for announce: Announcement failed for http://hadoop102:8881
|
11月前
|
SQL 监控 Java
Java性能优化:提升应用效率与响应速度的全面指南
【10月更文挑战第21】Java性能优化:提升应用效率与响应速度的全面指南
|
SQL 分布式计算 Spark
SPARK Expand问题的解决(由count distinct、group sets、cube、rollup引起的)
SPARK Expand问题的解决(由count distinct、group sets、cube、rollup引起的)
893 0
SPARK Expand问题的解决(由count distinct、group sets、cube、rollup引起的)
|
SQL 存储 分布式计算
Iceberg原理和项目使用技巧
Iceberg原理和项目使用技巧
1402 0
|
JavaScript
vue element plus Color 色彩
vue element plus Color 色彩
308 0
|
人工智能 弹性计算 缓存
带你读《弹性计算技术指导及场景应用》——2. 技术改变AI发展:RDMA能优化吗?GDR性能提升方案
带你读《弹性计算技术指导及场景应用》——2. 技术改变AI发展:RDMA能优化吗?GDR性能提升方案
476 1
|
Cloud Native 架构师 云计算
秒杀高并发场景实战 | 在线直播
秒杀等高并发场景下,同一时刻会有大量的用户请求到达服务器,在保证系统整体稳定的前提下,如何通过有限的服务器资源,尽可能快速处理更多的请求,是用户需要解决的核心问题。本次直播以典型的秒杀场景为例,为您深入剖析在高并发场景下,阿里云相关解决方案和实战经验。
7209 49
秒杀高并发场景实战 | 在线直播
|
机器学习/深度学习 算法 Python
【阿旭机器学习实战】【10】朴素贝叶斯模型原理及3种贝叶斯模型对比:高斯分布朴素贝叶斯、多项式分布朴素贝叶斯、伯努利分布朴素贝叶斯
【阿旭机器学习实战】【10】朴素贝叶斯模型原理及3种贝叶斯模型对比:高斯分布朴素贝叶斯、多项式分布朴素贝叶斯、伯努利分布朴素贝叶斯
【阿旭机器学习实战】【10】朴素贝叶斯模型原理及3种贝叶斯模型对比:高斯分布朴素贝叶斯、多项式分布朴素贝叶斯、伯努利分布朴素贝叶斯
|
程序员 开发工具 Windows
编程必备,程序员应该都知道的7款文本编辑器
正如一个作家需要一个文字处理器来写故事,一个艺术家需要画布来创作,同样的,如果想编程,你会需要一个地方来写代码。程序员在哪里编写代码?最常见的就是使用文本编辑器了吧。下文列出了 7 个主流的文本编辑器,不出意外的话,开发人员应该都有所了解,至少听说过。7款文本编辑器,总有一款会适合你。
9195 0