Hive笔记05 -- Hive UDF UDTF UDAF
UDF
UDF在Hive中的实现
UDF的创建与配置
类名定义规则
示例:com.ybg.hive.ql.func.udf.UDFDateDiffByUnit
规则:反向域名+模块名+功能分类(ql.func.udf:hive查询语言中的UDF函数)+具体功能|类名
基本配置
New Project - Maven模板
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
UDF核心
1.参数类型和参数值分开管理
2.将共性的校验写在接口中
UDF示例:
主类:UDFDateDiffByUnit extends GenericUDF implements UDFCom,DateCom
目的:计算两个日期之间的差异,可以按年、季、月、周或日计算。
方法:initialize
:参数验证并定义UDF的返回类型
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
validateArgs(arguments,3);
validateAllPrimitiveArgs(arguments,3);
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
evaluate
: 具体的数据校验
核心计算方法,计算两个日期之间的差异。
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
validateArgs(arguments,3);
final String strDateSmall = arguments[0].get().toString();
final String strDateBig = arguments[1].get().toString();
validateDateFormat(strDateSmall,strDateBig);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar dateSmall = Calendar.getInstance();
Calendar dateBig = Calendar.getInstance();
try {
dateSmall.setTime(sdf.parse(strDateSmall));
dateBig.setTime(sdf.parse(strDateBig));
} catch (ParseException e) {
throw new HiveException(e);
}
if(dateSmall.after(dateBig)){
throw new HiveException("dateSmall by arg1 > dateBig by arg2");
}
final String unit = arguments[2].get().toString().toLowerCase();
int intUnit = 0;
switch(unit){
case "y":
intUnit = Calendar.YEAR;
break;
case "q": case "m":
intUnit = Calendar.MONTH;
break;
case "w": case "d":
intUnit = Calendar.DATE;
break;
default:
throw new HiveException("Unsupported unit by arg3 :"+unit);
}
int diff = -1;
while(true){
diff++;
dateSmall.add(intUnit,1);
if(dateSmall.after(dateBig)){
break;
}
}
switch(unit){
case "q":
diff/=3;
break;
case "w":
diff/=7;
break;
}
return diff;
}
getDisplayString
:提供函数及其参数的描述(并不重要)
@Override
public String getDisplayString(String[] children) {
return Objects.isNull(children) || children.length == 0 || null == children[0] ? null : children[0];
}
eg:对于"两数相加的UDF"
@Override
public String getDisplayString(String[] children){
if(children == null || children.length<2){
// 重述函数用法
return "Usage:MyAddFunction(int,int)";
}
// 查询解释
return "MyAddFunction(" + children[0] + ", " + children[1] + ")";
}
接口一:DateCom
功能:提供日期相关的辅助方法。validateDateFormat
:验证日期格式是否符合 yyyy-MM-dd。
default void validateDateFormat(String...dateStrArr) throws HiveException {
for (String dateStr : dateStrArr) {
// dateStr.matches("\\d{4}-(0?[1-9]|1[0-2])-(0?[1-9]|[1-2][0-9]|3[0-1])")
if (!dateStr.matches("\\d{4}-(0?[1-9]|1[0-2])-(0?[1-9]|[1-2][0-9]|3[0-1])")) {
throw new HiveException("date format illegal : " + dateStr);
}
}
}
接口二:UDFCom
功能:提供通用的参数验证方法。validateArgs
:检查传入的参数数量是否正确,以及是否有空参数。validateAllPrimitiveArgs
:确保所有参数都是原始类型。
原始类型指的是最基本的数据类型,直接包含了数据的值。(byte short int long float double char boolean string✔)
/**
* @param args 实际参数数组
* @param size 预期参数个数
* @throws HiveException
*/
// 实现非具体的通用校验
default void validateArgs(Object[] args,int size) throws UDFArgumentException {
if (size>0 && (Objects.isNull(args) || args.length < size)) {
// 检验提供的参数数量是否满足size个
throw new UDFArgumentException(size+" args must be provided.");
}
for (int i = 0; i < size; i++) {
// 检测某一参数是否为空
if (Objects.isNull(args[i])) {
throw new UDFArgumentException("type of args["+i+"] null");
}
}
}
// 验证参数的类型是否为原始类型
default void validateAllPrimitiveArgs(Object[] args, int size) throws UDFArgumentException{
for (int i = 0; i < size; i++) {
if (((ObjectInspector)args[i]).getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("only support primitive type");
}
}
}
打jar包上传至 HDFS 的两种方式:
-- install => 打资源jar包,直接将jar包打入到 maven localRepository
-- package => 打执行jar包,直接将jar包打入到 project target ✔
Hive UDF集成到Hive查询环境的四步骤:
- 打包(Package)
package打架包 - 找包(Locate Package)
架包位于target处,show in explorer显示物理路径 - 上传(Upload)
复制架包路径并上传到HDFS上。 - 创建Hive UDF映射至HDFS上的JAR文件,并且指定了UDF实现的完整类名。
create function FUNC_NAME as 'com.ybg.hive.ql.func.udf.UDFDateDiffByUnit'(主类的全包路径)
using jar 'hdfs://single01:9000/hive_data/udf/hiveudf2-1.0-SNAPSHOT.jar';(hdfs://single01:9000+HDFS中架包存放路径)
问题与解决方法:
如果架包删除后重新上传会出现"UDF按照前一个架包方式继续运行"的情况
解决方式是:close project
之后重新打开project(重新连接)。
UDTF
UDTF的创建与配置
与UDF相同
UDTF示例
package com.ybg.hive.ql.func.udtf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class MyExplode extends GenericUDTF {
private static Logger logger = LoggerFactory.getLogger(MyExplode.class);
private ObjectInspector oi;
private Object[] params;
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
oi = argOIs[0];
final ObjectInspector.Category category = oi.getCategory();
// names 和 types分别存储数据的名称和类型
List<String> names = new ArrayList<>(2);
List<ObjectInspector> types = new ArrayList<>(2);
switch (category){
// 默认的名字
case MAP:
logger.info("receive explode category : Map");
names.add("key");
names.add("value");
final MapObjectInspector moi = (MapObjectInspector) this.oi;
types.add(moi.getMapKeyObjectInspector());
types.add(moi.getMapValueObjectInspector());
params = new Object[2];
break;
case LIST:
logger.info("receive explode category : List");
names.add("value");
final ListObjectInspector loi = (ListObjectInspector) oi;
types.add(loi.getListElementObjectInspector());
params = new Object[1];
break;
default:
throw new UDFArgumentException("not supported category for function explode : " + category);
}
return ObjectInspectorFactory.getStandardStructObjectInspector(names,types);
}
@Override
public void process(Object[] args) throws HiveException {
if (args.length != 1 || Objects.isNull(args[0])){
throw new HiveException("Only 1 nonnull arg supported for function explode, but got " + args.length);
}
ObjectInspector.Category category = oi.getCategory();
switch(category){
case MAP:
final Map<?, ?> map = ((MapObjectInspector) oi).getMap(args[0]);
// map.entrySet().forEach(entry -> {
// params[0] = entry.getKey();
// params[1] = entry.getValue();
// try {
// forward(params);
// } catch (HiveException e) {
// throw new RuntimeException(e);
// }
// });
final Iterator<? extends Map.Entry<?, ?>> it = map.entrySet().iterator();
while(it.hasNext()){
final Map.Entry<?, ?> entry = it.next();
params[0] = entry.getKey();
params[1] = entry.getValue();
forward(params);
}
break;
case LIST:
final List<?> list = ((ListObjectInspector) oi).getList(args[0]);
final Iterator<?> itl = list.iterator();
while (itl.hasNext()) {
params[0] = itl.next();
forward(params);
}
break;
}
}
@Override
public void close() throws HiveException {
oi = null;
/**
* 将数组置空
* 1.数组中存储的是对象的引用。将数组中每个元素设置为null,断开数组和这些对象的链接。使这些对象没有引用指向它们,便于垃圾回收。
* 2.将数组引用本身置空,告诉垃圾回收器数组本身可以被回收。
*/
for (int i = 0; i < params.length; i++) {
params[i] = null;
}
params = null;
}
}
代码注意点
public class MyExplode extends GenericUDTF
继承GenericUDTF
抽象类之后,会自动重构两个方法process()
和close()
,但是,我们需要手动重构另一个方法initialize(ObjectInspector[] argOIs)
(ps:只能用这个过期方法才能处理结构化数据类型,用initialize(StructObjectInspector[] argOIs))
无法实现。根据处理异常的方式选择循环方式
// 1.发生异常后继续执行
// map.entrySet().forEach(entry -> {
// params[0] = entry.getKey();
// params[1] = entry.getValue();
// try {
// forward(params);
// } catch (HiveException e) {
// ...
// }
// });
// 2.发生异常后终止while循环
final Iterator<? extends Map.Entry<?, ?>> it = map.entrySet().iterator();
while(it.hasNext()){
final Map.Entry<?, ?> entry = it.next();
params[0] = entry.getKey();
params[1] = entry.getValue();
forward(params);
}
break;
注释部分的lambda表达式中foreach期望一个consumer接口,而该接口不允许抛出检查型异常,只能尝试在lambda内捕获异常并进行处理。
而改为iterator迭代则可以选择抛出异常。
- 将数组置空
a.数组中存储的是对象的引用。将数组中每个元素设置为null,断开数组和这些对象的链接。使这些对象没有引用指向它们,便于垃圾回收。
b.将数组引用本身置空,告诉垃圾回收器数组本身可以被回收。
UDAF
UDAF Mode
PARTITIAL1
-- 对原始数据进行部分聚合
-- iterate() & teriminatePartitial() 会被调用
-- Mapper
PARTITIAL2
-- 将部分聚合进行聚合
-- merge() & teriminatePartitial() 会被调用
-- Combiner
FINAL
-- 将所有的部分聚合进行完全聚合
-- merge() & terminate() 会被调用
-- Reducer
COMPLETE
-- 直接对原始数据进行全量聚合
-- iterate() & terminate() 会被调用
-- Mapper -> Reducer
代码
package cn.ybg.hive.ql.func.udaf;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class MySum extends AbstractGenericUDAFResolver {
private static Logger logger = LoggerFactory.getLogger(MySum.class);
// 检查参数类型是否非空且长度为1(是否传入参数都是同一类型)
private static void checkParam(String content, Object...params) throws SemanticException{
if(Objects.isNull(params) || params.length!=1 || Objects.isNull(params[0])){
throw new SemanticException(content);
}
}
// getEvaluator():根据输入参数的类型,选择并返回合适的UDAF计算器
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
// 检查参数info是否非空
if(Objects.isNull(info)){
throw new SemanticException("From YB12211 : MySum getEvaluator(info) : info NullPointerException");
}
// 获取参数的类型信息
ObjectInspector[] params = info.getParameterObjectInspectors();
// 子类向上类型转换:自动转换
checkParam("From YB12211 : MySum getEvaluator(info) : only support one nonnull param",params);
// 验证参数类型的种类是否为基本数据类型
ObjectInspector.Category category = params[0].getCategory();
if(category != ObjectInspector.Category.PRIMITIVE){
throw new SemanticException("From YB12211 : MySum getEvaluator(info) : only support primitive type");
}
PrimitiveObjectInspector inputIO = (PrimitiveObjectInspector) params[0];
AbstractSumEvaluator evaluator;
// 根据参数类型选择相应的计算器
switch (inputIO.getPrimitiveCategory()){
case BYTE: case SHORT: case INT: case LONG:
evaluator = new SumLong();
break;
case FLOAT: case DOUBLE:
evaluator = new SumDouble();
break;
case DECIMAL:
evaluator = new SumDecimal();
break;
default:
throw new SemanticException("From YB12211 : MySum getEvaluator(info) : doesn't support type of "
+inputIO.getPrimitiveCategory());
}
// 根据参数设置计算器的”是否开窗“和"是否去重"
evaluator.setWindowing(info.isWindowing());
evaluator.setDistinct(info.isDistinct());
return evaluator;
}
// 去除私有
// AbstractSumEvaluator:实现UDAF中SUM函数的通用逻辑
// T表示SUM的结果数据类型,通常是 DoubleWritable、LongWritable 或 HiveDecimalWritable。
static abstract class AbstractSumEvaluator<T extends Writable> extends GenericUDAFEvaluator{
// AbstractSumAgg是SUM函数的聚合缓冲区
// E表示SUM的中间结果类型
abstract class AbstractSumAgg<E> extends AbstractAggregationBuffer{
// 标识聚合缓冲区是否为空
boolean empty;
E agg;
// 类型差异,不一定是 E
// 如果使用DISTINCT关键字进行聚合计算,会用它检测唯一性
Set<Object> unique;
// 去构造器
/*public AbstractSumAgg() {
reset();
}*/
public boolean isEmpty() {
return empty;
}
// 添加类型差异
boolean add(Object parameter){
if(empty){
empty = false;
}
// 类型转换
if (isWindowingAndDistinct()) {
// 将参数值parameter转化为java对象obj,便于后续进行唯一性检查
Object obj = parameter instanceof ObjectInspectorObject ?
(ObjectInspectorObject) parameter :
ObjectInspectorUtils.copyToStandardJavaObject(parameter,inputIO);
if(unique.contains(obj)){
return false;
}else{
// 忘了半个逻辑
unique.add(obj);
}
}
return true;
}
// 重置聚合缓冲区的状态,将`empty`置为`true`,并清空`unique`集合(如果使用DISTINCT关键字的话)
void reset(){
empty = true;
if (isWindowingAndDistinct()) {
if(Objects.nonNull(unique)){
if(!unique.isEmpty()){
unique.clear();
}
}else{
unique = new HashSet<>();
}
}
}
}
// 属性迁移
PrimitiveObjectInspector inputIO;
PrimitiveObjectInspector outputIO;
boolean isWindowing;
boolean isDistinct;
// 去除构造器
/*public AbstractSumEvaluator(boolean isWindowing, boolean isDistinct) {
this.isWindowing = isWindowing;
this.isDistinct = isDistinct;
}*/
// 新增 setter
void setWindowing(boolean windowing) {
isWindowing = windowing;
}
void setDistinct(boolean distinct) {
isDistinct = distinct;
}
// willInit 放回
void checkParamsAndInit(ObjectInspector[] params,Mode mode, boolean willInit, String content)
throws HiveException {
checkParam(content,params);
// 初始化UDAF计算器的模式和参数
super.init(mode, params);
// 根据需要初始化输入输出类型
if (willInit) {
inputIO = (PrimitiveObjectInspector) params[0];
// 将输入的ObjectInspector转化为输出的标准Java ObjectInspector
outputIO = (PrimitiveObjectInspector) ObjectInspectorUtils
.getStandardObjectInspector(inputIO, ObjectInspectorUtils.ObjectInspectorCopyOption.JAVA);
}
}
boolean isWindowingAndDistinct(){
return isWindowing && isDistinct;
}
// terminatePartial 方法用于计算部分聚合结果,但如果启用了DISTINCT属性,则会抛出异常,因为DISTINCT不支持部分聚合。
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
if (isWindowingAndDistinct()) {
throw new HiveException("From YB12211 : distinct sum doesn't support terminatePartial(AggregationBuffer agg)");
}
return terminate(agg);
}
}
static class SumDecimal extends AbstractSumEvaluator<HiveDecimalWritable>{
class SumAggDecimal extends AbstractSumAgg<HiveDecimalWritable>{
// 调用reset()将其重置为初始状态
public SumAggDecimal() {
// 变动
reset();
}
// 父类负责对`待加数`进行校验,子类负责实现真正的添加
@Override
boolean add(Object parameter) {
HiveDecimal value = PrimitiveObjectInspectorUtils.getHiveDecimal(parameter, inputIO);
if (super.add(value)) {
agg.mutateAdd(value);
return true;
}
return false;
}
@Override
void reset() {
super.reset();
// 空指针异常
if(Objects.isNull(agg)){
agg = new HiveDecimalWritable(HiveDecimal.ZERO);
}else{
agg.set(HiveDecimal.ZERO);
}
}
}
// 用于初始化计算器的状态
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
checkParamsAndInit(parameters,m,false,"From YB12211 : SumDecimal.init(Mode m, ObjectInspector[] parameters) parameters can't be NULL");
// 考虑到取值范围:将长度放大
inputIO = (PrimitiveObjectInspector) parameters[0];
int precision = inputIO.precision();
int scale = inputIO.scale();
switch (m){
// 在部分聚合(PARTIAL1)和最终聚合(COMPLETE)阶段,SUM 函数需要对输入的 DECIMAL 类型数据进行累加,并且可能会产生更大精度的结果。为了确保计算不会丢失精度,需要在这些阶段增加精度。
case PARTIAL1: case COMPLETE:
precision = Math.min(precision+10, HiveDecimal.MAX_PRECISION);
break;
}
DecimalTypeInfo decimalTypeInfo = TypeInfoFactory.getDecimalTypeInfo(precision, scale);
/**
* PrimitiveObjectInspector
* AbstractPrimitiveObjectInspector
* AbstractPrimitiveWritableObjectInspector
*/
outputIO = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(decimalTypeInfo);
outputIO = (PrimitiveObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(outputIO);
return inputIO;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
return new SumAggDecimal();
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((SumAggDecimal)agg).reset();
}
/*private HiveDecimalWritable toHiveDecimalWritable(Object...value) throws HiveException {
checkParam(value,"From YB12211 : SumDecimal.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
return new HiveDecimalWritable(PrimitiveObjectInspectorUtils.getHiveDecimal(value, inputIO));
}*/
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
//((SumAggDecimal)agg).add(toHiveDecimalWritable(parameters[0]));
checkParam("From YB12211 : SumDecimal.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
((SumAggDecimal)agg).add(parameters[0]);
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
//((SumAggDecimal)agg).add(toHiveDecimalWritable(partial));
checkParam("From YB12211 : SumDecimal.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
((SumAggDecimal)agg).add(partial);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
SumAggDecimal sumAgg = (SumAggDecimal) agg;
if(sumAgg.isEmpty()){
return null;
}
return sumAgg.agg;
}
}
static class SumDouble extends AbstractSumEvaluator<DoubleWritable>{
class SumAggDouble extends AbstractSumAgg<DoubleWritable>{
public SumAggDouble() {
// 变动
reset();
}
@Override
boolean add(Object parameter) {
double value = PrimitiveObjectInspectorUtils.getDouble(parameter, inputIO);
if (super.add(value)) {
agg.set(agg.get()+value);
return true;
}
return false;
}
@Override
void reset() {
super.reset();
// 空指针异常
if(Objects.isNull(agg)){
agg = new DoubleWritable(0.0);
}else{
agg.set(0.0);
}
}
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
checkParamsAndInit(parameters,m,true,"From YB12211 : SumDouble.init(Mode m, ObjectInspector[] parameters) parameters can't be null");
return inputIO;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
return new SumAggDouble();
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((SumAggDouble)agg).reset();
}
/*private DoubleWritable toDouble(Object...value) throws HiveException {
checkParam(value,"From YB12211 : SumDouble.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
return new DoubleWritable(PrimitiveObjectInspectorUtils.getDouble(value[0], inputIO));
}*/
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
//((SumAggDouble)agg).add(toDouble(parameters[0]));
checkParam("From YB12211 : SumDouble.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
((SumAggDouble)agg).add(parameters[0]);
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
//((SumAggDouble)agg).add(toDouble(partial));
checkParam("From YB12211 : SumDouble.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
((SumAggDouble)agg).add(partial);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
SumAggDouble sumAgg = (SumAggDouble) agg;
if(sumAgg.isEmpty()){
return null;
}
return sumAgg.agg;
}
}
static class SumLong extends AbstractSumEvaluator<LongWritable>{
class SumAggLong extends AbstractSumAgg<LongWritable>{
public SumAggLong() {
// 变动
reset();
}
@Override
boolean add(Object parameter) {
long value = PrimitiveObjectInspectorUtils.getLong(parameter, inputIO);
if (super.add(value)) {
agg.set(agg.get()+value);
return true;
}
return false;
}
@Override
void reset() {
super.reset();
// 空指针异常
if(Objects.isNull(agg)){
agg = new LongWritable(0);
}else{
agg.set(0);
}
}
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
checkParamsAndInit(parameters,m,true,"From YB12211 : SumLong.init(Mode m, ObjectInspector[] parameters) parameters can't be null");
return inputIO;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
return new SumAggLong();
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((SumAggLong)agg).reset();
}
/*private LongWritable toLong(Object...value) throws HiveException {
checkParam(value,"From YB12211 : SumLong.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null");
return new LongWritable(PrimitiveObjectInspectorUtils.getLong(value[0], inputIO));
}*/
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
//((SumAggLong)agg).add(toLong(parameters[0]));
checkParam("From YB12211 : SumLong.iterate(AggregationBuffer agg, Object[] parameters) parameters can't be null",parameters);
((SumAggLong)agg).add(parameters[0]);
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
//((SumAggLong)agg).add(toLong(partial));
checkParam("From YB12211 : SumLong.merge(AggregationBuffer agg, Object partial) partial can't be null",partial);
((SumAggLong)agg).add(partial);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
SumAggLong sumAgg = (SumAggLong) agg;
if(sumAgg.isEmpty()){
return null;
}
return sumAgg.agg;
}
}
}
UDAF的创建与配置
同上
UDAF流程
- 创建UDAF的Resolver类(继承于
AbstractGenericUDAFResolver
) - 在Resolver类中实现
getEvaluator
方法:返回合适类型的UDAF计算器 - 创建计算器类(Evaluator类,通常继承于
GenericUDAFEvaluator
)
在Evaluator
类中实现具体的UDAF函数逻辑和其他和新方法(reset
,interate
,merge
...) - 创建
Evaluator
类的具体子类
UDF UDTF UDAF的区别
UDF:一进一出
特殊的多进一出(进的部分并列,如:concat)
UDTF:一进多出(列转行,如:explode)
UDAF:多进一出