线上由hive0.10升级到hive0.11之后,部分etl job运行出现问题,具体表现为:
在分区表中,当改变table的表的字段类型之后,旧的partition数据可能会出现NULL的情况。
这其实是hive0.11的一个新的特性,每个partition可以有自己的schemal信息。
分析hive0.10和hive0.11 mysql中的元数据信息,发现在table的schemal更改后,两种版本下新的partition都会继承table的schemal,而老的partition不会改变。
在hive0.11这个特性的描述中:
Rows in partitions are now read using partition schema and than made to comply with table schema, instead of being read directly using table schema.
也就是说,hive0.11的partition都是直接读自己的column_v2表中的信息,而hive0.10是读table的信息,这个特性是默认开启的,这就造成在升级到hive0.11之后部分job运行结果有问题。
解决方法:
通过更新mysql中hive元数据的sds表,把partition的cd_id信息都改成对应的table的cd_id的值即可。
实际操作中需要做3个表的join相关操作,在mysql中计算会比较慢,因此把数据load到gp中进行计算,通过python生成update语句做更新(因为涉及到元数据的更新,暂时还是用了手动的方式来更新,另外做了一个监控,当有数据不一致时会有报警邮件产生):
python代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
import
psycopg2
import
time,sys
class
gpUtil:
def
__init__(
self
,host,user,passwd,db):
self
.host
=
host
self
.user
=
user
self
.passwd
=
passwd
self
.db
=
db
def
db_connect(
self
):
self
.conn
=
psycopg2.connect(host
=
self
.host,user
=
self
.user,password
=
self
.passwd,database
=
self
.db)
self
.cursor
=
self
.conn.cursor()
def
db_fetch_all(
self
,sql):
self
.cursor.execute(sql)
try
:
result
=
self
.cursor.fetchall()
except
:
result
=
"Wrong"
return
result
def
db_close(
self
):
self
.cursor.close()
self
.conn.close()
def
get_list(sql):
conn
=
gpUtil(
'xxxx'
,
'xxxxx'
,
'xxxxx'
,
'xxxxxx'
)
conn.db_connect()
host_list
=
conn.db_fetch_all(sql)
return
host_list
conn.db_close()
def
get_item_list():
tble_list
=
[]
d
=
0
sql
=
"""
SELECT TBL_NAME,a.TBL_ID,TABLE_CD_ID,PART_NAME,PART_ID,PARTITION_CD_ID,PARITION_SD_ID FROM
(select PART_ID,PART_NAME,TBL_ID,CD_ID AS PARTITION_CD_ID,b.SD_ID AS PARITION_SD_ID from vipbi.partitions a,vipbi.sds b where a.SD_ID=b.SD_ID)
a join (select TBL_ID,TBL_NAME,CD_ID AS TABLE_CD_ID from vipbi.tbls a,vipbi.sds b where a.SD_ID=b.SD_ID) b on a.TBL_ID=b.TBL_ID and a.PARTITION_CD_ID<>b.TABLE_CD_ID
"""
re
=
get_list(sql)
for
line
in
re:
line
=
list
(line)
table_name
=
line[
0
]
table_id
=
line[
1
]
table_cd_id
=
line[
2
]
part_name
=
line[
3
]
part_id
=
line[
4
]
part_cd_id
=
line[
5
]
part_sd_id
=
line[
6
]
tble_list.append([table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id])
return
tble_list
def
change_partition(tble_list):
if
len
(tble_list)
=
=
0
:
print
"no need to update"
sys.exit(
1
)
now
=
time.time()
change_content
=
''
rollback_content
=
''
change_sql
=
open
(
"/home/hdfs/bin/hadoop_tools/sql/meta_change"
+
str
(
int
(now))
+
".sql"
,
'w+'
)
rollback_sql
=
open
(
"/home/hdfs/bin/hadoop_tools/sql/meta_rollback"
+
str
(
int
(now))
+
".sql"
,
'w+'
)
for
table_item
in
tble_list:
table_name
=
table_item[
0
]
table_id
=
table_item[
1
]
table_cd_id
=
table_item[
2
]
part_name
=
table_item[
3
]
part_id
=
table_item[
4
]
part_cd_id
=
table_item[
5
]
part_sd_id
=
table_item[
6
]
change_content
+
=
"""-- table para<name:%s,id:%s,cd_id:%s>; partition para<name:%s,id:%s,cd_id:%s,sd_id:%s>\n"""
%
(table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id)
rollback_content
+
=
"""-- table para<name:%s,id:%s,cd_id:%s>; partition para<name:%s,id:%s,cd_id:%s,sd_id:%s>\n"""
%
(table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id)
change_content
+
=
""" update sds set CD_ID='%s' where SD_ID='%s';\n"""
%
(table_cd_id,part_sd_id)
rollback_content
+
=
""" update sds set CD_ID='%s' where SD_ID='%s';\n"""
%
(part_cd_id,part_sd_id)
change_sql.write(change_content)
rollback_sql.write(rollback_content)
change_sql.close()
rollback_sql.close()
if
__name__
=
=
'__main__'
:
#change_content = ''
start_time
=
time.time()
tble_list
=
[]
tble_list
=
get_item_list()
change_partition(tble_list)
stop_time
=
time.time()
elapse_time
=
stop_time
-
start_time
print
"elapse_time is %s"
%
(
str
(elapse_time))
|
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1355216,如需转载请自行联系原作者