开发者社区> 科技小先锋> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

hive执行流程(3)-Driver类分析1Driver类整体流程

简介:
+关注继续查看

Driver类是对

1
org.apache.hadoop.hive.ql.processors.CommandProcessor.java

接口的实现,重写了run方法,定义了常见sql的执行方式.

1
public class Driver implements CommandProcessor

具体的方法调用顺序:

1
2
run--->runInternal--->(createTxnManager+recordValidTxns)----->compileInternal--->
compile--analyzer(BaseSemanticAnalyzer)--->execute

其中compile和execute是两个比较重要的方法:

compile用来完成语法和语义的分析,生成执行计划

execute执行物理计划,即提交相应的mapredjob

通过打印perflog可以看到Driver类的简单地时序图:

wKiom1RY9Fbh-9DsAAC0DODwl28784.jpg

下面来看下Driver类的几个常用的方法实现:

1)createTxnManager 用来获取目前设置的用于实现lock的类,比如:

1
org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager

2)checkConcurrency 用来判断当前hive设置是否支持并发控制:

1
boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);

主要是通过判断hive.support.concurrency参数,默认是false

3)getClusterStatus 调用JobClient类的getClusterStatus方法来获取集群的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
public ClusterStatus getClusterStatus() throws Exception {
    ClusterStatus cs;
    try {
      JobConf job = new JobConf(conf , ExecDriver.class);
      JobClient jc = new JobClient(job);
      cs = jc.getClusterStatus();
    catch (Exception e) {
      e.printStackTrace();
      throw e;
    }
    LOG.info( "Returning cluster status: " + cs.toString());
    return cs;
  }

4)getSchema   //返回表的schema信息

5)

1
doAuthorization/doAuthorizationV2/getHivePrivObjects

用来在开启权限验证情况下对sql的权限检测操作

6)

1
getLockObjects/acquireReadWriteLocks/releaseLocks

都是和锁相关的方法 ,其中getLockObjects用来获取锁的对象(锁的路径,锁的模式等),最终返回一个包含所有锁的list,acquireReadWriteLocks用来控制获取锁,releaseLocks用来释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
getLockObjects:
  private List<HiveLockObj> getLockObjects(Database d, Table t, Partition p, HiveLockMode mode)
      throws SemanticException {
    List<HiveLockObj> locks = new LinkedList<HiveLockObj>();
    HiveLockObjectData lockData =
      new HiveLockObjectData( plan.getQueryId(),
                             String. valueOf(System.currentTimeMillis ()),
                             "IMPLICIT",
                             plan.getQueryStr());
    if (d != null) {
      locks.add( new HiveLockObj(new HiveLockObject(d.getName(), lockData), mode));  //数据库层面的锁
      return locks;
    }
    if (t != null) {  // 表层面的锁
      locks.add( new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode));
      locks.add( new HiveLockObj(new HiveLockObject(t, lockData), mode));
      mode = HiveLockMode.SHARED;
      locks.add( new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode));
      return locks;
    }
    if (p != null) { //分区层面的锁
      locks.add( new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode));
      if (!(p instanceof DummyPartition)) {
        locks.add( new HiveLockObj(new HiveLockObject(p, lockData), mode));
      }
      // All the parents are locked in shared mode
      mode = HiveLockMode.SHARED;
      // For dummy partitions, only partition name is needed
      String name = p.getName();
      if (p instanceof DummyPartition) {
        name = p.getName().split( "@")[2];
      }
      String partialName = "";
      String[] partns = name.split( "/");
      int len = p instanceof DummyPartition ? partns.length : partns.length - 1;
      Map<String, String> partialSpec = new LinkedHashMap<String, String>();
      for int idx = 0; idx < len; idx++) {
        String partn = partns[idx];
        partialName += partn;
        String[] nameValue = partn.split( "=");
        assert(nameValue.length == 2);
        partialSpec.put(nameValue[0], nameValue[1]);
        try {
          locks.add( new HiveLockObj(
                      new HiveLockObject(new DummyPartition(p.getTable(), p.getTable().getDbName()
                                                            "/" + p.getTable().getTableName()
                                                            "/" + partialName,
                                                              partialSpec), lockData), mode));
          partialName += "/";
        catch (HiveException e) {
          throw new SemanticException(e.getMessage());
        }
      }
      locks.add( new HiveLockObj(new HiveLockObject(p.getTable(), lockData), mode));
      locks.add( new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode));
    }
    return locks;
  }

acquireReadWriteLocks调用了锁具体实现类的acquireLocks方法

releaseLocks调用了锁具体实现类的releaseLocks方法

7)

run方法是Driver类的入口方法,调用了runInternal方法,我们主要来看runInternal的方法,大体步骤:

1
2
3
4
5
运行hive.exec.driver.run.hooks中设置的hook,
运行HiveDriverRunHook相关类的的preDriverRun方法---->检测是否支持并发,并获取并发实现的类
--->compileInternal---->运行锁相关的操作(判断是否只对mapred job进行锁,获取锁等)
---->调用execute---->释放锁--->运行HiveDriverRunHook相关类的的postDriverRun方法
---->返回CommandProcessorResponse对象

相关代码: 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
      throws CommandNeedRetryException {
    errorMessage = null;
    SQLState = null;
    downstreamError = null;
    if (!validateConfVariables()) {
      return new CommandProcessorResponse(12, errorMessage , SQLState );
    }
    HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf , command);
    // Get all the driver run hooks and pre-execute them.
    List<HiveDriverRunHook> driverRunHooks;
    try {               //运行hive.exec.driver.run.hooks中设置的hook
      driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, 
          HiveDriverRunHook. class);         
      for (HiveDriverRunHook driverRunHook : driverRunHooks) {
          driverRunHook.preDriverRun(hookContext); //运行HiveDriverRunHook相关类的的preDriverRun方法
      }
    catch (Exception e) {
      errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
      SQLState = ErrorMsg. findSQLState(e.getMessage());
      downstreamError = e;
      console.printError( errorMessage + "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      return new CommandProcessorResponse(12, errorMessage , SQLState );
    }
    // Reset the perf logger
    PerfLogger perfLogger = PerfLogger.getPerfLogger( true);
    perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.DRIVER_RUN);
    perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
    int ret;
    boolean requireLock = false;
    boolean ckLock = false;
    try {
      ckLock = checkConcurrency();  //检测是否支持并发,并获取并发实现的类,比如常用的 org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager
      createTxnManager();
    catch (SemanticException e) {
      errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();
      SQLState = ErrorMsg. findSQLState(e.getMessage());
      downstreamError = e;
      console.printError( errorMessage, "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      ret = 10;
      return new CommandProcessorResponse(ret, errorMessage , SQLState );
    }
    ret = recordValidTxns();
    if (ret != 0return new CommandProcessorResponse(ret, errorMessage, SQLState);
    if (!alreadyCompiled) {
      ret = compileInternal(command);  //调用compileInternal方法
      if (ret != 0) {
        return new CommandProcessorResponse(ret, errorMessage, SQLState);
      }
    }
    // the reason that we set the txn manager for the cxt here is because each
    // query has its own ctx object. The txn mgr is shared across the
    // same instance of Driver, which can run multiple queries.
    ctx.setHiveTxnManager( txnMgr);
    if (ckLock) {  //断是否只对mapred job进行锁,参数hive.lock.mapred.only.operation,默认为false
      boolean lockOnlyMapred = HiveConf.getBoolVar( conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY);
      if(lockOnlyMapred) {
        Queue<Task<? extends Serializable>> taskQueue = new LinkedList<Task<? extends Serializable>>();
        taskQueue.addAll( plan.getRootTasks());
        while (taskQueue.peek() != null) {
          Task<? extends Serializable> tsk = taskQueue.remove();
          requireLock = requireLock || tsk.requireLock();
          if(requireLock) {
            break;
          }
          if (tsk instanceof ConditionalTask) {
            taskQueue.addAll(((ConditionalTask)tsk).getListTasks());
          }
          if(tsk.getChildTasks()!= null) {
            taskQueue.addAll(tsk.getChildTasks());
          }
          // does not add back up task here, because back up task should be the same
          // type of the original task.
        }
      else {
        requireLock = true;
      }
    }
    if (requireLock) { //获取锁
      ret = acquireReadWriteLocks();
      if (ret != 0) {
        try {
          releaseLocks( ctx.getHiveLocks());
        catch (LockException e) {
          // Not much to do here
        }
        return new CommandProcessorResponse(ret, errorMessage, SQLState);
      }
    }
    ret = execute(); //job运行
    if (ret != 0) {
      //if needRequireLock is false, the release here will do nothing because there is no lock
      try {
        releaseLocks( ctx.getHiveLocks());
      catch (LockException e) {
        // Nothing to do here
      }
      return new CommandProcessorResponse(ret, errorMessage , SQLState );
    }
    //if needRequireLock is false, the release here will do nothing because there is no lock
    try {
      releaseLocks( ctx.getHiveLocks());
    catch (LockException e) {
      errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
      SQLState = ErrorMsg. findSQLState(e.getMessage());
      downstreamError = e;
      console.printError( errorMessage + "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      return new CommandProcessorResponse(12, errorMessage , SQLState );
    }
    perfLogger.PerfLogEnd( CLASS_NAME, PerfLogger.DRIVER_RUN);
    perfLogger.close(LOG, plan);
    // Take all the driver run hooks and post-execute them.
    try {
      for (HiveDriverRunHook driverRunHook : driverRunHooks) {  //运行HiveDriverRunHook相关类的的postDriverRun方法
          driverRunHook.postDriverRun(hookContext);
      }
    catch (Exception e) {
      errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
      SQLState = ErrorMsg. findSQLState(e.getMessage());
      downstreamError = e;
      console.printError( errorMessage + "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      return new CommandProcessorResponse(12, errorMessage , SQLState );
    }
    return new CommandProcessorResponse(ret);
  }

8)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
再来看下compileInternal方法
  private static final Object compileMonitor = new Object();
  private int compileInternal(String command) {
    int ret;
    synchronized ( compileMonitor) {
      ret = compile(command);  //调用compile方法
    }
    if (ret != 0) {
      try {
        releaseLocks( ctx.getHiveLocks());
      catch (LockException e) {
        LOG.warn("Exception in releasing locks. "
            + org.apache.hadoop.util.StringUtils.stringifyException(e));
      }
    }
    return ret;
  }

 调用了compile方法,compile方法分析命令,生成Task,关于compile的具体实现后面详细讲解

9.execute方法,提交task并等待task运行完毕,并打印task运行的信息,比如消耗的时间等

(这里信息也比较多,后面单独讲解



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1571890,如需转载请自行联系原作者

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Hive底层原理:explain执行计划详解(二)
不懂hive中的explain,说明hive还没入门,学会explain,能够给我们工作中使用hive带来极大的便利!
85 0
CVE-2021-22205——Gitlab 远程命令执行漏洞复现
CVE-2021-22205——Gitlab 远程命令执行漏洞复现
497 0
Hive安装使用
文档及下周网址 官网http://hive.apache.org 文档https://cwiki.apache.org/confluence/display/Hive/GettingStarted https://cwiki.
2512 0
使用SAP HANA Express Edition开始用自己的设备构建大数据应用
作为SAP HANA Platform的简化版本,开发者可以免费下载使用SAP HANA Express Edition,用最简单的硬件设备(包括个人电脑、服务器与云服务器)运行SAP HANA Express Edition。快捷简单的构建大数据与企业服务解决方案,并进行快速部署。
6379 0
《Hive编程指南》一导读
本书是一本Hive的编程指南。Hive是Hadoop生态系统中必不可少的一个工具,它提供了一种SQL(结构化查询语言)方言,可以查询存储在Hadoop分布式文件系统(HDFS)中的数据或其他和Hadoop集成的文件系统,如MapR-FS、Amazon的S3和像HBase(Hadoop数据库)和Cassandra这样的数据库中的数据。
1374 0
【RAC安装故障】multiple user has uid 0
【RAC安装故障处理】multiple user has uid 0 一.1  BLOG文档结构图       一.2  前言部分   一.2.1  导读和注意事项 各位技术爱好者,看完本文后,你可以掌握如下的技能,也可以学到一些其它你所不知道的知识,~O...
1203 0
6963
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载