hive升级至0.11 partition schemal问题

简介:
线上由hive0.10升级到hive0.11之后,部分etl job运行出现问题,具体表现为:
在分区表中,当改变table的表的字段类型之后,旧的partition数据可能会出现NULL的情况。

这其实是hive0.11的一个新的特性,每个partition可以有自己的schemal信息。
分析hive0.10和hive0.11 mysql中的元数据信息,发现在table的schemal更改后,两种版本下新的partition都会继承table的schemal,而老的partition不会改变。

在hive0.11这个特性的描述中:
Rows in partitions are now read using partition schema and than made to comply with table schema, instead of being read directly using table schema.
也就是说,hive0.11的partition都是直接读自己的column_v2表中的信息,而hive0.10是读table的信息,这个特性是默认开启的,这就造成在升级到hive0.11之后部分job运行结果有问题。

解决方法:
通过更新mysql中hive元数据的sds表,把partition的cd_id信息都改成对应的table的cd_id的值即可。

实际操作中需要做3个表的join相关操作,在mysql中计算会比较慢,因此把数据load到gp中进行计算,通过python生成update语句做更新(因为涉及到元数据的更新,暂时还是用了手动的方式来更新,另外做了一个监控,当有数据不一致时会有报警邮件产生):

python代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import  psycopg2
import  time,sys
class  gpUtil:
         def  __init__( self ,host,user,passwd,db):
                 self .host  =  host
                 self .user  =  user
                 self .passwd  =  passwd
                 self .db  =  db
         def  db_connect( self ):
                 self .conn  =  psycopg2.connect(host = self .host,user = self .user,password = self .passwd,database = self .db)
                 self .cursor  =  self .conn.cursor()
         def  db_fetch_all( self ,sql):
                 self .cursor.execute(sql)
                 try :
                         result  =  self .cursor.fetchall()
                 except :
                         result  =  "Wrong"
                 return  result
         def  db_close( self ):
                 self .cursor.close()
                 self .conn.close()
def  get_list(sql):
         conn  =  gpUtil( 'xxxx' , 'xxxxx' , 'xxxxx' , 'xxxxxx' )
         conn.db_connect()
         host_list  =  conn.db_fetch_all(sql)
         return  host_list
         conn.db_close()
def  get_item_list():
                 tble_list  =  []
                 =  0
                 sql  =  """
                 SELECT TBL_NAME,a.TBL_ID,TABLE_CD_ID,PART_NAME,PART_ID,PARTITION_CD_ID,PARITION_SD_ID FROM                                       
(select PART_ID,PART_NAME,TBL_ID,CD_ID AS PARTITION_CD_ID,b.SD_ID  AS PARITION_SD_ID from vipbi.partitions a,vipbi.sds b where a.SD_ID=b.SD_ID)            
a join (select TBL_ID,TBL_NAME,CD_ID AS TABLE_CD_ID from vipbi.tbls a,vipbi.sds b where a.SD_ID=b.SD_ID) b on a.TBL_ID=b.TBL_ID and a.PARTITION_CD_ID<>b.TABLE_CD_ID
                 """
                 re  =  get_list(sql)
                 for  line  in  re:
                         line  =  list (line)
                         table_name  =  line[ 0 ]
                         table_id  =  line[ 1 ]
                         table_cd_id  =  line[ 2 ]
                         part_name  =  line[ 3 ]
                         part_id  =  line[ 4 ]
                         part_cd_id  =  line[ 5 ]
                         part_sd_id  =   line[ 6 ]
                         tble_list.append([table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id])
                 return  tble_list
def  change_partition(tble_list):
                 if  len (tble_list)  = =  0 :
                         print  "no need to update"
                         sys.exit( 1 )
                 now  =  time.time()
                 change_content  =  ''
                 rollback_content  =   ''
                 change_sql  =  open ( "/home/hdfs/bin/hadoop_tools/sql/meta_change"  +  str ( int (now))  +  ".sql" , 'w+' )
                 rollback_sql  =  open ( "/home/hdfs/bin/hadoop_tools/sql/meta_rollback"   +  str ( int (now))  +  ".sql" , 'w+' )
                 for  table_item  in  tble_list:
                         table_name  =  table_item[ 0 ]
                         table_id  =  table_item[ 1 ]
                         table_cd_id  =  table_item[ 2 ]
                         part_name  =  table_item[ 3 ]
                         part_id  =  table_item[ 4 ]
                         part_cd_id  =  table_item[ 5 ]
                         part_sd_id  =  table_item[ 6 ]
                         change_content  + =  """-- table para<name:%s,id:%s,cd_id:%s>; partition para<name:%s,id:%s,cd_id:%s,sd_id:%s>\n"""  %  (table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id)
                         rollback_content  + =  """-- table para<name:%s,id:%s,cd_id:%s>; partition para<name:%s,id:%s,cd_id:%s,sd_id:%s>\n"""  %  (table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id)
                         change_content  + =  """ update sds set CD_ID='%s' where SD_ID='%s';\n"""   %  (table_cd_id,part_sd_id)
                         rollback_content  + =  """ update sds set CD_ID='%s' where SD_ID='%s';\n"""   %  (part_cd_id,part_sd_id)
                 change_sql.write(change_content)
                 rollback_sql.write(rollback_content)
                 change_sql.close()
                 rollback_sql.close()
if  __name__  = =  '__main__' :
                 #change_content = ''
                 start_time  =  time.time()
                 tble_list  =  []
                 tble_list  =  get_item_list()
                 change_partition(tble_list)
                 stop_time  =  time.time()
                 elapse_time  =  stop_time  -  start_time
                 print  "elapse_time is  %s"  %  ( str (elapse_time))


本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1355216,如需转载请自行联系原作者

相关文章
|
SQL HIVE 存储
|
SQL HIVE
hive 动态分区(Dynamic Partition)异常处理
Changing Hive Dynamic Partition Limits Symptoms: Hive enforces limits on the number of dynamic partitions that it creates.
1908 0
|
5月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
104 1
|
5月前
|
SQL 存储 大数据
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
80 0
|
7月前
|
SQL 分布式计算 大数据
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 入门
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 入门
74 0
|
5月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
103 0