Hive metastore源码阅读(二)

简介:   最近随着项目的深入,发现hive meta有些弊端,就是你会发现它的元数据操作与操作物理集群的代码耦合在一起,非常不利于扩展。比如:在create_table的时候同时进行路径校验及创建,如下代码: 1 if (!TableType.

  最近随着项目的深入,发现hive meta有些弊端,就是你会发现它的元数据操作与操作物理集群的代码耦合在一起,非常不利于扩展。比如:在create_table的时候同时进行路径校验及创建,如下代码:

 1   if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
 2           if (tbl.getSd().getLocation() == null
 3               || tbl.getSd().getLocation().isEmpty()) {
 4             tblPath = wh.getTablePath(
 5                 ms.getDatabase(tbl.getDbName()), tbl.getTableName());
 6           } else {
 7             if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
 8               LOG.warn("Location: " + tbl.getSd().getLocation()
 9                   + " specified for non-external table:" + tbl.getTableName());
10             }
11             tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
12           }
13           tbl.getSd().setLocation(tblPath.toString());
14         }
15 
16         if (tblPath != null) {
17           if (!wh.isDir(tblPath)) {
18             if (!wh.mkdirs(tblPath, true)) {
19               throw new MetaException(tblPath
20                   + " is not a directory or unable to create one");
21             }
22             madeDir = true;
23           }

   所以这是meta无法统一所有元数据的原因么。。其实hive metastore的代码从大的来看,就好比元数据的增删改查,从上次梳理中我们看到,在创建HiveMetaStore的init方法中,同时创建了三种Listener---MetaStorePreEventListener,MetaStoreEventListener,MetaStoreEndFunctionListener用于对每一步事件的监听与记录。同时呢,它还new出了WareHouse,用以进行物理操作。

  

 1     public void init() throws MetaException {
 2       rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
 3       initListeners = MetaStoreUtils.getMetaStoreListeners(
 4           MetaStoreInitListener.class, hiveConf,
 5           hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS));
 6       for (MetaStoreInitListener singleInitListener: initListeners) {
 7           MetaStoreInitContext context = new MetaStoreInitContext();
 8           singleInitListener.onInit(context);
 9       }
10 
11       String alterHandlerName = hiveConf.get("hive.metastore.alter.impl",
12           HiveAlterHandler.class.getName());
13       alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass(
14           alterHandlerName), hiveConf);
15       wh = new Warehouse(hiveConf);
16         。。。。
17     }

 接下来,我们从元数据的生命周期开始,学习下Partiiton的生命周期。在HiveMetaStoreClient中,查找add_partition作为入口,这种操作在我们insert overwrite 以表中某个字段为分区时,比如dt=20170830,作用到的操作。或者是add_partitions,创建分区表后进行数据的导入,那么会创建多个分区路径,下面以add_partiitons为例:

 1   public int add_partitions(List<Partition> new_parts)
 2       throws InvalidObjectException, AlreadyExistsException, MetaException,
 3       TException {
 4     return client.add_partitions(new_parts);
 5   }
 6 
 7   @Override
 8   public List<Partition> add_partitions(
 9       List<Partition> parts, boolean ifNotExists, boolean needResults)
10       throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
11     if (parts.isEmpty()) {
12       return needResults ? new ArrayList<Partition>() : null;
13     }
14     Partition part = parts.get(0);
15     AddPartitionsRequest req = new AddPartitionsRequest(
16         part.getDbName(), part.getTableName(), parts, ifNotExists);
17     req.setNeedResult(needResults);
18     AddPartitionsResult result = client.add_partitions_req(req);
19     return needResults ? filterHook.filterPartitions(result.getPartitions()) : null;
20   }

  这里的client来自于ThriftHiveMetastore.Iface接口对象,其实现子类HiveMetaStore并调用init方法进行创建。随后将封装了AddPartitionsRequest类,其实这个类还是partition的属性,但是这样封装的好处是,今后再调用的时候不用再去获取partition的DbName,,TableName等信息,一次性封装以便后续直接使用该对象。随后,我们查看client调用add_partitions_req,下面代码高能预警,非常多,我们一点点分析。

  

 1    private List<Partition> add_partitions_core(
 2         RawStore ms, String dbName, String tblName, List<Partition> parts, boolean ifNotExists)
 3             throws MetaException, InvalidObjectException, AlreadyExistsException, TException {
 4       logInfo("add_partitions");
 5       boolean success = false;
 6       // Ensures that the list doesn't have dups, and keeps track of directories we have created.
 7       Map<PartValEqWrapper, Boolean> addedPartitions = new HashMap<PartValEqWrapper, Boolean>();
 8       List<Partition> result = new ArrayList<Partition>();
 9       List<Partition> existingParts = null;
10       Table tbl = null;
11       try {
12         ms.openTransaction();
13         tbl = ms.getTable(dbName, tblName);
14         if (tbl == null) {
15           throw new InvalidObjectException("Unable to add partitions because "
16               + "database or table " + dbName + "." + tblName + " does not exist");
17         }
18 
19         if (!parts.isEmpty()) {
20           firePreEvent(new PreAddPartitionEvent(tbl, parts, this));
21         }
22 
23         for (Partition part : parts) {
24           if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) {
25             throw new MetaException("Partition does not belong to target table "
26                 + dbName + "." + tblName + ": " + part);
27           }
28           boolean shouldAdd = startAddPartition(ms, part, ifNotExists);
29           if (!shouldAdd) {
30             if (existingParts == null) {
31               existingParts = new ArrayList<Partition>();
32             }
33             existingParts.add(part);
34             LOG.info("Not adding partition " + part + " as it already exists");
35             continue;
36           }
37           boolean madeDir = createLocationForAddedPartition(tbl, part);
38           if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {
39             // Technically, for ifNotExists case, we could insert one and discard the other
40             // because the first one now "exists", but it seems better to report the problem
41             // upstream as such a command doesn't make sense.
42             throw new MetaException("Duplicate partitions in the list: " + part);
43           }
44           initializeAddedPartition(tbl, part, madeDir);
45           result.add(part);
46         }
47         if (!result.isEmpty()) {
48           success = ms.addPartitions(dbName, tblName, result);
49         } else {
50           success = true;
51         }
52         success = success && ms.commitTransaction();
53       } finally {
54         if (!success) {
55           ms.rollbackTransaction();
56           for (Entry<PartValEqWrapper, Boolean> e : addedPartitions.entrySet()) {
57             if (e.getValue()) {
58               wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true);
59               // we just created this directory - it's not a case of pre-creation, so we nuke
60             }
61           }
62           fireMetaStoreAddPartitionEvent(tbl, parts, null, false);
63         } else {
64           fireMetaStoreAddPartitionEvent(tbl, result, null, true);
65           if (existingParts != null) {
66             // The request has succeeded but we failed to add these partitions.
67             fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false);
68           }
69         }
70       }
71       return result;
72     }
View Code

  首先呢

  1、ms.openTransaction(),这个上次已经提到过,是为了保证操作的原子性。随后 tbl = ms.getTable(dbName, tblName);

  2、通过dbName以及tableName获取正个Table对象。

  3、通过firePreEvent记录事件。

  4、开始循环遍历partiiton,通过startAddPartition方法校验该partition是否在元数据中存在

  5、调用createLocationForAddedPartition方法进行文件路径创建,随后调用initializeAddedPartition,主要是将table的param信息赋给partition,与hive的表结构有关,最终会将param扩展信息写入类似meta_partition_param的扩展信息表。

  6、待物理操作完毕之后,进行ms.addPartitions(dbName, tblName, result)元数据信息的meta录入。

  7、如果说partition的路径已经存在,则抛出异常,并且在最后删除已经创建的路径。这个有一次,请看上面,首先创建了一个Map,

Map<PartValEqWrapper, Boolean> addedPartitions = new HashMap<PartValEqWrapper, Boolean>();将partition对象作为key,mkdir成功失败的布尔值作为value,最终通过判断value的值,来删除创建成功的partition.

  删除,和查询就不说了,因为太过简单,那么alter_partition来了,client.alter_partition(dbName, tblName, newPart);从client端调用我也不说了~,传入dbName,tbleName以及新的partition,随之在hivemetaStore中调用了rename_partition方法:

  

    @Override
    public void rename_partition(final String db_name, final String tbl_name,
        final List<String> part_vals, final Partition new_part)
        throws InvalidOperationException, MetaException, TException {
      // Call rename_partition without an environment context.
      rename_partition(db_name, tbl_name, part_vals, new_part, null);
    }

    private void rename_partition(final String db_name, final String tbl_name,
        final List<String> part_vals, final Partition new_part,
        final EnvironmentContext envContext)
        throws InvalidOperationException, MetaException,
        TException {
      startTableFunction("alter_partition", db_name, tbl_name);

      if (LOG.isInfoEnabled()) {
        LOG.info("New partition values:" + new_part.getValues());
        if (part_vals != null && part_vals.size() > 0) {
          LOG.info("Old Partition values:" + part_vals);
        }
      }

      Partition oldPart = null;
      Exception ex = null;
      try {
        firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this));

        if (part_vals != null && !part_vals.isEmpty()) {
          MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(),
              partitionValidationPattern);
        }

        oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part);

        // Only fetch the table if we actually have a listener
        Table table = null;
        for (MetaStoreEventListener listener : listeners) {
          if (table == null) {
            table = getMS().getTable(db_name, tbl_name);
          }
          AlterPartitionEvent alterPartitionEvent =
              new AlterPartitionEvent(oldPart, new_part, table, true, this);
          alterPartitionEvent.setEnvironmentContext(envContext);
          listener.onAlterPartition(alterPartitionEvent);
        }
      } catch (InvalidObjectException e) {
        ex = e;
        throw new InvalidOperationException(e.getMessage());
      } catch (AlreadyExistsException e) {
        ex = e;
        throw new InvalidOperationException(e.getMessage());
      } catch (Exception e) {
        ex = e;
        if (e instanceof MetaException) {
          throw (MetaException) e;
        } else if (e instanceof InvalidOperationException) {
          throw (InvalidOperationException) e;
        } else if (e instanceof TException) {
          throw (TException) e;
        } else {
          throw newMetaException(e);
        }
      } finally {
        endFunction("alter_partition", oldPart != null, ex, tbl_name);
      }
      return;
    }

  我们继续来看:  

  1、startTableFunction方法主要用来计数

  2、new_part.getValues()其实获取的是partition的具体列值信息,比如dt=20170830,那么获取的就是这个20170830

  3、随之通过validatePartitionNameCharacters校验partitionName是否合法。

  4、随后通过alterHandler.alterPartition进行partition的更改,但是为什么要用oldPart命名?已经修改了呀?(疑问)我们跟进去会发现,其调用了updatePartColumnStats方法:

  private void updatePartColumnStats(RawStore msdb, String dbName, String tableName,
      List<String> partVals, Partition newPart) throws MetaException, InvalidObjectException {
    dbName = HiveStringUtils.normalizeIdentifier(dbName);
    tableName = HiveStringUtils.normalizeIdentifier(tableName);
    String newDbName = HiveStringUtils.normalizeIdentifier(newPart.getDbName());
    String newTableName = HiveStringUtils.normalizeIdentifier(newPart.getTableName());

    Table oldTable = msdb.getTable(dbName, tableName);
    if (oldTable == null) {
      return;
    }

    try {
      String oldPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), partVals);
      String newPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), newPart.getValues());
      if (!dbName.equals(newDbName) || !tableName.equals(newTableName)
          || !oldPartName.equals(newPartName)) {
        msdb.deletePartitionColumnStatistics(dbName, tableName, oldPartName, partVals, null);
      } else {
        Partition oldPartition = msdb.getPartition(dbName, tableName, partVals);
        if (oldPartition == null) {
          return;
        }
        if (oldPartition.getSd() != null && newPart.getSd() != null) {
        List<FieldSchema> oldCols = oldPartition.getSd().getCols();
          if (!MetaStoreUtils.areSameColumns(oldCols, newPart.getSd().getCols())) {
            updatePartColumnStatsForAlterColumns(msdb, oldPartition, oldPartName, partVals, oldCols, newPart);
          }
        }
      }
    } catch (NoSuchObjectException nsoe) {
      LOG.debug("Could not find db entry." + nsoe);
      //ignore
    } catch (InvalidInputException iie) {
      throw new InvalidObjectException("Invalid input to update partition column stats." + iie);
    }
  }

  5、通过Warehouse.makePartName组装partition的原有和新的表达,比如:dt=20180830,新的为dataPart=20180830

  6、这里会有层判断,如果新的表达与旧的表达不同则删除原有meta信息,否则将会调用updatePartColumnStatsForAlterColumns进行meta元数据的更新。

  随后就木有了。。太晚了,碎觉啦,明天还要作死上班呢哈哈哈哈~

目录
相关文章
|
2月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
147 0
|
2月前
|
SQL Java Maven
hive-3.0.0源码编译详解
hive-3.0.0源码编译详解
20 0
|
9月前
|
SQL 分布式计算 Java
浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)
浅析 hive udaf 的正确编写方式- 论姿势的重要性-系列四-如何直接访问metastore service(附源码)
|
9月前
|
SQL 运维 大数据
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
如何获取大数据平台 CDH 中 hive metastore db 的用户名和密码?
|
11月前
|
SQL 存储 大数据
关于数据仓库的Hive的Hive架构的MetaStore元数据服务
随着大数据技术的不断发展,数据仓库成为了企业中不可或缺的一部分。而Hive作为一种开源的数据仓库系统,因其易于使用和高效处理等特点,成为了许多企业的首选。然而,对于普通用户来说,直接使用Hive的命令行工具进行操作并不方便。因此,开发者社区中涌现出了大量的Hive GUI工具,其中最为流行的就是Web GUI工具。
243 2
|
SQL 存储 分布式计算
Hive 2.1.1 MetaException(在metastore中找不到消息:版本信息)
Hive 2.1.1 MetaException(在metastore中找不到消息:版本信息)
256 0
|
SQL HIVE
解决启动Hive报错Hive Schema version 2.3.0 does not match metastore‘s schema version 1.2.0 Metastore is not
解决启动Hive报错Hive Schema version 2.3.0 does not match metastore‘s schema version 1.2.0 Metastore is not
170 0
|
SQL 分布式计算 Java
spark 对于hive metastore的兼容性随笔--通过classloader实现
spark 对于hive metastore的兼容性随笔--通过classloader实现
432 0
|
SQL 存储 分布式计算
Hive简介及源码编译
Hive是一个基于Hadoop的数据仓库,可以将结构化数据映射成一张表,并提供类SQL的功能,最初由Facebook提供,使用HQL作为查询接口、HDFS作为存储底层、MapReduce作为执行层,设计目的是让SQL技能良好,但Java技能较弱的分析师可以查询海量数据,2008年facebook把Hive项目贡献给Apache。Hive提供了比较完整的SQL功能(本质是将SQL转换为MapReduce),自身最大的缺点就是执行速度慢。Hive有自身的元数据结构描述,可以使用MySql\ProstgreSql\oracle 等关系型数据库来进行存储,但请注意Hive中的所有数据都存储在HDFS中
388 0
Hive简介及源码编译
|
SQL 分布式计算 数据管理
spark SQL配置连接Hive Metastore 3.1.2
Hive Metastore作为元数据管理中心,支持多种计算引擎的读取操作,例如Flink、Presto、Spark等。本文讲述通过spark SQL配置连接Hive Metastore,并以3.1.2版本为例。
spark SQL配置连接Hive Metastore 3.1.2