数仓面试重灾区之-Generic User-defined Table Generating Function(UDTF)

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 数仓面试重灾区之-Generic User-defined Table Generating Function(UDTF)

前言

UDTF 这玩意对数仓同学来讲,熟悉又陌生,主要一方面是大量接触,另一方面是理解上有误导,还一个就是不是太明白里头到底咋回事。

场景切入

关于UDTF面试场景大概有以下的问题:

1、hive的udf你了解么,常用都有哪些类型

2、行专列操作了解么,里头是怎么实现的

3、比较直白的问法,udtf你了解么

4、关于hive的优化方式,udtf其实是考察之一

背后的原因:

1、首先实际线上滥用很多,数据膨胀、倾斜等,导数很严重的问题,实际点的例子,线上碰到因为udtf膨胀4-5个小时的运行时间,优化之后直接提到2分钟,是很吐槽的一点

2、期望对udf了解的全面性,因为实际考察的时候大家觉得知道了解的时候,还以为面试官为啥问那么简单的问题,还有点暗喜

3、相对来说是比较实用的etl中的操作

进场

UDTF其实在源码中就有直接的描述:

org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

/**
 * A Generic User-defined Table Generating Function (UDTF)
 * Generates a variable number of output rows for a single input row. Useful for
 * explode(array)...
 */

其实字面上的含义就是用户定义表的一个函数,这个函数实现的是单行输入的情况下输出多行的结果,udtf中有两个最基础的接口定义,那就是定义输出表中的的字段,输出的数据内容

public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException
    public void process(Object[] args) throws HiveException

我们期望udf直接帮我们产生这么一个表格

为了封装表格的列信息,我们进行封装

public Schema(String column, JavaStringObjectInspector type) {
        this.column = column;
        this.type = type;
    }

这样一来,我们就可以对输出列进行定义了

public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        Schema[] schema={
                new Schema("student_id",PrimitiveObjectInspectorFactory.javaStringObjectInspector),
                new Schema("student_name",PrimitiveObjectInspectorFactory.javaStringObjectInspector),
                new Schema("age",PrimitiveObjectInspectorFactory.javaStringObjectInspector),
                new Schema("gender",PrimitiveObjectInspectorFactory.javaStringObjectInspector)
        };
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        for(int i = 0 ; i < schema.length; i++){
            fieldNames.add(schema[i].column);
            fieldOIs.add(schema[i].type);
        }
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

其实就是定义出结构的对象。下一步,我们进行数据输出

public void process(Object[] args) throws HiveException {
         String[][] results={
                {"S001","李强","12","男"},
                {"S002","李强","15","男"},
                {"S003","李军","13","男"},
                {"S004","王倩雪","12","女"},
                {"S005","汪玉珍","12","女"},
        };
        for(int i = 0; i < results.length; i ++){
            forward(results[i]);
        }
    }

来点简单又不失风度的小代码测试一下

create temporary function  show as 'org.apache.spark.udf.UDTF01';
select show();

结果如下:

+----------+------------+---+------+
|student_id|student_name|age|gender|
+----------+------------+---+------+
|      S001|        李强| 12|    男|
|      S002|        李强| 15|    男|
|      S003|        李军| 13|    男|
|      S004|      王倩雪| 12|    女|
|      S005|      汪玉珍| 12|    女|
+----------+------------+---+------+

可不是咋滴,这玩意不就是一个表么,这样也可以表达出这类函数本来的功能了。

进击

当然,前面为了表达式udtf的本来作用,直接数据上定死了,这样的做法就是永远都是输出5行一样的数据,这是表示一行输入之后的结果,当我们有多行查询的时候,结果就不断重复了。

我们再来点小测试:

drop table if  exists t;
create table if not exists t  (id int);
insert into t select 1;
insert into t select 2;
select id,show() from t;

结果如下:

+---+----------+------------+---+------+
| id|student_id|student_name|age|gender|
+---+----------+------------+---+------+
|  2|      S001|        李强| 12|    男|
|  2|      S002|        李强| 15|    男|
|  2|      S003|        李军| 13|    男|
|  2|      S004|      王倩雪| 12|    女|
|  2|      S005|      汪玉珍| 12|    女|
|  1|      S001|        李强| 12|    男|
|  1|      S002|        李强| 15|    男|
|  1|      S003|        李军| 13|    男|
|  1|      S004|      王倩雪| 12|    女|
|  1|      S005|      汪玉珍| 12|    女|
+---+----------+------------+---+------+

这个结果就是为每一行输出了udtf中的数据。

实际的结果我们是基于原有的数据进行加工,再输出,由于加工的结果是一行输出多行的,所以这类的需求底层就得udtf的实现。

我们看一下hive源码中为我们提供了哪些udtf函数,位于:org.apache.hadoop.hive.ql.exec.FunctionRegistry 中,我们定位到这么几行 Generic UDTF’s:

// Generic UDTF's
    system.registerGenericUDTF("explode", GenericUDTFExplode.class);
    system.registerGenericUDTF("replicate_rows", GenericUDTFReplicateRows.class);
    system.registerGenericUDTF("inline", GenericUDTFInline.class);
    system.registerGenericUDTF("json_tuple", GenericUDTFJSONTuple.class);
    system.registerGenericUDTF("parse_url_tuple", GenericUDTFParseUrlTuple.class);
    system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class);
    system.registerGenericUDTF("stack", GenericUDTFStack.class);
    system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class);

这部分就是我们内置的udtf定义了,我们很常见的explode也在里面,我们进行一下explode源码阅读

initialize部分如下:

if (args.length != 1) {
      throw new UDFArgumentException("explode() takes only one argument");
    }
    ArrayList<String> fieldNames = new ArrayList<String>();
    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    switch (args[0].getCategory()) {
    case LIST:
      inputOI = args[0];
      fieldNames.add("col");
      fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
      break;
    case MAP:
      inputOI = args[0];
      fieldNames.add("key");
      fieldNames.add("value");
      fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
      fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
      break;
    default:
      throw new UDFArgumentException("explode() takes an array or a map as a parameter");

这里我们可以了解两个信息:

1、explode可以接收两种类型,一种其实是list,还一种是map,这里可以解答为什么我们使用explode的时候常常看到的是组合的使用,因为我们实际的数据类型比较少是直接定义成LIST或者Map类型的,例如和字符串的切割组合 hexplode(split(…))

2、传递LIST的时候我们的结果列名就是col,Map类型的则是key和value两个列

我们再看看process的逻辑,其实相对比较简单,因为输入的是LIST或者MAP,输出结果便是把结果打平输出

switch (inputOI.getCategory()) {
    case LIST:
      ListObjectInspector listOI = (ListObjectInspector)inputOI;
      List<?> list = listOI.getList(o[0]);
      if (list == null) {
        return;
      }
      for (Object r : list) {
        forwardListObj[0] = r;
        forward(forwardListObj);
      }
      break;
    case MAP:
      MapObjectInspector mapOI = (MapObjectInspector)inputOI;
      Map<?,?> map = mapOI.getMap(o[0]);
      if (map == null) {
        return;
      }
      for (Entry<?,?> r : map.entrySet()) {
        forwardMapObj[0] = r.getKey();
        forwardMapObj[1] = r.getValue();
        forward(forwardMapObj);
      }
      break;
    default:
      throw new TaskExecutionException("explode() can only operate on an array or a map");
    }

到此为止,我们了解了explode工作的全部过程了。

进一步的思考

udtf本质上就是为我们每一行生成一个表,这个数据膨胀是完全是N*M的等级,在大量数据规模下运算,如果一行有10行输出的话,整个表的数据会变成10倍,另外如果一行中的LITS过长的话,势必是更加严重的情形和扩展。

怎么去规避呢?

其实了解底层原理就很好说了,减少数据规模,过滤无用的膨胀,LIST切割逻辑进行分段,办法总比困难多,只是底层跑的内容真正了解的情况下,办法也才是有效的 ~

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
7月前
|
存储 关系型数据库 MySQL
在阿里云的AnalyticDB MySQL版中使用CREATE TABLE语句来创建内表
在阿里云的AnalyticDB MySQL版中使用CREATE TABLE语句来创建内表【1月更文挑战第16天】【1月更文挑战第78篇】
370 3
|
7月前
|
SQL 分布式计算 Java
实时数仓 Hologres产品使用合集之ologres holostudio为什么不支持max_pt('table')取最大分区这个方法
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
103 4
|
7月前
|
SQL Java HIVE
Hive高频面试题之UDTF实现多行输出
Hive高频面试题之UDTF实现多行输出
64 0
|
7月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之在 DataWorks 中的 ODPS UDF(User-Defined Function,用户自定义函数)中,支持不定长参数如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
91 0
|
关系型数据库 MySQL 数据库
View ‘information_schema.SCHEMATA‘ references invalid table(s) or column(s) or function(s) or define
View ‘information_schema.SCHEMATA‘ references invalid table(s) or column(s) or function(s) or define
250 0
|
SQL 存储 分布式计算
数仓面试高频考点--解决hive小文件过多问题
小文件产生原因、小文件过多产生的影响以及怎么解决小文件过多问题
1068 0
|
SQL 关系型数据库 OLAP
AnalyticDB for PostgreSQL 6.0 新特性解析:Recursive CTE (Common Table Expressions)
Recursive CTE (Common Table Expressions) 能够实现SQL的递归查询功能,一般用于处理逻辑上为层次化或树状结构的数据(如查询组织结构、物料清单等),方便对该类数据进行多级递归查询。

热门文章

最新文章