Hive UDF UDTF UDAF 自定义函数详解

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Hive UDF UDTF UDAF 自定义函数详解

Hive笔记05 -- Hive UDF UDTF UDAF

UDF

UDF在Hive中的实现

在这里插入图片描述

UDF的创建与配置

类名定义规则
示例:com.ybg.hive.ql.func.udf.UDFDateDiffByUnit
规则:反向域名+模块名+功能分类(ql.func.udf:hive查询语言中的UDF函数)+具体功能|类名
基本配置
New Project - Maven模板
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>3.1.2</version>
    </dependency>
</dependencies>
UDF核心

1.参数类型和参数值分开管理
2.将共性的校验写在接口中

UDF示例:

主类:UDFDateDiffByUnit extends GenericUDF implements UDFCom,DateCom
目的:计算两个日期之间的差异,可以按年、季、月、周或日计算。
方法:
initialize:参数验证并定义UDF的返回类型

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    validateArgs(arguments,3);
    validateAllPrimitiveArgs(arguments,3);
    return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}

evaluate: 具体的数据校验
核心计算方法,计算两个日期之间的差异。

@Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        validateArgs(arguments,3);
        final String strDateSmall = arguments[0].get().toString();
        final String strDateBig = arguments[1].get().toString();
        validateDateFormat(strDateSmall,strDateBig);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        Calendar dateSmall = Calendar.getInstance();
        Calendar dateBig = Calendar.getInstance();
        try {
            dateSmall.setTime(sdf.parse(strDateSmall));
            dateBig.setTime(sdf.parse(strDateBig));
        } catch (ParseException e) {
            throw new HiveException(e);
        }
        if(dateSmall.after(dateBig)){
            throw new HiveException("dateSmall by arg1 > dateBig by arg2");
        }
        final String unit = arguments[2].get().toString().toLowerCase();
        int intUnit = 0;
        switch(unit){
            case "y":
                intUnit = Calendar.YEAR;
                break;
            case "q": case "m":
                intUnit = Calendar.MONTH;
                break;
            case "w": case "d":
                intUnit = Calendar.DATE;
                break;
            default:
                throw new HiveException("Unsupported unit by arg3 :"+unit);
        }
        int diff = -1;
        while(true){
            diff++;
            dateSmall.add(intUnit,1);
            if(dateSmall.after(dateBig)){
                break;
            }
        }
        switch(unit){
            case "q":
                diff/=3;
                break;
            case "w":
                diff/=7;
                break;
        }
        return diff;
    }

getDisplayString:提供函数及其参数的描述(并不重要)

@Override
public String getDisplayString(String[] children) {
    return Objects.isNull(children) || children.length == 0 || null == children[0] ? null : children[0];
}

eg:对于"两数相加的UDF"

@Override
public String getDisplayString(String[] children){
    if(children == null || children.length<2){
        // 重述函数用法
        return "Usage:MyAddFunction(int,int)";    
    }
    // 查询解释
    return "MyAddFunction(" + children[0] + ", " + children[1] + ")";
}

接口一:DateCom
功能:提供日期相关的辅助方法。
validateDateFormat:验证日期格式是否符合 yyyy-MM-dd。

default void validateDateFormat(String...dateStrArr) throws HiveException {
    for (String dateStr : dateStrArr) {
        // dateStr.matches("\\d{4}-(0?[1-9]|1[0-2])-(0?[1-9]|[1-2][0-9]|3[0-1])")
        if (!dateStr.matches("\\d{4}-(0?[1-9]|1[0-2])-(0?[1-9]|[1-2][0-9]|3[0-1])")) {
            throw new HiveException("date format illegal : " + dateStr);
        }
    }
}

接口二:UDFCom
功能:提供通用的参数验证方法。
validateArgs:检查传入的参数数量是否正确,以及是否有空参数。
validateAllPrimitiveArgs:确保所有参数都是原始类型。
原始类型指的是最基本的数据类型,直接包含了数据的值。(byte short int long float double char boolean string✔)

/**
 * @param args  实际参数数组
 * @param size  预期参数个数
 * @throws HiveException
 */
// 实现非具体的通用校验
default void validateArgs(Object[] args,int size) throws UDFArgumentException {
    if (size>0 && (Objects.isNull(args) || args.length < size)) {
        // 检验提供的参数数量是否满足size个
        throw new UDFArgumentException(size+" args must be provided.");
    }
    for (int i = 0; i < size; i++) {
        // 检测某一参数是否为空
        if (Objects.isNull(args[i])) {
            throw new UDFArgumentException("type of args["+i+"] null");
        }
    }
}

// 验证参数的类型是否为原始类型
default void validateAllPrimitiveArgs(Object[] args, int size) throws UDFArgumentException{
    for (int i = 0; i < size; i++) {
        if (((ObjectInspector)args[i]).getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("only support primitive type");
        }
    }
}

打jar包上传至 HDFS 的两种方式:
-- install => 打资源jar包,直接将jar包打入到 maven localRepository
-- package => 打执行jar包,直接将jar包打入到 project target ✔

Hive UDF集成到Hive查询环境的四步骤:

  1. 打包(Package)
    package打架包
  2. 找包(Locate Package)
    架包位于target处,show in explorer显示物理路径
  3. 上传(Upload)
    复制架包路径并上传到HDFS上。
  4. 创建Hive UDF映射至HDFS上的JAR文件,并且指定了UDF实现的完整类名。
create function FUNC_NAME as 'com.ybg.hive.ql.func.udf.UDFDateDiffByUnit'(主类的全包路径)
using jar 'hdfs://single01:9000/hive_data/udf/hiveudf2-1.0-SNAPSHOT.jar';(hdfs://single01:9000+HDFS中架包存放路径)

问题与解决方法:

如果架包删除后重新上传会出现"UDF按照前一个架包方式继续运行"的情况
解决方式是:close project之后重新打开project(重新连接)。

UDTF

UDTF的创建与配置

与UDF相同

UDTF示例

package com.ybg.hive.ql.func.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class MyExplode extends GenericUDTF {
   
   
    private static Logger logger = LoggerFactory.getLogger(MyExplode.class);
    private ObjectInspector oi;
    private Object[] params;

    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
   
   
        oi = argOIs[0];
        final ObjectInspector.Category category = oi.getCategory();
        // names 和 types分别存储数据的名称和类型
        List<String> names = new ArrayList<>(2);
        List<ObjectInspector> types = new ArrayList<>(2);
        switch (category){
   
   
            // 默认的名字
            case MAP:
                logger.info("receive explode category : Map");
                names.add("key");
                names.add("value");
                final MapObjectInspector moi = (MapObjectInspector) this.oi;
                types.add(moi.getMapKeyObjectInspector());
                types.add(moi.getMapValueObjectInspector());
                params = new Object[2];
                break;
            case LIST:
                logger.info("receive explode category : List");
                names.add("value");
                final ListObjectInspector loi = (ListObjectInspector) oi;
                types.add(loi.getListElementObjectInspector());
                params = new Object[1];
                break;
            default:
                throw new UDFArgumentException("not supported category for function explode : " + category);
        }
        return ObjectInspectorFactory.getStandardStructObjectInspector(names,types);
    }

    @Override
    public void process(Object[] args) throws HiveException {
   
   
        if (args.length != 1 || Objects.isNull(args[0])){
   
   
            throw new HiveException("Only 1 nonnull arg supported for function explode, but got " + args.length);
        }
        ObjectInspector.Category category = oi.getCategory();
        switch(category){
   
   
            case MAP:
                final Map<?, ?> map = ((MapObjectInspector) oi).getMap(args[0]);
             // map.entrySet().forEach(entry -> {
   
   
             //     params[0] = entry.getKey();
             //     params[1] = entry.getValue();
             //     try {
   
   
             //         forward(params);
             //     } catch (HiveException e) {
   
   
             //         throw new RuntimeException(e);
             //     }
             // });
                final Iterator<? extends Map.Entry<?, ?>> it = map.entrySet().iterator();
                while(it.hasNext()){
   
   
                    final Map.Entry<?, ?> entry = it.next();
                    params[0] = entry.getKey();
                    params[1] = entry.getValue();
                    forward(params);
                }
                break;
            case LIST:
                final List<?> list = ((ListObjectInspector) oi).getList(args[0]);
                final Iterator<?> itl = list.iterator();
                while (itl.hasNext()) {
   
   
                    params[0] = itl.next();
                    forward(params);
                }
                break;
        }
    }

    @Override
    public void close() throws HiveException {
   
   
        oi = null;
        /**
         * 将数组置空
         * 1.数组中存储的是对象的引用。将数组中每个元素设置为null,断开数组和这些对象的链接。使这些对象没有引用指向它们,便于垃圾回收。
         * 2.将数组引用本身置空,告诉垃圾回收器数组本身可以被回收。
         */
        for (int i = 0; i < params.length; i++) {
   
   
            params[i] = null;
        }
        params = null;
    }
}

代码注意点

  1. public class MyExplode extends GenericUDTF 继承GenericUDTF抽象类之后,会自动重构两个方法process()close(),但是,我们需要手动重构另一个方法initialize(ObjectInspector[] argOIs)(ps:只能用这个过期方法才能处理结构化数据类型,用initialize(StructObjectInspector[] argOIs))无法实现。

  2. 根据处理异常的方式选择循环方式

// 1.发生异常后继续执行
//  map.entrySet().forEach(entry -> {
   
   
//      params[0] = entry.getKey();
//      params[1] = entry.getValue();
//      try {
   
   
//          forward(params);
//      } catch (HiveException e) {
   
   
//          ...
//      }
//  });
// 2.发生异常后终止while循环
  final Iterator<? extends Map.Entry<?, ?>> it = map.entrySet().iterator();
  while(it.hasNext()){
   
   
      final Map.Entry<?, ?> entry = it.next();
      params[0] = entry.getKey();
      params[1] = entry.getValue();
      forward(params);
  }
  break;

注释部分的lambda表达式中foreach期望一个consumer接口,而该接口不允许抛出检查型异常,只能尝试在lambda内捕获异常并进行处理。
而改为iterator迭代则可以选择抛出异常。

  1. 将数组置空
    a.数组中存储的是对象的引用。将数组中每个元素设置为null,断开数组和这些对象的链接。使这些对象没有引用指向它们,便于垃圾回收。
    b.将数组引用本身置空,告诉垃圾回收器数组本身可以被回收。

UDAF

UDAF Mode

        PARTITIAL1
            -- 对原始数据进行部分聚合
            -- iterate() & teriminatePartitial() 会被调用
            -- Mapper

        PARTITIAL2
            -- 将部分聚合进行聚合
            -- merge() & teriminatePartitial() 会被调用
            -- Combiner

        FINAL
            -- 将所有的部分聚合进行完全聚合
            -- merge() & terminate() 会被调用
            -- Reducer

        COMPLETE
            -- 直接对原始数据进行全量聚合
            -- iterate() & terminate() 会被调用
            -- Mapper -> Reducer

代码

package cn.ybg.hive.ql.func.udaf;

import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class MySum extends AbstractGenericUDAFResolver {
   
   
    private static Logger logger = LoggerFactory.getLogger(MySum.class);

    // 检查参数类型是否非空且长度为1(是否传入参数都是同一类型)
    private static void checkParam(String content, Object...params) throws SemanticException{
   
   
        if(Objects.isNull(params) || params.length!=1 || Objects.isNull(params[0])){
   
   
            throw new SemanticException(content);
        }
    }

    // getEvaluator():根据输入参数的类型,选择并返回合适的UDAF计算器
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
   
   
        // 检查参数info是否非空
        if(Objects.isNull(info)){
   
   
            throw new SemanticException("From YB12211 : MySum getEvaluator(info) : info NullPointerException");
        }
        // 获取参数的类型信息
        ObjectInspector[] params = info.getParameterObjectInspectors();

        // 子类向上类型转换:自动转换
        checkParam("From YB12211 : MySum getEvaluator(info) : only support one nonnull param",params);

        // 验证参数类型的种类是否为基本数据类型
        ObjectInspector.Category category = params[0].getCategory();
        if(category != ObjectInspector.Category.PRIMITIVE){
   
   
            throw new SemanticException("From YB12211 : MySum getEvaluator(info) : only support primitive type");
        }

        PrimitiveObjectInspector inputIO = (PrimitiveObjectInspector) params[0];
        AbstractSumEvaluator evaluator;
        // 根据参数类型选择相应的计算器
        switch (inputIO.getPrimitiveCategory()){
   
   
            case BYTE: case SHORT: case INT: case LONG:
                evaluator = new SumLong();
                break;
            case FLOAT: case DOUBLE:
                evaluator = new SumDouble();
                break;
            case DECIMAL:
                evaluator = new SumDecimal();
                break;
            default:
                throw new SemanticException("From YB12211 : MySum getEvaluator(info) : doesn't support type of "
                        +inputIO.getPrimitiveCategory());
        }

        // 根据参数设置计算器的”是否开窗“和"是否去重"
        evaluator.setWindowing(info.isWindowing());
        evaluator.setDistinct(info.isDistinct());
        return evaluator;
    }

    // 去除私有
    // AbstractSumEvaluator:实现UDAF中SUM函数的通用逻辑
    // T表示SUM的结果数据类型,通常是 DoubleWritable、LongWritable 或 HiveDecimalWritable。
    static abstract class AbstractSumEvaluator<T extends Writable> extends GenericUDAFEvaluator{
   
   
        // AbstractSumAgg是SUM函数的聚合缓冲区
        // E表示SUM的中间结果类型
        abstract class AbstractSumAgg<E> extends AbstractAggregationBuffer{
   
   
            // 标识聚合缓冲区是否为空
            boolean empty;
            E agg;
            // 类型差异,不一定是 E
            // 如果使用DISTINCT关键字进行聚合计算,会用它检测唯一性
            Set<Object> unique;

            // 去构造器
            /*public AbstractSumAgg() {
                reset();
            }*/

            public boolean isEmpty() {
   
   
                return empty;
            }

            // 添加类型差异
            boolean add(Object parameter){
   
   
                if(empty){
   
   
                    empty = false;
                }
                // 类型转换
                if (isWindowingAndDistinct()) {
   
   
                    // 将参数值parameter转化为java对象obj,便于后续进行唯一性检查
                    Object obj = parameter instanceof ObjectInspectorObject ?
                            (ObjectInspectorObject) parameter :
                            ObjectInspectorUtils.copyToStandardJavaObject(parameter,inputIO);
                    if(unique.contains(obj)){
   
   
                        return false;
                    }else{
   
    // 忘了半个逻辑
                        unique.add(obj);
                    }
                }
                return true;
            }

            // 重置聚合缓冲区的状态,将`empty`置为`true`,并清空`unique`集合(如果使用DISTINCT关键字的话)
            void reset(){
   
   
                empty = true;
                if (isWindowingAndDistinct()) {
   
   
                    if(Objects.nonNull(unique)){
   
   
                        if(!unique.isEmpty()){
   
   
                            unique.clear();
                        }
                    }else{
   
   
                        unique = new HashSet<>();
                    }
                }
            }
        }

        // 属性迁移
        PrimitiveObjectInspector inputIO;
        PrimitiveObjectInspector outputIO;

        boolean isWindowing;
        boolean isDistinct;

        // 去除构造器
        /*public AbstractSumEvaluator(boolean isWindowing, boolean isDistinct) {
            this.isWindowing = isWindowing;
            this.isDistinct = isDistinct;
        }*/

        // 新增 setter
        void setWindowing(boolean windowing) {
   
   
            isWindowing = windowing;
        }

        void setDistinct(boolean distinct) {
   
   
            isDistinct = distinct;
        }

        // willInit 放回
        void checkParamsAndInit(ObjectInspector[] params,Mode mode, boolean willInit, String content)
                throws HiveException {
   
   
            checkParam(content,params);
            // 初始化UDAF计算器的模式和参数
            super.init(mode, params);
            // 根据需要初始化输入输出类型
            if (willInit) {
   
   
                inputIO = (PrimitiveObjectInspector) params[0];
                // 将输入的ObjectInspector转化为输出的标准Java ObjectInspector
                outputIO = (PrimitiveObjectInspector) ObjectInspectorUtils
                        .getStandardObjectInspector(inputIO, ObjectInspectorUtils.ObjectInspectorCopyOption.JAVA);
            }
        }

        boolean isWindowingAndDistinct(){
   
   
            return isWindowing && isDistinct;
        }

        // terminatePartial 方法用于计算部分聚合结果,但如果启用了DISTINCT属性,则会抛出异常,因为DISTINCT不支持部分聚合。
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
   
   
            if (isWindowingAndDistinct()) {
   
   
                throw new HiveException("From YB12211 : distinct sum doesn't support terminatePartial(AggregationBuffer agg)");
            }
            return terminate(agg);
        }
    }

    static class SumDecimal extends AbstractSumEvaluator<HiveDecimalWritable>{
   
   
        class SumAggDecimal extends AbstractSumAgg<HiveDecimalWritable>{
   
   
            // 调用reset()将其重置为初始状态
            public SumAggDecimal() {
   
   
                // 变动
                reset();
            }

            // 父类负责对`待加数`进行校验,子类负责实现真正的添加
            @Override
            boolean add(Object parameter) {
   
   
                HiveDecimal value = PrimitiveObjectInspectorUtils.getHiveDecimal(parameter, inputIO);
                if (super.add(value)) {
   
   
                    agg.mutateAdd(value);
                    return true;
                }
                return false;
            }

            @Override
            void reset() {
   
   
                super.reset();
                // 空指针异常
                if(Objects.isNull(agg)){
   
   
                    agg = new HiveDecimalWritable(HiveDecimal.ZERO);
                }else{
   
   
                    agg.set(HiveDecimal.ZERO);
                }
            }
        }

        // 用于初始化计算器的状态
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
   
   
            checkParamsAndInit(parameters,m,false,"From YB12211 : SumDecimal.init(Mode m, ObjectInspector[] parameters) parameters can't be NULL");

            // 考虑到取值范围:将长度放大
            inputIO = (PrimitiveObjectInspector) parameters[0];

            int precision = inputIO.precision();
            int scale = inputIO.scale();
            switch (m){
   
   
                // 在部分聚合(PARTIAL1)和最终聚合(COMPLETE)阶段,SUM 函数需要对输入的 DECIMAL 类型数据进行累加,并且可能会产生更大精度的结果。为了确保计算不会丢失精度,需要在这些阶段增加精度。
                case PARTIAL1: case COMPLETE:
                    precision = Math.min(precision+10, HiveDecimal.MAX_PRECISION);
                    break;
            }
            DecimalTypeInfo decimalTypeInfo = TypeInfoFactory.getDecimalTypeInfo(precision, scale);
            /**
             * PrimitiveObjectInspector
             *      AbstractPrimitiveObjectInspector
             *          AbstractPrimitiveWritableObjectInspector
             */
            outputIO = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(decimalTypeInfo);
            outputIO = (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(outputIO);
            return inputIO;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
   
   
            return new SumAggDecimal();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
   
   
            ((SumAggDecimal)agg).reset();
        }

        /*private HiveDecimalWritable toHiveDecimalWritable(Object...value) throws HiveException {
            checkParam(value,"From YB12211 : SumDecimal.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
            return new HiveDecimalWritable(PrimitiveObjectInspectorUtils.getHiveDecimal(value, inputIO));
        }*/

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
   
   
            //((SumAggDecimal)agg).add(toHiveDecimalWritable(parameters[0]));
            checkParam("From YB12211 : SumDecimal.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
            ((SumAggDecimal)agg).add(parameters[0]);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
   
   
            //((SumAggDecimal)agg).add(toHiveDecimalWritable(partial));
            checkParam("From YB12211 : SumDecimal.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
            ((SumAggDecimal)agg).add(partial);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
   
   
            SumAggDecimal sumAgg = (SumAggDecimal) agg;
            if(sumAgg.isEmpty()){
   
   
                return null;
            }
            return sumAgg.agg;
        }
    }

    static class SumDouble extends AbstractSumEvaluator<DoubleWritable>{
   
   
        class SumAggDouble extends AbstractSumAgg<DoubleWritable>{
   
   
            public SumAggDouble() {
   
   
                // 变动
                reset();
            }

            @Override
            boolean add(Object parameter) {
   
   
                double value = PrimitiveObjectInspectorUtils.getDouble(parameter, inputIO);
                if (super.add(value)) {
   
   
                    agg.set(agg.get()+value);
                    return true;
                }
                return false;
            }

            @Override
            void reset() {
   
   
                super.reset();
                // 空指针异常
                if(Objects.isNull(agg)){
   
   
                    agg = new DoubleWritable(0.0);
                }else{
   
   
                    agg.set(0.0);
                }
            }
        }

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
   
   
            checkParamsAndInit(parameters,m,true,"From YB12211 : SumDouble.init(Mode m, ObjectInspector[] parameters) parameters can't be null");
            return inputIO;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
   
   
            return new SumAggDouble();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
   
   
            ((SumAggDouble)agg).reset();
        }

        /*private DoubleWritable toDouble(Object...value) throws HiveException {
            checkParam(value,"From YB12211 : SumDouble.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
            return new DoubleWritable(PrimitiveObjectInspectorUtils.getDouble(value[0], inputIO));
        }*/

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
   
   
            //((SumAggDouble)agg).add(toDouble(parameters[0]));
            checkParam("From YB12211 : SumDouble.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
            ((SumAggDouble)agg).add(parameters[0]);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
   
   
            //((SumAggDouble)agg).add(toDouble(partial));
            checkParam("From YB12211 : SumDouble.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
            ((SumAggDouble)agg).add(partial);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
   
   
            SumAggDouble sumAgg = (SumAggDouble) agg;
            if(sumAgg.isEmpty()){
   
   
                return null;
            }
            return sumAgg.agg;
        }
    }

    static class SumLong extends AbstractSumEvaluator<LongWritable>{
   
   
        class SumAggLong extends AbstractSumAgg<LongWritable>{
   
   
            public SumAggLong() {
   
   
                // 变动
                reset();
            }

            @Override
            boolean add(Object parameter) {
   
   
                long value = PrimitiveObjectInspectorUtils.getLong(parameter, inputIO);
                if (super.add(value)) {
   
   
                    agg.set(agg.get()+value);
                    return true;
                }
                return false;
            }

            @Override
            void reset() {
   
   
                super.reset();
                // 空指针异常
                if(Objects.isNull(agg)){
   
   
                    agg = new LongWritable(0);
                }else{
   
   
                    agg.set(0);
                }
            }
        }

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
   
   
            checkParamsAndInit(parameters,m,true,"From YB12211 : SumLong.init(Mode m, ObjectInspector[] parameters) parameters can't be null");
            return inputIO;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
   
   
            return new SumAggLong();
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
   
   
            ((SumAggLong)agg).reset();
        }

        /*private LongWritable toLong(Object...value) throws HiveException {
            checkParam(value,"From YB12211 : SumLong.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
            return new LongWritable(PrimitiveObjectInspectorUtils.getLong(value[0], inputIO));
        }*/

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
   
   
            //((SumAggLong)agg).add(toLong(parameters[0]));
            checkParam("From YB12211 : SumLong.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
            ((SumAggLong)agg).add(parameters[0]);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
   
   
            //((SumAggLong)agg).add(toLong(partial));
            checkParam("From YB12211 : SumLong.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
            ((SumAggLong)agg).add(partial);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
   
   
            SumAggLong sumAgg = (SumAggLong) agg;
            if(sumAgg.isEmpty()){
   
   
                return null;
            }
            return sumAgg.agg;
        }
    }
}

UDAF的创建与配置

同上

UDAF流程

  1. 创建UDAF的Resolver类(继承于AbstractGenericUDAFResolver)
  2. 在Resolver类中实现getEvaluator方法:返回合适类型的UDAF计算器
  3. 创建计算器类(Evaluator类,通常继承于GenericUDAFEvaluator)
    Evaluator类中实现具体的UDAF函数逻辑和其他和新方法(reset,interate,merge...)
  4. 创建Evaluator类的具体子类

UDF UDTF UDAF的区别

UDF:一进一出
特殊的多进一出(进的部分并列,如:concat)
UDTF:一进多出(列转行,如:explode)
UDAF:多进一出

目录
相关文章
|
3月前
|
SQL JavaScript 前端开发
Hive根据用户自定义函数、reflect函数和窗口分析函数
Hive根据用户自定义函数、reflect函数和窗口分析函数
39 6
|
7月前
|
SQL 缓存 Java
Hive 之 UDF 运用(包会的)
Hive的UDF允许用户自定义数据处理函数,扩展其功能。`reflect()`函数通过Java反射调用JDK中的方法,如静态或实例方法。例如,调用`MathUtils.addNumbers()`进行加法运算。要创建自定义UDF,可以继承`GenericUDF`,实现`initialize`、`evaluate`和`getDisplayString`方法。在`initialize`中检查参数类型,在`evaluate`中执行业务逻辑。最后,打包项目成JAR,上传到HDFS,并在Hive中注册以供使用。
201 2
|
7月前
|
SQL Java HIVE
Hive高频面试题之UDTF实现多行输出
Hive高频面试题之UDTF实现多行输出
57 0
|
7月前
|
SQL Java 程序员
Hive反射函数的使用-程序员是怎么学UDF函数的
Hive反射函数的使用-程序员是怎么学UDF函数的
43 0
|
7月前
|
SQL Java 数据处理
【Hive】Hive的函数:UDF、UDAF、UDTF的区别?
【4月更文挑战第17天】【Hive】Hive的函数:UDF、UDAF、UDTF的区别?
|
7月前
|
SQL 消息中间件 Apache
Flink报错问题之使用hive udf函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
7月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
198 1
|
7月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
278 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
47 0
|
5月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。