开发者社区> 松伯> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Hive metastore整体代码分析及详解

简介:   从上一篇对Hive metastore表结构的简要分析中,我再根据数据设计的实体对象,再进行整个代码结构的总结。那么我们先打开metadata的目录,其目录结构:   可以看到,整个hivemeta的目录包含metastore(客户端与服务端调用逻辑)、events(事件目录包含table生命...
+关注继续查看

  从上一篇对Hive metastore表结构的简要分析中,我再根据数据设计的实体对象,再进行整个代码结构的总结。那么我们先打开metadata的目录,其目录结构:

  可以看到,整个hivemeta的目录包含metastore(客户端与服务端调用逻辑)、events(事件目录包含table生命周期中的检查、权限认证等listener实现)、hooks(这里的hooks仅包含了jdo connection的相关接口)、parser(对于表达树的解析)、spec(partition的相关代理类)、tools(jdo execute相关方法)及txn及model,下来我们从整个metadata分逐一进行代码分析及注释:

  没有把包打开,很多类?是不是感觉害怕很想死?我也想死,咱们继续。。一开始,我们可能觉得一团乱麻烦躁,这是啥玩意儿啊这。。冷静下来,我们从Hive这个大类开始看,因为它是metastore元数据调用的入口。整个生命周期分析流程为: HiveMetaStoreClient客户端的创建及加载、HiveMetaStore服务端的创建及加载、createTable、dropTable、AlterTable、createPartition、dropPartition、alterPartition。当然,这只是完整metadata的一小部分。

  1、HiveMetaStoreClient客户端的创建及加载

  那么我们从Hive这个类一点点开始看:

 1   private HiveConf conf = null;
 2   private IMetaStoreClient metaStoreClient;
 3   private UserGroupInformation owner;
 4 
 5   // metastore calls timing information
 6   private final Map<String, Long> metaCallTimeMap = new HashMap<String, Long>();
 7 
 8   private static ThreadLocal<Hive> hiveDB = new ThreadLocal<Hive>() {
 9     @Override
10     protected synchronized Hive initialValue() {
11       return null;
12     }
13 
14     @Override
15     public synchronized void remove() {
16       if (this.get() != null) {
17         this.get().close();
18       }
19       super.remove();
20     }
21   };

  这里声明的有hiveConf对象、metaStoreClient 、操作用户组userGroupInfomation以及调用时间Map,这里存成一个map,用来记录每一个动作的运行时长。同时维护了一个本地线程hiveDB,如果db为空的情况下,会重新创建一个Hive对象,代码如下:

 1   public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
 2     Hive db = hiveDB.get();
 3     if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
 4       if (db != null) {
 5         LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
 6           ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
 7       }
 8       closeCurrent();
 9       c.set("fs.scheme.class", "dfs");
10       Hive newdb = new Hive(c);
11       hiveDB.set(newdb);
12       return newdb;
13     }
14     db.conf = c;
15     return db;
16   }

  随后我们会发现,在创建Hive对象时,便已经将function进行注册,什么是function呢,通过上次的表结构分析,可以理解为所有udf等jar包的元数据存储。代码如下:

 1   // register all permanent functions. need improvement
 2   static {
 3     try {
 4       reloadFunctions();
 5     } catch (Exception e) {
 6       LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e);
 7     }
 8   }
 9 
10   public static void reloadFunctions() throws HiveException {
    //获取 Hive对象,用于后续方法的调用
11 Hive db = Hive.get();
    //通过遍历每一个dbName
12 for (String dbName : db.getAllDatabases()) {
      //通过dbName查询挂在该db下的所有function的信息。
13 for (String functionName : db.getFunctions(dbName, "*")) { 14 Function function = db.getFunction(dbName, functionName); 15 try {
     //这里的register便是将查询到的function的数据注册到Registry类中的一个Map<String,FunctionInfo>中,以便计算引擎在调用时,不必再次查询数据库。
16 FunctionRegistry.registerPermanentFunction( 17 FunctionUtils.qualifyFunctionName(functionName, dbName), function.getClassName(), 18 false, FunctionTask.toFunctionResource(function.getResourceUris())); 19 } catch (Exception e) { 20 LOG.warn("Failed to register persistent function " + 21 functionName + ":" + function.getClassName() + ". Ignore and continue."); 22 } 23 } 24 } 25 }

  调用getMSC()方法,进行metadataClient客户端的创建,代码如下:

 1  1   private IMetaStoreClient createMetaStoreClient() throws MetaException {
 2  2   
 3     //这里实现接口HiveMetaHookLoader
 4  3     HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
 5  4         @Override
 6  5         public HiveMetaHook getHook(
 7  6           org.apache.hadoop.hive.metastore.api.Table tbl)
 8  7           throws MetaException {
 9  8 
10  9           try {
11 10             if (tbl == null) {
12 11               return null;
13 12             }
14          //根据tble的kv属性加载不同storage的实例,比如hbase、redis等等拓展存储,作为外部表进行存储
15 13             HiveStorageHandler storageHandler =
16 14               HiveUtils.getStorageHandler(conf,
17 15                 tbl.getParameters().get(META_TABLE_STORAGE));
18 16             if (storageHandler == null) {
19 17               return null;
20 18             }
21 19             return storageHandler.getMetaHook();
22 20           } catch (HiveException ex) {
23 21             LOG.error(StringUtils.stringifyException(ex));
24 22             throw new MetaException(
25 23               "Failed to load storage handler:  " + ex.getMessage());
26 24           }
27 25         }
28 26       };
29 27     return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap,
30 28         SessionHiveMetaStoreClient.class.getName());
31 29   }

  2、HiveMetaStore服务端的创建及加载

  在HiveMetaStoreClient初始化时,会初始化HiveMetaStore客户端,代码如下:

 1   public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader)
 2     throws MetaException {
 3 
 4     this.hookLoader = hookLoader;
 5     if (conf == null) {
 6       conf = new HiveConf(HiveMetaStoreClient.class);
 7     }
 8     this.conf = conf;
 9     filterHook = loadFilterHooks();
10    //根据hive-site.xml中的hive.metastore.uris配置,如果配置该参数,则认为是远程连接,否则为本地连接
11     String msUri = conf.getVar(HiveConf.ConfVars.METASTOREURIS);
12     localMetaStore = HiveConfUtil.isEmbeddedMetaStore(msUri);
13     if (localMetaStore) {
      //本地连接直接连接HiveMetaStore
16       client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true);
17       isConnected = true;
18       snapshotActiveConf();
19       return;
20     }
21 
22     //获取配置中的重试次数及timeout时间
23     retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
24     retryDelaySeconds = conf.getTimeVar(
25         ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
26 
27     //拼接metastore uri
28     if (conf.getVar(HiveConf.ConfVars.METASTOREURIS) != null) {
29       String metastoreUrisString[] = conf.getVar(
30           HiveConf.ConfVars.METASTOREURIS).split(",");
31       metastoreUris = new URI[metastoreUrisString.length];
32       try {
33         int i = 0;
34         for (String s : metastoreUrisString) {
35           URI tmpUri = new URI(s);
36           if (tmpUri.getScheme() == null) {
37             throw new IllegalArgumentException("URI: " + s
38                 + " does not have a scheme");
39           }
40           metastoreUris[i++] = tmpUri;
41 
42         }
43       } catch (IllegalArgumentException e) {
44         throw (e);
45       } catch (Exception e) {
46         MetaStoreUtils.logAndThrowMetaException(e);
47       }
48     } else {
49       LOG.error("NOT getting uris from conf");
50       throw new MetaException("MetaStoreURIs not found in conf file");
51     }
52     调用open方法创建连接
53     open();
54   }

  从上面代码中可以看出,如果我们是远程连接,需要配置hive-site.xml中的hive.metastore.uri,是不是很熟悉?加入你的client与server不在同一台机器,就需要配置进行远程连接。那么我们继续往下面看,创建连接的open方法:

  1   private void open() throws MetaException {
  2     isConnected = false;
  3     TTransportException tte = null;
     //是否使用Sasl
4 boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
     //If true, the metastore Thrift interface will use TFramedTransport. When false (default) a standard TTransport is used.
5 boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
     //If true, the metastore Thrift interface will use TCompactProtocol. When false (default) TBinaryProtocol will be used 具体他们之间的区别我们后续再讨论
6 boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
     //获取socket timeout时间
7 int clientSocketTimeout = (int) conf.getTimeVar( 8 ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); 9 10 for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { 11 for (URI store : metastoreUris) { 12 LOG.info("Trying to connect to metastore with URI " + store); 13 try { 14 transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); 15 if (useSasl) { 16 // Wrap thrift connection with SASL for secure connection. 17 try {
           //创建HadoopThriftAuthBridge client
18 HadoopThriftAuthBridge.Client authBridge = 19 ShimLoader.getHadoopThriftAuthBridge().createClient(); 20          //权限认证相关 21 // check if we should use delegation tokens to authenticate 22 // the call below gets hold of the tokens if they are set up by hadoop 23 // this should happen on the map/reduce tasks if the client added the 24 // tokens into hadoop's credential store in the front end during job 25 // submission. 26 String tokenSig = conf.get("hive.metastore.token.signature"); 27 // tokenSig could be null 28 tokenStrForm = Utils.getTokenStrForm(tokenSig); 29 if(tokenStrForm != null) { 30 // authenticate using delegation tokens via the "DIGEST" mechanism 31 transport = authBridge.createClientTransport(null, store.getHost(), 32 "DIGEST", tokenStrForm, transport, 33 MetaStoreUtils.getMetaStoreSaslProperties(conf)); 34 } else { 35 String principalConfig = 36 conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL); 37 transport = authBridge.createClientTransport( 38 principalConfig, store.getHost(), "KERBEROS", null, 39 transport, MetaStoreUtils.getMetaStoreSaslProperties(conf)); 40 } 41 } catch (IOException ioe) { 42 LOG.error("Couldn't create client transport", ioe); 43 throw new MetaException(ioe.toString()); 44 } 45 } else if (useFramedTransport) { 46 transport = new TFramedTransport(transport); 47 } 48 final TProtocol protocol;
         //后续详细说明两者的区别(因为俺还没看,哈哈)
49 if (useCompactProtocol) { 50 protocol = new TCompactProtocol(transport); 51 } else { 52 protocol = new TBinaryProtocol(transport); 53 }
         //创建ThriftHiveMetastore client
54 client = new ThriftHiveMetastore.Client(protocol); 55 try { 56 transport.open(); 57 isConnected = true; 58 } catch (TTransportException e) { 59 tte = e; 60 if (LOG.isDebugEnabled()) { 61 LOG.warn("Failed to connect to the MetaStore Server...", e); 62 } else { 63 // Don't print full exception trace if DEBUG is not on. 64 LOG.warn("Failed to connect to the MetaStore Server..."); 65 } 66 } 67       //用户组及用户的加载 68 if (isConnected && !useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){ 69 // Call set_ugi, only in unsecure mode. 70 try { 71 UserGroupInformation ugi = Utils.getUGI(); 72 client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames())); 73 } catch (LoginException e) { 74 LOG.warn("Failed to do login. set_ugi() is not successful, " + 75 "Continuing without it.", e); 76 } catch (IOException e) { 77 LOG.warn("Failed to find ugi of client set_ugi() is not successful, " + 78 "Continuing without it.", e); 79 } catch (TException e) { 80 LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. " 81 + "Continuing without it.", e); 82 } 83 } 84 } catch (MetaException e) { 85 LOG.error("Unable to connect to metastore with URI " + store 86 + " in attempt " + attempt, e); 87 } 88 if (isConnected) { 89 break; 90 } 91 } 92 // Wait before launching the next round of connection retries. 93 if (!isConnected && retryDelaySeconds > 0) { 94 try { 95 LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt."); 96 Thread.sleep(retryDelaySeconds * 1000); 97 } catch (InterruptedException ignore) {} 98 } 99 } 100 101 if (!isConnected) { 102 throw new MetaException("Could not connect to meta store using any of the URIs provided." + 103 " Most recent failure: " + StringUtils.stringifyException(tte)); 104 } 105 106 snapshotActiveConf(); 107 108 LOG.info("Connected to metastore."); 109 }

  本篇先对对protocol的原理放置一边。从代码中可以看出HiveMetaStore服务端是通过ThriftHiveMetaStore创建,它本是一个class类,但其中定义了接口Iface、AsyncIface,这样做的好处是利于继承实现。那么下来,我们看一下HMSHandler的初始化。如果是在本地调用的过程中,直接调用newRetryingHMSHandler,便会直接进行HMSHandler的初始化。代码如下:

1     public HMSHandler(String name, HiveConf conf, boolean init) throws MetaException {
2       super(name);
3       hiveConf = conf;
4       if (init) {
5         init();
6       }
7     }

  下俩我们继续看它的init方法:

 1     public void init() throws MetaException {
      //获取与数据交互的实现类className,该类为objectStore,是RawStore的实现,负责JDO与数据库的交互。
2 rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
      //加载Listeners,来自hive.metastore.init.hooks,可自行实现并加载
3 initListeners = MetaStoreUtils.getMetaStoreListeners( 4 MetaStoreInitListener.class, hiveConf, 5 hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); 6 for (MetaStoreInitListener singleInitListener: initListeners) { 7 MetaStoreInitContext context = new MetaStoreInitContext(); 8 singleInitListener.onInit(context); 9 } 10     //初始化alter的实现类 11 String alterHandlerName = hiveConf.get("hive.metastore.alter.impl", 12 HiveAlterHandler.class.getName()); 13 alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass( 14 alterHandlerName), hiveConf);
      //初始化warehouse
15 wh = new Warehouse(hiveConf); 16     //创建默认db以及用户,同时加载currentUrl 17 synchronized (HMSHandler.class) { 18 if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(hiveConf))) { 19 createDefaultDB(); 20 createDefaultRoles(); 21 addAdminUsers(); 22 currentUrl = MetaStoreInit.getConnectionURL(hiveConf); 23 } 24 } 25     //计数信息的初始化 26 if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) { 27 try { 28 Metrics.init(); 29 } catch (Exception e) { 30 // log exception, but ignore inability to start 31 LOG.error("error in Metrics init: " + e.getClass().getName() + " " 32 + e.getMessage(), e); 33 } 34 } 35     //Listener的PreListener的初始化 36 preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class, 37 hiveConf, 38 hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS)); 39 listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf, 40 hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS)); 41 listeners.add(new SessionPropertiesListener(hiveConf)); 42 endFunctionListeners = MetaStoreUtils.getMetaStoreListeners( 43 MetaStoreEndFunctionListener.class, hiveConf, 44 hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS)); 45     //针对partitionName的正则校验,可自行设置,根据hive.metastore.partition.name.whitelist.pattern进行设置 46 String partitionValidationRegex = 47 hiveConf.getVar(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN); 48 if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { 49 partitionValidationPattern = Pattern.compile(partitionValidationRegex); 50 } else { 51 partitionValidationPattern = null; 52 } 53 54 long cleanFreq = hiveConf.getTimeVar(ConfVars.METASTORE_EVENT_CLEAN_FREQ, TimeUnit.MILLISECONDS); 55 if (cleanFreq > 0) { 56 // In default config, there is no timer. 57 Timer cleaner = new Timer("Metastore Events Cleaner Thread", true); 58 cleaner.schedule(new EventCleanerTask(this), cleanFreq, cleanFreq); 59 } 60 }

  它初始化了与数据库交互的rawStore的实现类、物理操作的warehouse以及Event与Listener。从而通过接口调用相关meta生命周期方法进行表的操作。

  3、createTable

 从createTable方法开始。上代码:

 1   public void createTable(String tableName, List<String> columns, List<String> partCols,
 2                           Class<? extends InputFormat> fileInputFormat,
 3                           Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols,
 4                           Map<String, String> parameters) throws HiveException {
 5     if (columns == null) {
 6       throw new HiveException("columns not specified for table " + tableName);
 7     }
 8 
 9     Table tbl = newTable(tableName);
    //SD表属性,设置该表的input及output class名,在计算引擎计算时,拉取相应的ClassName 通过反射进行input及output类的加载
10 tbl.setInputFormatClass(fileInputFormat.getName()); 11 tbl.setOutputFormatClass(fileOutputFormat.getName()); 12   
    //封装FileSchema对象,该为每个column的名称及字段类型,并加入到sd对象的的column属性中 13 for (String col : columns) { 14 FieldSchema field = new FieldSchema(col, STRING_TYPE_NAME, "default"); 15 tbl.getCols().add(field); 16 } 17
    //如果在创建表时,设置了分区信息,比如dt字段为该分区。则进行分区信息的记录,最终写入Partition表中 18 if (partCols != null) { 19 for (String partCol : partCols) { 20 FieldSchema part = new FieldSchema(); 21 part.setName(partCol); 22 part.setType(STRING_TYPE_NAME); // default partition key 23 tbl.getPartCols().add(part); 24 } 25 }
    //设置序列化的方式
26 tbl.setSerializationLib(LazySimpleSerDe.class.getName());
    //设置分桶信息
27 tbl.setNumBuckets(bucketCount); 28 tbl.setBucketCols(bucketCols);
    //设置table额外添加的kv信息
29 if (parameters != null) { 30 tbl.setParamters(parameters); 31 } 32 createTable(tbl); 33 }

  从代码中可以看到,Hive 构造了一个Table的对象,该对象可以当做是一个model,包含了几乎所有以Tbls表为主表的所有以table_id为的外键表属性(具体可参考hive metastore表结构),封装完毕后在进行createTable的调用,接下来的调用如下:

  public void createTable(Table tbl, boolean ifNotExists) throws HiveException {
    try {
    //这里再次获取SessionState中的CurrentDataBase进行setDbName(安全)
if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) { tbl.setDbName(SessionState.get().getCurrentDatabase()); }
    //这里主要对每一个column属性进行校验,比如是否有非法字符等等
if (tbl.getCols().size() == 0 || tbl.getSd().getColsSize() == 0) { tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(), tbl.getDeserializer())); }
    //该方法对table属性中的input、output以及column属性的校验 tbl.checkValidity();
if (tbl.getParameters() != null) { tbl.getParameters().remove(hive_metastoreConstants.DDL_TIME); } org.apache.hadoop.hive.metastore.api.Table tTbl = tbl.getTTable();
    //这里开始进行权限认证,牵扯到的便是我们再hive中配置的 hive.security.authorization.createtable.user.grants、hive.security.authorization.createtable.group.grants、
    hive.security.authorization.createtable.role.grants配置参数,来自于hive自己封装的 用户、角色、组的概念。
PrincipalPrivilegeSet principalPrivs
= new PrincipalPrivilegeSet(); SessionState ss = SessionState.get(); if (ss != null) { CreateTableAutomaticGrant grants = ss.getCreateTableGrants(); if (grants != null) { principalPrivs.setUserPrivileges(grants.getUserGrants()); principalPrivs.setGroupPrivileges(grants.getGroupGrants()); principalPrivs.setRolePrivileges(grants.getRoleGrants()); tTbl.setPrivileges(principalPrivs); } }
   //通过客户端链接服务端进行table的创建 getMSC().createTable(tTbl); }
catch (AlreadyExistsException e) { if (!ifNotExists) { throw new HiveException(e); } } catch (Exception e) { throw new HiveException(e); } }

  那么下来,我们来看一下受到调用的HiveMetaClient中createTable方法,代码如下:

 1   public void createTable(Table tbl, EnvironmentContext envContext) throws AlreadyExistsException,
 2       InvalidObjectException, MetaException, NoSuchObjectException, TException {
    //这里获取HiveMeetaHook对象,针对不同的存储引擎进行创建前的加载及验证
3 HiveMetaHook hook = getHook(tbl); 4 if (hook != null) { 5 hook.preCreateTable(tbl); 6 } 7 boolean success = false; 8 try {      //随即调用HiveMetaStore进行服务端与数据库的创建交互 10 create_table_with_environment_context(tbl, envContext); 11 if (hook != null) { 12 hook.commitCreateTable(tbl); 13 } 14 success = true; 15 } finally {
      如果创建失败的话,进行回滚操作
16 if (!success && (hook != null)) { 17 hook.rollbackCreateTable(tbl); 18 } 19 } 20 }

  这里简要说下Hook的作用,HiveMetaHook为接口,接口方法包括preCreate、rollbackCreateTable、preDropTable等等操作,它的实现为不同存储类型的预创建加载及验证,以及失败回滚等动作。代码如下:

 1 public interface HiveMetaHook {
 2   /**
 3    * Called before a new table definition is added to the metastore
 4    * during CREATE TABLE.
 5    *
 6    * @param table new table definition
 7    */
 8   public void preCreateTable(Table table)
 9     throws MetaException;
10 
11   /** 
12    * Called after failure adding a new table definition to the metastore
13    * during CREATE TABLE.
14    *
15    * @param table new table definition
16    */
17   public void rollbackCreateTable(Table table)
18     throws MetaException;
35   public void preDropTale(Table table)
36     throws MetaException;
...............................

  随后,我们再看一下HiveMetaStore服务端的createTable方法,如下:

  1     private void create_table_core(final RawStore ms, final Table tbl,
2
final EnvironmentContext envContext) 3 throws AlreadyExistsException, MetaException, 4 InvalidObjectException, NoSuchObjectException { 5     //名称正则校验,校验是否含有非法字符 6 if (!MetaStoreUtils.validateName(tbl.getTableName())) { 7 throw new InvalidObjectException(tbl.getTableName() 8 + " is not a valid object name"); 9 }
      //改端代码属于校验代码,对于column的名称及column type类型j及partitionKey的名称校验
10 String validate = MetaStoreUtils.validateTblColumns(tbl.getSd().getCols()); 11 if (validate != null) { 12 throw new InvalidObjectException("Invalid column " + validate); 13 }
14 if (tbl.getPartitionKeys() != null) { 15 validate = MetaStoreUtils.validateTblColumns(tbl.getPartitionKeys()); 16 if (validate != null) { 17 throw new InvalidObjectException("Invalid partition column " + validate); 18 } 19 } 20 SkewedInfo skew = tbl.getSd().getSkewedInfo(); 21 if (skew != null) { 22 validate = MetaStoreUtils.validateSkewedColNames(skew.getSkewedColNames()); 23 if (validate != null) { 24 throw new InvalidObjectException("Invalid skew column " + validate); 25 } 26 validate = MetaStoreUtils.validateSkewedColNamesSubsetCol( 27 skew.getSkewedColNames(), tbl.getSd().getCols()); 28 if (validate != null) { 29 throw new InvalidObjectException("Invalid skew column " + validate); 30 } 31 } 32 33 Path tblPath = null; 34 boolean success = false, madeDir = false; 35 try {
       //创建前的事件调用,metastore已实现的listner事件包含DummyPreListener、AuthorizationPreEventListener、AlternateFailurePreListener以及MetaDataExportListener。
       //这些Listener是干嘛的呢?详细解释由分析meta设计模式时,详细说明。
36 firePreEvent(new PreCreateTableEvent(tbl, this)); 37
       //打开事务 38 ms.openTransaction(); 39
       //如果db不存在的情况下,则抛异常 40 Database db = ms.getDatabase(tbl.getDbName()); 41 if (db == null) { 42 throw new NoSuchObjectException("The database " + tbl.getDbName() + " does not exist"); 43 } 44      45  // 校验该db下,table是否存在 46 if (is_table_exists(ms, tbl.getDbName(), tbl.getTableName())) { 47 throw new AlreadyExistsException("Table " + tbl.getTableName() 48 + " already exists"); 49 } 50      // 如果该表不为视图表,则组装完整的tbleParth ->
fs.getUri().getScheme()+fs.getUri().getAuthority()+path.toUri().getPath())
 51         if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
 52           if (tbl.getSd().getLocation() == null
 53               || tbl.getSd().getLocation().isEmpty()) {
 54             tblPath = wh.getTablePath(
 55                 ms.getDatabase(tbl.getDbName()), tbl.getTableName());
 56           } else {
          //如果该表不是内部表同时tbl的kv中storage_handler为空时,则只是警告
57 if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { 58 LOG.warn("Location: " + tbl.getSd().getLocation() 59 + " specified for non-external table:" + tbl.getTableName()); 60 } 61 tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation())); 62 }
        //将拼接完的tblPath set到sd的location中
63 tbl.getSd().setLocation(tblPath.toString()); 64 } 65      //创建table的路径 66 if (tblPath != null) { 67 if (!wh.isDir(tblPath)) { 68 if (!wh.mkdirs(tblPath, true)) { 69 throw new MetaException(tblPath 70 + " is not a directory or unable to create one"); 71 } 72 madeDir = true; 73 } 74 }
       // hive.stats.autogather 配置判断
75 if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) && 76 !MetaStoreUtils.isView(tbl)) { 77 if (tbl.getPartitionKeysSize() == 0) { // Unpartitioned table 78 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir); 79 } else { // Partitioned table with no partitions. 80 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, true); 81 } 82 } 83 84 // set create time 85 long time = System.currentTimeMillis() / 1000; 86 tbl.setCreateTime((int) time); 87 if (tbl.getParameters() == null || 88 tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) { 89 tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time)); 90 }
       执行createTable数据库操作
91 ms.createTable(tbl); 92 success = ms.commitTransaction(); 93 94 } finally { 95 if (!success) { 96 ms.rollbackTransaction();
         //如果由于某些原因没有创建,则进行已创建表路径的删除
97 if (madeDir) { 98 wh.deleteDir(tblPath, true); 99 } 100 }
       //进行create完成时的listener类发送 比如 noftify通知
101 for (MetaStoreEventListener listener : listeners) { 102 CreateTableEvent createTableEvent = 103 new CreateTableEvent(tbl, success, this); 104 createTableEvent.setEnvironmentContext(envContext); 105 listener.onCreateTable(createTableEvent); 106 } 107 } 108 }

  这里的listener后续会详细说明,那么我们继续垂直往下看,这里的 ms.createTable方法。ms便是RawStore接口对象,这个接口对象包含了所有生命周期的统一方法调用,部分代码如下:

 1   public abstract Database getDatabase(String name)
 2       throws NoSuchObjectException;
 3 
 4   public abstract boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException;
 5 
 6   public abstract boolean alterDatabase(String dbname, Database db) throws NoSuchObjectException, MetaException;
 7 
 8   public abstract List<String> getDatabases(String pattern) throws MetaException;
 9 
10   public abstract List<String> getAllDatabases() throws MetaException;
11 
12   public abstract boolean createType(Type type);
13 
14   public abstract Type getType(String typeName);
15 
16   public abstract boolean dropType(String typeName);
17 
18   public abstract void createTable(Table tbl) throws InvalidObjectException,
19       MetaException;
20 
21   public abstract boolean dropTable(String dbName, String tableName)
22       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException;
23 
24   public abstract Table getTable(String dbName, String tableName)
25       throws MetaException;
26   ..................

  那么下来我们来看一下具体怎么实现的,首先hive metastore会通过调用getMS()方法,获取本地线程中的RawStore的实现,代码如下:

 1     public RawStore getMS() throws MetaException {
     //获取本地线程中已存在的RawStore
2 RawStore ms = threadLocalMS.get();
     //如果不存在,则创建该对象的实现,并加入到本地线程中
3 if (ms == null) { 4 ms = newRawStore(); 5 ms.verifySchema(); 6 threadLocalMS.set(ms); 7 ms = threadLocalMS.get(); 8 } 9 return ms; 10 }

  看到这里,是不是很想看看newRawStore它干嘛啦?那么我们继续:

 1   public static RawStore getProxy(HiveConf hiveConf, Configuration conf, String rawStoreClassName,
 2       int id) throws MetaException {
 3   //通过反射,创建baseClass,随后再进行该实现对象的创建
 4     Class<? extends RawStore> baseClass = (Class<? extends RawStore>) MetaStoreUtils.getClass(
 5         rawStoreClassName);
 6 
 7     RawStoreProxy handler = new RawStoreProxy(hiveConf, conf, baseClass, id);
 8 
 9     // Look for interfaces on both the class and all base classes.
10     return (RawStore) Proxy.newProxyInstance(RawStoreProxy.class.getClassLoader(),
11         getAllInterfaces(baseClass), handler);
12   }

  那么问题来了,rawstoreClassName从哪里来呢?它是在HiveMetaStore进行初始化时加载的,来源于HiveConf中的METASTORE_RAW_STORE_IMPL,配置参数,也就是RawStore的实现类ObjectStore。好了,既然RawStore的实现类已经创建,那么我们继续深入ObjectStore,代码如下:

  

 1   @Override
 2   public void createTable(Table tbl) throws InvalidObjectException, MetaException {
 3     boolean commited = false;
 4     try {
     //创建事务
5 openTransaction();
      //这里再次进行db 、table的校验,代码不再贴出来,具体为什么又要做一次校验,还需要深入思考
6 MTable mtbl = convertToMTable(tbl);
     这里的pm为ObjectStore创建时,init的JDO PersistenceManage对象。这里便是提交Table对象的地方,具体可研究下JDO module对象与数据库的交互
7 pm.makePersistent(mtbl);
     //封装权限用户、角色、组对象并写入
8 PrincipalPrivilegeSet principalPrivs = tbl.getPrivileges(); 9 List<Object> toPersistPrivObjs = new ArrayList<Object>(); 10 if (principalPrivs != null) { 11 int now = (int)(System.currentTimeMillis()/1000); 12 13 Map<String, List<PrivilegeGrantInfo>> userPrivs = principalPrivs.getUserPrivileges(); 14 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, userPrivs, PrincipalType.USER); 15 16 Map<String, List<PrivilegeGrantInfo>> groupPrivs = principalPrivs.getGroupPrivileges(); 17 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP); 18 19 Map<String, List<PrivilegeGrantInfo>> rolePrivs = principalPrivs.getRolePrivileges(); 20 putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE); 21 } 22 pm.makePersistentAll(toPersistPrivObjs); 23 commited = commitTransaction(); 24 } finally {
      //如果失败则回滚
25 if (!commited) { 26 rollbackTransaction(); 27 } 28 } 29 }

  4、dropTable

   二话不说上从Hive类中上代码:

1   public void dropTable(String tableName, boolean ifPurge) throws HiveException {
    //这里Hive 将dbName与TableName合并成一个数组
2 String[] names = Utilities.getDbTableName(tableName); 3 dropTable(names[0], names[1], true, true, ifPurge); 4 }

  为什么要进行这样的处理呢,其实是因为 drop table的时候 我们的sql语句会是drop table dbName.tableName 或者是drop table tableName,这里进行tableName和DbName的组装,如果为drop table tableName,则获取当前session中的dbName,代码如下:

 1   public static String[] getDbTableName(String dbtable) throws SemanticException {
    //获取当前Session中的DbName
2 return getDbTableName(SessionState.get().getCurrentDatabase(), dbtable); 3 } 4 5 public static String[] getDbTableName(String defaultDb, String dbtable) throws SemanticException { 6 if (dbtable == null) { 7 return new String[2]; 8 } 9 String[] names = dbtable.split("\\."); 10 switch (names.length) { 11 case 2: 12 return names;
     //如果长度为1,则重新组装
13 case 1: 14 return new String [] {defaultDb, dbtable}; 15 default: 16 throw new SemanticException(ErrorMsg.INVALID_TABLE_NAME, dbtable); 17 } 18 }

  随后通过getMSC()调用HiveMetaStoreClient中的dropTable,代码如下:

 1   public void dropTable(String dbname, String name, boolean deleteData,
 2       boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
 3       NoSuchObjectException, UnsupportedOperationException {
 4     Table tbl;
 5     try {
     //通过dbName与tableName获取正个Table对象,也就是通过dbName与TableName获取该Table存储的所有元数据
6 tbl = getTable(dbname, name); 7 } catch (NoSuchObjectException e) { 8 if (!ignoreUnknownTab) { 9 throw e; 10 } 11 return; 12 }
    //根据table type来判断是否为IndexTable,如果为索引表则不允许删除  
13 if (isIndexTable(tbl)) { 14 throw new UnsupportedOperationException("Cannot drop index tables"); 15 }
    //这里的getHook 与create时getHook一致,获取对应table存储的hook
16 HiveMetaHook hook = getHook(tbl); 17 if (hook != null) { 18 hook.preDropTable(tbl); 19 } 20 boolean success = false; 21 try {
      调用HiveMetaStore服务端的dropTable方法
22 drop_table_with_environment_context(dbname, name, deleteData, envContext); 23 if (hook != null) { 24 hook.commitDropTable(tbl, deleteData); 25 } 26 success=true; 27 } catch (NoSuchObjectException e) { 28 if (!ignoreUnknownTab) { 29 throw e; 30 } 31 } finally { 32 if (!success && (hook != null)) { 33 hook.rollbackDropTable(tbl); 34 } 35 } 36 }

  下面我们重点看下服务端HiveMetaStore干了些什么,代码如下:

 1    private boolean drop_table_core(final RawStore ms, final String dbname, final String name,
 2         final boolean deleteData, final EnvironmentContext envContext,
 3         final String indexName) throws NoSuchObjectException,
 4         MetaException, IOException, InvalidObjectException, InvalidInputException {
 5       boolean success = false;
 6       boolean isExternal = false;
 7       Path tblPath = null;
 8       List<Path> partPaths = null;
 9       Table tbl = null;
10       boolean ifPurge = false;
11       try {
12         ms.openTransaction();
13         // 获取正个Table的对象属性
14         tbl = get_table_core(dbname, name);
15         if (tbl == null) {
16           throw new NoSuchObjectException(name + " doesn't exist");
17         }
       //如果sd数据为空,则认为该表数据损坏
18 if (tbl.getSd() == null) { 19 throw new MetaException("Table metadata is corrupted"); 20 } 21 ifPurge = isMustPurge(envContext, tbl); 22 23 firePreEvent(new PreDropTableEvent(tbl, deleteData, this));        //判断如果该表存在索引,则需要先删除该表的索引 25 boolean isIndexTable = isIndexTable(tbl); 26 if (indexName == null && isIndexTable) { 27 throw new RuntimeException( 28 "The table " + name + " is an index table. Please do drop index instead."); 29 }        //如果不是索引表,则删除索引元数据 31 if (!isIndexTable) { 32 try { 33 List<Index> indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE); 34 while (indexes != null && indexes.size() > 0) { 35 for (Index idx : indexes) { 36 this.drop_index_by_name(dbname, name, idx.getIndexName(), true); 37 } 38 indexes = ms.getIndexes(dbname, name, Short.MAX_VALUE); 39 } 40 } catch (TException e) { 41 throw new MetaException(e.getMessage()); 42 } 43 }
       //判断是否为外部表
44 isExternal = isExternal(tbl); 45 if (tbl.getSd().getLocation() != null) { 46 tblPath = new Path(tbl.getSd().getLocation()); 47 if (!wh.isWritable(tblPath.getParent())) { 48 String target = indexName == null ? "Table" : "Index table"; 49 throw new MetaException(target + " metadata not deleted since " + 50 tblPath.getParent() + " is not writable by " + 51 hiveConf.getUser()); 52 } 53 } 54 56 checkTrashPurgeCombination(tblPath, dbname + "." + name, ifPurge); 57 //获取所有partition的location path 这里有个奇怪的地方,为什么不将Table对象直接传入,而是又在该方法中重新getTable,同时校验上级目录的读写权限 58 partPaths = dropPartitionsAndGetLocations(ms, dbname, name, tblPath, 59 tbl.getPartitionKeys(), deleteData && !isExternal); 60      //调用ObjectStore进行meta数据的删除 61 if (!ms.dropTable(dbname, name)) { 62 String tableName = dbname + "." + name; 63 throw new MetaException(indexName == null ? "Unable to drop table " + tableName: 64 "Unable to drop index table " + tableName + " for index " + indexName); 65 } 66 success = ms.commitTransaction(); 67 } finally { 68 if (!success) { 69 ms.rollbackTransaction(); 70 } else if (deleteData && !isExternal) {         //删除物理partition 73 deletePartitionData(partPaths, ifPurge); 74 //删除Table路径 75 deleteTableData(tblPath, ifPurge); 76 // ok even if the data is not deleted 77
       //Listener 处理
78 for (MetaStoreEventListener listener : listeners) { 79 DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this); 80 dropTableEvent.setEnvironmentContext(envContext); 81 listener.onDropTable(dropTableEvent); 82 } 83 } 84 return success; 85 }

   我们继续深入ObjectStore中的dropTable,会发现 再一次通过dbName与tableName获取整个Table对象,随后逐一删除。也许代码并不是同一个人写的也可能是由于安全性考虑?很多可以通过接口传入的Table对象,都重新获取了,这样会不会加重数据库的负担呢?ObjectStore代码如下:

 1   public boolean dropTable(String dbName, String tableName) throws MetaException,
 2     NoSuchObjectException, InvalidObjectException, InvalidInputException {
 3     boolean success = false;
 4     try {
 5       openTransaction();
      //重新获取Table对象
6 MTable tbl = getMTable(dbName, tableName); 7 pm.retrieve(tbl); 8 if (tbl != null) { 9 //下列代码查询并删除所有的权限 10 List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, tableName); 11 if (tabGrants != null && tabGrants.size() > 0) { 12 pm.deletePersistentAll(tabGrants); 13 }
      
14 List<MTableColumnPrivilege> tblColGrants = listTableAllColumnGrants(dbName, 15 tableName); 16 if (tblColGrants != null && tblColGrants.size() > 0) { 17 pm.deletePersistentAll(tblColGrants); 18 } 19 20 List<MPartitionPrivilege> partGrants = this.listTableAllPartitionGrants(dbName, tableName); 21 if (partGrants != null && partGrants.size() > 0) { 22 pm.deletePersistentAll(partGrants); 23 } 24 25 List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(dbName, 26 tableName); 27 if (partColGrants != null && partColGrants.size() > 0) { 28 pm.deletePersistentAll(partColGrants); 29 } 30 // delete column statistics if present 31 try {
        //删除column统计表数据
32 deleteTableColumnStatistics(dbName, tableName, null); 33 } catch (NoSuchObjectException e) { 34 LOG.info("Found no table level column statistics associated with db " + dbName + 35 " table " + tableName + " record to delete"); 36 } 37      //删除mcd表数据 38 preDropStorageDescriptor(tbl.getSd()); 39 //删除整个Table对象相关表数据 40 pm.deletePersistentAll(tbl); 41 } 42 success = commitTransaction(); 43 } finally { 44 if (!success) { 45 rollbackTransaction(); 46 } 47 } 48 return success; 49 }

  5、AlterTable

  下来我们看下AlterTable,AlterTable包含的逻辑较多,因为牵扯到物理存储上的路径修改等,那么我们来一点点查看。还是从Hive类中开始,上代码:

 1   public void alterTable(String tblName, Table newTbl, boolean cascade)
 2       throws InvalidOperationException, HiveException {
 3     String[] names = Utilities.getDbTableName(tblName);
 4     try {
 5       //删除table kv中的DDL_TIME 因为要alterTable所以,该事件会被改变
 6       if (newTbl.getParameters() != null) {
 7         newTbl.getParameters().remove(hive_metastoreConstants.DDL_TIME);
 8       }
      //进行相关校验,包含dbName、tableName、column、inputOutClass、outputClass的校验等,如果校验不通过则抛出HiveException
9 newTbl.checkValidity();
      //调用alterTable
10 getMSC().alter_table(names[0], names[1], newTbl.getTTable(), cascade); 11 } catch (MetaException e) { 12 throw new HiveException("Unable to alter table. " + e.getMessage(), e); 13 } catch (TException e) { 14 throw new HiveException("Unable to alter table. " + e.getMessage(), e); 15 } 16 }

  对于HiveMetaClient,并没有做相应处理,所以我们直接来看HiveMetaStore服务端做了些什么呢?

 1     private void alter_table_core(final String dbname, final String name, final Table newTable,
 2         final EnvironmentContext envContext, final boolean cascade)
 3         throws InvalidOperationException, MetaException {
 4       startFunction("alter_table", ": db=" + dbname + " tbl=" + name
 5           + " newtbl=" + newTable.getTableName());
 6 
 7       //更新DDL_Time
 8       if (newTable.getParameters() == null ||
 9           newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
10         newTable.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
11             .currentTimeMillis() / 1000));
12       }
13       boolean success = false;
14       Exception ex = null;
15       try {
       //获取已有Table的整个对象
16 Table oldt = get_table_core(dbname, name);
       //进行Event处理
17 firePreEvent(new PreAlterTableEvent(oldt, newTable, this));
       //进行alterTable处理,后面详细说明
18 alterHandler.alterTable(getMS(), wh, dbname, name, newTable, cascade); 19 success = true; 20     
       //进行Listener处理 21 for (MetaStoreEventListener listener : listeners) { 22 23 AlterTableEvent alterTableEvent = 24 new AlterTableEvent(oldt, newTable, success, this); 25 alterTableEvent.setEnvironmentContext(envContext); 26 listener.onAlterTable(alterTableEvent); 27 } 28 } catch (NoSuchObjectException e) { 29 // thrown when the table to be altered does not exist 30 ex = e; 31 throw new InvalidOperationException(e.getMessage()); 32 } catch (Exception e) { 33 ex = e; 34 if (e instanceof MetaException) { 35 throw (MetaException) e; 36 } else if (e instanceof InvalidOperationException) { 37 throw (InvalidOperationException) e; 38 } else { 39 throw newMetaException(e); 40 } 41 } finally { 42 endFunction("alter_table", success, ex, name); 43 } 44 }

  那么,我们重点看下alterHandler具体所做的事情,在这之前简要说下alterHandler的初始化,它是在HiveMetaStore init时获取的hive.metastore.alter.impl参数的className,也就是HiveAlterHandler的name,那么具体,我们来看下它alterTable时的实现,前方高能,小心火烛:)

  1   public void alterTable(RawStore msdb, Warehouse wh, String dbname,
  2       String name, Table newt, boolean cascade) throws InvalidOperationException, MetaException {
  3     if (newt == null) {
  4       throw new InvalidOperationException("New table is invalid: " + newt);
  5     }
  6    //校验新的tableName是否合法
  7     if (!MetaStoreUtils.validateName(newt.getTableName())) {
  8       throw new InvalidOperationException(newt.getTableName()
  9           + " is not a valid object name");
 10     }
     //校验新的column Name type是否合法
11 String validate = MetaStoreUtils.validateTblColumns(newt.getSd().getCols()); 12 if (validate != null) { 13 throw new InvalidOperationException("Invalid column " + validate); 14 } 15 16 Path srcPath = null; 17 FileSystem srcFs = null; 18 Path destPath = null; 19 FileSystem destFs = null; 20 21 boolean success = false; 22 boolean moveData = false; 23 boolean rename = false; 24 Table oldt = null; 25 List<ObjectPair<Partition, String>> altps = new ArrayList<ObjectPair<Partition, String>>(); 26 27 try { 28 msdb.openTransaction();
      //这里直接转换小写,可以看出 代码不是一个人写的
29 name = name.toLowerCase(); 30 dbname = dbname.toLowerCase(); 31 32 //校验新的tableName是否存在 33 if (!newt.getTableName().equalsIgnoreCase(name) 34 || !newt.getDbName().equalsIgnoreCase(dbname)) { 35 if (msdb.getTable(newt.getDbName(), newt.getTableName()) != null) { 36 throw new InvalidOperationException("new table " + newt.getDbName() 37 + "." + newt.getTableName() + " already exists"); 38 } 39 rename = true; 40 } 41 42 //获取老的table对象 43 oldt = msdb.getTable(dbname, name); 44 if (oldt == null) { 45 throw new InvalidOperationException("table " + newt.getDbName() + "." 46 + newt.getTableName() + " doesn't exist"); 47 } 48     //alter Table时 获取 METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES配置项,如果为true的话,将改变column的type类型,这里为false 49 if (HiveConf.getBoolVar(hiveConf, 50 HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, 51 false)) { 52 // Throws InvalidOperationException if the new column types are not 53 // compatible with the current column types. 54 MetaStoreUtils.throwExceptionIfIncompatibleColTypeChange( 55 oldt.getSd().getCols(), newt.getSd().getCols()); 56 } 57     //cascade参数由调用Hive altertable方法穿过来的,也就是引擎调用时参数的设置,这里用来查看是否需要alterPartition信息 58 if (cascade) { 59 //校验新的column是否与老的column一致,如不一致,说明进行了column的添加或删除操作 60 if(MetaStoreUtils.isCascadeNeededInAlterTable(oldt, newt)) {
        //根据dbName与tableName获取整个partition的信息
61 List<Partition> parts = msdb.getPartitions(dbname, name, -1); 62 for (Partition part : parts) { 63 List<FieldSchema> oldCols = part.getSd().getCols(); 64 part.getSd().setCols(newt.getSd().getCols()); 65 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues());
          //如果columns不一致,则删除已有的column统计信息
66 updatePartColumnStatsForAlterColumns(msdb, part, oldPartName, part.getValues(), oldCols, part);
          //更新整个Partition的信息
67 msdb.alterPartition(dbname, name, part.getValues(), part); 68 } 69 } else { 70 LOG.warn("Alter table does not cascade changes to its partitions."); 71 } 72 } 73 74 //判断parititonkey是否改变,也就是dt 或 hour等partName是否改变 76 boolean partKeysPartiallyEqual = checkPartialPartKeysEqual(oldt.getPartitionKeys(), 77 newt.getPartitionKeys()); 78     
      //如果已有表为视图表,同时发现老的partkey与新的partKey不一致,则报错 79 if(!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){ 80 if (oldt.getPartitionKeys().size() != newt.getPartitionKeys().size() 81 || !partKeysPartiallyEqual) { 82 throw new InvalidOperationException( 83 "partition keys can not be changed."); 84 } 85 } 86       //如果该表不为视图表,同时,该表的location信息并未发生变化,同时新的location信息并不为空,并且已有的该表不为外部表,说明用户是想要移动数据到新的location地址,那么该操作
       // 为alter table rename操作
91 if (rename 92 && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) 93 && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0 94 || StringUtils.isEmpty(newt.getSd().getLocation())) 95 && !MetaStoreUtils.isExternalTable(oldt)) { 96      //获取新的location信息 97 srcPath = new Path(oldt.getSd().getLocation()); 98 srcFs = wh.getFs(srcPath); 99 100 // that means user is asking metastore to move data to new location 101 // corresponding to the new name 102 // get new location 103 Database db = msdb.getDatabase(newt.getDbName()); 104 Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath); 105 destPath = new Path(databasePath, newt.getTableName()); 106 destFs = wh.getFs(destPath); 107      //设置新的table location信息 用于后续更新动作 108 newt.getSd().setLocation(destPath.toString()); 109 moveData = true; 110        //校验物理目标地址是否存在,如果存在则会override所有数据,是不允许的。 114 if (!FileUtils.equalsFileSystem(srcFs, destFs)) { 115 throw new InvalidOperationException("table new location " + destPath 116 + " is on a different file system than the old location " 117 + srcPath + ". This operation is not supported"); 118 } 119 try { 120 srcFs.exists(srcPath); // check that src exists and also checks 121 // permissions necessary 122 if (destFs.exists(destPath)) { 123 throw new InvalidOperationException("New location for this table " 124 + newt.getDbName() + "." + newt.getTableName() 125 + " already exists : " + destPath); 126 } 127 } catch (IOException e) { 128 throw new InvalidOperationException("Unable to access new location " 129 + destPath + " for table " + newt.getDbName() + "." 130 + newt.getTableName()); 131 } 132 String oldTblLocPath = srcPath.toUri().getPath(); 133 String newTblLocPath = destPath.toUri().getPath(); 134      135 //获取old table中的所有partition信息 136 List<Partition> parts = msdb.getPartitions(dbname, name, -1); 137 for (Partition part : parts) { 138 String oldPartLoc = part.getSd().getLocation();
        //这里,便开始新老partition地址的变换,修改partition元数据信息
139 if (oldPartLoc.contains(oldTblLocPath)) { 140 URI oldUri = new Path(oldPartLoc).toUri(); 141 String newPath = oldUri.getPath().replace(oldTblLocPath, newTblLocPath); 142 Path newPartLocPath = new Path(oldUri.getScheme(), oldUri.getAuthority(), newPath); 143 altps.add(ObjectPair.create(part, part.getSd().getLocation())); 144 part.getSd().setLocation(newPartLocPath.toString()); 145 String oldPartName = Warehouse.makePartName(oldt.getPartitionKeys(), part.getValues()); 146 try { 147 //existing partition column stats is no longer valid, remove them 148 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, part.getValues(), null); 149 } catch (InvalidInputException iie) { 150 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); 151 } 152 msdb.alterPartition(dbname, name, part.getValues(), part); 153 } 154 }
       //更新stats相关信息
155 } else if (MetaStoreUtils.requireCalStats(hiveConf, null, null, newt) && 156 (newt.getPartitionKeysSize() == 0)) { 157 Database db = msdb.getDatabase(newt.getDbName()); 158 // Update table stats. For partitioned table, we update stats in 159 // alterPartition() 160 MetaStoreUtils.updateUnpartitionedTableStatsFast(db, newt, wh, false, true); 161 } 162 updateTableColumnStatsForAlterTable(msdb, oldt, newt); 163 // now finally call alter table 164 msdb.alterTable(dbname, name, newt); 165 // commit the changes 166 success = msdb.commitTransaction(); 167 } catch (InvalidObjectException e) { 168 LOG.debug(e); 169 throw new InvalidOperationException( 170 "Unable to change partition or table." 171 + " Check metastore logs for detailed stack." + e.getMessage()); 172 } catch (NoSuchObjectException e) { 173 LOG.debug(e); 174 throw new InvalidOperationException( 175 "Unable to change partition or table. Database " + dbname + " does not exist" 176 + " Check metastore logs for detailed stack." + e.getMessage()); 177 } finally { 178 if (!success) { 179 msdb.rollbackTransaction(); 180 } 181 if (success && moveData) {        //开始更新hdfs路径,进行老路径的rename到新路径 ,调用fileSystem的rename操作 185 try { 186 if (srcFs.exists(srcPath) && !srcFs.rename(srcPath, destPath)) { 187 throw new IOException("Renaming " + srcPath + " to " + destPath + " failed"); 188 } 189 } catch (IOException e) { 190 LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e); 191 boolean revertMetaDataTransaction = false; 192 try { 193 msdb.openTransaction();
         //这里会发现,又一次进行了alterTable元数据动作,或许跟JDO的特性有关?还是因为安全?
194 msdb.alterTable(newt.getDbName(), newt.getTableName(), oldt); 195 for (ObjectPair<Partition, String> pair : altps) { 196 Partition part = pair.getFirst(); 197 part.getSd().setLocation(pair.getSecond()); 198 msdb.alterPartition(newt.getDbName(), name, part.getValues(), part); 199 } 200 revertMetaDataTransaction = msdb.commitTransaction(); 201 } catch (Exception e1) { 202 // we should log this for manual rollback by administrator 203 LOG.error("Reverting metadata by HDFS operation failure failed During HDFS operation failed", e1); 204 LOG.error("Table " + Warehouse.getQualifiedName(newt) + 205 " should be renamed to " + Warehouse.getQualifiedName(oldt)); 206 LOG.error("Table " + Warehouse.getQualifiedName(newt) + 207 " should have path " + srcPath); 208 for (ObjectPair<Partition, String> pair : altps) { 209 LOG.error("Partition " + Warehouse.getQualifiedName(pair.getFirst()) + 210 " should have path " + pair.getSecond()); 211 } 212 if (!revertMetaDataTransaction) { 213 msdb.rollbackTransaction(); 214 } 215 } 216 throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name + 217 " failed to move data due to: '" + getSimpleMessage(e) + "' See hive log file for details."); 218 } 219 } 220 } 221 if (!success) { 222 throw new MetaException("Committing the alter table transaction was not successful."); 223 } 224 }

  6、createPartition
  在分区数据写入之前,会先进行partition的元数据注册及物理文件路径的创建(内部表),Hive类代码如下:

1   public Partition createPartition(Table tbl, Map<String, String> partSpec) throws HiveException {
2     try {
    //new出来一个Partition对象,传入Table对象,调用Partition的构造方法来initialize Partition的信息
3 return new Partition(tbl, getMSC().add_partition( 4 Partition.createMetaPartitionObject(tbl, partSpec, null))); 5 } catch (Exception e) { 6 LOG.error(StringUtils.stringifyException(e)); 7 throw new HiveException(e); 8 } 9 }

  这里的createMetaPartitionObject作用在于整个Partition传入对象的校验对对象的封装,代码如下:

 1   public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartitionObject(
 2       Table tbl, Map<String, String> partSpec, Path location) throws HiveException {
 3     List<String> pvals = new ArrayList<String>();
    //遍历整个PartCols,并且校验partMap中是否一一对应
4 for (FieldSchema field : tbl.getPartCols()) { 5 String val = partSpec.get(field.getName()); 6 if (val == null || val.isEmpty()) { 7 throw new HiveException("partition spec is invalid; field " 8 + field.getName() + " does not exist or is empty"); 9 } 10 pvals.add(val); 11 } 12   //set相关的属性信息,包括DbName、TableName、PartValues、以及sd信息 13 org.apache.hadoop.hive.metastore.api.Partition tpart = 14 new org.apache.hadoop.hive.metastore.api.Partition(); 15 tpart.setDbName(tbl.getDbName()); 16 tpart.setTableName(tbl.getTableName()); 17 tpart.setValues(pvals); 18 19 if (!tbl.isView()) { 20 tpart.setSd(cloneS d(tbl)); 21 tpart.getSd().setLocation((location != null) ? location.toString() : null); 22 } 23 return tpart; 24 }

  随之MetaDataClient对于该对象调用MetaDataService的addPartition,并进行了深拷贝,这里不再详细说明,那么我们直接看下服务端干了什么:

 1     private Partition add_partition_core(final RawStore ms,
 2         final Partition part, final EnvironmentContext envContext)
 3         throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
 4       boolean success = false;
 5       Table tbl = null;
 6       try {
 7         ms.openTransaction(); 
       //根据DbName、TableName获取整个Table对象信息
8 tbl = ms.getTable(part.getDbName(), part.getTableName()); 9 if (tbl == null) { 10 throw new InvalidObjectException( 11 "Unable to add partition because table or database do not exist"); 12 } 13      //事件处理 14 firePreEvent(new PreAddPartitionEvent(tbl, part, this)); 15      //在创建Partition之前,首先会校验元数据中该partition是否存在 16 boolean shouldAdd = startAddPartition(ms, part, false); 17 assert shouldAdd; // start would throw if it already existed here
       //创建Partition路径 18 boolean madeDir = createLocationForAddedPartition(tbl, part); 19 try {
        //加载一些kv信息
20 initializeAddedPartition(tbl, part, madeDir);
        //写入元数据
21 success = ms.addPartition(part); 22 } finally { 23 if (!success && madeDir) {
         //如果没有成功,便删除物理路径
24 wh.deleteDir(new Path(part.getSd().getLocation()), true); 25 } 26 } 27 // we proceed only if we'd actually succeeded anyway, otherwise, 28 // we'd have thrown an exception 29 success = success && ms.commitTransaction(); 30 } finally { 31 if (!success) { 32 ms.rollbackTransaction(); 33 } 34 fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success); 35 } 36 return part; 37 }

  这里提及一个设计上的点,从之前的表结构设计上,没有直接存储PartName,而是将key与value单独存在与kv表中,这里我们看下createLocationForAddedPartition:

 1     private boolean createLocationForAddedPartition(
 2         final Table tbl, final Partition part) throws MetaException {
 3       Path partLocation = null;
 4       String partLocationStr = null;
      //如果sd不为null,则将sd的location信息作为表跟目录赋给partLocationStr
5 if (part.getSd() != null) { 6 partLocationStr = part.getSd().getLocation(); 7 } 8     //如果为null,则重新拼接part Location 9 if (partLocationStr == null || partLocationStr.isEmpty()) { 10 // set default location if not specified and this is 11 // a physical table partition (not a view) 12 if (tbl.getSd().getLocation() != null) {
        //如果不为null,则继续拼接文件路径及part的路径,组成完成的Partition location
13 partLocation = new Path(tbl.getSd().getLocation(), Warehouse 14 .makePartName(tbl.getPartitionKeys(), part.getValues())); 15 } 16 } else { 17 if (tbl.getSd().getLocation() == null) { 18 throw new MetaException("Cannot specify location for a view partition"); 19 } 20 partLocation = wh.getDnsPath(new Path(partLocationStr)); 21 } 22 23 boolean result = false;
     //将location信息写入sd表
24 if (partLocation != null) { 25 part.getSd().setLocation(partLocation.toString()); 26 27 // Check to see if the directory already exists before calling 28 // mkdirs() because if the file system is read-only, mkdirs will 29 // throw an exception even if the directory already exists. 30 if (!wh.isDir(partLocation)) { 31 if (!wh.mkdirs(partLocation, true)) { 32 throw new MetaException(partLocation 33 + " is not a directory or unable to create one"); 34 } 35 result = true; 36 } 37 } 38 return result; 39 }

  总结:

  7、dropPartition

  删除partition就不再从Hive开始了,我们直接看HiveMetaStore服务端做了什么:

 1     private boolean drop_partition_common(RawStore ms, String db_name, String tbl_name,
 2       List<String> part_vals, final boolean deleteData, final EnvironmentContext envContext)
 3       throws MetaException, NoSuchObjectException, IOException, InvalidObjectException,
 4       InvalidInputException {
 5       boolean success = false;
 6       Path partPath = null;
 7       Table tbl = null;
 8       Partition part = null;
 9       boolean isArchived = false;
10       Path archiveParentDir = null;
11       boolean mustPurge = false;
12 
13       try {
14         ms.openTransaction();
       //根据dbName、tableName、part_values获取整个part信息
15 part = ms.getPartition(db_name, tbl_name, part_vals);
       //获取所有Table对象
16 tbl = get_table_core(db_name, tbl_name); 17 firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this)); 18 mustPurge = isMustPurge(envContext, tbl); 19 20 if (part == null) { 21 throw new NoSuchObjectException("Partition doesn't exist. " 22 + part_vals); 23 } 24      //这一片还没有深入看Arrchived partition 25 isArchived = MetaStoreUtils.isArchived(part); 26 if (isArchived) { 27 archiveParentDir = MetaStoreUtils.getOriginalLocation(part); 28 verifyIsWritablePath(archiveParentDir); 29 checkTrashPurgeCombination(archiveParentDir, db_name + "." + tbl_name + "." + part_vals, mustPurge); 30 } 31 if (!ms.dropPartition(db_name, tbl_name, part_vals)) { 32 throw new MetaException("Unable to drop partition"); 33 } 34 success = ms.commitTransaction(); 35 if ((part.getSd() != null) && (part.getSd().getLocation() != null)) { 36 partPath = new Path(part.getSd().getLocation()); 37 verifyIsWritablePath(partPath); 38 checkTrashPurgeCombination(partPath, db_name + "." + tbl_name + "." + part_vals, mustPurge); 39 } 40 } finally { 41 if (!success) { 42 ms.rollbackTransaction(); 43 } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) { 44 if (tbl != null && !isExternal(tbl)) { 45 if (mustPurge) { 46 LOG.info("dropPartition() will purge " + partPath + " directly, skipping trash."); 47 } 48 else { 49 LOG.info("dropPartition() will move " + partPath + " to trash-directory."); 50 }
         //删除partition
51 // Archived partitions have har:/to_har_file as their location. 52 // The original directory was saved in params 53 if (isArchived) { 54 assert (archiveParentDir != null); 55 wh.deleteDir(archiveParentDir, true, mustPurge); 56 } else { 57 assert (partPath != null); 58 wh.deleteDir(partPath, true, mustPurge); 59 deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge); 60 } 61 // ok even if the data is not deleted 62 } 63 } 64 for (MetaStoreEventListener listener : listeners) { 65 DropPartitionEvent dropPartitionEvent = 66 new DropPartitionEvent(tbl, part, success, deleteData, this); 67 dropPartitionEvent.setEnvironmentContext(envContext); 68 listener.onDropPartition(dropPartitionEvent); 69 } 70 } 71 return true; 72 }

  8、alterPartition

  alterPartition牵扯的校验及文件目录的修改,我们直接从HiveMetaStore中的rename_partition中查看:

 1     private void rename_partition(final String db_name, final String tbl_name,
 2         final List<String> part_vals, final Partition new_part,
 3         final EnvironmentContext envContext)
 4         throws InvalidOperationException, MetaException,
 5         TException {
      //日志记录
6 startTableFunction("alter_partition", db_name, tbl_name); 7 8 if (LOG.isInfoEnabled()) { 9 LOG.info("New partition values:" + new_part.getValues()); 10 if (part_vals != null && part_vals.size() > 0) { 11 LOG.info("Old Partition values:" + part_vals); 12 } 13 } 14 15 Partition oldPart = null; 16 Exception ex = null; 17 try { 18 firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this)); 19      //校验PartName的规范性 20 if (part_vals != null && !part_vals.isEmpty()) { 21 MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(), 22 partitionValidationPattern); 23 } 24      调用alterHandler的alterPartition进行partition物理上的rename,以及元数据修改 25 oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part); 26 27 // Only fetch the table if we actually have a listener 28 Table table = null; 29 for (MetaStoreEventListener listener : listeners) { 30 if (table == null) { 31 table = getMS().getTable(db_name, tbl_name); 32 } 33 AlterPartitionEvent alterPartitionEvent = 34 new AlterPartitionEvent(oldPart, new_part, table, true, this); 35 alterPartitionEvent.setEnvironmentContext(envContext); 36 listener.onAlterPartition(alterPartitionEvent); 37 } 38 } catch (InvalidObjectException e) { 39 ex = e; 40 throw new InvalidOperationException(e.getMessage()); 41 } catch (AlreadyExistsException e) { 42 ex = e; 43 throw new InvalidOperationException(e.getMessage()); 44 } catch (Exception e) { 45 ex = e; 46 if (e instanceof MetaException) { 47 throw (MetaException) e; 48 } else if (e instanceof InvalidOperationException) { 49 throw (InvalidOperationException) e; 50 } else if (e instanceof TException) { 51 throw (TException) e; 52 } else { 53 throw newMetaException(e); 54 } 55 } finally { 56 endFunction("alter_partition", oldPart != null, ex, tbl_name); 57 } 58 return; 59 }

  这里我们着重看一下,alterHandler.alterPartition方法,前方高能:

  1   public Partition alterPartition(final RawStore msdb, Warehouse wh, final String dbname,
  2       final String name, final List<String> part_vals, final Partition new_part)
  3       throws InvalidOperationException, InvalidObjectException, AlreadyExistsException,
  4       MetaException {
  5     boolean success = false;
  6 
  7     Path srcPath = null;
  8     Path destPath = null;
  9     FileSystem srcFs = null;
 10     FileSystem destFs = null;
 11     Partition oldPart = null;
 12     String oldPartLoc = null;
 13     String newPartLoc = null;
 14 
 15     //修改新的partition的DDL时间
 16     if (new_part.getParameters() == null ||
 17         new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
 18         Integer.parseInt(new_part.getParameters().get(hive_metastoreConstants.DDL_TIME)) == 0) {
 19       new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
 20           .currentTimeMillis() / 1000));
 21     }
 22    //根据dbName、tableName获取整个Table对象
 23     Table tbl = msdb.getTable(dbname, name);
 24     //如果传入的part_vals为空或为0,说明修改的只是partition的其他元数据信息而不牵扯到partKV,则直接元数据,在msdb.alterPartition会直接更新
 25     if (part_vals == null || part_vals.size() == 0) {
 26       try {
 27         oldPart = msdb.getPartition(dbname, name, new_part.getValues());
 28         if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
 29           MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
 30         }
 31         updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
 32         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
 33       } catch (InvalidObjectException e) {
 34         throw new InvalidOperationException("alter is not possible");
 35       } catch (NoSuchObjectException e){
 36         //old partition does not exist
 37         throw new InvalidOperationException("alter is not possible");
 38       }
 39       return oldPart;
 40     }
 41     //rename partition
 42     try {
 43       msdb.openTransaction();
 44       try {
       //获取oldPart对象信息
45 oldPart = msdb.getPartition(dbname, name, part_vals); 46 } catch (NoSuchObjectException e) { 47 // this means there is no existing partition 48 throw new InvalidObjectException( 49 "Unable to rename partition because old partition does not exist"); 50 } 51 Partition check_part = null; 52 try {
       //组装newPart的partValues等Partition信息
53 check_part = msdb.getPartition(dbname, name, new_part.getValues()); 54 } catch(NoSuchObjectException e) { 55 // this means there is no existing partition 56 check_part = null; 57 }
      //如果check_part组装成功,说明该part已经存在,则报already exists
58 if (check_part != null) { 59 throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." + 60 new_part.getValues()); 61 }
      //table的信息校验
62 if (tbl == null) { 63 throw new InvalidObjectException( 64 "Unable to rename partition because table or database do not exist"); 65 } 66 67 //如果是外部表的分区变化了,那么不需要操作文件系统,直接更新meta信息即可 68 if (tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { 69 new_part.getSd().setLocation(oldPart.getSd().getLocation()); 70 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues()); 71 try { 72 //existing partition column stats is no longer valid, remove 73 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null); 74 } catch (NoSuchObjectException nsoe) { 75 //ignore 76 } catch (InvalidInputException iie) { 77 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); 78 } 79 msdb.alterPartition(dbname, name, part_vals, new_part); 80 } else { 81 try {
         //获取Table的文件路径
82 destPath = new Path(wh.getTablePath(msdb.getDatabase(dbname), name), 83 Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()));
         //拼接新的Partition的路径
84 destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation())); 85 } catch (NoSuchObjectException e) { 86 LOG.debug(e); 87 throw new InvalidOperationException( 88 "Unable to change partition or table. Database " + dbname + " does not exist" 89 + " Check metastore logs for detailed stack." + e.getMessage()); 90 }
       //如果destPath不为空,说明改变了文件路径
91 if (destPath != null) { 92 newPartLoc = destPath.toString(); 93 oldPartLoc = oldPart.getSd().getLocation(); 94       //根据原有sd的路径获取老的part路径信息 95 srcPath = new Path(oldPartLoc); 96 97 LOG.info("srcPath:" + oldPartLoc); 98 LOG.info("descPath:" + newPartLoc); 99 srcFs = wh.getFs(srcPath); 100 destFs = wh.getFs(destPath); 101 //查看srcFS与destFs是否Wie同一个fileSystem 102 if (!FileUtils.equalsFileSystem(srcFs, destFs)) { 103 throw new InvalidOperationException("table new location " + destPath 104 + " is on a different file system than the old location " 105 + srcPath + ". This operation is not supported"); 106 } 107 try {
          //校验老的partition路径与新的partition路径是否一致,同时新的partition路径是否已经存在  
108 srcFs.exists(srcPath); // check that src exists and also checks 109 if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) { 110 throw new InvalidOperationException("New location for this table " 111 + tbl.getDbName() + "." + tbl.getTableName() 112 + " already exists : " + destPath); 113 } 114 } catch (IOException e) { 115 throw new InvalidOperationException("Unable to access new location " 116 + destPath + " for partition " + tbl.getDbName() + "." 117 + tbl.getTableName() + " " + new_part.getValues()); 118 } 119 new_part.getSd().setLocation(newPartLoc); 120 if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) { 121 MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true); 122 }
         //拼接oldPartName,并且删除原有oldPart的信息,写入新的partition信息
123 String oldPartName = Warehouse.makePartName(tbl.getPartitionKeys(), oldPart.getValues()); 124 try { 125 //existing partition column stats is no longer valid, remove 126 msdb.deletePartitionColumnStatistics(dbname, name, oldPartName, oldPart.getValues(), null); 127 } catch (NoSuchObjectException nsoe) { 128 //ignore 129 } catch (InvalidInputException iie) { 130 throw new InvalidOperationException("Unable to update partition stats in table rename." + iie); 131 } 132 msdb.alterPartition(dbname, name, part_vals, new_part); 133 } 134 } 135 136 success = msdb.commitTransaction(); 137 } finally { 138 if (!success) { 139 msdb.rollbackTransaction(); 140 } 141 if (success && newPartLoc != null && newPartLoc.compareTo(oldPartLoc) != 0) { 142 //rename the data directory 143 try{ 144 if (srcFs.exists(srcPath)) { 145 //如果根路径海微创建,需要重新进行创建,就好比计算引擎先调用了alterTable,又调用了alterPartition,这时partition的根路径或许还未创建 146 Path destParentPath = destPath.getParent(); 147 if (!wh.mkdirs(destParentPath, true)) { 148 throw new IOException("Unable to create path " + destParentPath); 149 }
          //进行原路径与目标路径的rename
150 wh.renameDir(srcPath, destPath, true); 151 LOG.info("rename done!"); 152 } 153 } catch (IOException e) { 154 boolean revertMetaDataTransaction = false; 155 try { 156 msdb.openTransaction(); 157 msdb.alterPartition(dbname, name, new_part.getValues(), oldPart); 158 revertMetaDataTransaction = msdb.commitTransaction(); 159 } catch (Exception e1) { 160 LOG.error("Reverting metadata opeation failed During HDFS operation failed", e1); 161 if (!revertMetaDataTransaction) { 162 msdb.rollbackTransaction(); 163 } 164 } 165 throw new InvalidOperationException("Unable to access old location " 166 + srcPath + " for partition " + tbl.getDbName() + "." 167 + tbl.getTableName() + " " + part_vals); 168 } 169 } 170 } 171 return oldPart; 172 }

  暂时到这里吧~后续咱们慢慢玩哈~

 

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
SRE-METAQ使用介绍
总体介绍Metaq是一款分布式、队列模型的消息中间件。分为topic和queue两种模式,push和pull两种消费方式,支持严格的消息顺序,亿级别的堆积能力,支持消息回溯和多个维度的消息查询。开源版本叫rocketMQ。MetaQ运维团队向用户承诺消息不丢,但是不保证不重复,另外发送消息如出现短暂超时现象属于正常,可能是网络问题或者服务器短暂压力大,应用做重试即可。(at least once,
251 0
CDP中的Hive3系列之Hive性能调优
要将数据从 RDBMS(例如 MySQL)迁移到 Hive,您应该考虑使用带有 Teradata 连接器的 CDP 中的 Apache Sqoop。Apache Sqoop 客户端基于 CLI 的工具在关系数据库和 HDFS 或云对象存储之间批量传输数据。 需要进行提取、转换和加载 (ETL) 过程的遗留系统数据源通常驻留在文件系统或对象存储中。您还可以以分隔文本(默认)或 SequenceFile 格式导入数据,然后将数据转换为 Hive 推荐的 ORC 格式。通常,为了在 Hive 中查询数据,ORC 是首选格式,因为 ORC 提供了性能增强。
185 0
CDP中的Hive3系列之Hive性能调优
查看与配置集群、存储数据和编写查询相关的某些性能调优指南,以便您可以保护集群和相关服务、自动扩展资源以处理查询等。
195 0
CDP中的Hive3系列之保护Hive3
作为管理员,您需要了解运行 Hive 查询的 Hive 默认授权是不安全的,以及您需要做什么来保护您的数据。您需要了解您的安全选项:设置 Ranger 或基于存储的授权 (SBA),它基于模拟和 HDFS 访问控制列表 (ACL),或这些方法的组合。 将 Apache Hive 访问限制为已批准的用户。Cloudera 推荐 Ranger。授权是检查用户权限以执行选择操作的过程,例如创建、读取和写入数据,以及编辑表元数据。Apache Ranger 为所有 Cloudera 运行时服务提供集中授权。 您可以设置 Ranger 以使用 Hadoop SQL 策略保护托管的 ACID 表或外部表。
203 0
带你读《More Effective C#:改善C#代码的50个有效方法》之一:处理各种类型的数据
本书围绕一些关于C#和.NET的重要主题,包括C#语言元素、.NET资源管理、使用C#表达设计、创建二进制组件和使用框架等,讲述了最常见的50个问题的解决方案,为程序员提供了改善C#和.NET程序的方法。本书通过将每个条款构建在之前的条款之上,并合理地利用之前的条款,来让读者最大限度地学习书中的内容,为其在不同情况下使用最佳构造提供指导,适合各层次的C#程序员阅读。
759 0
带你读《More Effective C#:改善C#代码的50个有效方法》之一:处理各种类型的数据
本书围绕一些关于C#和.NET的重要主题,包括C#语言元素、.NET资源管理、使用C#表达设计、创建二进制组件和使用框架等,讲述了最常见的50个问题的解决方案,为程序员提供了改善C#和.NET程序的方法。本书通过将每个条款构建在之前的条款之上,并合理地利用之前的条款,来让读者最大限度地学习书中的内容,为其在不同情况下使用最佳构造提供指导,适合各层次的C#程序员阅读。
559 0
Hive之parse_url函数详解
Hive的parse_url函数使用 parse_url(url, partToExtract[, key]) - extracts a part from a URL 解析URL字符串 partToExtract的选项包含[HOST,PATH,QU...
2098 0
Hive metastore源码阅读(二)
  最近随着项目的深入,发现hive meta有些弊端,就是你会发现它的元数据操作与操作物理集群的代码耦合在一起,非常不利于扩展。比如:在create_table的时候同时进行路径校验及创建,如下代码: 1 if (!TableType.
1288 0
阿里云MVP
阿里最有价值专家MVP(Most Valuable Professional) 是专注于帮助他人充分了解和使用阿里云技术的意见领袖。 MVP是一些拥有丰富知识和实际经验的阿里云技术专家。他们不是阿里云的员工,但是非常乐于通过在线或离线社区的方式帮助技术人士。
3598 0
+关注
松伯
技术专家 专注于大数据领域 博客园地址:https://www.cnblogs.com/yangsy0915/
187
文章
0
问答
文章排行榜
最热
最新