线上由hive0.10升级到hive0.11之后,部分etl job运行出现问题,具体表现为:
分析hive0.10和hive0.11 mysql中的元数据信息,发现在table的schemal更改后,两种版本下新的partition都会继承table的schemal,而老的partition不会改变。
Rows in partitions are now read using partition schema and than made to comply with table schema, instead of being read directly using table schema.
import psycopg2 import time,sys class gpUtil: def __init__(self,host,user,passwd,db): self.host = host self.user = user self.passwd = passwd self.db = db def db_connect(self): self.conn = psycopg2.connect(host=self.host,user=self.user,password=self.passwd,database=self.db) self.cursor = self.conn.cursor() def db_fetch_all(self,sql): self.cursor.execute(sql) try: result = self.cursor.fetchall() except: result = "Wrong" return result def db_close(self): self.cursor.close() self.conn.close() def get_list(sql): conn = gpUtil(‘xxxx‘,‘xxxxx‘,‘xxxxx‘,‘xxxxxx‘) conn.db_connect() host_list = conn.db_fetch_all(sql) return host_list conn.db_close() def get_item_list(): tble_list = [] d = 0 sql = """ SELECT TBL_NAME,a.TBL_ID,TABLE_CD_ID,PART_NAME,PART_ID,PARTITION_CD_ID,PARITION_SD_ID FROM (select PART_ID,PART_NAME,TBL_ID,CD_ID AS PARTITION_CD_ID,b.SD_ID AS PARITION_SD_ID from vipbi.partitions a,vipbi.sds b where a.SD_ID=b.SD_ID) a join (select TBL_ID,TBL_NAME,CD_ID AS TABLE_CD_ID from vipbi.tbls a,vipbi.sds b where a.SD_ID=b.SD_ID) b on a.TBL_ID=b.TBL_ID and a.PARTITION_CD_ID<>b.TABLE_CD_ID """ re = get_list(sql) for line in re: line = list(line) table_name = line[0] table_id = line[1] table_cd_id = line[2] part_name = line[3] part_id = line[4] part_cd_id = line[5] part_sd_id = line[6] tble_list.append([table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id]) return tble_list def change_partition(tble_list): if len(tble_list) == 0: print "no need to update" sys.exit(1) now = time.time() change_content = ‘‘ rollback_content = ‘‘ change_sql = open("/home/hdfs/bin/hadoop_tools/sql/meta_change" + str(int(now)) + ".sql",‘w+‘) rollback_sql = open("/home/hdfs/bin/hadoop_tools/sql/meta_rollback" + str(int(now)) + ".sql",‘w+‘) for table_item in tble_list: table_name = table_item[0] table_id = table_item[1] table_cd_id = table_item[2] part_name = table_item[3] part_id = table_item[4] part_cd_id = table_item[5] part_sd_id = table_item[6] change_content += """-- table para<name:%s,id:%s,cd_id:%s>; partition para<name:%s,id:%s,cd_id:%s,sd_id:%s>\n""" % (table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id) rollback_content += """-- table para<name:%s,id:%s,cd_id:%s>; partition para<name:%s,id:%s,cd_id:%s,sd_id:%s>\n""" % (table_name,table_id,table_cd_id,part_name,part_id,part_cd_id,part_sd_id) change_content += """ update sds set CD_ID=‘%s‘ where SD_ID=‘%s‘;\n""" % (table_cd_id,part_sd_id) rollback_content += """ update sds set CD_ID=‘%s‘ where SD_ID=‘%s‘;\n""" % (part_cd_id,part_sd_id) change_sql.write(change_content) rollback_sql.write(rollback_content) change_sql.close() rollback_sql.close() if __name__ == ‘__main__‘: #change_content = ‘‘ start_time = time.time() tble_list = [] tble_list = get_item_list() change_partition(tble_list) stop_time = time.time() elapse_time = stop_time - start_time print "elapse_time is %s" % (str(elapse_time))
本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1355216