Flink on zeppelin第三弹UDF的使用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 用户自定义函数是非常重要的一个特征,因为它极大地扩展了查询的表达能力。不管是在spark sql或者hive中都支持用户自定义UDF的使用,虽然Flink提供了很多内置的UDF可以直接使用,但是一些特定的场景可能需要我们自定义UDF去实现,这篇文章会主要介绍一下在Flink on zeppelin里面使用UDF的3种方法.1, 开发scala代码并注册UDF

用户自定义函数是非常重要的一个特征,因为它极大地扩展了查询的表达能力。不管是在spark sql或者hive中都支持用户自定义UDF的使用,虽然Flink提供了很多内置的UDF可以直接使用,但是一些特定的场景可能需要我们自定义UDF去实现,这篇文章会主要介绍一下在Flink on zeppelin里面使用UDF的3种方法.


1, 开发scala代码并注册UDF


因为目前zeppelin支持Scala,Python,SQL 三种语言,还不支持java.所以可以编写scala或者Python代码来开发UDF.具体的实现和在代码里面是一样的.


%flink
 class MyScalaUdfDemo extends ScalarFunction {
            def eval(str: String) = str.toUpperCase
        }


btenv.createTemporarySystemFunction("MyScalaUdfDemo",new MyScalaUdfDemo)

上面实现了一个非常简单的UDF,把字符串转为大写,这里用的是Flink1.11.0新的方法createTemporarySystemFunction ,registerFunction 方法已经被标记为废弃状态了.


2, flink.execution.jars加载指定的包


flink.execution.jars 所有的jar包都会load到flink interpreter的classpath里,而且会被发送到Task Manager。这个配置主要是用来指定你的flink job所依赖的普通jar包.这种方式用起来很麻烦,因为你需要提前知道你有哪些UDF,还要知道UDF的包名,类名,如果你有多个UDF的话,还需要一个一个注册,使用起来很不方便.


package flink.udf
import org.apache.flink.table.functions.ScalarFunction
class MyScalaUdfDemo extends ScalarFunction {
    def eval(str: String) = str.toUpperCase
}


然后把项目打包上传到服务器上,这里需要注意的是打包的时候不要别的依赖打进去否则可能会遇到jar包冲突的问题.


在zeppelin里面加载jar包,然后注册.


%flink.conf
// 刚才打包上传到服务器的路径
flink.execution.jars /home/jason/bigdata/jar/flink-1.10.0-2020-07-22.jar
%flink
// 需要传入包名.类名
btenv.createTemporarySystemFunction("MyScalaUdfDemo1",new flink.udf.MyScalaUdfDemo)


这个感觉比第一种方式还麻烦,如果UDF比较多的情况下,前面两种方法都比较麻烦,需要一个个去注册,不要着急,接着看第三种方式.


3,flink.udf.jars自动注册所有UDF


%flink.conf
flink.udf.jars /home/jason/bigdata/jar/flink-1.10.0-2020-07-22.jar


flink.udf.jars  这个配置和flink.execution.jars非常像, 不同的地方在于Zeppelin会检测这些jar包中所包含的UDF class,而且会把他们注册到TableEnvironment中。UDF的名字就是这个class name。


上面的三种方法都可以实现注册自定义UDF,推荐使用第三种方法,如果UDF比较多的时候会自动注册,但是打包的时候需要注意jar包冲突问题.注册完了,那有没有成功了,下面来检测一下.


%flink.ssql(type=update)
show functions



我这里用的是第三种方式,在代码里面写了3个UDF,两个java写的2,一个scala写的,虽然在zeppelin里面不支持java语言,但是可以用java编写UDF加载到Flink集群里面使用.


最后再来测试一下这些UDF能不能用呢? 我就随便拿一个测试了.


网络异常,图片无法展示
|


创建一个kafka的表,然后查询一下,看下打印的结果.


网络异常,图片无法展示
|


可以看到字符串都变成大写的了,如果不想这么麻烦的话,直接执行下面的语句也可以测试,非常的方便.



你可能会发现最近的几篇文章出现了flink.execution.jars, flink.udf.jars


flink.execution.packages 他们都是用来添加第三方依赖包的,这三个的区别是什么呢? 最后再来总结一下:


flink.execution.jars 所有的jar包都会load到flink interpreter的classpath里,而且会被发送到Task Manager。这个配置主要是用来指定你的flink job所依赖的普通jar包.


flink.udf.jars  这个配置和flink.execution.jars非常像, 不同的地方在于Zeppelin会检测这些jar包中所包含的UDF class,而且会把他们注册到TableEnvironment中。UDF的名字就是这个class name。


flink.execution.packages  这个配置也类似flink.execution.jars,但它不是用来指定jar包,而是用来指定package的。Zeppelin会下载这个package以及这个package的依赖,并且放到flink interpreter的classpath上。比如你想使用kafka connector,那么你需要如下配置 flink.exection.packages成下面的样子

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
关系型数据库 MySQL Java
实时计算 Flink版操作报错合集之udf是怎么定义接收和返回的数据类型的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
8月前
|
SQL Java 流计算
Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
【1月更文挑战第1天】【1月更文挑战第2篇】Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
354 0
|
8月前
|
关系型数据库 Java 分布式数据库
实时计算 Flink版操作报错合集之在使用 Python UDF 时遇到 requests 包的导入问题,提示 OpenSSL 版本不兼容如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
137 5
|
8月前
|
SQL Java 数据处理
实时计算 Flink版产品使用合集之怎么热加载Java和Python的UDF
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
105 1
|
8月前
|
SQL Java 数据处理
实时计算 Flink版产品使用合集之如何动态加载Java udf的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
消息中间件 SQL Java
Flink报错问题之调用udf时报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
8月前
|
SQL 消息中间件 Apache
Flink报错问题之使用hive udf函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
运维 Java 开发工具
Flink-Java自建UDF使用案例
Flink-Java自建UDF使用案例
|
SQL 前端开发 Java
迄今为止最好用的Flink SQL教程:Flink SQL Cookbook on Zeppelin
无需写任何代码,只要照着这篇文章轻松几步就能跑各种类型的 Flink SQL 语句。
迄今为止最好用的Flink SQL教程:Flink SQL Cookbook on Zeppelin
|
SQL 消息中间件 Java
【Flink-API】Table API & SQL 以及自定义UDF函数
【Flink-API】Table API & SQL 以及自定义UDF函数
256 0
【Flink-API】Table API & SQL 以及自定义UDF函数