准备环境
1. 安装JDK
建议安装JDK 1.8以上版本
2. 安装开发工具
本文档测试使用IDEA工具,具体安装方式可以参考在线资料。
开发流程
1. 创建项目
可以直接导入项目代码 ASI_UDX_Demo 到本地,本篇以下面为例测试整个流程
packageASI_UDF; importorg.apache.flink.table.functions.ScalarFunction; publicclassASI_UDFextendsScalarFunction { publicStringeval(Strings, Integerbegin, Integerend) { returns.substring(begin, end); } }
2. 开发UDF
packageASI_UDF; importorg.apache.flink.table.functions.ScalarFunction; publicclassASI_UDFextendsScalarFunction { publicStringeval(Strings, Integerbegin, Integerend) { returns.substring(begin, end); } }
3. 本地测试
创建测试类在本地测试,并验证结果符合预期,如下所示:
//<dependency>//<groupId>junit</groupId>//<artifactId>junit</artifactId>//<version>4.12</version>//</dependency>publicclassTEST_ASI { publicvoidtestUdf() { ASI_UDFudf=newASI_UDF(); System.out.println(udf.eval("HELLO",1,2)); } }
4. 本地打包
双击执行Maven命令打jar包,如下所示:
打包后这里可以看到对应jar包,如下所示:
5. 上传资源
📎ASI_UDX-TEST-1.0-SNAPSHOT.jar
6. 验证UDF
CREATE TEMPORARY TABLE ASI_UDF_Source ( a VARCHAR, b INT, c INT) WITH ('connector'='datagen');CREATE TEMPORARY TABLE ASI_UDF_Sink ( a VARCHAR) WITH ('connector'='blackhole');INSERTINTO ASI_UDF_Sink SELECT ASI_UDF(a,2,4)FROM ASI_UDF_Source;
上线查询输出结果, 在作业运维页面,单击目标作业名称操作列的启动。启动成功后,ASI_UDF_Sink表每行会被插入ASI_UDF_Source表中a字段每行字符串的第2~4位字符。