MaxCompute中Struct复杂数据类型的UDF编写、兼容HIVE的GenericUDF编写

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: MaxCompute 2.0版本升级后,Java UDF支持的数据类型从原来的BIGINT、STRING、DOUBLE、BOOLEAN扩展了更多基本的数据类型,同时还扩展支持了ARRAY、MAP、STRUCT等复杂类型,以及Writable参数。

一、背景介绍:
MaxCompute 2.0版本升级后,Java UDF支持的数据类型从原来的BIGINT、STRING、DOUBLE、BOOLEAN扩展了更多基本的数据类型,同时还扩展支持了ARRAY、MAP、STRUCT等复杂类型,以及Writable参数。Java UDF使用复杂数据类型的方法,STRUCT对应com.aliyun.odps.data.Struct。com.aliyun.odps.data.Struct从反射看不出Field Name和Field Type,所以需要用@Resolve注解来辅助。即如果需要在UDF中使用STRUCT,要求在UDF Class上也标注上@Resolve注解。但是当我们Struct类型中的field有很多字段的时候,这个时候需要我们去手动的添加@Resolve注解就不是那么的友好。针对这一个问题,我们可以使用Hive 中的GenericUDF去实现。MaxCompute 2.0支持Hive风格的UDF,部分Hive UDF、UDTF可以直接在MaxCompute上使用。
二、复杂数据类型UDF示例
示例定义了一个有三个复杂数据类型的UDF,其中第一个用ARRAY作为参数,第二个用MAP作为参数,第三个用STRUCT作为参数。由于第三个Overloads用了STRUCT作为参数或者返回值,因此要求必须对UDF Class添加@Resolve注解,指定STRUCT的具体类型。
1.代码编写

@Resolve("struct<a:bigint>,string->string")
public class UdfArray extends UDF {
public String evaluate(List<String> vals, Long len) {
    return vals.get(len.intValue());
}
public String evaluate(Map<String,String> map, String key) {
    return map.get(key);
}
public String evaluate(Struct struct, String key) {
    return struct.getFieldValue("a") + key;
}
}

2.打jar包添加资源

add jar UdfArray.jar

3.创建函数

create function my_index as 'UdfArray' using 'UdfArray.jar';

4.使用UDF函数

select id, my_index(array('red', 'yellow', 'green'), colorOrdinal) as color_name from colors;

三、使用Hive的GenericUDF
这里我们使用Struct复杂数据类型作为示例,主要处理的逻辑是当我们结构体中两个字段前后没有差异时不返回,如果前后有差异将新的字段及其值组成新的结构体返回。示例中Struct的Field为3个。使用GenericUDF方式可以解决需要手动添加@Resolve注解。
1.创建一个MaxCompute表

CREATE TABLE IF NOT EXISTS `tmp_ab_struct_type_1` (
`a1` struct<a:STRING,b:STRING,c:string>,
`b1` struct<a:STRING,b:STRING,c:string>
);

2.表中数据结构如下

insert into table tmp_ab_struct_type_1 SELECT named_struct('a',1,'b',3,'c','2019-12-17 16:27:00'), named_struct('a',5,'b',6,'c','2019-12-18 16:30:00');

查询数据如下所示:

1576811346298_FEB20147-DD74-4a10-8D6E-780D91DCBC93.png

3.编写GenericUDF处理逻辑
(1)QSC_DEMOO类

package com.aliyun.udf.struct;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import java.util.ArrayList;
import java.util.List;

/**
* Created by ljw on 2019-12-17
* Description:
*/
@SuppressWarnings("Duplicates")
public class QSC_DEMOO extends GenericUDF {
    StructObjectInspector soi1;
    StructObjectInspector soi2;

    /**
    * 避免频繁Struct对象
    */
    private PubSimpleStruct resultStruct = new PubSimpleStruct();
    private List<? extends StructField> allStructFieldRefs;

    //1. 这个方法只调用一次,并且在evaluate()方法之前调用。该方法接受的参数是一个arguments数组。该方法检查接受正确的参数类型和参数个数。
    //2. 输出类型的定义
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        String error = "";
        //检验参数个数是否正确
        if (arguments.length != 2) {
            throw new UDFArgumentException("需要两个参数");
        }
        //判断参数类型是否正确-struct
        ObjectInspector.Category arg1 = arguments[0].getCategory();
        ObjectInspector.Category arg2 = arguments[1].getCategory();
        if (!(arg1.equals(ObjectInspector.Category.STRUCT))) {
            error += arguments[0].getClass().getSimpleName();
            throw new UDFArgumentTypeException(0, "\"array\" expected at function STRUCT_CONTAINS, but \"" +
                    arg1.name() + "\" " + "is found" + "\n" + error);
        }
        if (!(arg2.equals(ObjectInspector.Category.STRUCT))) {
            error += arguments[1].getClass().getSimpleName();
            throw new UDFArgumentTypeException(0, "\"array\" expected at function STRUCT_CONTAINS, but \""
                    + arg2.name() + "\" " + "is found" + "\n" + error);
        }
        //输出结构体定义
        ArrayList<String> structFieldNames = new ArrayList();
        ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList();
        soi1 = (StructObjectInspector) arguments[0];
        soi2 = (StructObjectInspector) arguments[1];
        StructObjectInspector toValid = null;
        if (soi1 == null)
            toValid = soi2;
        else toValid = soi1;

        //设置返回类型
        allStructFieldRefs = toValid.getAllStructFieldRefs();
        for (StructField structField : allStructFieldRefs) {
            structFieldNames.add(structField.getFieldName());
            structFieldObjectInspectors.add(structField.getFieldObjectInspector());
        }
        return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
    }

    //这个方法类似UDF的evaluate()方法。它处理真实的参数,并返回最终结果。
    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        //将hive中的struct类型转换成com.aliyun.odps.data.Struct, 如果有错误,请调试,查看deferredObjects的数据是什么样子的
        //然后自己进行重新封装 !!!

        ArrayList list1 = (ArrayList) deferredObjects[0].get();
        ArrayList list2 = (ArrayList) deferredObjects[1].get();
        int len = list1.size();
        ArrayList fieldNames = new ArrayList<>();
        ArrayList fieldValues = new ArrayList<>();

        for (int i = 0; i < len ; i++) {
            if (!list1.get(i).equals(list2.get(i))) {
                fieldNames.add(allStructFieldRefs.get(i).getFieldName());
                fieldValues.add(list2.get(i));
            }
        }
        if (fieldValues.size() == 0) return null;
        return fieldValues;
    }

    //这个方法用于当实现的GenericUDF出错的时候,打印出提示信息。而提示信息就是你实现该方法最后返回的字符串。
    @Override
    public String getDisplayString(String[] strings) {
        return "Usage:" + this.getClass().getName() + "(" + strings[0] + ")";
    }
}

(2)PubSimpleStruct类

package com.aliyun.udf.struct;
import com.aliyun.odps.data.Struct;
import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import java.util.List;

public class PubSimpleStruct implements Struct {

    private StructTypeInfo typeInfo;
    private List<Object> fieldValues;

    public StructTypeInfo getTypeInfo() {
        return typeInfo;
    }

    public void setTypeInfo(StructTypeInfo typeInfo) {
        this.typeInfo = typeInfo;
    }

    public void setFieldValues(List<Object> fieldValues) {
        this.fieldValues = fieldValues;
    }

    public int getFieldCount() {
        return fieldValues.size();
    }

    public String getFieldName(int index) {
        return typeInfo.getFieldNames().get(index);
    }

    public TypeInfo getFieldTypeInfo(int index) {
        return typeInfo.getFieldTypeInfos().get(index);
    }

    public Object getFieldValue(int index) {
        return fieldValues.get(index);
    }

    public TypeInfo getFieldTypeInfo(String fieldName) {
        for (int i = 0; i < typeInfo.getFieldCount(); ++i) {
            if (typeInfo.getFieldNames().get(i).equalsIgnoreCase(fieldName)) {
                return typeInfo.getFieldTypeInfos().get(i);
            }
        }
        return null;
    }

    public Object getFieldValue(String fieldName) {
        for (int i = 0; i < typeInfo.getFieldCount(); ++i) {
            if (typeInfo.getFieldNames().get(i).equalsIgnoreCase(fieldName)) {
                return fieldValues.get(i);
            }
        }
        return null;
    }

    public List<Object> getFieldValues() {
        return fieldValues;
    }

    @Override
    public String toString() {
        return "PubSimpleStruct{" +
                "typeInfo=" + typeInfo +
                ", fieldValues=" + fieldValues +
                '}';
    }
}

3、打jar包,添加资源

add jar test.jar;

4、创建函数

CREATE FUNCTION UDF_DEMO as 'com.aliyun.udf.test.UDF_DEMOO' using 'test.jar';

5、测试使用UDF函数

set odps.sql.hive.compatible=true;
select UDF_DEMO(a1,b1) from tmp_ab_struct_type_1;

查询结果如下所示:

1576811361785_5BC15482-A394-4353-9E17-D6A53AB54960.png


注意:
(1)在使用兼容的Hive UDF的时候,需要在SQL前加set odps.sql.hive.compatible=true;语句,set语句和SQL语句一起提交执行。

(2)目前支持兼容的Hive版本为2.1.0,对应Hadoop版本为2.7.2。如果UDF是在其他版本的Hive/Hadoop开发的,则可能需要使用此Hive/Hadoop版本重新编译。
有疑问可以咨询阿里云MaxCompute技术支持:刘建伟

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.0</version>
    </dependency>

欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
6766293bc74543c99e7c493dc15cd39b.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
存储 SQL 分布式计算
大数据-135 - ClickHouse 集群 - 数据类型 实际测试
大数据-135 - ClickHouse 集群 - 数据类型 实际测试
38 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
3月前
|
存储 SQL 分布式计算
Hive 中有多少种数据类型?
【8月更文挑战第12天】
335 4
|
4月前
|
分布式计算 DataWorks 大数据
MaxCompute产品使用合集之如何使用UDF来使用Protocol Buffers
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
65 15
|
3月前
|
存储 分布式计算 关系型数据库
实时数仓 Hologres产品使用合集之创建外部表时提示不支持ODPS的datetime数据类型,该怎么解决
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
4月前
|
分布式计算 DataWorks 数据处理
MaxCompute操作报错合集之UDF访问OSS,配置白名单后出现报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。
|
4月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之编写UDF(用户自定义函数)时,报错:找不到主类,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
121 1
|
4月前
|
SQL 分布式计算 资源调度
MaxCompute操作报错合集之执行SQL Union All操作时,数据类型产生报错,该怎么解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
134 1
|
4月前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用合集之是否可以将5个资源包统一写到同一个python UDF脚本
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 下一篇
    无影云桌面