开发者社区 问答 正文

MaxCompute用户指南:SQL:UDF:Java UDF



MaxCompute 的 UDF 包括:UDF,UDAF 和 UDTF 三种函数,本文将重点介绍如何通过 Java 实现这三种函数。

参数与返回值类型


MaxCompute2.0 版本升级后,Java UDF 支持的数据类型从原来的 Bigint,String,Double,Boolean 扩展了更多基本的数据类型,同时还扩展支持了 ARRAY,MAP,STRUCT 等复杂类型。


  • Java UDF 使用新基本类型的方法,如下所示:
    UDTF 通过 @Resolve 注解来获取 signature,如:@Resolve("smallint->varchar(10)")。

  • UDF 通过反射分析 evaluate 来获取 signature,此时 MaxCompute 内置类型与 Java 类型符合一一映射关系。

  • UDAF暂时还不支持新数据类型。

JAVA UDF 使用复杂类型的方法,如下所示:

  • UDTF 通过 @Resolve annotation 来指定 sinature,在 MaxCompute2.0 上线后,您即可在 Resolve annotation 中。如:@Resolve("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")。

  • UDF 通过 evaluate 方法的 signature 来映射 UDF 的输入输出类型,此时参考MaxCompute 类型与 Java 类型的映射关系。其中 array 对应 java.util.List,map 对应java.util.Map,struct 对应 com.aliyun.odps.data.Struct。

  • UDAF暂时还不支持。

    注意:
    com.aliyun.odps.data.Struct 从反射看不出 field name 和 field type,所以需要用@Resolve annotation 来辅助。即如果需要在 UDF 中使用 struct,要求在 UDF class 上也标注上@Resolve 注解,这个注解只会影响参数或返回值中包含 com.aliyun.odps.data.Struct 的重载。

  • 目前 class 上只能提供一个 @Resolve annotation,因此一个 UDF 中带有 struct 参数或返回值的重载只能有一个。

MaxCompute 数据类型与 Java 类型的对应关系,如下所示:
MaxCompute TypeJava Type
Tinyintjava.lang.Byte
Smallintjava.lang.Short
Intjava.lang.Integer
Bigintjava.lang.Long
Floatjava.lang.Float
Doublejava.lang.Double
Decimaljava.math.BigDecimal
Booleanjava.lang.Boolean
Stringjava.lang.String
Varcharcom.aliyun.odps.data.Varchar
Binarycom.aliyun.odps.data.Binary
Datetimejava.util.Date
Timestampjava.sql.Timestamp
Arrayjava.util.List
Mapjava.util.Map
Structcom.aliyun.odps.data.Struct

注意:
  • Java 中对应的数据类型以及返回值数据类型是对象,首字母请务必大写。
  • SQL 中的 NULL 值通过 Java 中的 NULL 引用表示,因此 Java primitive type 是不允许使用的,因为无法表示 SQL 中的 NULL 值。
  • 此处 Array 类型对应的 Java 类型是 List,而不是数组。


UDF


实现UDF 需要继承 com.aliyun.odps.udf.UDF 类,并实现 evaluate 方法。evaluate 方法必须是非static 的 public 方法 。Evaluate 方法的参数和返回值类型将作为 SQL 中 UDF 的函数签名。这意味着您可以在 UDF中实现多个 evaluate 方法,在调用 UDF 时,框架会依据 UDF 调用的参数类型匹配正确的 evaluate 方法 。
UDF 的示例如下:
  1.      package org.alidata.odps.udf.examples;
  2.      import com.aliyun.odps.udf.UDF;
  3.      public final class Lower extends UDF {
  4.        public String evaluate(String s) {
  5.          if (s == null) { return null; }
  6.          return s.toLowerCase();
  7.        }
  8.      }

可以通过实现void setup(ExecutionContext ctx)和void close()来分别实现 UDF 的初始化和结束代码。
UDF 的使用方式与 MaxCompute SQL 中普通的内建函数相同,详情请参见 内建函数

UDAF


实现 Java UDAF 类需要继承 com.aliyun.odps.udf.Aggregator,并实现如下几个接口:
  1. public abstract class Aggregator implements ContextFunction {
  2.   @Override
  3.   public void setup(ExecutionContext ctx) throws UDFException {                                                                  
  4.   }
  5.   @Override
  6.   public void close() throws UDFException {    
  7.   }
  8.   /**
  9.    * 创建聚合Buffer
  10.    * @return Writable 聚合buffer
  11.    */
  12.   abstract public Writable newBuffer();
  13.   /**
  14.    * @param buffer 聚合buffer
  15.    * @param args SQL中调用UDAF时指定的参数
  16.    * @throws UDFException
  17.    */
  18.   abstract public void iterate(Writable buffer, Writable[] args) throws UDFException;
  19.   /**
  20.    * 生成最终结果
  21.    * @param buffer
  22.    * @return Object UDAF的最终结果
  23.    * @throws UDFException
  24.    */
  25.   abstract public Writable terminate(Writable buffer) throws UDFException;
  26.   abstract public void merge(Writable buffer, Writable partial) throws UDFException;
  27. }

其中最重要的是 iterate,merge 和 terminate 三个接口,UDAF 的主要逻辑依赖于这三个接口的实现。此外,还需要您实现自定义的 Writable buffer。
以实现求平均值 avg 为例,下图简要说明了在 MaxCompute UDAF 中这一函数的实现逻辑及计算流程:

在上图中,输入数据被按照一定的大小进行分片(有关分片的描述请参见 MapReduce),每片的大小适合一个 worker 在适当的时间内完成。这个分片大小的设置需要您手动配置完成。
UDAF 的计算过程分为两个阶段:

  • 第一阶段:每个 worker 统计分片内数据的个数及汇总值,您可以将每个分片内的数据个数及汇总值视为一个中间结果。

  • 第二阶段:worker 汇总上一个阶段中每个分片内的信息。在最终输出时,r.sum / r.count 即是所有输入数据的平均值。

计算平均值的 UDAF 的代码示例,如下所示:
  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import com.aliyun.odps.io.DoubleWritable;
  5. import com.aliyun.odps.io.Writable;
  6. import com.aliyun.odps.udf.Aggregator;
  7. import com.aliyun.odps.udf.UDFException;
  8. import com.aliyun.odps.udf.annotation.Resolve;
  9. @Resolve({"double->double"})
  10. public class AggrAvg extends Aggregator {
  11.   private static class AvgBuffer implements Writable {
  12.     private double sum = 0;
  13.     private long count = 0;
  14.     @Override
  15.     public void write(DataOutput out) throws IOException {
  16.       out.writeDouble(sum);
  17.       out.writeLong(count);
  18.     }
  19.     @Override
  20.     public void readFields(DataInput in) throws IOException {
  21.       sum = in.readDouble();
  22.       count = in.readLong();
  23.     }
  24.   }
  25.   private DoubleWritable ret = new DoubleWritable();
  26.   @Override
  27.   public Writable newBuffer() {
  28.     return new AvgBuffer();
  29.   }
  30.   @Override
  31.   public void iterate(Writable buffer, Writable[] args) throws UDFException {
  32.     DoubleWritable arg = (DoubleWritable) args[0];
  33.     AvgBuffer buf = (AvgBuffer) buffer;
  34.     if (arg != null) {
  35.       buf.count += 1;
  36.       buf.sum += arg.get();
  37.     }
  38.   }
  39.   @Override
  40.   public Writable terminate(Writable buffer) throws UDFException {
  41.     AvgBuffer buf = (AvgBuffer) buffer;
  42.     if (buf.count == 0) {
  43.       ret.set(0);
  44.     } else {
  45.       ret.set(buf.sum / buf.count);
  46.     }
  47.     return ret;
  48.   }
  49.   @Override
  50.   public void merge(Writable buffer, Writable partial) throws UDFException {
  51.     AvgBuffer buf = (AvgBuffer) buffer;
  52.     AvgBuffer p = (AvgBuffer) partial;
  53.     buf.sum += p.sum;
  54.     buf.count += p.count;
  55.   }
  56. }

注意:
  • UDAF 在 SQL 中的使用语法与普通的内建聚合函数相同,详情请参见 聚合函数
  • 关于如何运行 UDTF 的方法与 UDF 类似,详情请参见 运行 UDF


UDTF


Java UDTF 需要继承 com.aliyun.odps.udf.UDTF 类。这个类需要实现 4 个接口,如下表所示:
接口定义描述
public void setup(ExecutionContext ctx) throws UDFException初始化方法,在UDTF处理输入数据前,调用用户自定义的初始化行为。在每个Worker内setup会被先调用一次。
public void process(Object[] args) throws UDFException这个方法由框架调用,SQL中每一条记录都会对应调用一次process,process的参数为SQL语句中指定的UDTF输入参数。输入参数以Object[]的形式传入,输出结果通过forward函数输出。用户需要在process函数内自行调用forward,以决定输出数据。
public void close() throws UDFExceptionUDTF的结束方法,此方法由框架调用,并且只会被调用一次,即在处理完最后一条记录之后。
public void forward(Object …o) throws UDFException用户调用forward方法输出数据,每次forward代表输出一条记录。对应SQL语句UDTF的as子句指定的列。

UDTF 的程序示例,如下所示:
  1.      package org.alidata.odps.udtf.examples;
  2.      import com.aliyun.odps.udf.UDTF;
  3.      import com.aliyun.odps.udf.UDTFCollector;
  4.      import com.aliyun.odps.udf.annotation.Resolve;
  5.      import com.aliyun.odps.udf.UDFException;
  6.      // TODO define input and output types, e.g., "string,string->string,bigint".
  7.      @Resolve({"string,bigint->string,bigint"})
  8.      public class MyUDTF extends UDTF {
  9.        @Override
  10.        public void process(Object[] args) throws UDFException {
  11.          String a = (String) args[0];
  12.          Long b = (Long) args[1];
  13.          for (String t: a.split("\\s+")) {
  14.            forward(t, b);
  15.          }
  16.        }
  17.      }

注意:
以上只是程序示例,关于如何在 MaxCompute 中运行 UDTF 的方法与 UDF 类似,详情请参见: 运行 UDF

在 SQL 中可以这样使用这个 UDTF,假设在 MaxCompute 上创建 UDTF 时注册函数名为 user_udtf:
  1.       select user_udtf(col0, col1) as (c0, c1) from my_table;

假设 my_table 的 col0,col1 的值如下所示:
  1.        +------+------+
  2.        | col0 | col1 |
  3.        +------+------+
  4.        | A B  | 1    |
  5.        | C D  | 2    |
  6.        +------+------+

则 select 出的结果,如下所示:
  1.        +----+----+
  2.        | c0 | c1 |
  3.        +----+----+
  4.        | A  | 1  |
  5.        | B  | 1  |
  6.        | C  | 2  |
  7.        | D  | 2  |
  8.        +----+----+


使用说明


UDTF 在 SQL 中的常用方式如下:
  1.       select user_udtf(col0, col1, col2) as (c0, c1) from my_table;
  2.       select user_udtf(col0, col1, col2) as (c0, c1) from
  3.           (select * from my_table distribute by key sort by key) t;
  4.       select reduce_udtf(col0, col1, col2) as (c0, c1) from
  5.           (select col0, col1, col2 from
  6.               (select map_udtf(a0, a1, a2, a3) as (col0, col1, col2) from my_table) t1
  7.            distribute by col0 sort by col0, col1) t2;

但使用 UDTF 有如下使用限制:

  • 同一个 SELECT 子句中不允许有其他表达式。
    1.    select value, user_udtf(key) as mycol ...

  • UDTF 不能嵌套使用。
    1.    select user_udtf1(user_udtf2(key)) as mycol...

  • 不支持在同一个 select 子句中与 group by / distribute by / sort by 联用。
    1.   select user_udtf(key) as mycol ... group by mycol


其他 UDTF 示例


在 UDTF 中,您可以读取 MaxCompute 的 资源。利用 UDTF 读取 MaxCompute 资源的示例,如下所示:

  1. 编写 UDTF 程序,编译成功后导出 jar 包(udtfexample1.jar)。  package com.aliyun.odps.examples.udf;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.io.InputStreamReader;
  6. import java.util.Iterator;
  7. import com.aliyun.odps.udf.ExecutionContext;
  8. import com.aliyun.odps.udf.UDFException;
  9. import com.aliyun.odps.udf.UDTF;
  10. import com.aliyun.odps.udf.annotation.Resolve;
  11. /**
  12. * project: example_project
  13. * table: wc_in2
  14. * partitions: p2=1,p1=2
  15. * columns: colc,colb
  16. */
  17. @Resolve({ "string,string->string,bigint,string" })
  18. public class UDTFResource extends UDTF {
  19. ExecutionContext ctx;
  20. long fileResourceLineCount;
  21. long tableResource1RecordCount;
  22. long tableResource2RecordCount;
  23. @Override
  24. public void setup(ExecutionContext ctx) throws UDFException {
  25. this.ctx = ctx;
  26. try {
  27.    InputStream in = ctx.readResourceFileAsStream("file_resource.txt");
  28.    BufferedReader br = new BufferedReader(new InputStreamReader(in));
  29.    String line;
  30.    fileResourceLineCount = 0;
  31.    while ((line = br.readLine()) != null) {
  32.      fileResourceLineCount++;
  33.    }
  34.    br.close();
  35.    Iterator<Object[]> iterator = ctx.readResourceTable("table_resource1").iterator();
  36.    tableResource1RecordCount = 0;
  37.    while (iterator.hasNext()) {
  38.      tableResource1RecordCount++;
  39.      iterator.next();
  40.    }
  41.    iterator = ctx.readResourceTable("table_resource2").iterator();
  42.    tableResource2RecordCount = 0;
  43.    while (iterator.hasNext()) {
  44.      tableResource2RecordCount++;
  45.      iterator.next();
  46.    }
  47. } catch (IOException e) {
  48.    throw new UDFException(e);
  49. }
  50. }
  51. @Override
  52. public void process(Object[] args) throws UDFException {
  53. String a = (String) args[0];
  54. long b = args[1] == null ? 0 : ((String) args[1]).length();
  55. forward(a, b, "fileResourceLineCount=" + fileResourceLineCount + "|tableResource1RecordCount="
  56.      + tableResource1RecordCount + "|tableResource2RecordCount=" + tableResource2RecordCount);
  57. }
  58. }

添加资源到 MaxCompute。
  1. Add file file_resource.txt;
  2. Add jar udtfexample1.jar;
  3. Add table table_resource1 as table_resource1;
  4. Add table table_resource2 as table_resource2;

在 MaxCompute 中创建 UDTF 函数(my_udtf)。
  1. create function mp_udtf as com.aliyun.odps.examples.udf.UDTFResource using 'udtfexample1.jar, file_resource.txt, table_resource1, table_resource2';

在 MaxCompute 创建资源表 table_resource1、table_resource2 和物理表 tmp1,并插入相应的数据。
运行该 UDTF。
  1. select mp_udtf("10","20") as (a, b, fileResourceLineCount) from table_resource1;  
  2. 返回:
  3. +-------+------------+-------+
  4. | a | b      | fileResourceLineCount |
  5. +-------+------------+-------+
  6. | 10    | 2          | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
  7. | 10    | 2          | fileResourceLineCount=3|tableResource1RecordCount=0|tableResource2RecordCount=0 |
  8. +-------+------------+-------+


复杂数据类型示例


如以下代码,定义了一个有三个overloads 的 UDF,其中第一个用了 array 作为参数,第二个用了 map 作为参数,第三个用了 struct。由于第三个overloads 用了 struct 作为参数或者返回值,因此要求必须要对 UDF class 打上 @Resolveannotation,来指定 struct 的具体类型。
  1. @Resolve("struct<a:bigint>,string->string")
  2. public class UdfArray extends UDF {
  3.   public String evaluate(List<String> vals, Long len) {
  4.     return vals.get(len.intValue());
  5.   }
  6.   public String evaluate(Map<String,String> map, String key) {
  7.     return map.get(key);
  8.   }
  9.   public String evaluate(Struct struct, String key) {
  10.     return struct.getFieldValue("a") + key;
  11.   }
  12. }

您可以直接将复杂类型传入 UDF 中,如下所示:
  1. create function my_index as 'UdfArray' using 'myjar.jar';
  2. select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;

展开
收起
行者武松 2017-10-23 17:31:57 2607 分享
分享
版权
举报
0 条回答
写回答
取消 提交回答
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等