hive0.13 rows loaded为空问题源码分析及fix

简介:

升级hive0.13之后发现job运行完成后Rows loaded的信息没有了。

rows loaded的信息在hive0.11中由HiveHistory类的printRowCount输出。HiveHistory类的主要用途是记录job运行的信息,包括task的counter等。默认的目录在/tmp/$user中。 

hive0.11在SessionState 的start方法中会初始化HiveHistory的对象

1
2
3
  if  (startSs. hiveHist ==  null ) {
       startSs. hiveHist =  new  HiveHistory(startSs);
     }

而在hive0.13中HiveHistory是一个抽象类,其具体的实现在HiveHistoryImpl类中,其中初始化HiveHistoryImpl对象时增加了一层判断,判断hive.session.history.enabled的设置(默认为false),导致不会实例化HiveHistoryImpl类

1
2
3
4
5
6
7
8
   if (startSs.hiveHist ==  null ){
       if  (startSs.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SESSION_HISTORY_ENABLED)) {
         startSs.hiveHist =  new  HiveHistoryImpl (startSs);
       } else  {
         //Hive history is disabled, create a no-op proxy
         startSs.hiveHist = HiveHistoryProxyHandler .getNoOpHiveHistoryProxy();
       }
     }

在fix这个配置之后,仍然没有发现rows loaded的信息,通过分析源码

printRowCount方法的实现如下:

1
2
3
4
5
6
7
8
9
   public  void  printRowCount(String queryId) {
     QueryInfo ji = queryInfoMap.get(queryId);
     if  (ji ==  null ) {    // 如果ji为空,则直接返回
       return ;
     }
     for  (String tab : ji. rowCountMap.keySet()) {
       console.printInfo(ji. rowCountMap.get(tab) +  " Rows loaded to "  + tab);  // 从hashmap中获取数据
     }
   }

在hive0.13中,这里获取的ji对象是空值。

近一步发现,是由于counter中没有TABLE_ID_(\\d+)_ROWCOUNT,导致不能匹配ROW_COUNT_PATTERN的正则。就不能正常获取的row count的值。

其中获取tasker count的rows loaded信息的getRowCountTableName方法内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private  static  final  String ROW_COUNT_PATTERN =  "TABLE_ID_(\\d+)_ROWCOUNT" ;
   private  static  final  Pattern rowCountPattern = Pattern.compile(ROW_COUNT_PATTERN);
......
   String getRowCountTableName(String name) {
     if  (idToTableMap ==  null ) {
       return  null ;
     }
     Matcher m = rowCountPattern.matcher(name);
     if  (m.find()) {   // //没有和TABLE_ID_xxxx match的counter导致,即counter没有打印出TABLE_ID_(\\d+)_ROWCOUNT导致。。
       String tuple = m.group( 1 );
       return  idToTableMap.get(tuple);
     }
     return  null ;
   }

而TABLE_ID_(\\d+)_ROWCOUNT是由FileSinkOperator类负责写入的。hive0.11中相关的代码如下:

1
2
3
4
5
6
7
8
9
   protected  void  initializeOp(Configuration hconf)  throws  HiveException {
..........
       int  id = conf.getDestTableId();
       if  ((id !=  0 ) && (id <= TableIdEnum. values().length)) {
         String enumName =  "TABLE_ID_"  + String.valueOf(id) +  "_ROWCOUNT" ;
         tabIdEnum = TableIdEnum.valueOf(enumName);
         row_count =  new  LongWritable();
         statsMap.put( tabIdEnum, row_count );
       }

而在hive0.13中这部分代码都被去掉了,找到了原因,fix也比较简单,把这个counter加回去就可了。

patch如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 1dde78e..96860f7  100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ - 68 , 13  + 68 , 16  @@
import  org.apache.hadoop.util.ReflectionUtils;
import  com.google.common.collect.Lists;
+ import  org.apache.commons.logging.Log;
+ import  org.apache.commons.logging.LogFactory;
+
/**
   * File Sink operator implementation.
   **/
public  class  FileSinkOperator  extends  TerminalOperator<FileSinkDesc>  implements
      Serializable {
-
+   public  static  Log LOG = LogFactory.getLog( "FileSinkOperator.class" );
    protected  transient  HashMap<String, FSPaths> valToPaths;
    protected  transient  int  numDynParts;
    protected  transient  List<String> dpColNames;
@@ - 214 , 6  + 217 , 7  @@  public  Stat getStat() {
    protected  transient  FileSystem fs;
    protected  transient  Serializer serializer;
    protected  transient  LongWritable row_count;
+   protected  transient  TableIdEnum tabIdEnum =  null ;
    private  transient  boolean  isNativeTable =  true ;
    /**
@@ - 241 , 6  + 245 , 23  @@  public  Stat getStat() {
    protected  transient  JobConf jc;
    Class<?  extends  Writable> outputClass;
    String taskId;
+   public  static  enum  TableIdEnum {
+       TABLE_ID_1_ROWCOUNT,
+       TABLE_ID_2_ROWCOUNT,
+       TABLE_ID_3_ROWCOUNT,
+       TABLE_ID_4_ROWCOUNT,
+       TABLE_ID_5_ROWCOUNT,
+       TABLE_ID_6_ROWCOUNT,
+       TABLE_ID_7_ROWCOUNT,
+       TABLE_ID_8_ROWCOUNT,
+       TABLE_ID_9_ROWCOUNT,
+       TABLE_ID_10_ROWCOUNT,
+       TABLE_ID_11_ROWCOUNT,
+       TABLE_ID_12_ROWCOUNT,
+       TABLE_ID_13_ROWCOUNT,
+       TABLE_ID_14_ROWCOUNT,
+       TABLE_ID_15_ROWCOUNT;
+  }
    protected  boolean  filesCreated =  false ;
@@ - 317 , 7  + 338 , 15  @@  protected  void  initializeOp(Configuration hconf)  throws  HiveException {
          prtner = (HivePartitioner<HiveKey, Object>) ReflectionUtils.newInstance(
              jc.getPartitionerClass(),  null );
        }
-      row_count =  new  LongWritable();
+       //row_count = new LongWritable();
+          int  id = conf.getDestTableId();
+          if  ((id !=  0 ) && (id <= TableIdEnum.values().length)) {
+               String enumName =  "TABLE_ID_"  + String.valueOf(id) +  "_ROWCOUNT" ;  
+               tabIdEnum = TableIdEnum.valueOf(enumName);
+               row_count =  new  LongWritable();
+               statsMap.put(tabIdEnum, row_count);
+         }
+
        if  (dpCtx !=  null ) {
          dpSetup();
        }

打完patch后,重新打包,替换线上的hive-exec-xxx.jar包之后测试,rows loaded的数据又回来了。



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1528516,如需转载请自行联系原作者

相关文章
|
SQL Java 数据库
|
SQL 分布式计算 自然语言处理
|
6月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
191 1
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
36 0
|
4月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。
下一篇
无影云桌面