线上由hive0.10升级到hive0.11之后,部分etl job运行出现问题,具体表现为:
在分区表中,当改变table的表的字段类型之后,旧的partition数据可能会出现NULL的情况。
这其实是hive0.11的一个新的特性,每个partition可以有自己的schemal信息。
分析hive0.10和hive0.11 mysql中的元数据信息,发现在table的schemal更改后,两种版本下新的partition都会继承table的schemal,而老的partition不会改变。
在hive0.11这个特性的描述中:
Rows in partitions are now read using partition schema and than made to comply with table schema, instead of being read directly using table schema.
也就是说,hive0.11的partition都是直接读自己的column_v2表中的信息,而hive0.10是读table的信息,这个特性是默认开启的,这就造成在升级到hive0.11之后部分job运行结果有问题。
解决方法:
通过更新mysql中hive元数据的sds表,把partition的cd_id信息都改成对应的table的cd_id的值即可。
实际操作中需要做3个表的join相关操作,在mysql中计算会比较慢,因此把数据load到gp中进行计算,通过python生成update语句做更新(因为涉及到元数据的更新,暂时还是用了手动的方式来更新,另外做了一个监控,当有数据不一致时会有报警邮件产生):
python代码如下:
import psycopg2 import time,sys class gpUtil: def __init__(self,host,user,passwd,db): self.host = host self.user = user self.passwd = passwd self.db = db def db_connect(self): self.conn = psycopg2.connect(host=self.host,user=self.user,password=self.passwd,database=self.db) self.cursor = self.conn.cursor() def db_fetch_all(self,sql): self.cursor.execute(sql) try: result = self.cursor.fetchall() except: result = "Wrong" return result def db_close(self): self.cursor.close() self.conn.close() def get_list(sql): conn = gpUtil(‘xxxx‘,‘xxxxx‘,‘xxxxx‘,‘xxxxxx‘) conn.db_connect() host_list = conn.db_fetch_all(sql) return host_list conn.db_close() def get_item_list(): tble_list = [] d = 0 sql = """ SELECT TBL_NAME,a.TBL_ID,TABLE_CD_ID,PART_NAME,PART_ID,PARTITION_CD_ID,PARITION_SD_ID FROM (select PART_ID,PART_NAME,TBL_ID,CD_ID AS PARTITION_CD_ID,b.SD_ID AS PARITION_SD_ID from vipbi.partitions a,vipbi.sds b where a.SD_ID=b.SD_ID) a join (select TBL_ID,TBL_NAME,CD_ID AS TABLE_CD_ID from vipbi.tbls a,vipbi.sds b where a.SD_ID=b.SD_ID) b on a.TBL_ID=b.TBL_ID and a.PARTITION_CD_ID<>b.TABLE_CD_ID """ re = get_list(sql) for line in re: line = list(line) table_name = line[0] table_id = line[1] table_cd_id = line[2] part_name = line[3] part_id = line[4] part_cd_id = line[5] part_sd_id = line[6] tble_list.append([table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id]) return tble_list def change_partition(tble_list): if len(tble_list) == 0: print "no need to update" sys.exit(1) now = time.time() change_content = ‘‘ rollback_content = ‘‘ change_sql = open("/home/hdfs/bin/hadoop_tools/sql/meta_change" + str(int(now)) + ".sql",‘w+‘) rollback_sql = open("/home/hdfs/bin/hadoop_tools/sql/meta_rollback" + str(int(now)) + ".sql",‘w+‘) for table_item in tble_list: table_name = table_item[0] table_id = table_item[1] table_cd_id = table_item[2] part_name = table_item[3] part_id = table_item[4] part_cd_id = table_item[5] part_sd_id = table_item[6] change_content += """-- table para<name:%s,id:%s,cd_id:%s>; partition para<name:%s,id:%s,cd_id:%s,sd_id:%s>\n""" % (table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id) rollback_content += """-- table para<name:%s,id:%s,cd_id:%s>; partition para<name:%s,id:%s,cd_id:%s,sd_id:%s>\n""" % (table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id) change_content += """ update sds set CD_ID=‘%s‘ where SD_ID=‘%s‘;\n""" % (table_cd_id,part_sd_id) rollback_content += """ update sds set CD_ID=‘%s‘ where SD_ID=‘%s‘;\n""" % (part_cd_id,part_sd_id) change_sql.write(change_content) rollback_sql.write(rollback_content) change_sql.close() rollback_sql.close() if __name__ == ‘__main__‘: #change_content = ‘‘ start_time = time.time() tble_list = [] tble_list = get_item_list() change_partition(tble_list) stop_time = time.time() elapse_time = stop_time - start_time print "elapse_time is %s" % (str(elapse_time))
本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1355216