前言
UDTF 这玩意对数仓同学来讲,熟悉又陌生,主要一方面是大量接触,另一方面是理解上有误导,还一个就是不是太明白里头到底咋回事。
场景切入
关于UDTF面试场景大概有以下的问题:
1、hive的udf你了解么,常用都有哪些类型
2、行专列操作了解么,里头是怎么实现的
3、比较直白的问法,udtf你了解么
4、关于hive的优化方式,udtf其实是考察之一
背后的原因:
1、首先实际线上滥用很多,数据膨胀、倾斜等,导数很严重的问题,实际点的例子,线上碰到因为udtf膨胀4-5个小时的运行时间,优化之后直接提到2分钟,是很吐槽的一点
2、期望对udf了解的全面性,因为实际考察的时候大家觉得知道了解的时候,还以为面试官为啥问那么简单的问题,还有点暗喜
3、相对来说是比较实用的etl中的操作
进场
UDTF其实在源码中就有直接的描述:
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
/** * A Generic User-defined Table Generating Function (UDTF) * Generates a variable number of output rows for a single input row. Useful for * explode(array)... */
其实字面上的含义就是用户定义表的一个函数,这个函数实现的是单行输入的情况下输出多行的结果,udtf中有两个最基础的接口定义,那就是定义输出表中的的字段,输出的数据内容
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException public void process(Object[] args) throws HiveException
我们期望udf直接帮我们产生这么一个表格
为了封装表格的列信息,我们进行封装
public Schema(String column, JavaStringObjectInspector type) { this.column = column; this.type = type; }
这样一来,我们就可以对输出列进行定义了
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { Schema[] schema={ new Schema("student_id",PrimitiveObjectInspectorFactory.javaStringObjectInspector), new Schema("student_name",PrimitiveObjectInspectorFactory.javaStringObjectInspector), new Schema("age",PrimitiveObjectInspectorFactory.javaStringObjectInspector), new Schema("gender",PrimitiveObjectInspectorFactory.javaStringObjectInspector) }; ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); for(int i = 0 ; i < schema.length; i++){ fieldNames.add(schema[i].column); fieldOIs.add(schema[i].type); } return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); }
其实就是定义出结构的对象。下一步,我们进行数据输出
public void process(Object[] args) throws HiveException { String[][] results={ {"S001","李强","12","男"}, {"S002","李强","15","男"}, {"S003","李军","13","男"}, {"S004","王倩雪","12","女"}, {"S005","汪玉珍","12","女"}, }; for(int i = 0; i < results.length; i ++){ forward(results[i]); } }
来点简单又不失风度的小代码测试一下
create temporary function show as 'org.apache.spark.udf.UDTF01'; select show();
结果如下:
+----------+------------+---+------+ |student_id|student_name|age|gender| +----------+------------+---+------+ | S001| 李强| 12| 男| | S002| 李强| 15| 男| | S003| 李军| 13| 男| | S004| 王倩雪| 12| 女| | S005| 汪玉珍| 12| 女| +----------+------------+---+------+
可不是咋滴,这玩意不就是一个表么,这样也可以表达出这类函数本来的功能了。
进击
当然,前面为了表达式udtf的本来作用,直接数据上定死了,这样的做法就是永远都是输出5行一样的数据,这是表示一行输入之后的结果,当我们有多行查询的时候,结果就不断重复了。
我们再来点小测试:
drop table if exists t; create table if not exists t (id int); insert into t select 1; insert into t select 2; select id,show() from t;
结果如下:
+---+----------+------------+---+------+ | id|student_id|student_name|age|gender| +---+----------+------------+---+------+ | 2| S001| 李强| 12| 男| | 2| S002| 李强| 15| 男| | 2| S003| 李军| 13| 男| | 2| S004| 王倩雪| 12| 女| | 2| S005| 汪玉珍| 12| 女| | 1| S001| 李强| 12| 男| | 1| S002| 李强| 15| 男| | 1| S003| 李军| 13| 男| | 1| S004| 王倩雪| 12| 女| | 1| S005| 汪玉珍| 12| 女| +---+----------+------------+---+------+
这个结果就是为每一行输出了udtf中的数据。
实际的结果我们是基于原有的数据进行加工,再输出,由于加工的结果是一行输出多行的,所以这类的需求底层就得udtf的实现。
我们看一下hive源码中为我们提供了哪些udtf函数,位于:org.apache.hadoop.hive.ql.exec.FunctionRegistry 中,我们定位到这么几行 Generic UDTF’s:
// Generic UDTF's system.registerGenericUDTF("explode", GenericUDTFExplode.class); system.registerGenericUDTF("replicate_rows", GenericUDTFReplicateRows.class); system.registerGenericUDTF("inline", GenericUDTFInline.class); system.registerGenericUDTF("json_tuple", GenericUDTFJSONTuple.class); system.registerGenericUDTF("parse_url_tuple", GenericUDTFParseUrlTuple.class); system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class); system.registerGenericUDTF("stack", GenericUDTFStack.class); system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class);
这部分就是我们内置的udtf定义了,我们很常见的explode也在里面,我们进行一下explode源码阅读
initialize部分如下:
if (args.length != 1) { throw new UDFArgumentException("explode() takes only one argument"); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); switch (args[0].getCategory()) { case LIST: inputOI = args[0]; fieldNames.add("col"); fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector()); break; case MAP: inputOI = args[0]; fieldNames.add("key"); fieldNames.add("value"); fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector()); fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector()); break; default: throw new UDFArgumentException("explode() takes an array or a map as a parameter");
这里我们可以了解两个信息:
1、explode可以接收两种类型,一种其实是list,还一种是map,这里可以解答为什么我们使用explode的时候常常看到的是组合的使用,因为我们实际的数据类型比较少是直接定义成LIST或者Map类型的,例如和字符串的切割组合 hexplode(split(…))
2、传递LIST的时候我们的结果列名就是col,Map类型的则是key和value两个列
我们再看看process的逻辑,其实相对比较简单,因为输入的是LIST或者MAP,输出结果便是把结果打平输出
switch (inputOI.getCategory()) { case LIST: ListObjectInspector listOI = (ListObjectInspector)inputOI; List<?> list = listOI.getList(o[0]); if (list == null) { return; } for (Object r : list) { forwardListObj[0] = r; forward(forwardListObj); } break; case MAP: MapObjectInspector mapOI = (MapObjectInspector)inputOI; Map<?,?> map = mapOI.getMap(o[0]); if (map == null) { return; } for (Entry<?,?> r : map.entrySet()) { forwardMapObj[0] = r.getKey(); forwardMapObj[1] = r.getValue(); forward(forwardMapObj); } break; default: throw new TaskExecutionException("explode() can only operate on an array or a map"); }
到此为止,我们了解了explode工作的全部过程了。
进一步的思考
udtf本质上就是为我们每一行生成一个表,这个数据膨胀是完全是N*M的等级,在大量数据规模下运算,如果一行有10行输出的话,整个表的数据会变成10倍,另外如果一行中的LITS过长的话,势必是更加严重的情形和扩展。
怎么去规避呢?
其实了解底层原理就很好说了,减少数据规模,过滤无用的膨胀,LIST切割逻辑进行分段,办法总比困难多,只是底层跑的内容真正了解的情况下,办法也才是有效的 ~