浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)

简介: 浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)

前言

大家好,我是明哥。

HIVE 作为大数据生态的数仓解决方案,因为历史的原因在很多行业很多公司都有着广泛的应用。对于比较复杂的业务逻辑,HIVE SQL 往往比较难以表达,此时大家在开发中往往会辅以 HIVE UDF。所以充分理解和掌握 HIVE UDF正确的表写和使用方式,是大数据从业人员必不可少的一项技能。

对于 HIVE UDF 编写使用过程中常见的问题,明哥编写了一个系列 - “浅析 hive udf 的正确编写和使用方式 - 论姿势的重要性“,并陆续发布了三篇博文:

  • “如何在 hive udf 中访问配置数据-方案汇总与对比”
  • “浅析 hive udaf 的正确编写方式- 论姿势的重要性"
  • “浅析 hive udf 的正确编写和使用方式- 论姿势的重要性 - 系列三 - hdfs 相对路径与静态代码块引起的问题”

以下是该系列的第四篇博文:“浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service”

问题概述

在编写UDF时,有时我们需要直接访问 hive metastore service 以获取某些库表或分区的元数据信息,比如最近遇到某客户有个需求,需要直接访问 metastore service 以获取给定分区表的最大分区。该客户已经实现了基本的代码,且在没有开启 kerberos 认证的 hive 中也通过了测试,但在开启了 kerberos 认证的 hive 中执行该 UDF 时却会报错。 该有 BUG 的 UDF 示例代码如下:

image.png

该有 BUG 的 UDF 在开启了 kerberos 认证的 hive 中执行时报错如下:

image.png


问题原因分析

在本系列文章中,笔者不断提到,“Hive SQL 和 UDF 的解析编译和优化是在 hiveserver2 中进行的,解析编译和优化的结果一般是生成 mr/tez/spark 任务,这些 mr/tez/spark 任务是在向 yarn 申请获得的 container 容器对应的 jvm 中执行的;但并不是所有的 sql 和 udf 都会生成 mr/tez/spark 任务,此时其真正的执行就是直接在 hiveserver2 这个已经存在的 jvm 中执行的,该 hiveserver2 这个 jvm 的生命周期跟 udf 的执行无关,如果涉及到配置环境变量,系统参数,或加载类及执行静态代码块,要尤其小心“.

其实这里原来的代码有问题的原因,就跟上面不断强调的这句话有关: hive 的 udf 是在 hiveserver2 这个 Jvm 进程中编译执行的(有时会生成mr作业有时不会,在我们这个场景下不会生成 mr 作业),而 hiveserver2 这个 jvm 进程已经通过 kerberos 认证了,已经能够正常访问 hive metastore service 了,所以 udf 代码中不需要再次执行 UserGroupInformation.loginUserFromKeytab(principal,keytab)进行 kerberos 认证!事实上,由于再次尝试认证时涉及到配置环境变量和系统参数(如示例代码中 java.security.krb5.conf, HiveConf 等),稍有不慎就会污染已经启动的 hiveserver2 这个 jvm s 实例,而该实例是全局的给多个客户端使用的 hive 服务端,一旦被污染会影响其它客户端的执行,危害比较大,往往需要重启 hiveserver2 实例才能修复。

问题解决方案

知道了问题发生的根本原因,问题解决思路就有了:

  • 不要再次执行 kerberos 认证代码 UserGroupInformation.loginUserFromKeytab(principal,keytab);
  • 不要重新创建全新的 org.apache.hadoop.hive.conf.HiveConf 实例,而是复用已有的 HiveConf 实例;
  • 已有的 HiveConf 实例,可以通过 HiveConf hiveConf = SessionState.get().getConf() 获得;

最后还有必要指出,hive udf 和 udaf 有两种接口,一种是旧的 simple Udf/udaf,一种是新的GenericUDF/GenericUDAF:

  • org.apache.hadoop.hive.ql.exec.UDF/org.apache.hadoop.hive.ql.exec.UDAF
  • org.apache.hadoop.hive.ql.udf.generic.GenericUDF/org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

因为支持更多特性和执行时的性能问题,hive 社区推荐我们使用后者。

示例代码 (已经通过none/ldap/kerberos认证环境下的测试,可以直接使用)

simple Udf 示例代码:

package com.xxx;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.service.server.HiveServer2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
@Description(name = "max_pt", value = "_FUNC_(db,table) - return the max partition for a hive partitioned table")
//note that hive udf are executed in the same jvm as hiveserver2
public class UdfMaxPt extends UDF {
    // this is the same logger as the one created in org.apache.hive.service.server.HiveServer2
    private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
    public String evaluate(String db, String table) {
        try {
            // note that you can get the HiveConf from org.apache.hadoop.hive.ql.session.SessionState
            HiveConf hiveConf = SessionState.get().getConf();
            HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
            List<Partition> partitions = hiveMetaStoreClient.listPartitions(db, table, Short.MAX_VALUE);
            // there is no need to call UserGroupInformation.loginUserFromKeytab(principal,keytab) to log into kdc,
            // as the udf is executed in hiveserver2 and hiveserver2 has already logged into kdc;
            // this can be verified by check below outputs
//          String principal = "hive/_HOST@CDH.COM";
//          String keytab= "hive.keytab";
//          UserGroupInformation.loginUserFromKeytab(principal,keytab);
            LOG.info("UserGroupInformation.getCurrentUser(): " + UserGroupInformation.getCurrentUser().toString());
            LOG.info("UserGroupInformation.getLoginUser(): " + UserGroupInformation.getLoginUser().toString());
            LOG.info("principal:" + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL));
            LOG.info("keytab:" + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB));
            LOG.info("java.security.krb5.conf" + System.getProperty("java.security.krb5.conf"));
            LOG.info("java.security.krb5.realm" + System.getProperty("java.security.krb5.realm"));
            LOG.info("java.security.krb5.kdc" + System.getProperty("java.security.krb5.kdc"));
            LOG.info("HADOOP_USER_NAME" + System.getenv("HADOOP_USER_NAME"));
            LOG.info("HADOOP_OPTS" + System.getenv("HADOOP_OPTS"));
            // return the max partition value
            return String.valueOf(Collections.max(partitions).getValues().get(0));
        } catch (Exception e) {
            LOG.warn(e.getMessage());
            LOG.warn(e.toString());
            return e.getMessage();
        }
    }
}

GenericUDF 示例代码:

package com.xxxx;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hive.service.server.HiveServer2;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
@Description(name = "max_pt", value = "_FUNC_(db,table) - return the max partition for a hive partitioned table")
//note that hive udf are executed in the same jvm as hiveserver2
public class UdfMaxPtNew extends GenericUDF {
    // this is the same logger as the one created in org.apache.hive.service.server.HiveServer2
    private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
    PrimitiveObjectInspector inputDbOI;
    PrimitiveObjectInspector inputTableOI;
    PrimitiveObjectInspector outputOI;
    @Override
    public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        // This UDF accepts one argument
        assert (args.length == 2);
        // The first argument is a primitive type
        assert (args[0].getCategory() == ObjectInspector.Category.PRIMITIVE);
        assert (args[1].getCategory() == ObjectInspector.Category.PRIMITIVE);
        inputDbOI = (PrimitiveObjectInspector) args[0];
        inputTableOI = (PrimitiveObjectInspector) args[1];
        /* We only support String type */
        assert (inputDbOI.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING);
        assert (inputTableOI.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING);
        /* And we'll return a type string, so let's return the corresponding object inspector */
        outputOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
        return outputOI;
    }
    @Override
    public Object evaluate(DeferredObject[] args) throws HiveException {
        if (args.length != 2) {
            return "";
        }
// Access the deferred value. Hive passes the arguments as "deferred" objects
// to avoid some computations if we don't actually need some of the values
        Object oin1 = args[0].get();
        Object oin2 = args[1].get();
        if (oin1 == null || oin2 == null) {
            return "";
        }
        String inputDb = (String) inputDbOI.getPrimitiveJavaObject(oin1);
        String inputTable = (String) inputTableOI.getPrimitiveJavaObject(oin2);
        if (StringUtils.isEmpty(inputDb) || StringUtils.isEmpty(inputTable)) {
            return "";
        }
        HiveConf hiveConf = SessionState.get().getConf();
        HiveMetaStoreClient hiveMetaStoreClient = null;
        try {
            hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
            List<Partition> partitions = hiveMetaStoreClient.listPartitions(inputDb, inputTable, Short.MAX_VALUE);
            return String.valueOf(Collections.max(partitions).getValues().get(0));
        } catch (TException e) {
            LOG.warn(e.getMessage());
            LOG.warn(e.toString());
            return e.getMessage();
        }
    }
    @Override
    public String getDisplayString(String[] children) {
        return "return the max partition for a hive partitioned table";
    }
}

来自客户的认可才是最大的认可,以上示例代码交付给客户后,明哥收到了客户的赞赏,允许我小小的得瑟一下,哈哈。


相关文章
|
4月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
102 0
|
8天前
|
SQL Java 数据处理
【Hive】Hive的函数:UDF、UDAF、UDTF的区别?
【4月更文挑战第17天】【Hive】Hive的函数:UDF、UDAF、UDTF的区别?
|
6月前
|
SQL 存储 Java
浅析 hive udaf 的正确编写方式- 论姿势的重要性
浅析 hive udaf 的正确编写方式- 论姿势的重要性
|
6月前
|
SQL 运维 大数据
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
|
8月前
|
SQL 存储 大数据
关于数据仓库的Hive的Hive架构的MetaStore元数据服务
随着大数据技术的不断发展,数据仓库成为了企业中不可或缺的一部分。而Hive作为一种开源的数据仓库系统,因其易于使用和高效处理等特点,成为了许多企业的首选。然而,对于普通用户来说,直接使用Hive的命令行工具进行操作并不方便。因此,开发者社区中涌现出了大量的Hive GUI工具,其中最为流行的就是Web GUI工具。
209 2
|
SQL 存储 分布式计算
Hive 2.1.1 MetaException(在metastore中找不到消息:版本信息)
Hive 2.1.1 MetaException(在metastore中找不到消息:版本信息)
218 0
|
SQL HIVE
解决启动Hive报错Hive Schema version 2.3.0 does not match metastore‘s schema version 1.2.0 Metastore is not
解决启动Hive报错Hive Schema version 2.3.0 does not match metastore‘s schema version 1.2.0 Metastore is not
146 0
|
SQL Java 数据挖掘
【Hive】(十二)Hive自定义函数详解(UDF、UDAF、UDTF)
【Hive】(十二)Hive自定义函数详解(UDF、UDAF、UDTF)
602 0
|
SQL 分布式计算 Java
spark 对于hive metastore的兼容性随笔--通过classloader实现
spark 对于hive metastore的兼容性随笔--通过classloader实现
415 0
|
SQL 存储 分布式计算
Hive简介及源码编译
Hive是一个基于Hadoop的数据仓库,可以将结构化数据映射成一张表,并提供类SQL的功能,最初由Facebook提供,使用HQL作为查询接口、HDFS作为存储底层、MapReduce作为执行层,设计目的是让SQL技能良好,但Java技能较弱的分析师可以查询海量数据,2008年facebook把Hive项目贡献给Apache。Hive提供了比较完整的SQL功能(本质是将SQL转换为MapReduce),自身最大的缺点就是执行速度慢。Hive有自身的元数据结构描述,可以使用MySql\ProstgreSql\oracle 等关系型数据库来进行存储,但请注意Hive中的所有数据都存储在HDFS中
361 0
Hive简介及源码编译