Driver类是对
1
|
org.apache.hadoop.hive.ql.processors.CommandProcessor.java
|
接口的实现,重写了run方法,定义了常见sql的执行方式.
1
|
public
class
Driver
implements
CommandProcessor
|
具体的方法调用顺序:
1
2
|
run--->runInternal--->(createTxnManager+recordValidTxns)----->compileInternal--->
compile--analyzer(BaseSemanticAnalyzer)--->execute
|
其中compile和execute是两个比较重要的方法:
compile用来完成语法和语义的分析,生成执行计划
execute执行物理计划,即提交相应的mapredjob
通过打印perflog可以看到Driver类的简单地时序图:
下面来看下Driver类的几个常用的方法实现:
1)createTxnManager 用来获取目前设置的用于实现lock的类,比如:
1
|
org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager
|
2)checkConcurrency 用来判断当前hive设置是否支持并发控制:
1
|
boolean
supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
|
主要是通过判断hive.support.concurrency参数,默认是false
3)getClusterStatus 调用JobClient类的getClusterStatus方法来获取集群的状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public
ClusterStatus getClusterStatus()
throws
Exception {
ClusterStatus cs;
try
{
JobConf job =
new
JobConf(conf , ExecDriver.
class
);
JobClient jc =
new
JobClient(job);
cs = jc.getClusterStatus();
}
catch
(Exception e) {
e.printStackTrace();
throw
e;
}
LOG.info(
"Returning cluster status: "
+ cs.toString());
return
cs;
}
|
4)getSchema //返回表的schema信息
1
|
doAuthorization/doAuthorizationV2/getHivePrivObjects
|
用来在开启权限验证情况下对sql的权限检测操作
1
|
getLockObjects/acquireReadWriteLocks/releaseLocks
|
都是和锁相关的方法 ,其中getLockObjects用来获取锁的对象(锁的路径,锁的模式等),最终返回一个包含所有锁的list,acquireReadWriteLocks用来控制获取锁,releaseLocks用来释放锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
getLockObjects:
private
List<HiveLockObj> getLockObjects(Database d, Table t, Partition p, HiveLockMode mode)
throws
SemanticException {
List<HiveLockObj> locks =
new
LinkedList<HiveLockObj>();
HiveLockObjectData lockData =
new
HiveLockObjectData( plan.getQueryId(),
String. valueOf(System.currentTimeMillis ()),
"IMPLICIT"
,
plan.getQueryStr());
if
(d !=
null
) {
locks.add(
new
HiveLockObj(
new
HiveLockObject(d.getName(), lockData), mode));
//数据库层面的锁
return
locks;
}
if
(t !=
null
) {
// 表层面的锁
locks.add(
new
HiveLockObj(
new
HiveLockObject(t.getDbName(), lockData), mode));
locks.add(
new
HiveLockObj(
new
HiveLockObject(t, lockData), mode));
mode = HiveLockMode.SHARED;
locks.add(
new
HiveLockObj(
new
HiveLockObject(t.getDbName(), lockData), mode));
return
locks;
}
if
(p !=
null
) {
//分区层面的锁
locks.add(
new
HiveLockObj(
new
HiveLockObject(p.getTable().getDbName(), lockData), mode));
if
(!(p
instanceof
DummyPartition)) {
locks.add(
new
HiveLockObj(
new
HiveLockObject(p, lockData), mode));
}
// All the parents are locked in shared mode
mode = HiveLockMode.SHARED;
// For dummy partitions, only partition name is needed
String name = p.getName();
if
(p
instanceof
DummyPartition) {
name = p.getName().split(
"@"
)[
2
];
}
String partialName =
""
;
String[] partns = name.split(
"/"
);
int
len = p
instanceof
DummyPartition ? partns.length : partns.length -
1
;
Map<String, String> partialSpec =
new
LinkedHashMap<String, String>();
for
(
int
idx =
0
; idx < len; idx++) {
String partn = partns[idx];
partialName += partn;
String[] nameValue = partn.split(
"="
);
assert
(nameValue.length ==
2
);
partialSpec.put(nameValue[
0
], nameValue[
1
]);
try
{
locks.add(
new
HiveLockObj(
new
HiveLockObject(
new
DummyPartition(p.getTable(), p.getTable().getDbName()
+
"/"
+ p.getTable().getTableName()
+
"/"
+ partialName,
partialSpec), lockData), mode));
partialName +=
"/"
;
}
catch
(HiveException e) {
throw
new
SemanticException(e.getMessage());
}
}
locks.add(
new
HiveLockObj(
new
HiveLockObject(p.getTable(), lockData), mode));
locks.add(
new
HiveLockObj(
new
HiveLockObject(p.getTable().getDbName(), lockData), mode));
}
return
locks;
}
|
acquireReadWriteLocks调用了锁具体实现类的acquireLocks方法
releaseLocks调用了锁具体实现类的releaseLocks方法
run方法是Driver类的入口方法,调用了runInternal方法,我们主要来看runInternal的方法,大体步骤:
1
2
3
4
5
|
运行hive.exec.driver.run.hooks中设置的hook,
运行HiveDriverRunHook相关类的的preDriverRun方法---->检测是否支持并发,并获取并发实现的类
--->compileInternal---->运行锁相关的操作(判断是否只对mapred job进行锁,获取锁等)
---->调用execute---->释放锁--->运行HiveDriverRunHook相关类的的postDriverRun方法
---->返回CommandProcessorResponse对象
|
相关代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
private
CommandProcessorResponse runInternal(String command,
boolean
alreadyCompiled)
throws
CommandNeedRetryException {
errorMessage =
null
;
SQLState =
null
;
downstreamError =
null
;
if
(!validateConfVariables()) {
return
new
CommandProcessorResponse(
12
, errorMessage , SQLState );
}
HiveDriverRunHookContext hookContext =
new
HiveDriverRunHookContextImpl(conf , command);
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try
{
//运行hive.exec.driver.run.hooks中设置的hook
driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
HiveDriverRunHook.
class
);
for
(HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.preDriverRun(hookContext);
//运行HiveDriverRunHook相关类的的preDriverRun方法
}
}
catch
(Exception e) {
errorMessage =
"FAILED: Hive Internal Error: "
+ Utilities.getNameMessage(e);
SQLState = ErrorMsg. findSQLState(e.getMessage());
downstreamError = e;
console.printError( errorMessage +
"\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return
new
CommandProcessorResponse(
12
, errorMessage , SQLState );
}
// Reset the perf logger
PerfLogger perfLogger = PerfLogger.getPerfLogger(
true
);
perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
int
ret;
boolean
requireLock =
false
;
boolean
ckLock =
false
;
try
{
ckLock = checkConcurrency();
//检测是否支持并发,并获取并发实现的类,比如常用的 org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager
createTxnManager();
}
catch
(SemanticException e) {
errorMessage =
"FAILED: Error in semantic analysis: "
+ e.getMessage();
SQLState = ErrorMsg. findSQLState(e.getMessage());
downstreamError = e;
console.printError( errorMessage,
"\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
ret =
10
;
return
new
CommandProcessorResponse(ret, errorMessage , SQLState );
}
ret = recordValidTxns();
if
(ret !=
0
)
return
new
CommandProcessorResponse(ret, errorMessage, SQLState);
if
(!alreadyCompiled) {
ret = compileInternal(command);
//调用compileInternal方法
if
(ret !=
0
) {
return
new
CommandProcessorResponse(ret, errorMessage, SQLState);
}
}
// the reason that we set the txn manager for the cxt here is because each
// query has its own ctx object. The txn mgr is shared across the
// same instance of Driver, which can run multiple queries.
ctx.setHiveTxnManager( txnMgr);
if
(ckLock) {
//断是否只对mapred job进行锁,参数hive.lock.mapred.only.operation,默认为false
boolean
lockOnlyMapred = HiveConf.getBoolVar( conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY);
if
(lockOnlyMapred) {
Queue<Task<?
extends
Serializable>> taskQueue =
new
LinkedList<Task<?
extends
Serializable>>();
taskQueue.addAll( plan.getRootTasks());
while
(taskQueue.peek() !=
null
) {
Task<?
extends
Serializable> tsk = taskQueue.remove();
requireLock = requireLock || tsk.requireLock();
if
(requireLock) {
break
;
}
if
(tsk
instanceof
ConditionalTask) {
taskQueue.addAll(((ConditionalTask)tsk).getListTasks());
}
if
(tsk.getChildTasks()!=
null
) {
taskQueue.addAll(tsk.getChildTasks());
}
// does not add back up task here, because back up task should be the same
// type of the original task.
}
}
else
{
requireLock =
true
;
}
}
if
(requireLock) {
//获取锁
ret = acquireReadWriteLocks();
if
(ret !=
0
) {
try
{
releaseLocks( ctx.getHiveLocks());
}
catch
(LockException e) {
// Not much to do here
}
return
new
CommandProcessorResponse(ret, errorMessage, SQLState);
}
}
ret = execute();
//job运行
if
(ret !=
0
) {
//if needRequireLock is false, the release here will do nothing because there is no lock
try
{
releaseLocks( ctx.getHiveLocks());
}
catch
(LockException e) {
// Nothing to do here
}
return
new
CommandProcessorResponse(ret, errorMessage , SQLState );
}
//if needRequireLock is false, the release here will do nothing because there is no lock
try
{
releaseLocks( ctx.getHiveLocks());
}
catch
(LockException e) {
errorMessage =
"FAILED: Hive Internal Error: "
+ Utilities.getNameMessage(e);
SQLState = ErrorMsg. findSQLState(e.getMessage());
downstreamError = e;
console.printError( errorMessage +
"\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return
new
CommandProcessorResponse(
12
, errorMessage , SQLState );
}
perfLogger.PerfLogEnd( CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.close(LOG, plan);
// Take all the driver run hooks and post-execute them.
try
{
for
(HiveDriverRunHook driverRunHook : driverRunHooks) {
//运行HiveDriverRunHook相关类的的postDriverRun方法
driverRunHook.postDriverRun(hookContext);
}
}
catch
(Exception e) {
errorMessage =
"FAILED: Hive Internal Error: "
+ Utilities.getNameMessage(e);
SQLState = ErrorMsg. findSQLState(e.getMessage());
downstreamError = e;
console.printError( errorMessage +
"\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return
new
CommandProcessorResponse(
12
, errorMessage , SQLState );
}
return
new
CommandProcessorResponse(ret);
}
|
8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
再来看下compileInternal方法
private
static
final
Object compileMonitor =
new
Object();
private
int
compileInternal(String command) {
int
ret;
synchronized
( compileMonitor) {
ret = compile(command);
//调用compile方法
}
if
(ret !=
0
) {
try
{
releaseLocks( ctx.getHiveLocks());
}
catch
(LockException e) {
LOG.warn(
"Exception in releasing locks. "
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
}
}
return
ret;
}
|
调用了compile方法,compile方法分析命令,生成Task,关于compile的具体实现后面详细讲解
9.execute方法,提交task并等待task运行完毕,并打印task运行的信息,比如消耗的时间等
(这里信息也比较多,后面单独讲解
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1571890,如需转载请自行联系原作者