Hive 之 UDF 运用(包会的)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
大数据开发治理平台 DataWorks,不限时长
简介: Hive的UDF允许用户自定义数据处理函数,扩展其功能。`reflect()`函数通过Java反射调用JDK中的方法,如静态或实例方法。例如,调用`MathUtils.addNumbers()`进行加法运算。要创建自定义UDF,可以继承`GenericUDF`,实现`initialize`、`evaluate`和`getDisplayString`方法。在`initialize`中检查参数类型,在`evaluate`中执行业务逻辑。最后,打包项目成JAR,上传到HDFS,并在Hive中注册以供使用。

@[toc]

UDF 是什么?

Hive 中的 UDF 其实就是用户自定义函数,允许用户注册使用自定义的逻辑对数据进行处理,丰富了Hive 对数据处理的能力。

UDF 负责完成对数据一进一出处理的操作,和 Hive 中存在的函数 yearmonthday 等相同。

reflect

在 Hive 中,可以使用 reflect() 方法通过 Java 反射机制调用 Java 类的方法。

通俗来说,它可以调用 Hive 中不存在,但是 JDK 中拥有的方法。

语法

  • reflect() 函数的语法为:reflect(class,method[,arg1[,arg2..]])

静态方法调用

假设当前在 Java 中存在类如下:

package com.example;

public class MathUtils {
   
   
    public static int addNumbers(int a, int b) {
   
   
        return a + b;
    }
}

那么使用 reflect() 方法调用时,如下所示:

SELECT reflect("com.example.MathUtils", "addNumbers", 3, 5) AS result;

注意! 这里的类 "com.example.MathUtils" 并不是在 JDK 中真实存在的,只是我作为说明的一个案例, reflect() 方法只能调用 JDK 中(原生内置)存在的方法。

所以当你需要使用 reflect() 方法时,需要先去查找调用的目标方法全类名、方法名以及是否需要传递参数。

实例方法调用

当我们需要调用 Java 中的实例方法时,先创建 Java 对象,然后再调用其方法。

例如:将乱码的字符串进行解析。

SELECT reflect('java.net.URLDecoder', 'decode', "Mozilla/5.0%20(compatible;%20MJ12bot/v1.4.7;%20http://www.majestic12.co.uk/bot.php?+)
" ,'utf-8') as result;

结果输出如下:

image.png

自定义 UDF(GenericUDF)

Hive 支持两种 UDF 函数自定义操作,分别是:

  • GenericUDF(通用UDF):用于实现那些可以处理任意数据类型的函数。它们的输入和输出类型可以是任意的,但需要在函数内部处理类型转换和逻辑,可以实现更复杂的逻辑处理。

  • UDF:用于实现那些只能处理特定数据类型的函数。每个 UDF 都明确指定了输入参数的类型和返回值类型,使用更为简单。

本文采用的是通用 UDF —— GenericUDF 实现方法

这里通过一个在 Hive 中实现两数相加的自定义 UDF 案例来进行说明,看完你就会啦,轻松拿捏。

1.创建项目

在 IDEA 中创建一个 Maven 项目,引入 Hive 依赖,如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.jsu</groupId>
    <artifactId>MyUDF</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- hive-exec依赖无需打到jar包,故scope使用provided-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.3</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <!--将依赖编译到jar包中-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <!--配置执行器-->
                    <execution>
                        <id>make-assembly</id>
                        <!--绑定到package执行周期上-->
                        <phase>package</phase>
                        <goals>
                            <!--只运行一次-->
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

注意,引入的 Hive 依赖版本请保持和你集群中使用的版本一致。

2.创建类继承 UDF

创建一个类,我这里取名为 AddTest,继承 Hive UDF 父类 GenericUDF,需要重写三个方法,如下所示:

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

public class AddTest extends GenericUDF {
   
   

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
   
   
        return null;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
   
   
        return null;
    }

    @Override
    public String getDisplayString(String[] strings) {
   
   
        return null;
    }
}
  • initialize(ObjectInspector[] objectInspectors) 方法
    这个方法是在 UDF 初始化时调用的。它用于执行一些初始化操作,并且可以用来验证 UDF 的输入参数类型是否正确。参数 objectInspectors 是一个包含输入参数的 ObjectInspector 数组,它描述了每个输入参数的类型和结构。
    一般在这个方法中检查输入参数的数量和类型是否满足你的函数的要求。如果输入参数不符合预期,你可以抛出 UDFArgumentException 异常。如果一切正常,你需要返回一个合适的 ObjectInspector 对象,它描述了你的函数返回值的类型。

  • evaluate(DeferredObject[] deferredObjects) 方法
    在这个方法中定义真正执行 UDF 逻辑的地方,获取输入的参数,并且根据输入参数执行相应的计算或操作。参数 deferredObjects 是一个包含输入参数的 DeferredObject 数组,你可以通过它来获取实际的输入值。

  • getDisplayString(String[] strings) 方法
    这个方法用于描述 UDF 的信息,用于生成可读的查询执行计划(Explain),以便用户了解查询的结构和执行过程。

3.数据类型判断

实现 UDF 的第一步操作就是在 initialize 方法中,判断用户输入的参数是否合法,出现错误时,进行反馈。

在这里主要分为三个步骤:

  1. 检验参数个数

  2. 检查参数类型

  3. 定义函数返回值类型

一般情况下,可以使用下面的模板:

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
   
   

        // 1.校验参数个数
        if (objectInspectors.length != 2) {
   
   
            throw new UDFArgumentException("参数个数有误!");
        }

        // 2.检查第1个参数是否是int类型
        // 判断第1个参数的基本类型
        ObjectInspector num1 = objectInspectors[0];
        if (num1.getCategory() != ObjectInspector.Category.PRIMITIVE) {
   
   
            throw new UDFArgumentException("第1个参数不是基本数据类型");
        }
        // 第1个参数类型判断
        PrimitiveObjectInspector temp = (PrimitiveObjectInspector) num1;
        if (PrimitiveObjectInspector.PrimitiveCategory.INT != temp.getPrimitiveCategory()) {
   
   
            throw new UDFArgumentException("第1个参数应为INT类型");
        }

        // 2.检查第2个参数是否是int类型
        // 判断第2个参数的基本类型
        ObjectInspector num2 = objectInspectors[1];
        if (num2.getCategory() != ObjectInspector.Category.PRIMITIVE) {
   
   
            throw new UDFArgumentException("第2个参数不是基本数据类型");
        }
        // 第2个参数类型判断
        PrimitiveObjectInspector temp2 = (PrimitiveObjectInspector) num2;
        if (PrimitiveObjectInspector.PrimitiveCategory.INT != temp2.getPrimitiveCategory()) {
   
   
            throw new UDFArgumentException("第2个参数应为INT类型");
        }

        // 3.设置函数返回值类型(返回一个整型数据)
        return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    }

4.编写业务逻辑

evaluate 方法中定义业务逻辑,这里比较简单,就是实现两数相加。

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
   
   
        // 完成两数相加的逻辑计算
        int num1 = Integer.parseInt(deferredObjects[0].get().toString());
        int num2 = Integer.parseInt(deferredObjects[1].get().toString());

        return num1 + num2;
    }

5.定义函数描述信息

getDisplayString 方法中定义函数在 Explain 中的描述信息,一般都是固定写法,如下所示:

    @Override
    public String getDisplayString(String[] strings) {
   
   
        return getStandardDisplayString("AddTest", strings);
    }

把对应的函数名称进行替换即可。

6.打包与上传

对编写的项目进行打包,并上传到 HDFS 上

image.png

本案例的完整代码如下所示:

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class AddTest extends GenericUDF {
   
   

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
   
   

        // 1.校验参数个数
        if (objectInspectors.length != 2) {
   
   
            throw new UDFArgumentException("参数个数有误!");
        }

        // 2.检查第1个参数是否是int类型
        // 判断第1个参数的基本类型
        ObjectInspector num1 = objectInspectors[0];
        if (num1.getCategory() != ObjectInspector.Category.PRIMITIVE) {
   
   
            throw new UDFArgumentException("第1个参数不是基本数据类型");
        }
        // 第1个参数类型判断
        PrimitiveObjectInspector temp = (PrimitiveObjectInspector) num1;
        if (PrimitiveObjectInspector.PrimitiveCategory.INT != temp.getPrimitiveCategory()) {
   
   
            throw new UDFArgumentException("第1个参数应为INT类型");
        }

        // 2.检查第2个参数是否是int类型
        // 判断第2个参数的基本类型
        ObjectInspector num2 = objectInspectors[1];
        if (num2.getCategory() != ObjectInspector.Category.PRIMITIVE) {
   
   
            throw new UDFArgumentException("第2个参数不是基本数据类型");
        }
        // 第2个参数类型判断
        PrimitiveObjectInspector temp2 = (PrimitiveObjectInspector) num2;
        if (PrimitiveObjectInspector.PrimitiveCategory.INT != temp2.getPrimitiveCategory()) {
   
   
            throw new UDFArgumentException("第2个参数应为INT类型");
        }

        // 3.设置函数返回值类型(返回一个整型数据)
        return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
   
   
        // 完成两数相加的逻辑计算
        int num1 = Integer.parseInt(deferredObjects[0].get().toString());
        int num2 = Integer.parseInt(deferredObjects[1].get().toString());

        return num1 + num2;
    }

    @Override
    public String getDisplayString(String[] strings) {
   
   
        return getStandardDisplayString("AddTest", strings);
    }

}

7.注册 UDF 函数并测试

进入 Hive 中对创建的 UDF 函数进行注册。

如果你期间修改了 JAR 包并重新上传,则需要重启与 Hive 的连接,建立新的会话才会生效。
-- 永久注册
create function testAdd as 'AddTest' using jar 'hdfs://hadoop201:8020/test/MyUDF-1.0-SNAPSHOT-jar-with-dependencies.jar';

-- 删除注册的函数
drop function if exists testAdd;
  • testAdd:注册的 UDF 函数名称。

  • as 'AddTest':编写的 UDF 函数全类名。

  • using jar:指定 JAR 包的全路径。

注册成功后,如下所示:

image.png

测试

select testAdd(1,2);

image.png

如果输入错误的数据类型,会进行报错提示:

image.png

返回复杂的数据类型

在更多的场景下,我们可能有多个返回值,那么该如何定义与配置呢?

这里还是通过上面的两数相加的案例来进行说明,套下面的模板使用:

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;

public class AddTestReturnList extends GenericUDF {
   
   

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
   
   

        // 1.校验参数个数
        if (objectInspectors.length != 2) {
   
   
            throw new UDFArgumentException("参数个数有误!");
        }

        // 2.检查第1个参数是否是int类型
        // 判断第1个参数的基本类型
        ObjectInspector num1 = objectInspectors[0];
        if (num1.getCategory() != ObjectInspector.Category.PRIMITIVE) {
   
   
            throw new UDFArgumentException("第1个参数不是基本数据类型");
        }
        // 第1个参数类型判断
        PrimitiveObjectInspector temp = (PrimitiveObjectInspector) num1;
        if (PrimitiveObjectInspector.PrimitiveCategory.INT != temp.getPrimitiveCategory()) {
   
   
            throw new UDFArgumentException("第1个参数应为INT类型");
        }

        // 2.检查第2个参数是否是int类型
        // 判断第2个参数的基本类型
        ObjectInspector num2 = objectInspectors[1];
        if (num2.getCategory() != ObjectInspector.Category.PRIMITIVE) {
   
   
            throw new UDFArgumentException("第2个参数不是基本数据类型");
        }
        // 第2个参数类型判断
        PrimitiveObjectInspector temp2 = (PrimitiveObjectInspector) num2;
        if (PrimitiveObjectInspector.PrimitiveCategory.INT != temp2.getPrimitiveCategory()) {
   
   
            throw new UDFArgumentException("第2个参数应为INT类型");
        }

        // 3.设置函数返回值类型(返回一个键值对数据)
        ArrayList<String> structFieldNames = new ArrayList<>();
        ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<>();
        structFieldNames.add("result");
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
   
   
        // 完成两数相加的逻辑计算
        ArrayList<Integer> arrayList = new ArrayList<>();
        int num1 = Integer.parseInt(deferredObjects[0].get().toString());
        int num2 = Integer.parseInt(deferredObjects[1].get().toString());
        arrayList.add(num1 + num2);
        return arrayList;
    }

    @Override
    public String getDisplayString(String[] strings) {
   
   
        return getStandardDisplayString("AddTestReturnList", strings);
    }

}

(退出当前与 Hive 的连接,建立新的连接,刷新缓存)

同样的,打包上传到 HDFS 上进行注册:

create function AddTestReturnList as 'AddTestReturnList' using jar 'hdfs://hadoop201:8020/test/MyUDF-1.0-SNAPSHOT-jar-with-dependencies.jar';

此时,可能会发生报错,这是由于我们之前已经加载过该 JAR 包了,再次加载时 Hive 会抛出异常,我们可以通过下面的语句进行调整:

-- 关闭向量化查询
set hive.vectorized.execution.enabled=false;

重新注册即可。

进行测试:

select AddTestReturnList(1,2);

计算结果如下:

image.png

是不是轻松拿捏了~

相关文章
|
1月前
|
SQL 存储 Java
Hive UDF UDTF UDAF 自定义函数详解
Hive UDF UDTF UDAF 自定义函数详解
43 2
Hive UDF UDTF UDAF 自定义函数详解
|
19天前
|
SQL Java 程序员
Hive反射函数的使用-程序员是怎么学UDF函数的
Hive反射函数的使用-程序员是怎么学UDF函数的
5 0
|
1月前
|
SQL Java 数据处理
【Hive】Hive的函数:UDF、UDAF、UDTF的区别?
【4月更文挑战第17天】【Hive】Hive的函数:UDF、UDAF、UDTF的区别?
|
1月前
|
SQL 消息中间件 Apache
Flink报错问题之使用hive udf函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
1月前
|
SQL 搜索推荐 Java
Hive中的UDF是什么?请解释其作用和使用方法。
Hive中的UDF是什么?请解释其作用和使用方法。
54 0
|
1月前
|
SQL 分布式计算 Java
Hive自定义函数UDF编写
Hive自定义函数UDF编写
42 2
|
8月前
|
SQL 分布式计算 Java
阿里云MaxCompute-Hive UDF(Java)迁移上云实践
阿里云MaxCompute-Hive UDF(Java)迁移上云实践
|
8月前
|
SQL 分布式计算 Java
浅析 hive udf 的正确编写和使用方式- 论姿势的重要性 - 系列三 - hdfs 相对路径与静态代码块引起的问题
浅析 hive udf 的正确编写和使用方式- 论姿势的重要性 - 系列三 - hdfs 相对路径与静态代码块引起的问题
|
8月前
|
SQL 分布式计算 Java
如何在 hive udf 中访问配置数据-踩坑记录,方案汇总与对比-udf中可以写sql吗?
如何在 hive udf 中访问配置数据-踩坑记录,方案汇总与对比-udf中可以写sql吗?
|
11月前
|
SQL Java Maven
Hive UDF开发流程到调用
Hive UDF开发流程到调用
95 0

热门文章

最新文章