hive执行流程(1)-hive入口CliDriver类分析

简介:

在运行hive cli命令时,调用hadoop jar hive-cli-0.13.1.jar org.apache.hadoop.hive.cli.CliDriver xxxx 命令,而org.apache.hadoop.util.RunJar方法其实是封装了反射调用,最终是调用org.apache.hadoop.hive.cli.CliDriver类的main方法.


CliDriver类是hive的入口类。

  首先CliDriver类会通过OptionsProcessor类来parse输入的命令。比如解析-e,-s,-h等参数,然后把对应的值存放到对应的CliSessionState类的属性中,最后应用于CliDriver类中。

比如在executeDriver方法中,根据CliSessionState的属性对命令进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CliDriver cli =  new  CliDriver();
     cli.setHiveVariables(oproc.getHiveVariables());   // 有变量相关的设置时
  
     // use the specified database if specified
     cli.processSelectDatabase(ss); 
     // Execute -i init files (always in silent mode)
     cli.processInitFiles(ss);  // 指定了-i和加载.hiverc文件
     if  (ss. execString !=  null  ) {   // 指定了 -e时
       int  cmdProcessStatus = cli.processLine(ss. execString);  
       return  cmdProcessStatus;
     }
     try  {    // 指定了-f时
       if  (ss. fileName !=  null ) {
         return  cli.processFile(ss.fileName );
       }
     catch  (FileNotFoundException e) {
       System. err.println( "Could not open input file for reading. ("  + e.getMessage() +  ")"  );
       return  3 ;
     }

在CliDriver类方法的调用顺序主要有下面几种

1)add xxx/set/compile/reset等命令

1
main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd--对应processor类的run方法

2)sql命令

1
main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd---->Driver类run方法

3)shell命令

1
main-->run--->executeDriver---->processLine--->processCmd

其中CliDriver类中最重要的方法是processCmd,其定义了不同的命令不同的执行方式:

具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public  int  processCmd(String cmd) {
     CliSessionState ss = (CliSessionState) SessionState.get();
     ss.setLastCommand(cmd);
     // Flush the print stream, so it doesn't include output from the last command
     ss.err.flush();
     String cmd_trimmed = cmd.trim();
     String[] tokens = tokenizeCmd(cmd_trimmed);
     int  ret =  0 ;
     if  (cmd_trimmed.toLowerCase().equals(  "quit" ) || cmd_trimmed.toLowerCase().equals( "exit"  )) {  //如果是quit或者是exit,则直接退出jvm
       ss.close();
       System.exit( 0 );
     else  if  (tokens[ 0 ].equalsIgnoreCase( "source"  )) {   // 如果是source xxx的情况,则按文件处理(调用processFile方法)
       String cmd_1 = getFirstCmd(cmd_trimmed, tokens[ 0 ].length());
       File sourceFile =  new  File(cmd_1);
       if  (! sourceFile.isFile()){
         console.printError(  "File: " + cmd_1 +  " is not a file."  );
         ret =  1 ;
       else  {
         try  {
           this .processFile(cmd_1);
         catch  (IOException e) {
           console.printError(  "Failed processing file " + cmd_1 + " "  + e.getLocalizedMessage(),
             stringifyException(e));
           ret =  1 ;
         }
       }
     else  if  (cmd_trimmed.startsWith( "!"  )) {   // 以!开头的,做为shell命令执行,最终调用Runtime.getRuntime().exec(shell_cmd)
       String shell_cmd = cmd_trimmed.substring( 1 );
       shell_cmd =  new  VariableSubstitution().substitute(ss.getConf(), shell_cmd);   //这里也会进行变量替换
       // shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
       try  {
         Process executor = Runtime. getRuntime().exec(shell_cmd);
         StreamPrinter outPrinter =  new  StreamPrinter(executor.getInputStream(),  null , ss.out);
         StreamPrinter errPrinter =  new  StreamPrinter(executor.getErrorStream(),  null , ss.err);
         outPrinter.start();
         errPrinter.start();
         ret = executor.waitFor();
         if  (ret !=  0 ) {
           console.printError(  "Command failed with exit code = "  + ret);
         }
       catch  (Exception e) {
         console.printError(  "Exception raised from Shell command "  + e.getLocalizedMessage(),
             stringifyException(e));
         ret =  1 ;
       }
     else  if  (tokens[ 0 ].toLowerCase().equals( "list"  )) {  // list命令时,调用SessionState的list_resource方法
       SessionState.ResourceType t;
       if  (tokens. length <  2  || (t = SessionState.find_resource_type(tokens[ 1 ])) ==  null ) {
         console.printError(  "Usage: list ["
             + StringUtils.join(SessionState.ResourceType.values(),  "|" ) +  "] [<value> [<value>]*]" );
         ret =  1 ;
       else  {
         List<String> filter =  null ;
         if  (tokens.length >=  3 ) {
           System. arraycopy(tokens,  2 , tokens,  0 , tokens.length -  2 );
           filter = Arrays. asList(tokens);
         }
         Set<String> s = ss.list_resource(t, filter);
         if  (s !=  null  && !s.isEmpty()) {
           ss.out.println(StringUtils.join(s,  "\n" ));
         }
       }
     else  if  (ss.isRemoteMode()) {  // remote mode -- connecting to remote hive server   //如果是远程模式,即hiveserver,调用HiveClient类的execute方法
       HiveClient client = ss.getClient();
       PrintStream out = ss.out;
       PrintStream err = ss.err;
       try  {
         client.execute(cmd_trimmed);
         List<String> results;
         do  {
           results = client.fetchN( LINES_TO_FETCH);
           for  (String line : results) {
             out.println(line);
           }
         while  (results.size() == LINES_TO_FETCH);
       catch  (HiveServerException e) {
         ret = e.getErrorCode();
         if  (ret !=  0 ) {  // OK if ret == 0 -- reached the EOF
           String errMsg = e.getMessage();
           if  (errMsg ==  null ) {
             errMsg = e.toString();
           }
           ret = e.getErrorCode();
           err.println(  "[Hive Error]: "  + errMsg);
         }
       catch  (TException e) {
         String errMsg = e.getMessage();
         if  (errMsg ==  null ) {
           errMsg = e.toString();
         }
         ret = - 10002 ;
         err.println(  "[Thrift Error]: "  + errMsg);
       finally  {
         try  {
           client.clean();
         catch  (TException e) {
           String errMsg = e.getMessage();
           if  (errMsg ==  null ) {
             errMsg = e.toString();
           }
           err.println(  "[Thrift Error]: Hive server is not cleaned due to thrift exception: "
               + errMsg);
         }
       }
     else  // local mode   // 剩下的情况都作为local模式,比如add xxx,set xxxx,select/insert xxx/show tables/create table,databse/use xxx等命令。
       try  {
         CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);   //会先根据命令获取对应的CommandProcessor 实现类
         ret = processLocalCmd(cmd, proc, ss);   //并调用processLocalCmd方法
       catch  (SQLException e) {
         console.printError(  "Failed processing command "  + tokens[ 0 ] +  " "  + e.getLocalizedMessage(),
           org.apache.hadoop.util.StringUtils.stringifyException(e));
         ret =  1 ;
       }
     }
     return  ret;
   }

而processLocalCmd方法会将CommandProcessor的实例作为参数传入,并根据不同的CommandProcessor实现类,来调用不同的类的run方法。

1
   int  processLocalCmd (String cmd, CommandProcessor proc, CliSessionState ss)


本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1542275,如需转载请自行联系原作者
相关文章
|
7月前
|
SQL HIVE
Hive LAG函数分析
Hive LAG函数分析
86 0
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
48 2
|
6月前
|
SQL 数据采集 数据可视化
基于Hive的招聘网站的大数据分析系统
基于Hive的招聘网站的大数据分析系统
137 2
|
6月前
|
SQL 关系型数据库 MySQL
基于Hive的天气情况大数据分析系统(通过hive进行大数据分析将分析的数据通过sqoop导入到mysql,通过Django基于mysql的数据做可视化)
基于Hive的天气情况大数据分析系统(通过hive进行大数据分析将分析的数据通过sqoop导入到mysql,通过Django基于mysql的数据做可视化)
205 0
|
7月前
|
SQL 数据采集 存储
Hive实战 —— 电商数据分析(全流程详解 真实数据)
关于基于小型数据的Hive数仓构建实战,目的是通过分析某零售企业的门店数据来进行业务洞察。内容涵盖了数据清洗、数据分析和Hive表的创建。项目需求包括客户画像、消费统计、资源利用率、特征人群定位和数据可视化。数据源包括Customer、Transaction、Store和Review四张表,涉及多个维度的聚合和分析,如按性别、国家统计客户、按时间段计算总收入等。项目执行需先下载数据和配置Zeppelin环境,然后通过Hive进行数据清洗、建表和分析。在建表过程中,涉及ODS、DWD、DWT、DWS和DM五层,每层都有其特定的任务和粒度。最后,通过Hive SQL进行各种业务指标的计算和分析。
1014 1
Hive实战 —— 电商数据分析(全流程详解 真实数据)
|
7月前
|
SQL HIVE UED
【Hive SQL 每日一题】分析电商平台的用户行为和订单数据
作为一名数据分析师,你需要分析电商平台的用户行为和订单数据。你有三张表:`users`(用户信息),`orders`(订单信息)和`order_items`(订单商品信息)。任务包括计算用户总订单金额和数量,按月统计订单,找出最常购买的商品,找到平均每月最高订单金额和数量的用户,以及分析高消费用户群体的年龄和性别分布。通过SQL查询,你可以实现这些分析,例如使用`GROUP BY`、`JOIN`和窗口函数来排序和排名。
351 2
|
7月前
|
SQL 数据可视化 关系型数据库
【大数据实训】基于Hive的北京市天气系统分析报告(二)
【大数据实训】基于Hive的北京市天气系统分析报告(二)
202 1
|
7月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
196 1
|
7月前
|
SQL 存储 大数据
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
257 0
|
7月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
272 0