在运行hive cli命令时,调用hadoop jar hive-cli-0.13.1.jar org.apache.hadoop.hive.cli.CliDriver xxxx 命令,而org.apache.hadoop.util.RunJar方法其实是封装了反射调用,最终是调用org.apache.hadoop.hive.cli.CliDriver类的main方法.
CliDriver类是hive的入口类。
首先CliDriver类会通过OptionsProcessor类来parse输入的命令。比如解析-e,-s,-h等参数,然后把对应的值存放到对应的CliSessionState类的属性中,最后应用于CliDriver类中。
比如在executeDriver方法中,根据CliSessionState的属性对命令进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
CliDriver cli =
new
CliDriver();
cli.setHiveVariables(oproc.getHiveVariables());
// 有变量相关的设置时
// use the specified database if specified
cli.processSelectDatabase(ss);
// Execute -i init files (always in silent mode)
cli.processInitFiles(ss);
// 指定了-i和加载.hiverc文件
if
(ss. execString !=
null
) {
// 指定了 -e时
int
cmdProcessStatus = cli.processLine(ss. execString);
return
cmdProcessStatus;
}
try
{
// 指定了-f时
if
(ss. fileName !=
null
) {
return
cli.processFile(ss.fileName );
}
}
catch
(FileNotFoundException e) {
System. err.println(
"Could not open input file for reading. ("
+ e.getMessage() +
")"
);
return
3
;
}
|
在CliDriver类方法的调用顺序主要有下面几种
1)add xxx/set/compile/reset等命令
1
|
main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd--对应processor类的run方法
|
2)sql命令
1
|
main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd---->Driver类run方法
|
3)shell命令
1
|
main-->run--->executeDriver---->processLine--->processCmd
|
其中CliDriver类中最重要的方法是processCmd,其定义了不同的命令不同的执行方式:
具体实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
public
int
processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
ss.setLastCommand(cmd);
// Flush the print stream, so it doesn't include output from the last command
ss.err.flush();
String cmd_trimmed = cmd.trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
int
ret =
0
;
if
(cmd_trimmed.toLowerCase().equals(
"quit"
) || cmd_trimmed.toLowerCase().equals(
"exit"
)) {
//如果是quit或者是exit,则直接退出jvm
ss.close();
System.exit(
0
);
}
else
if
(tokens[
0
].equalsIgnoreCase(
"source"
)) {
// 如果是source xxx的情况,则按文件处理(调用processFile方法)
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[
0
].length());
File sourceFile =
new
File(cmd_1);
if
(! sourceFile.isFile()){
console.printError(
"File: "
+ cmd_1 +
" is not a file."
);
ret =
1
;
}
else
{
try
{
this
.processFile(cmd_1);
}
catch
(IOException e) {
console.printError(
"Failed processing file "
+ cmd_1 +
" "
+ e.getLocalizedMessage(),
stringifyException(e));
ret =
1
;
}
}
}
else
if
(cmd_trimmed.startsWith(
"!"
)) {
// 以!开头的,做为shell命令执行,最终调用Runtime.getRuntime().exec(shell_cmd)
String shell_cmd = cmd_trimmed.substring(
1
);
shell_cmd =
new
VariableSubstitution().substitute(ss.getConf(), shell_cmd);
//这里也会进行变量替换
// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
try
{
Process executor = Runtime. getRuntime().exec(shell_cmd);
StreamPrinter outPrinter =
new
StreamPrinter(executor.getInputStream(),
null
, ss.out);
StreamPrinter errPrinter =
new
StreamPrinter(executor.getErrorStream(),
null
, ss.err);
outPrinter.start();
errPrinter.start();
ret = executor.waitFor();
if
(ret !=
0
) {
console.printError(
"Command failed with exit code = "
+ ret);
}
}
catch
(Exception e) {
console.printError(
"Exception raised from Shell command "
+ e.getLocalizedMessage(),
stringifyException(e));
ret =
1
;
}
}
else
if
(tokens[
0
].toLowerCase().equals(
"list"
)) {
// list命令时,调用SessionState的list_resource方法
SessionState.ResourceType t;
if
(tokens. length <
2
|| (t = SessionState.find_resource_type(tokens[
1
])) ==
null
) {
console.printError(
"Usage: list ["
+ StringUtils.join(SessionState.ResourceType.values(),
"|"
) +
"] [<value> [<value>]*]"
);
ret =
1
;
}
else
{
List<String> filter =
null
;
if
(tokens.length >=
3
) {
System. arraycopy(tokens,
2
, tokens,
0
, tokens.length -
2
);
filter = Arrays. asList(tokens);
}
Set<String> s = ss.list_resource(t, filter);
if
(s !=
null
&& !s.isEmpty()) {
ss.out.println(StringUtils.join(s,
"\n"
));
}
}
}
else
if
(ss.isRemoteMode()) {
// remote mode -- connecting to remote hive server //如果是远程模式,即hiveserver,调用HiveClient类的execute方法
HiveClient client = ss.getClient();
PrintStream out = ss.out;
PrintStream err = ss.err;
try
{
client.execute(cmd_trimmed);
List<String> results;
do
{
results = client.fetchN( LINES_TO_FETCH);
for
(String line : results) {
out.println(line);
}
}
while
(results.size() == LINES_TO_FETCH);
}
catch
(HiveServerException e) {
ret = e.getErrorCode();
if
(ret !=
0
) {
// OK if ret == 0 -- reached the EOF
String errMsg = e.getMessage();
if
(errMsg ==
null
) {
errMsg = e.toString();
}
ret = e.getErrorCode();
err.println(
"[Hive Error]: "
+ errMsg);
}
}
catch
(TException e) {
String errMsg = e.getMessage();
if
(errMsg ==
null
) {
errMsg = e.toString();
}
ret = -
10002
;
err.println(
"[Thrift Error]: "
+ errMsg);
}
finally
{
try
{
client.clean();
}
catch
(TException e) {
String errMsg = e.getMessage();
if
(errMsg ==
null
) {
errMsg = e.toString();
}
err.println(
"[Thrift Error]: Hive server is not cleaned due to thrift exception: "
+ errMsg);
}
}
}
else
{
// local mode // 剩下的情况都作为local模式,比如add xxx,set xxxx,select/insert xxx/show tables/create table,databse/use xxx等命令。
try
{
CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);
//会先根据命令获取对应的CommandProcessor 实现类
ret = processLocalCmd(cmd, proc, ss);
//并调用processLocalCmd方法
}
catch
(SQLException e) {
console.printError(
"Failed processing command "
+ tokens[
0
] +
" "
+ e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret =
1
;
}
}
return
ret;
}
|
而processLocalCmd方法会将CommandProcessor的实例作为参数传入,并根据不同的CommandProcessor实现类,来调用不同的类的run方法。
1
|
int
processLocalCmd (String cmd, CommandProcessor proc, CliSessionState ss)
|
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1542275,如需转载请自行联系原作者