前言
大家好,我是明哥。
HIVE 作为大数据生态的数仓解决方案,因为历史的原因在很多行业很多公司都有着广泛的应用。对于比较复杂的业务逻辑,HIVE SQL 往往比较难以表达,此时大家在开发中往往会辅以 HIVE UDF。所以充分理解和掌握 HIVE UDF正确的表写和使用方式,是大数据从业人员必不可少的一项技能。
对于 HIVE UDF 编写使用过程中常见的问题,明哥编写了一个系列 - “浅析 hive udf 的正确编写和使用方式 - 论姿势的重要性“,并陆续发布了三篇博文:
- “如何在 hive udf 中访问配置数据-方案汇总与对比”
- “浅析 hive udaf 的正确编写方式- 论姿势的重要性"
- “浅析 hive udf 的正确编写和使用方式- 论姿势的重要性 - 系列三 - hdfs 相对路径与静态代码块引起的问题”
以下是该系列的第四篇博文:“浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service”
问题概述
在编写UDF时,有时我们需要直接访问 hive metastore service 以获取某些库表或分区的元数据信息,比如最近遇到某客户有个需求,需要直接访问 metastore service 以获取给定分区表的最大分区。该客户已经实现了基本的代码,且在没有开启 kerberos 认证的 hive 中也通过了测试,但在开启了 kerberos 认证的 hive 中执行该 UDF 时却会报错。 该有 BUG 的 UDF 示例代码如下:
该有 BUG 的 UDF 在开启了 kerberos 认证的 hive 中执行时报错如下:
问题原因分析
在本系列文章中,笔者不断提到,“Hive SQL 和 UDF 的解析编译和优化是在 hiveserver2 中进行的,解析编译和优化的结果一般是生成 mr/tez/spark 任务,这些 mr/tez/spark 任务是在向 yarn 申请获得的 container 容器对应的 jvm 中执行的;但并不是所有的 sql 和 udf 都会生成 mr/tez/spark 任务,此时其真正的执行就是直接在 hiveserver2 这个已经存在的 jvm 中执行的,该 hiveserver2 这个 jvm 的生命周期跟 udf 的执行无关,如果涉及到配置环境变量,系统参数,或加载类及执行静态代码块,要尤其小心“.
其实这里原来的代码有问题的原因,就跟上面不断强调的这句话有关: hive 的 udf 是在 hiveserver2 这个 Jvm 进程中编译执行的(有时会生成mr作业有时不会,在我们这个场景下不会生成 mr 作业),而 hiveserver2 这个 jvm 进程已经通过 kerberos 认证了,已经能够正常访问 hive metastore service 了,所以 udf 代码中不需要再次执行 UserGroupInformation.loginUserFromKeytab(principal,keytab)进行 kerberos 认证!事实上,由于再次尝试认证时涉及到配置环境变量和系统参数(如示例代码中 java.security.krb5.conf, HiveConf 等),稍有不慎就会污染已经启动的 hiveserver2 这个 jvm s 实例,而该实例是全局的给多个客户端使用的 hive 服务端,一旦被污染会影响其它客户端的执行,危害比较大,往往需要重启 hiveserver2 实例才能修复。
问题解决方案
知道了问题发生的根本原因,问题解决思路就有了:
- 不要再次执行 kerberos 认证代码 UserGroupInformation.loginUserFromKeytab(principal,keytab);
- 不要重新创建全新的 org.apache.hadoop.hive.conf.HiveConf 实例,而是复用已有的 HiveConf 实例;
- 已有的 HiveConf 实例,可以通过 HiveConf hiveConf = SessionState.get().getConf() 获得;
最后还有必要指出,hive udf 和 udaf 有两种接口,一种是旧的 simple Udf/udaf,一种是新的GenericUDF/GenericUDAF:
- org.apache.hadoop.hive.ql.exec.UDF/org.apache.hadoop.hive.ql.exec.UDAF
- org.apache.hadoop.hive.ql.udf.generic.GenericUDF/org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
因为支持更多特性和执行时的性能问题,hive 社区推荐我们使用后者。
示例代码 (已经通过none/ldap/kerberos认证环境下的测试,可以直接使用)
simple Udf 示例代码:
package com.xxx; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.service.server.HiveServer2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; @Description(name = "max_pt", value = "_FUNC_(db,table) - return the max partition for a hive partitioned table") //note that hive udf are executed in the same jvm as hiveserver2 public class UdfMaxPt extends UDF { // this is the same logger as the one created in org.apache.hive.service.server.HiveServer2 private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class); public String evaluate(String db, String table) { try { // note that you can get the HiveConf from org.apache.hadoop.hive.ql.session.SessionState HiveConf hiveConf = SessionState.get().getConf(); HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); List<Partition> partitions = hiveMetaStoreClient.listPartitions(db, table, Short.MAX_VALUE); // there is no need to call UserGroupInformation.loginUserFromKeytab(principal,keytab) to log into kdc, // as the udf is executed in hiveserver2 and hiveserver2 has already logged into kdc; // this can be verified by check below outputs // String principal = "hive/_HOST@CDH.COM"; // String keytab= "hive.keytab"; // UserGroupInformation.loginUserFromKeytab(principal,keytab); LOG.info("UserGroupInformation.getCurrentUser(): " + UserGroupInformation.getCurrentUser().toString()); LOG.info("UserGroupInformation.getLoginUser(): " + UserGroupInformation.getLoginUser().toString()); LOG.info("principal:" + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL)); LOG.info("keytab:" + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB)); LOG.info("java.security.krb5.conf" + System.getProperty("java.security.krb5.conf")); LOG.info("java.security.krb5.realm" + System.getProperty("java.security.krb5.realm")); LOG.info("java.security.krb5.kdc" + System.getProperty("java.security.krb5.kdc")); LOG.info("HADOOP_USER_NAME" + System.getenv("HADOOP_USER_NAME")); LOG.info("HADOOP_OPTS" + System.getenv("HADOOP_OPTS")); // return the max partition value return String.valueOf(Collections.max(partitions).getValues().get(0)); } catch (Exception e) { LOG.warn(e.getMessage()); LOG.warn(e.toString()); return e.getMessage(); } } }
GenericUDF 示例代码:
package com.xxxx; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hive.service.server.HiveServer2; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; @Description(name = "max_pt", value = "_FUNC_(db,table) - return the max partition for a hive partitioned table") //note that hive udf are executed in the same jvm as hiveserver2 public class UdfMaxPtNew extends GenericUDF { // this is the same logger as the one created in org.apache.hive.service.server.HiveServer2 private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class); PrimitiveObjectInspector inputDbOI; PrimitiveObjectInspector inputTableOI; PrimitiveObjectInspector outputOI; @Override public ObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { // This UDF accepts one argument assert (args.length == 2); // The first argument is a primitive type assert (args[0].getCategory() == ObjectInspector.Category.PRIMITIVE); assert (args[1].getCategory() == ObjectInspector.Category.PRIMITIVE); inputDbOI = (PrimitiveObjectInspector) args[0]; inputTableOI = (PrimitiveObjectInspector) args[1]; /* We only support String type */ assert (inputDbOI.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING); assert (inputTableOI.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING); /* And we'll return a type string, so let's return the corresponding object inspector */ outputOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector; return outputOI; } @Override public Object evaluate(DeferredObject[] args) throws HiveException { if (args.length != 2) { return ""; } // Access the deferred value. Hive passes the arguments as "deferred" objects // to avoid some computations if we don't actually need some of the values Object oin1 = args[0].get(); Object oin2 = args[1].get(); if (oin1 == null || oin2 == null) { return ""; } String inputDb = (String) inputDbOI.getPrimitiveJavaObject(oin1); String inputTable = (String) inputTableOI.getPrimitiveJavaObject(oin2); if (StringUtils.isEmpty(inputDb) || StringUtils.isEmpty(inputTable)) { return ""; } HiveConf hiveConf = SessionState.get().getConf(); HiveMetaStoreClient hiveMetaStoreClient = null; try { hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); List<Partition> partitions = hiveMetaStoreClient.listPartitions(inputDb, inputTable, Short.MAX_VALUE); return String.valueOf(Collections.max(partitions).getValues().get(0)); } catch (TException e) { LOG.warn(e.getMessage()); LOG.warn(e.toString()); return e.getMessage(); } } @Override public String getDisplayString(String[] children) { return "return the max partition for a hive partitioned table"; } }
来自客户的认可才是最大的认可,以上示例代码交付给客户后,明哥收到了客户的赞赏,允许我小小的得瑟一下,哈哈。