Hive metastore整体代码分析及详解

简介:   从上一篇对Hive metastore表结构的简要分析中,我再根据数据设计的实体对象,再进行整个代码结构的总结。那么我们先打开metadata的目录,其目录结构:  可以看到,整个hivemeta的目录包含metastore(客户端与服务端调用逻辑)、events(事件目录包含table生命...

  从上一篇对Hive metastore表结构的简要分析中,我再根据数据设计的实体对象,再进行整个代码结构的总结。那么我们先打开metadata的目录,其目录结构:

  可以看到,整个hivemeta的目录包含metastore(客户端与服务端调用逻辑)、events(事件目录包含table生命周期中的检查、权限认证等listener实现)、hooks(这里的hooks仅包含了jdo connection的相关接口)、parser(对于表达树的解析)、spec(partition的相关代理类)、tools(jdo execute相关方法)及txn及model,下来我们从整个metadata分逐一进行代码分析及注释:

  没有把包打开,很多类?是不是感觉害怕很想死?我也想死,咱们继续。。一开始,我们可能觉得一团乱麻烦躁,这是啥玩意儿啊这。。冷静下来,我们从Hive这个大类开始看,因为它是metastore元数据调用的入口。整个生命周期分析流程为: HiveMetaStoreClient客户端的创建及加载、HiveMetaStore服务端的创建及加载、createTable、dropTable、AlterTable、createPartition、dropPartition、alterPartition。当然,这只是完整metadata的一小部分。



 1   private HiveConf conf = null;
 2   private IMetaStoreClient metaStoreClient;
 3   private UserGroupInformation owner;
 5   // metastore calls timing information
 6   private final Map<String, Long> metaCallTimeMap = new HashMap<String, Long>();
 8   private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
 9     @Override
10     protected synchronized Hive initialValue() {
11       return null;
12     }
14     @Override
15     public synchronized void remove() {
16       if (this.get() != null) {
17         this.get().close();
18       }
19       super.remove();
20     }
21   };

  这里声明的有hiveConf对象、metaStoreClient 、操作用户组userGroupInfomation以及调用时间Map,这里存成一个map,用来记录每一个动作的运行时长。同时维护了一个本地线程hiveDB,如果db为空的情况下,会重新创建一个Hive对象,代码如下:

 1   public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
 2     Hive db = hiveDB.get();
 3     if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
 4       if (db != null) {
 5         LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
 6           ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
 7       }
 8       closeCurrent();
 9       c.set("fs.scheme.class", "dfs");
10       Hive newdb = new Hive(c);
11       hiveDB.set(newdb);
12       return newdb;
13     }
14     db.conf = c;
15     return db;
16   }


 1   // register all permanent functions. need improvement
 2   static {
 3     try {
 4       reloadFunctions();
 5     } catch (Exception e) {
 6       LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e);
 7     }
 8   }
10   public static void reloadFunctions() throws HiveException {
    //获取 Hive对象,用于后续方法的调用
11 Hive db = Hive.get();
12 for (String dbName : db.getAllDatabases()) {
13 for (String functionName : db.getFunctions(dbName, "*")) { 14 Function function = db.getFunction(dbName, functionName); 15 try {
16 FunctionRegistry.registerPermanentFunction( 17 FunctionUtils.qualifyFunctionName(functionName, dbName), function.getClassName(), 18 false, FunctionTask.toFunctionResource(function.getResourceUris())); 19 } catch (Exception e) { 20 LOG.warn("Failed to register persistent function " + 21 functionName + ":" + function.getClassName() + ". Ignore and continue."); 22 } 23 } 24 } 25 }


 1  1   private IMetaStoreClient createMetaStoreClient() throws MetaException {
 2  2   
 3     //这里实现接口HiveMetaHookLoader
 4  3     HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
 5  4         @Override
 6  5         public HiveMetaHook getHook(
 7  6           org.apache.hadoop.hive.metastore.api.Table tbl)
 8  7           throws MetaException {
 9  8 
10  9           try {
11 10             if (tbl == null) {
12 11               return null;
13 12             }
14          //根据tble的kv属性加载不同storage的实例,比如hbase、redis等等拓展存储,作为外部表进行存储
15 13             HiveStorageHandler storageHandler =
16 14               HiveUtils.getStorageHandler(conf,
17 15                 tbl.getParameters().get(META_TABLE_STORAGE));
18 16             if (storageHandler == null) {
19 17               return null;
20 18             }
21 19             return storageHandler.getMetaHook();
22 20           } catch (HiveException ex) {
23 21             LOG.error(StringUtils.stringifyException(ex));
24 22             throw new MetaException(
25 23               "Failed to load storage handler:  " + ex.getMessage());
26 24           }
27 25         }
28 26       };
29 27     return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
30 28         SessionHiveMetaStoreClient.class.getName());
31 29   }



 1   public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
 2     throws MetaException {
 4     this.hookLoader = hookLoader;
 5     if (conf == null) {
 6       conf = new HiveConf(HiveMetaStoreClient.class);
 7     }
 8     this.conf = conf;
 9     filterHook = loadFilterHooks();
10    //根据hive-site.xml中的hive.metastore.uris配置,如果配置该参数,则认为是远程连接,否则为本地连接
11     String msUri = conf.getVar(HiveConf.ConfVars.METASTOREURIS);
12     localMetaStore = HiveConfUtil.isEmbeddedMetaStore(msUri);
13     if (localMetaStore) {
16       client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true);
17       isConnected = true;
18       snapshotActiveConf();
19       return;
20     }
22     //获取配置中的重试次数及timeout时间
23     retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
24     retryDelaySeconds = conf.getTimeVar(
27     //拼接metastore uri
28     if (conf.getVar(HiveConf.ConfVars.METASTOREURIS) != null) {
29       String metastoreUrisString[] = conf.getVar(
30           HiveConf.ConfVars.METASTOREURIS).split(",");
31       metastoreUris = new URI[metastoreUrisString.length];
32       try {
33         int i = 0;
34         for (String s : metastoreUrisString) {
35           URI tmpUri = new URI(s);
36           if (tmpUri.getScheme() == null) {
37             throw new IllegalArgumentException("URI: " + s
38                 + " does not have a scheme");
39           }
40           metastoreUris[i++] = tmpUri;
42         }
43       } catch (IllegalArgumentException e) {
44         throw (e);
45       } catch (Exception e) {
46         MetaStoreUtils.logAndThrowMetaException(e);
47       }
48     } else {
49       LOG.error("NOT getting uris from conf");
50       throw new MetaException("MetaStoreURIs not found in conf file");
51     }
52     调用open方法创建连接
53     open();
54   }


  1   private void open() throws MetaException {
  2     isConnected = false;
  3     TTransportException tte = null;
4 boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
     //If true, the metastore Thrift interface will use TFramedTransport. When false (default) a standard TTransport is used.
5 boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
     //If true, the metastore Thrift interface will use TCompactProtocol. When false (default) TBinaryProtocol will be used 具体他们之间的区别我们后续再讨论
6 boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
     //获取socket timeout时间
7 int clientSocketTimeout = (int) conf.getTimeVar( 8 ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); 9 10 for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { 11 for (URI store : metastoreUris) { 12"Trying to connect to metastore with URI " + store); 13 try { 14 transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); 15 if (useSasl) { 16 // Wrap thrift connection with SASL for secure connection. 17 try {
           //创建HadoopThriftAuthBridge client
18 HadoopThriftAuthBridge.Client authBridge = 19 ShimLoader.getHadoopThriftAuthBridge().createClient(); 20          //权限认证相关 21 // check if we should use delegation tokens to authenticate 22 // the call below gets hold of the tokens if they are set up by hadoop 23 // this should happen on the map/reduce tasks if the client added the 24 // tokens into hadoop's credential store in the front end during job 25 // submission. 26 String tokenSig = conf.get("hive.metastore.token.signature"); 27 // tokenSig could be null 28 tokenStrForm = Utils.getTokenStrForm(tokenSig); 29 if(tokenStrForm != null) { 30 // authenticate using delegation tokens via the "DIGEST" mechanism 31 transport = authBridge.createClientTransport(null, store.getHost(), 32 "DIGEST", tokenStrForm, transport, 33 MetaStoreUtils.getMetaStoreSaslProperties(conf)); 34 } else { 35 String principalConfig = 36 conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); 37 transport = authBridge.createClientTransport( 38 principalConfig, store.getHost(), "KERBEROS", null, 39 transport, MetaStoreUtils.getMetaStoreSaslProperties(conf)); 40 } 41 } catch (IOException ioe) { 42 LOG.error("Couldn't create client transport", ioe); 43 throw new MetaException(ioe.toString()); 44 } 45 } else if (useFramedTransport) { 46 transport = new TFramedTransport(transport); 47 } 48 final TProtocol protocol;
49 if (useCompactProtocol) { 50 protocol = new TCompactProtocol(transport); 51 } else { 52 protocol = new TBinaryProtocol(transport); 53 }
         //创建ThriftHiveMetastore client
54 client = new ThriftHiveMetastore.Client(protocol); 55 try { 56; 57 isConnected = true; 58 } catch (TTransportException e) { 59 tte = e; 60 if (LOG.isDebugEnabled()) { 61 LOG.warn("Failed to connect to the MetaStore Server...", e); 62 } else { 63 // Don't print full exception trace if DEBUG is not on. 64 LOG.warn("Failed to connect to the MetaStore Server..."); 65 } 66 } 67       //用户组及用户的加载 68 if (isConnected && !useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){ 69 // Call set_ugi, only in unsecure mode. 70 try { 71 UserGroupInformation ugi = Utils.getUGI(); 72 client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames())); 73 } catch (LoginException e) { 74 LOG.warn("Failed to do login. set_ugi() is not successful, " + 75 "Continuing without it.", e); 76 } catch (IOException e) { 77 LOG.warn("Failed to find ugi of client set_ugi() is not successful, " + 78 "Continuing without it.", e); 79 } catch (TException e) { 80 LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. " 81 + "Continuing without it.", e); 82 } 83 } 84 } catch (MetaException e) { 85 LOG.error("Unable to connect to metastore with URI " + store 86 + " in attempt " + attempt, e); 87 } 88 if (isConnected) { 89 break; 90 } 91 } 92 // Wait before launching the next round of connection retries. 93 if (!isConnected && retryDelaySeconds > 0) { 94 try { 95"Waiting " + retryDelaySeconds + " seconds before next connection attempt."); 96 Thread.sleep(retryDelaySeconds * 1000); 97 } catch (InterruptedException ignore) {} 98 } 99 } 100 101 if (!isConnected) { 102 throw new MetaException("Could not connect to meta store using any of the URIs provided." + 103 " Most recent failure: " + StringUtils.stringifyException(tte)); 104 } 105 106 snapshotActiveConf(); 107 108"Connected to metastore."); 109 }


1     public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException {
2       super(name);
3       hiveConf = conf;
4       if (init) {
5         init();
6       }
7     }


 1     public void init() throws MetaException {
2 rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
3 initListeners = MetaStoreUtils.getMetaStoreListeners( 4 MetaStoreInitListener.class, hiveConf, 5 hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); 6 for (MetaStoreInitListener singleInitListener: initListeners) { 7 MetaStoreInitContext context = new MetaStoreInitContext(); 8 singleInitListener.onInit(context); 9 } 10     //初始化alter的实现类 11 String alterHandlerName = hiveConf.get("hive.metastore.alter.impl", 12 HiveAlterHandler.class.getName()); 13 alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass( 14 alterHandlerName), hiveConf);
15 wh = new Warehouse(hiveConf); 16     //创建默认db以及用户,同时加载currentUrl 17 synchronized (HMSHandler.class) { 18 if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(hiveConf))) { 19 createDefaultDB(); 20 createDefaultRoles(); 21 addAdminUsers(); 22 currentUrl = MetaStoreInit.getConnectionURL(hiveConf); 23 } 24 } 25     //计数信息的初始化 26 if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) { 27 try { 28 Metrics.init(); 29 } catch (Exception e) { 30 // log exception, but ignore inability to start 31 LOG.error("error in Metrics init: " + e.getClass().getName() + " " 32 + e.getMessage(), e); 33 } 34 } 35     //Listener的PreListener的初始化 36 preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class, 37 hiveConf, 38 hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS)); 39 listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf, 40 hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS)); 41 listeners.add(new SessionPropertiesListener(hiveConf)); 42 endFunctionListeners = MetaStoreUtils.getMetaStoreListeners( 43 MetaStoreEndFunctionListener.class, hiveConf, 44 hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS)); 45     //针对partitionName的正则校验,可自行设置,根据进行设置 46 String partitionValidationRegex = 47 hiveConf.getVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN); 48 if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { 49 partitionValidationPattern = Pattern.compile(partitionValidationRegex); 50 } else { 51 partitionValidationPattern = null; 52 } 53 54 long cleanFreq = hiveConf.getTimeVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ, TimeUnit.MILLISECONDS); 55 if (cleanFreq > 0) { 56 // In default config, there is no timer. 57 Timer cleaner = new Timer("Metastore Events Cleaner Thread", true); 58 cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq); 59 } 60 }




 1   public void createTable(String tableName, List<String> columns, List<String> partCols,
 2                           Class<? extends InputFormat> fileInputFormat,
 3                           Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols,
 4                           Map<String, String> parameters) throws HiveException {
 5     if (columns == null) {
 6       throw new HiveException("columns not specified for table " + tableName);
 7     }
 9     Table tbl = newTable(tableName);
    //SD表属性,设置该表的input及output class名,在计算引擎计算时,拉取相应的ClassName 通过反射进行input及output类的加载
10 tbl.setInputFormatClass(fileInputFormat.getName()); 11 tbl.setOutputFormatClass(fileOutputFormat.getName()); 12   
    //封装FileSchema对象,该为每个column的名称及字段类型,并加入到sd对象的的column属性中 13 for (String col : columns) { 14 FieldSchema field = new FieldSchema(col, STRING_TYPE_NAME, "default"); 15 tbl.getCols().add(field); 16 } 17
    //如果在创建表时,设置了分区信息,比如dt字段为该分区。则进行分区信息的记录,最终写入Partition表中 18 if (partCols != null) { 19 for (String partCol : partCols) { 20 FieldSchema part = new FieldSchema(); 21 part.setName(partCol); 22 part.setType(STRING_TYPE_NAME); // default partition key 23 tbl.getPartCols().add(part); 24 } 25 }
26 tbl.setSerializationLib(LazySimpleSerDe.class.getName());
27 tbl.setNumBuckets(bucketCount); 28 tbl.setBucketCols(bucketCols);
29 if (parameters != null) { 30 tbl.setParamters(parameters); 31 } 32 createTable(tbl); 33 }

  从代码中可以看到,Hive 构造了一个Table的对象,该对象可以当做是一个model,包含了几乎所有以Tbls表为主表的所有以table_id为的外键表属性(具体可参考hive metastore表结构),封装完毕后在进行createTable的调用,接下来的调用如下:

  public void createTable(Table tbl, boolean ifNotExists) throws HiveException {
    try {
if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) { tbl.setDbName(SessionState.get().getCurrentDatabase()); }
if (tbl.getCols().size() == 0 || tbl.getSd().getColsSize() == 0) { tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(), tbl.getDeserializer())); }
    //该方法对table属性中的input、output以及column属性的校验 tbl.checkValidity();
if (tbl.getParameters() != null) { tbl.getParameters().remove(hive_metastoreConstants.DDL_TIME); } org.apache.hadoop.hive.metastore.api.Table tTbl = tbl.getTTable();
    //这里开始进行权限认证,牵扯到的便是我们再hive中配置的、、配置参数,来自于hive自己封装的 用户、角色、组的概念。
PrincipalPrivilegeSet principalPrivs
= new PrincipalPrivilegeSet(); SessionState ss = SessionState.get(); if (ss != null) { CreateTableAutomaticGrant grants = ss.getCreateTableGrants(); if (grants != null) { principalPrivs.setUserPrivileges(grants.getUserGrants()); principalPrivs.setGroupPrivileges(grants.getGroupGrants()); principalPrivs.setRolePrivileges(grants.getRoleGrants()); tTbl.setPrivileges(principalPrivs); } }
   //通过客户端链接服务端进行table的创建 getMSC().createTable(tTbl); }
catch (AlreadyExistsException e) { if (!ifNotExists) { throw new HiveException(e); } } catch (Exception e) { throw new HiveException(e); } }


 1   public void createTable(Table tbl, EnvironmentContext envContext) throws AlreadyExistsException,
 2       InvalidObjectException, MetaException, NoSuchObjectException, TException {
3 HiveMetaHook hook = getHook(tbl); 4 if (hook != null) { 5 hook.preCreateTable(tbl); 6 } 7 boolean success = false; 8 try {      //随即调用HiveMetaStore进行服务端与数据库的创建交互 10 create_table_with_environment_context(tbl, envContext); 11 if (hook != null) { 12 hook.commitCreateTable(tbl); 13 } 14 success = true; 15 } finally {
16 if (!success && (hook != null)) { 17 hook.rollbackCreateTable(tbl); 18 } 19 } 20 }


 1 public interface HiveMetaHook {
 2   /**
 3    * Called before a new table definition is added to the metastore
 4    * during CREATE TABLE.
 5    *
 6    * @param table new table definition
 7    */
 8   public void preCreateTable(Table table)
 9     throws MetaException;
11   /** 
12    * Called after failure adding a new table definition to the metastore
13    * during CREATE TABLE.
14    *
15    * @param table new table definition
16    */
17   public void rollbackCreateTable(Table table)
18     throws MetaException;
35   public void preDropTale(Table table)
36     throws MetaException;


  1     private void create_table_core(final RawStore ms, final Table tbl,
final EnvironmentContext envContext) 3 throws AlreadyExistsException, MetaException, 4 InvalidObjectException, NoSuchObjectException { 5     //名称正则校验,校验是否含有非法字符 6 if (!MetaStoreUtils.validateName(tbl.getTableName())) { 7 throw new InvalidObjectException(tbl.getTableName() 8 + " is not a valid object name"); 9 }
      //改端代码属于校验代码,对于column的名称及column type类型j及partitionKey的名称校验
10 String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols()); 11 if (validate != null) { 12 throw new InvalidObjectException("Invalid column " + validate); 13 }
14 if (tbl.getPartitionKeys() != null) { 15 validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys()); 16 if (validate != null) { 17 throw new InvalidObjectException("Invalid partition column " + validate); 18 } 19 } 20 SkewedInfo skew = tbl.getSd().getSkewedInfo(); 21 if (skew != null) { 22 validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames()); 23 if (validate != null) { 24 throw new InvalidObjectException("Invalid skew column " + validate); 25 } 26 validate = MetaStoreUtils.validateSkewedColNamesSubsetCol( 27 skew.getSkewedColNames(), tbl.getSd().getCols()); 28 if (validate != null) { 29 throw new InvalidObjectException("Invalid skew column " + validate); 30 } 31 } 32 33 Path tblPath = null; 34 boolean success = false, madeDir = false; 35 try {
36 firePreEvent(new PreCreateTableEvent(tbl, this)); 37
       //打开事务 38 ms.openTransaction(); 39
       //如果db不存在的情况下,则抛异常 40 Database db = ms.getDatabase(tbl.getDbName()); 41 if (db == null) { 42 throw new NoSuchObjectException("The database " + tbl.getDbName() + " does not exist"); 43 } 44      45  // 校验该db下,table是否存在 46 if (is_table_exists(ms, tbl.getDbName(), tbl.getTableName())) { 47 throw new AlreadyExistsException("Table " + tbl.getTableName() 48 + " already exists"); 49 } 50      // 如果该表不为视图表,则组装完整的tbleParth ->
 51         if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
 52           if (tbl.getSd().getLocation() == null
 53               || tbl.getSd().getLocation().isEmpty()) {
 54             tblPath = wh.getTablePath(
 55                 ms.getDatabase(tbl.getDbName()), tbl.getTableName());
 56           } else {
57 if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { 58 LOG.warn("Location: " + tbl.getSd().getLocation() 59 + " specified for non-external table:" + tbl.getTableName()); 60 } 61 tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation())); 62 }
        //将拼接完的tblPath set到sd的location中
63 tbl.getSd().setLocation(tblPath.toString()); 64 } 65      //创建table的路径 66 if (tblPath != null) { 67 if (!wh.isDir(tblPath)) { 68 if (!wh.mkdirs(tblPath, true)) { 69 throw new MetaException(tblPath 70 + " is not a directory or unable to create one"); 71 } 72 madeDir = true; 73 } 74 }
       // hive.stats.autogather 配置判断
75 if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) && 76 !MetaStoreUtils.isView(tbl)) { 77 if (tbl.getPartitionKeysSize() == 0) { // Unpartitioned table 78 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir); 79 } else { // Partitioned table with no partitions. 80 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, true); 81 } 82 } 83 84 // set create time 85 long time = System.currentTimeMillis() / 1000; 86 tbl.setCreateTime((int) time); 87 if (tbl.getParameters() == null || 88 tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) { 89 tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); 90 }
91 ms.createTable(tbl); 92 success = ms.commitTransaction(); 93 94 } finally { 95 if (!success) { 96 ms.rollbackTransaction();
97 if (madeDir) { 98 wh.deleteDir(tblPath, true); 99 } 100 }
       //进行create完成时的listener类发送 比如 noftify通知
101 for (MetaStoreEventListener listener : listeners) { 102 CreateTableEvent createTableEvent = 103 new CreateTableEvent(tbl, success, this); 104 createTableEvent.setEnvironmentContext(envContext); 105 listener.onCreateTable(createTableEvent); 106 } 107 } 108 }

  这里的listener后续会详细说明,那么我们继续垂直往下看,这里的 ms.createTable方法。ms便是RawStore接口对象,这个接口对象包含了所有生命周期的统一方法调用,部分代码如下:

 1   public abstract Database getDatabase(String name)
 2       throws NoSuchObjectException;
 4   public abstract boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException;
 6   public abstract boolean alterDatabase(String dbname, Database db) throws NoSuchObjectException, MetaException;
 8   public abstract List<String> getDatabases(String pattern) throws MetaException;
10   public abstract List<String> getAllDatabases() throws MetaException;
12   public abstract boolean createType(Type type);
14   public abstract Type getType(String typeName);
16   public abstract boolean dropType(String typeName);
18   public abstract void createTable(Table tbl) throws InvalidObjectException,
19       MetaException;
21   public abstract boolean dropTable(String dbName, String tableName)
22       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException;
24   public abstract Table getTable(String dbName, String tableName)
25       throws MetaException;
26   ..................

  那么下来我们来看一下具体怎么实现的,首先hive metastore会通过调用getMS()方法,获取本地线程中的RawStore的实现,代码如下:

 1     public RawStore getMS() throws MetaException {
2 RawStore ms = threadLocalMS.get();
3 if (ms == null) { 4 ms = newRawStore(); 5 ms.verifySchema(); 6 threadLocalMS.set(ms); 7 ms = threadLocalMS.get(); 8 } 9 return ms; 10 }


 1   public static RawStore getProxy(HiveConf hiveConf, Configuration conf, String rawStoreClassName,
 2       int id) throws MetaException {
 3   //通过反射,创建baseClass,随后再进行该实现对象的创建
 4     Class<? extends RawStore> baseClass = (Class<? extends RawStore>) MetaStoreUtils.getClass(
 5         rawStoreClassName);
 7     RawStoreProxy handler = new RawStoreProxy(hiveConf, conf, baseClass, id);
 9     // Look for interfaces on both the class and all base classes.
10     return (RawStore) Proxy.newProxyInstance(RawStoreProxy.class.getClassLoader(),
11         getAllInterfaces(baseClass), handler);
12   }



 1   @Override
 2   public void createTable(Table tbl) throws InvalidObjectException, MetaException {
 3     boolean commited = false;
 4     try {
5 openTransaction();
      //这里再次进行db 、table的校验,代码不再贴出来,具体为什么又要做一次校验,还需要深入思考
6 MTable mtbl = convertToMTable(tbl);
     这里的pm为ObjectStore创建时,init的JDO PersistenceManage对象。这里便是提交Table对象的地方,具体可研究下JDO module对象与数据库的交互
7 pm.makePersistent(mtbl);
8 PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges(); 9 List<Object> toPersistPrivObjs = new ArrayList<Object>(); 10 if (principalPrivs != null) { 11 int now = (int)(System.currentTimeMillis()/1000); 12 13 Map<String, List<PrivilegeGrantInfo>> userPrivs = principalPrivs.getUserPrivileges(); 14 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, PrincipalType.USER); 15 16 Map<String, List<PrivilegeGrantInfo>> groupPrivs = principalPrivs.getGroupPrivileges(); 17 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP); 18 19 Map<String, List<PrivilegeGrantInfo>> rolePrivs = principalPrivs.getRolePrivileges(); 20 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE); 21 } 22 pm.makePersistentAll(toPersistPrivObjs); 23 commited = commitTransaction(); 24 } finally {
25 if (!commited) { 26 rollbackTransaction(); 27 } 28 } 29 }



1   public void dropTable(String tableName, boolean ifPurge) throws HiveException {
    //这里Hive 将dbName与TableName合并成一个数组
2 String[] names = Utilities.getDbTableName(tableName); 3 dropTable(names[0], names[1], true, true, ifPurge); 4 }

  为什么要进行这样的处理呢,其实是因为 drop table的时候 我们的sql语句会是drop table dbName.tableName 或者是drop table tableName,这里进行tableName和DbName的组装,如果为drop table tableName,则获取当前session中的dbName,代码如下:

 1   public static String[] getDbTableName(String dbtable) throws SemanticException {
2 return getDbTableName(SessionState.get().getCurrentDatabase(), dbtable); 3 } 4 5 public static String[] getDbTableName(String defaultDb, String dbtable) throws SemanticException { 6 if (dbtable == null) { 7 return new String[2]; 8 } 9 String[] names = dbtable.split("\\."); 10 switch (names.length) { 11 case 2: 12 return names;
13 case 1: 14 return new String [] {defaultDb, dbtable}; 15 default: 16 throw new SemanticException(ErrorMsg.INVALID_TABLE_NAME, dbtable); 17 } 18 }


 1   public void dropTable(String dbname, String name, boolean deleteData,
 2       boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
 3       NoSuchObjectException, UnsupportedOperationException {
 4     Table tbl;
 5     try {
6 tbl = getTable(dbname, name); 7 } catch (NoSuchObjectException e) { 8 if (!ignoreUnknownTab) { 9 throw e; 10 } 11 return; 12 }
    //根据table type来判断是否为IndexTable,如果为索引表则不允许删除  
13 if (isIndexTable(tbl)) { 14 throw new UnsupportedOperationException("Cannot drop index tables"); 15 }
    //这里的getHook 与create时getHook一致,获取对应table存储的hook
16 HiveMetaHook hook = getHook(tbl); 17 if (hook != null) { 18 hook.preDropTable(tbl); 19 } 20 boolean success = false; 21 try {
22 drop_table_with_environment_context(dbname, name, deleteData, envContext); 23 if (hook != null) { 24 hook.commitDropTable(tbl, deleteData); 25 } 26 success=true; 27 } catch (NoSuchObjectException e) { 28 if (!ignoreUnknownTab) { 29 throw e; 30 } 31 } finally { 32 if (!success && (hook != null)) { 33 hook.rollbackDropTable(tbl); 34 } 35 } 36 }


 1    private boolean drop_table_core(final RawStore ms, final String dbname, final String name,
 2         final boolean deleteData, final EnvironmentContext envContext,
 3         final String indexName) throws NoSuchObjectException,
 4         MetaException, IOException, InvalidObjectException, InvalidInputException {
 5       boolean success = false;
 6       boolean isExternal = false;
 7       Path tblPath = null;
 8       List<Path> partPaths = null;
 9       Table tbl = null;
10       boolean ifPurge = false;
11       try {
12         ms.openTransaction();
13         // 获取正个Table的对象属性
14         tbl = get_table_core(dbname, name);
15         if (tbl == null) {
16           throw new NoSuchObjectException(name + " doesn't exist");
17         }
18 if (tbl.getSd() == null) { 19 throw new MetaException("Table metadata is corrupted"); 20 } 21 ifPurge = isMustPurge(envContext, tbl); 22 23 firePreEvent(new PreDropTableEvent(tbl, deleteData, this));        //判断如果该表存在索引,则需要先删除该表的索引 25 boolean isIndexTable = isIndexTable(tbl); 26 if (indexName == null && isIndexTable) { 27 throw new RuntimeException( 28 "The table " + name + " is an index table. Please do drop index instead."); 29 }        //如果不是索引表,则删除索引元数据 31 if (!isIndexTable) { 32 try { 33 List<Index> indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE); 34 while (indexes != null && indexes.size() > 0) { 35 for (Index idx : indexes) { 36 this.drop_index_by_name(dbname, name, idx.getIndexName(), true); 37 } 38 indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE); 39 } 40 } catch (TException e) { 41 throw new MetaException(e.getMessage()); 42 } 43 }
44 isExternal = isExternal(tbl); 45 if (tbl.getSd().getLocation() != null) { 46 tblPath = new Path(tbl.getSd().getLocation()); 47 if (!wh.isWritable(tblPath.getParent())) { 48 String target = indexName == null ? "Table" : "Index table"; 49 throw new MetaException(target + " metadata not deleted since " + 50 tblPath.getParent() + " is not writable by " + 51 hiveConf.getUser()); 52 } 53 } 54 56 checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge); 57 //获取所有partition的location path 这里有个奇怪的地方,为什么不将Table对象直接传入,而是又在该方法中重新getTable,同时校验上级目录的读写权限 58 partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath, 59 tbl.getPartitionKeys(), deleteData && !isExternal); 60      //调用ObjectStore进行meta数据的删除 61 if (!ms.dropTable(dbname, name)) { 62 String tableName = dbname + "." + name; 63 throw new MetaException(indexName == null ? "Unable to drop table " + tableName: 64 "Unable to drop index table " + tableName + " for index " + indexName); 65 } 66 success = ms.commitTransaction(); 67 } finally { 68 if (!success) { 69 ms.rollbackTransaction(); 70 } else if (deleteData && !isExternal) {         //删除物理partition 73 deletePartitionData(partPaths, ifPurge); 74 //删除Table路径 75 deleteTableData(tblPath, ifPurge); 76 // ok even if the data is not deleted 77
       //Listener 处理
78 for (MetaStoreEventListener listener : listeners) { 79 DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this); 80 dropTableEvent.setEnvironmentContext(envContext); 81 listener.onDropTable(dropTableEvent); 82 } 83 } 84 return success; 85 }

   我们继续深入ObjectStore中的dropTable,会发现 再一次通过dbName与tableName获取整个Table对象,随后逐一删除。也许代码并不是同一个人写的也可能是由于安全性考虑?很多可以通过接口传入的Table对象,都重新获取了,这样会不会加重数据库的负担呢?ObjectStore代码如下:

 1   public boolean dropTable(String dbName, String tableName) throws MetaException,
 2     NoSuchObjectException, InvalidObjectException, InvalidInputException {
 3     boolean success = false;
 4     try {
 5       openTransaction();
6 MTable tbl = getMTable(dbName, tableName); 7 pm.retrieve(tbl); 8 if (tbl != null) { 9 //下列代码查询并删除所有的权限 10 List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, tableName); 11 if (tabGrants != null && tabGrants.size() > 0) { 12 pm.deletePersistentAll(tabGrants); 13 }
14 List<MTableColumnPrivilege> tblColGrants = listTableAllColumnGrants(dbName, 15 tableName); 16 if (tblColGrants != null && tblColGrants.size() > 0) { 17 pm.deletePersistentAll(tblColGrants); 18 } 19 20 List<MPartitionPrivilege> partGrants = this.listTableAllPartitionGrants(dbName, tableName); 21 if (partGrants != null && partGrants.size() > 0) { 22 pm.deletePersistentAll(partGrants); 23 } 24 25 List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(dbName, 26 tableName); 27 if (partColGrants != null && partColGrants.size() > 0) { 28 pm.deletePersistentAll(partColGrants); 29 } 30 // delete column statistics if present 31 try {
32 deleteTableColumnStatistics(dbName, tableName, null); 33 } catch (NoSuchObjectException e) { 34"Found no table level column statistics associated with db " + dbName + 35 " table " + tableName + " record to delete"); 36 } 37      //删除mcd表数据 38 preDropStorageDescriptor(tbl.getSd()); 39 //删除整个Table对象相关表数据 40 pm.deletePersistentAll(tbl); 41 } 42 success = commitTransaction(); 43 } finally { 44 if (!success) { 45 rollbackTransaction(); 46 } 47 } 48 return success; 49 }



 1   public void alterTable(String tblName, Table newTbl, boolean cascade)
 2       throws InvalidOperationException, HiveException {
 3     String[] names = Utilities.getDbTableName(tblName);
 4     try {
 5       //删除table kv中的DDL_TIME 因为要alterTable所以,该事件会被改变
 6       if (newTbl.getParameters() != null) {
 7         newTbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
 8       }
9 newTbl.checkValidity();
10 getMSC().alter_table(names[0], names[1], newTbl.getTTable(), cascade); 11 } catch (MetaException e) { 12 throw new HiveException("Unable to alter table. " + e.getMessage(), e); 13 } catch (TException e) { 14 throw new HiveException("Unable to alter table. " + e.getMessage(), e); 15 } 16 }


 1     private void alter_table_core(final String dbname, final String name, final Table newTable,
 2         final EnvironmentContext envContext, final boolean cascade)
 3         throws InvalidOperationException, MetaException {
 4       startFunction("alter_table", ": db=" + dbname + " tbl=" + name
 5           + " newtbl=" + newTable.getTableName());
 7       //更新DDL_Time
 8       if (newTable.getParameters() == null ||
 9           newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
10         newTable.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
11             .currentTimeMillis() / 1000));
12       }
13       boolean success = false;
14       Exception ex = null;
15       try {
16 Table oldt = get_table_core(dbname, name);
17 firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
18 alterHandler.alterTable(getMS(), wh, dbname, name, newTable, cascade); 19 success = true; 20     
       //进行Listener处理 21 for (MetaStoreEventListener listener : listeners) { 22 23 AlterTableEvent alterTableEvent = 24 new AlterTableEvent(oldt, newTable, success, this); 25 alterTableEvent.setEnvironmentContext(envContext); 26 listener.onAlterTable(alterTableEvent); 27 } 28 } catch (NoSuchObjectException e) { 29 // thrown when the table to be altered does not exist 30 ex = e; 31 throw new InvalidOperationException(e.getMessage()); 32 } catch (Exception e) { 33 ex = e; 34 if (e instanceof MetaException) { 35 throw (MetaException) e; 36 } else if (e instanceof InvalidOperationException) { 37 throw (InvalidOperationException) e; 38 } else { 39 throw newMetaException(e); 40 } 41 } finally { 42 endFunction("alter_table", success, ex, name); 43 } 44 }

  那么,我们重点看下alterHandler具体所做的事情,在这之前简要说下alterHandler的初始化,它是在HiveMetaStore init时获取的hive.metastore.alter.impl参数的className,也就是HiveAlterHandler的name,那么具体,我们来看下它alterTable时的实现,前方高能,小心火烛:)

  1   public void alterTable(RawStore msdb, Warehouse wh, String dbname,
  2       String name, Table newt, boolean cascade) throws InvalidOperationException, MetaException {
  3     if (newt == null) {
  4       throw new InvalidOperationException("New table is invalid: " + newt);
  5     }
  6    //校验新的tableName是否合法
  7     if (!MetaStoreUtils.validateName(newt.getTableName())) {
  8       throw new InvalidOperationException(newt.getTableName()
  9           + " is not a valid object name");
 10     }
     //校验新的column Name type是否合法
11 String validate = MetaStoreUtils.validateTblColumns(newt.getSd().getCols()); 12 if (validate != null) { 13 throw new InvalidOperationException("Invalid column " + validate); 14 } 15 16 Path srcPath = null; 17 FileSystem srcFs = null; 18 Path destPath = null; 19 FileSystem destFs = null; 20 21 boolean success = false; 22 boolean moveData = false; 23 boolean rename = false; 24 Table oldt = null; 25 List<ObjectPair<Partition, String>> altps = new ArrayList<ObjectPair<Partition, String>>(); 26 27 try { 28 msdb.openTransaction();
      //这里直接转换小写,可以看出 代码不是一个人写的
29 name = name.toLowerCase(); 30 dbname = dbname.toLowerCase(); 31 32 //校验新的tableName是否存在 33 if (!newt.getTableName().equalsIgnoreCase(name) 34 || !newt.getDbName().equalsIgnoreCase(dbname)) { 35 if (msdb.getTable(newt.getDbName(), newt.getTableName()) != null) { 36 throw new InvalidOperationException("new table " + newt.getDbName() 37 + "." + newt.getTableName() + " already exists"); 38 } 39 rename = true; 40 } 41 42 //获取老的table对象 43 oldt = msdb.getTable(dbname, name); 44 if (oldt == null) { 45 throw new InvalidOperationException("table " + newt.getDbName() + "." 46 + newt.getTableName() + " doesn't exist"); 47 } 48     //alter Table时 获取 METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES配置项,如果为true的话,将改变column的type类型,这里为false 49 if (HiveConf.getBoolVar(hiveConf, 50 HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, 51 false)) { 52 // Throws InvalidOperationException if the new column types are not 53 // compatible with the current column types. 54 MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange( 55 oldt.getSd().getCols(), newt.getSd().getCols()); 56 } 57     //cascade参数由调用Hive altertable方法穿过来的,也就是引擎调用时参数的设置,这里用来查看是否需要alterPartition信息 58 if (cascade) { 59 //校验新的column是否与老的column一致,如不一致,说明进行了column的添加或删除操作 60 if(MetaStoreUtils.isCascadeNeededInAlterTable(oldt, newt)) {
61 List<Partition> parts = msdb.getPartitions(dbname, name, -1); 62 for (Partition part : parts) { 63 List<FieldSchema> oldCols = part.getSd().getCols(); 64 part.getSd().setCols(newt.getSd().getCols()); 65 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
66 updatePartColumnStatsForAlterColumns(msdb, part, oldPartName, part.getValues(), oldCols, part);
67 msdb.alterPartition(dbname, name, part.getValues(), part); 68 } 69 } else { 70 LOG.warn("Alter table does not cascade changes to its partitions."); 71 } 72 } 73 74 //判断parititonkey是否改变,也就是dt 或 hour等partName是否改变 76 boolean partKeysPartiallyEqual = checkPartialPartKeysEqual(oldt.getPartitionKeys(), 77 newt.getPartitionKeys()); 78     
      //如果已有表为视图表,同时发现老的partkey与新的partKey不一致,则报错 79 if(!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){ 80 if (oldt.getPartitionKeys().size() != newt.getPartitionKeys().size() 81 || !partKeysPartiallyEqual) { 82 throw new InvalidOperationException( 83 "partition keys can not be changed."); 84 } 85 } 86       //如果该表不为视图表,同时,该表的location信息并未发生变化,同时新的location信息并不为空,并且已有的该表不为外部表,说明用户是想要移动数据到新的location地址,那么该操作
       // 为alter table rename操作
91 if (rename 92 && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) 93 && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0 94 || StringUtils.isEmpty(newt.getSd().getLocation())) 95 && !MetaStoreUtils.isExternalTable(oldt)) { 96      //获取新的location信息 97 srcPath = new Path(oldt.getSd().getLocation()); 98 srcFs = wh.getFs(srcPath); 99 100 // that means user is asking metastore to move data to new location 101 // corresponding to the new name 102 // get new location 103 Database db = msdb.getDatabase(newt.getDbName()); 104 Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath); 105 destPath = new Path(databasePath, newt.getTableName()); 106 destFs = wh.getFs(destPath); 107      //设置新的table location信息 用于后续更新动作 108 newt.getSd().setLocation(destPath.toString()); 109 moveData = true; 110        //校验物理目标地址是否存在,如果存在则会override所有数据,是不允许的。 114 if (!FileUtils.equalsFileSystem(srcFs, destFs)) { 115 throw new InvalidOperationException("table new location " + destPath 116 + " is on a different file system than the old location " 117 + srcPath + ". This operation is not supported"); 118 } 119 try { 120 srcFs.exists(srcPath); // check that src exists and also checks 121 // permissions necessary 122 if (destFs.exists(destPath)) { 123 throw new InvalidOperationException("New location for this table " 124 + newt.getDbName() + "." + newt.getTableName() 125 + " already exists : " + destPath); 126 } 127 } catch (IOException e) { 128 throw new InvalidOperationException("Unable to access new location " 129 + destPath + " for table " + newt.getDbName() + "." 130 + newt.getTableName()); 131 } 132 String oldTblLocPath = srcPath.toUri().getPath(); 133 String newTblLocPath = destPath.toUri().getPath(); 134      135 //获取old table中的所有partition信息 136 List<Partition> parts = msdb.getPartitions(dbname, name, -1); 137 for (Partition part : parts) { 138 String oldPartLoc = part.getSd().getLocation();
139 if (oldPartLoc.contains(oldTblLocPath)) { 140 URI oldUri = new Path(oldPartLoc).toUri(); 141 String newPath = oldUri.getPath().replace(oldTblLocPath, newTblLocPath); 142 Path newPartLocPath = new Path(oldUri.getScheme(), oldUri.getAuthority(), newPath); 143 altps.add(ObjectPair.create(part, part.getSd().getLocation())); 144 part.getSd().setLocation(newPartLocPath.toString()); 145 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues()); 146 try { 147 //existing partition column stats is no longer valid, remove them 148 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, part.getValues(), null); 149 } catch (InvalidInputException iie) { 150 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); 151 } 152 msdb.alterPartition(dbname, name, part.getValues(), part); 153 } 154 }
155 } else if (MetaStoreUtils.requireCalStats(hiveConf, null, null, newt) && 156 (newt.getPartitionKeysSize() == 0)) { 157 Database db = msdb.getDatabase(newt.getDbName()); 158 // Update table stats. For partitioned table, we update stats in 159 // alterPartition() 160 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, newt, wh, false, true); 161 } 162 updateTableColumnStatsForAlterTable(msdb, oldt, newt); 163 // now finally call alter table 164 msdb.alterTable(dbname, name, newt); 165 // commit the changes 166 success = msdb.commitTransaction(); 167 } catch (InvalidObjectException e) { 168 LOG.debug(e); 169 throw new InvalidOperationException( 170 "Unable to change partition or table." 171 + " Check metastore logs for detailed stack." + e.getMessage()); 172 } catch (NoSuchObjectException e) { 173 LOG.debug(e); 174 throw new InvalidOperationException( 175 "Unable to change partition or table. Database " + dbname + " does not exist" 176 + " Check metastore logs for detailed stack." + e.getMessage()); 177 } finally { 178 if (!success) { 179 msdb.rollbackTransaction(); 180 } 181 if (success && moveData) {        //开始更新hdfs路径,进行老路径的rename到新路径 ,调用fileSystem的rename操作 185 try { 186 if (srcFs.exists(srcPath) && !srcFs.rename(srcPath, destPath)) { 187 throw new IOException("Renaming " + srcPath + " to " + destPath + " failed"); 188 } 189 } catch (IOException e) { 190 LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e); 191 boolean revertMetaDataTransaction = false; 192 try { 193 msdb.openTransaction();
194 msdb.alterTable(newt.getDbName(), newt.getTableName(), oldt); 195 for (ObjectPair<Partition, String> pair : altps) { 196 Partition part = pair.getFirst(); 197 part.getSd().setLocation(pair.getSecond()); 198 msdb.alterPartition(newt.getDbName(), name, part.getValues(), part); 199 } 200 revertMetaDataTransaction = msdb.commitTransaction(); 201 } catch (Exception e1) { 202 // we should log this for manual rollback by administrator 203 LOG.error("Reverting metadata by HDFS operation failure failed During HDFS operation failed", e1); 204 LOG.error("Table " + Warehouse.getQualifiedName(newt) + 205 " should be renamed to " + Warehouse.getQualifiedName(oldt)); 206 LOG.error("Table " + Warehouse.getQualifiedName(newt) + 207 " should have path " + srcPath); 208 for (ObjectPair<Partition, String> pair : altps) { 209 LOG.error("Partition " + Warehouse.getQualifiedName(pair.getFirst()) + 210 " should have path " + pair.getSecond()); 211 } 212 if (!revertMetaDataTransaction) { 213 msdb.rollbackTransaction(); 214 } 215 } 216 throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name + 217 " failed to move data due to: '" + getSimpleMessage(e) + "' See hive log file for details."); 218 } 219 } 220 } 221 if (!success) { 222 throw new MetaException("Committing the alter table transaction was not successful."); 223 } 224 }


1   public Partition createPartition(Table tbl, Map<String, String> partSpec) throws HiveException {
2     try {
    //new出来一个Partition对象,传入Table对象,调用Partition的构造方法来initialize Partition的信息
3 return new Partition(tbl, getMSC().add_partition( 4 Partition.createMetaPartitionObject(tbl, partSpec, null))); 5 } catch (Exception e) { 6 LOG.error(StringUtils.stringifyException(e)); 7 throw new HiveException(e); 8 } 9 }


 1   public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartitionObject(
 2       Table tbl, Map<String, String> partSpec, Path location) throws HiveException {
 3     List<String> pvals = new ArrayList<String>();
4 for (FieldSchema field : tbl.getPartCols()) { 5 String val = partSpec.get(field.getName()); 6 if (val == null || val.isEmpty()) { 7 throw new HiveException("partition spec is invalid; field " 8 + field.getName() + " does not exist or is empty"); 9 } 10 pvals.add(val); 11 } 12   //set相关的属性信息,包括DbName、TableName、PartValues、以及sd信息 13 org.apache.hadoop.hive.metastore.api.Partition tpart = 14 new org.apache.hadoop.hive.metastore.api.Partition(); 15 tpart.setDbName(tbl.getDbName()); 16 tpart.setTableName(tbl.getTableName()); 17 tpart.setValues(pvals); 18 19 if (!tbl.isView()) { 20 tpart.setSd(cloneS d(tbl)); 21 tpart.getSd().setLocation((location != null) ? location.toString() : null); 22 } 23 return tpart; 24 }


 1     private Partition add_partition_core(final RawStore ms,
 2         final Partition part, final EnvironmentContext envContext)
 3         throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
 4       boolean success = false;
 5       Table tbl = null;
 6       try {
 7         ms.openTransaction(); 
8 tbl = ms.getTable(part.getDbName(), part.getTableName()); 9 if (tbl == null) { 10 throw new InvalidObjectException( 11 "Unable to add partition because table or database do not exist"); 12 } 13      //事件处理 14 firePreEvent(new PreAddPartitionEvent(tbl, part, this)); 15      //在创建Partition之前,首先会校验元数据中该partition是否存在 16 boolean shouldAdd = startAddPartition(ms, part, false); 17 assert shouldAdd; // start would throw if it already existed here
       //创建Partition路径 18 boolean madeDir = createLocationForAddedPartition(tbl, part); 19 try {
20 initializeAddedPartition(tbl, part, madeDir);
21 success = ms.addPartition(part); 22 } finally { 23 if (!success && madeDir) {
24 wh.deleteDir(new Path(part.getSd().getLocation()), true); 25 } 26 } 27 // we proceed only if we'd actually succeeded anyway, otherwise, 28 // we'd have thrown an exception 29 success = success && ms.commitTransaction(); 30 } finally { 31 if (!success) { 32 ms.rollbackTransaction(); 33 } 34 fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success); 35 } 36 return part; 37 }


 1     private boolean createLocationForAddedPartition(
 2         final Table tbl, final Partition part) throws MetaException {
 3       Path partLocation = null;
 4       String partLocationStr = null;
5 if (part.getSd() != null) { 6 partLocationStr = part.getSd().getLocation(); 7 } 8     //如果为null,则重新拼接part Location 9 if (partLocationStr == null || partLocationStr.isEmpty()) { 10 // set default location if not specified and this is 11 // a physical table partition (not a view) 12 if (tbl.getSd().getLocation() != null) {
        //如果不为null,则继续拼接文件路径及part的路径,组成完成的Partition location
13 partLocation = new Path(tbl.getSd().getLocation(), Warehouse 14 .makePartName(tbl.getPartitionKeys(), part.getValues())); 15 } 16 } else { 17 if (tbl.getSd().getLocation() == null) { 18 throw new MetaException("Cannot specify location for a view partition"); 19 } 20 partLocation = wh.getDnsPath(new Path(partLocationStr)); 21 } 22 23 boolean result = false;
24 if (partLocation != null) { 25 part.getSd().setLocation(partLocation.toString()); 26 27 // Check to see if the directory already exists before calling 28 // mkdirs() because if the file system is read-only, mkdirs will 29 // throw an exception even if the directory already exists. 30 if (!wh.isDir(partLocation)) { 31 if (!wh.mkdirs(partLocation, true)) { 32 throw new MetaException(partLocation 33 + " is not a directory or unable to create one"); 34 } 35 result = true; 36 } 37 } 38 return result; 39 }




 1     private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name,
 2       List<String> part_vals, final boolean deleteData, final EnvironmentContext envContext)
 3       throws MetaException, NoSuchObjectException, IOException, InvalidObjectException,
 4       InvalidInputException {
 5       boolean success = false;
 6       Path partPath = null;
 7       Table tbl = null;
 8       Partition part = null;
 9       boolean isArchived = false;
10       Path archiveParentDir = null;
11       boolean mustPurge = false;
13       try {
14         ms.openTransaction();
15 part = ms.getPartition(db_name, tbl_name, part_vals);
16 tbl = get_table_core(db_name, tbl_name); 17 firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this)); 18 mustPurge = isMustPurge(envContext, tbl); 19 20 if (part == null) { 21 throw new NoSuchObjectException("Partition doesn't exist. " 22 + part_vals); 23 } 24      //这一片还没有深入看Arrchived partition 25 isArchived = MetaStoreUtils.isArchived(part); 26 if (isArchived) { 27 archiveParentDir = MetaStoreUtils.getOriginalLocation(part); 28 verifyIsWritablePath(archiveParentDir); 29 checkTrashPurgeCombination(archiveParentDir, db_name + "." + tbl_name + "." + part_vals, mustPurge); 30 } 31 if (!ms.dropPartition(db_name, tbl_name, part_vals)) { 32 throw new MetaException("Unable to drop partition"); 33 } 34 success = ms.commitTransaction(); 35 if ((part.getSd() != null) && (part.getSd().getLocation() != null)) { 36 partPath = new Path(part.getSd().getLocation()); 37 verifyIsWritablePath(partPath); 38 checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." + part_vals, mustPurge); 39 } 40 } finally { 41 if (!success) { 42 ms.rollbackTransaction(); 43 } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) { 44 if (tbl != null && !isExternal(tbl)) { 45 if (mustPurge) { 46"dropPartition() will purge " + partPath + " directly, skipping trash."); 47 } 48 else { 49"dropPartition() will move " + partPath + " to trash-directory."); 50 }
51 // Archived partitions have har:/to_har_file as their location. 52 // The original directory was saved in params 53 if (isArchived) { 54 assert (archiveParentDir != null); 55 wh.deleteDir(archiveParentDir, true, mustPurge); 56 } else { 57 assert (partPath != null); 58 wh.deleteDir(partPath, true, mustPurge); 59 deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge); 60 } 61 // ok even if the data is not deleted 62 } 63 } 64 for (MetaStoreEventListener listener : listeners) { 65 DropPartitionEvent dropPartitionEvent = 66 new DropPartitionEvent(tbl, part, success, deleteData, this); 67 dropPartitionEvent.setEnvironmentContext(envContext); 68 listener.onDropPartition(dropPartitionEvent); 69 } 70 } 71 return true; 72 }



 1     private void rename_partition(final String db_name, final String tbl_name,
 2         final List<String> part_vals, final Partition new_part,
 3         final EnvironmentContext envContext)
 4         throws InvalidOperationException, MetaException,
 5         TException {
6 startTableFunction("alter_partition", db_name, tbl_name); 7 8 if (LOG.isInfoEnabled()) { 9"New partition values:" + new_part.getValues()); 10 if (part_vals != null && part_vals.size() > 0) { 11"Old Partition values:" + part_vals); 12 } 13 } 14 15 Partition oldPart = null; 16 Exception ex = null; 17 try { 18 firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this)); 19      //校验PartName的规范性 20 if (part_vals != null && !part_vals.isEmpty()) { 21 MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(), 22 partitionValidationPattern); 23 } 24      调用alterHandler的alterPartition进行partition物理上的rename,以及元数据修改 25 oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part); 26 27 // Only fetch the table if we actually have a listener 28 Table table = null; 29 for (MetaStoreEventListener listener : listeners) { 30 if (table == null) { 31 table = getMS().getTable(db_name, tbl_name); 32 } 33 AlterPartitionEvent alterPartitionEvent = 34 new AlterPartitionEvent(oldPart, new_part, table, true, this); 35 alterPartitionEvent.setEnvironmentContext(envContext); 36 listener.onAlterPartition(alterPartitionEvent); 37 } 38 } catch (InvalidObjectException e) { 39 ex = e; 40 throw new InvalidOperationException(e.getMessage()); 41 } catch (AlreadyExistsException e) { 42 ex = e; 43 throw new InvalidOperationException(e.getMessage()); 44 } catch (Exception e) { 45 ex = e; 46 if (e instanceof MetaException) { 47 throw (MetaException) e; 48 } else if (e instanceof InvalidOperationException) { 49 throw (InvalidOperationException) e; 50 } else if (e instanceof TException) { 51 throw (TException) e; 52 } else { 53 throw newMetaException(e); 54 } 55 } finally { 56 endFunction("alter_partition", oldPart != null, ex, tbl_name); 57 } 58 return; 59 }


  1   public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
  2       final String name, final List<String> part_vals, final Partition new_part)
  3       throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
  4       MetaException {
  5     boolean success = false;
  7     Path srcPath = null;
  8     Path destPath = null;
  9     FileSystem srcFs = null;
 10     FileSystem destFs = null;
 11     Partition oldPart = null;
 12     String oldPartLoc = null;
 13     String newPartLoc = null;
 15     //修改新的partition的DDL时间
 16     if (new_part.getParameters() == null ||
 17         new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
 18         Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
 19       new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
 20           .currentTimeMillis() / 1000));
 21     }
 22    //根据dbName、tableName获取整个Table对象
 23     Table tbl = msdb.getTable(dbname, name);
 24     //如果传入的part_vals为空或为0,说明修改的只是partition的其他元数据信息而不牵扯到partKV,则直接元数据,在msdb.alterPartition会直接更新
 25     if (part_vals == null || part_vals.size() == 0) {
 26       try {
 27         oldPart = msdb.getPartition(dbname, name, new_part.getValues());
 28         if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
 29           MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
 30         }
 31         updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
 32         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
 33       } catch (InvalidObjectException e) {
 34         throw new InvalidOperationException("alter is not possible");
 35       } catch (NoSuchObjectException e){
 36         //old partition does not exist
 37         throw new InvalidOperationException("alter is not possible");
 38       }
 39       return oldPart;
 40     }
 41     //rename partition
 42     try {
 43       msdb.openTransaction();
 44       try {
45 oldPart = msdb.getPartition(dbname, name, part_vals); 46 } catch (NoSuchObjectException e) { 47 // this means there is no existing partition 48 throw new InvalidObjectException( 49 "Unable to rename partition because old partition does not exist"); 50 } 51 Partition check_part = null; 52 try {
53 check_part = msdb.getPartition(dbname, name, new_part.getValues()); 54 } catch(NoSuchObjectException e) { 55 // this means there is no existing partition 56 check_part = null; 57 }
      //如果check_part组装成功,说明该part已经存在,则报already exists
58 if (check_part != null) { 59 throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." + 60 new_part.getValues()); 61 }
62 if (tbl == null) { 63 throw new InvalidObjectException( 64 "Unable to rename partition because table or database do not exist"); 65 } 66 67 //如果是外部表的分区变化了,那么不需要操作文件系统,直接更新meta信息即可 68 if (tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { 69 new_part.getSd().setLocation(oldPart.getSd().getLocation()); 70 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues()); 71 try { 72 //existing partition column stats is no longer valid, remove 73 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null); 74 } catch (NoSuchObjectException nsoe) { 75 //ignore 76 } catch (InvalidInputException iie) { 77 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); 78 } 79 msdb.alterPartition(dbname, name, part_vals, new_part); 80 } else { 81 try {
82 destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name), 83 Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
84 destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation())); 85 } catch (NoSuchObjectException e) { 86 LOG.debug(e); 87 throw new InvalidOperationException( 88 "Unable to change partition or table. Database " + dbname + " does not exist" 89 + " Check metastore logs for detailed stack." + e.getMessage()); 90 }
91 if (destPath != null) { 92 newPartLoc = destPath.toString(); 93 oldPartLoc = oldPart.getSd().getLocation(); 94       //根据原有sd的路径获取老的part路径信息 95 srcPath = new Path(oldPartLoc); 96 97"srcPath:" + oldPartLoc); 98"descPath:" + newPartLoc); 99 srcFs = wh.getFs(srcPath); 100 destFs = wh.getFs(destPath); 101 //查看srcFS与destFs是否Wie同一个fileSystem 102 if (!FileUtils.equalsFileSystem(srcFs, destFs)) { 103 throw new InvalidOperationException("table new location " + destPath 104 + " is on a different file system than the old location " 105 + srcPath + ". This operation is not supported"); 106 } 107 try {
108 srcFs.exists(srcPath); // check that src exists and also checks 109 if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) { 110 throw new InvalidOperationException("New location for this table " 111 + tbl.getDbName() + "." + tbl.getTableName() 112 + " already exists : " + destPath); 113 } 114 } catch (IOException e) { 115 throw new InvalidOperationException("Unable to access new location " 116 + destPath + " for partition " + tbl.getDbName() + "." 117 + tbl.getTableName() + " " + new_part.getValues()); 118 } 119 new_part.getSd().setLocation(newPartLoc); 120 if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) { 121 MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true); 122 }
123 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues()); 124 try { 125 //existing partition column stats is no longer valid, remove 126 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null); 127 } catch (NoSuchObjectException nsoe) { 128 //ignore 129 } catch (InvalidInputException iie) { 130 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); 131 } 132 msdb.alterPartition(dbname, name, part_vals, new_part); 133 } 134 } 135 136 success = msdb.commitTransaction(); 137 } finally { 138 if (!success) { 139 msdb.rollbackTransaction(); 140 } 141 if (success && newPartLoc != null && newPartLoc.compareTo(oldPartLoc) != 0) { 142 //rename the data directory 143 try{ 144 if (srcFs.exists(srcPath)) { 145 //如果根路径海微创建,需要重新进行创建,就好比计算引擎先调用了alterTable,又调用了alterPartition,这时partition的根路径或许还未创建 146 Path destParentPath = destPath.getParent(); 147 if (!wh.mkdirs(destParentPath, true)) { 148 throw new IOException("Unable to create path " + destParentPath); 149 }
150 wh.renameDir(srcPath, destPath, true); 151"rename done!"); 152 } 153 } catch (IOException e) { 154 boolean revertMetaDataTransaction = false; 155 try { 156 msdb.openTransaction(); 157 msdb.alterPartition(dbname, name, new_part.getValues(), oldPart); 158 revertMetaDataTransaction = msdb.commitTransaction(); 159 } catch (Exception e1) { 160 LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1); 161 if (!revertMetaDataTransaction) { 162 msdb.rollbackTransaction(); 163 } 164 } 165 throw new InvalidOperationException("Unable to access old location " 166 + srcPath + " for partition " + tbl.getDbName() + "." 167 + tbl.getTableName() + " " + part_vals); 168 } 169 } 170 } 171 return oldPart; 172 }



SQL 缓存 分布式计算
SparkSQL与Hive metastore Parquet转换
Spark SQL为了更好的性能,在读写Hive metastore parquet格式的表时,会默认使用自己的Parquet SerDe,而不是采用Hive的SerDe进行序列化和反序列化
SparkSQL与Hive metastore Parquet转换
SQL 存储 分布式计算
CDP的Hive3系列之Hive Metastore介绍
CDP的Hive Metastore (HMS) 是一种服务,用于在后端 RDBMS(例如 MySQL 或 PostgreSQL)中存储与 Apache Hive 和其他服务相关的元数据。Impala、Spark、Hive 和其他服务共享元存储。与 HMS 的连接包括 HiveServer、Ranger 和代表 HDFS 的 NameNode。
1683 0
CDP的Hive3系列之Hive Metastore介绍
SQL 分布式计算 Java
浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)
浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)
SQL 运维 大数据
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
SQL 存储 大数据
随着大数据技术的不断发展,数据仓库成为了企业中不可或缺的一部分。而Hive作为一种开源的数据仓库系统,因其易于使用和高效处理等特点,成为了许多企业的首选。然而,对于普通用户来说,直接使用Hive的命令行工具进行操作并不方便。因此,开发者社区中涌现出了大量的Hive GUI工具,其中最为流行的就是Web GUI工具。
219 2
SQL 存储 分布式计算
Hive 2.1.1 MetaException(在metastore中找不到消息:版本信息)
Hive 2.1.1 MetaException(在metastore中找不到消息:版本信息)
227 0
SQL 存储 缓存
hive metastore 3.0介绍
我们说到Hive 3.0.0版本开始,其单独提供了standalone metastore服务以作为像presto等处理引擎的元数据管理中心。
解决启动Hive报错Hive Schema version 2.3.0 does not match metastore‘s schema version 1.2.0 Metastore is not
解决启动Hive报错Hive Schema version 2.3.0 does not match metastore‘s schema version 1.2.0 Metastore is not
161 0
SQL 分布式计算 Java
spark 对于hive metastore的兼容性随笔--通过classloader实现
spark 对于hive metastore的兼容性随笔--通过classloader实现
424 0
SQL 分布式计算 Java
hive metastore配置kerberos认证
hive从3.0.0开始提供hive metastore单独服务作为像presto、flink、spark等组件的元数据中心。但是默认情况下hive metastore在启动之后是不需要进行认证就可以访问的。所以本文基于大数据组件中流行的kerberos认证方式,对hive metastore进行认证配置。