MaxCompute 的 UDF 包括:UDF,UDAF,UDTF 三种函数。通常情况下,此三种函数被统称为 UDF。如果您使用 Maven,可以从
Maven 库 中搜索 odps-sdk-udf 获取不同版本的 Java SDK。
相关配置信息如下所示:
- <dependency>
- <groupId>com.aliyun.odps</groupId>
- <artifactId>odps-sdk-udf</artifactId>
- <version>0.20.7-public</version>
- </dependency>
注意:
- groupId,artifactId,version 信息请以在 Maven 库中查询到的信息为准 。
UDF 目前只支持 Java 语言接口,如果您想编写 UDF 程序,可以通过 添加资源 的方式将 UDF 代码上传到项目空间中,使用 注册函数 语句创建 UDF。
本章节中会分别给出 UDF,UDAF,UDTF 的代码示例,运行 UDF 的示例请参见 UDF 开发插件介绍。
Java 和 MaxCompute 的数据类型对应关系,请参见 参数与返回值类型。
UDF 示例
下面将为您介绍实现字符小写转换功能的 UDF 完整流程。
操作步骤
编写代码。
按照 MaxCompute UDF 框架的规定,实现函数功能,并进行编译。代码实现,示例如下:package org.alidata.odps.udf.examples;- import com.aliyun.odps.udf.UDF;
- public final class Lower extends UDF {
- public String evaluate(String s) {
- if (s == null) { return null; }
- return s.toLowerCase();
- }
- }
将这个 Jar 包命名为 my_lower.jar 。
添加资源。
在运行 UDF 之前,必须指定引用的 UDF 代码。您编写的代码通过资源的形式添加到 MaxCompute 中。Java UDF必须被打成 Jar 包,以 Jar 资源添加到 MaxCompute 中,UDF 框架会自动加载 Jar 包,运行用户自定义的 UDF。MaxCompute MapReduce 也用到了资源这一特有概念,详情请参见
MapReduce。
执行如下命令:
- add jar my_lower.jar;
- -- 如果存在同名的资源请将这个 jar 包重命名,
- -- 并注意修改下面示例命令中相关 jar 包的名字;
- -- 又或者直接使用-f选项覆盖原有的 jar 资源
注册 UDF 函数。
您的 Jar 包被上传后,使得 MaxCompute 有条件自动获取代码并运行。但此时仍然无法使用这个 UDF,因为 MaxCompute中并没有关于这个 UDF 的任何信息。因此需要您在 MaxCompute 中注册一个唯一的函数名,并指定这个函数名与哪个 Jar资源的哪个函数对应。关于如何注册 UDF,请参见
注册函数。
运行如下命令:
- CREATE FUNCTION test_lower AS org.alidata.odps.udf.examples.Lower USING my_lower.jar;
在 SQL 中使用此函数:
- select test_lower('A') from my_test_table;
UDAF 示例
UDAF 的注册方式与 UDF 基本相同,使用方式与内建函数中的
聚合函数 相同。计算平均值的 UDAF 的代码示例,如下所示:
- package org.alidata.odps.udf.examples;
- import com.aliyun.odps.io.LongWritable;
- import com.aliyun.odps.io.Text;
- import com.aliyun.odps.io.Writable;
- import com.aliyun.odps.udf.Aggregator;
- import com.aliyun.odps.udf.UDFException;
- /**
- * project: example_project
- * table: wc_in2
- * partitions: p2=1,p1=2
- * columns: colc,colb,cola
- */
- public class UDAFExample extends Aggregator {
- @Override
- public void iterate(Writable arg0, Writable[] arg1) throws UDFException {
- LongWritable result = (LongWritable) arg0;
- for (Writable item : arg1) {
- Text txt = (Text) item;
- result.set(result.get() + txt.getLength());
- }
- }
- @Override
- public void merge(Writable arg0, Writable arg1) throws UDFException {
- LongWritable result = (LongWritable) arg0;
- LongWritable partial = (LongWritable) arg1;
- result.set(result.get() + partial.get());
- }
- @Override
- public Writable newBuffer() {
- return new LongWritable(0L);
- }
- @Override
- public Writable terminate(Writable arg0) throws UDFException {
- return arg0;
- }
- }
UDTF 示例
UDTF 的注册和使用方式与 UDF 相同。代码示例如下:
- package org.alidata.odps.udtf.examples;
- import com.aliyun.odps.udf.UDTF;
- import com.aliyun.odps.udf.UDTFCollector;
- import com.aliyun.odps.udf.annotation.Resolve;
- import com.aliyun.odps.udf.UDFException;
- // TODO define input and output types, e.g., "string,string->string,bigint".
- @Resolve({"string,bigint->string,bigint"})
- public class MyUDTF extends UDTF {
- @Override
- public void process(Object[] args) throws UDFException {
- String a = (String) args[0];
- Long b = (Long) args[1];
- for (String t: a.split("\\s+")) {
- forward(t, b);
- }
- }
- }
MaxCompute 提供了很多内建函数来满足您的计算需求,同时您还可以通过创建自定义函数来满足不同的计算需求。详情请参见
创建自定义函数。