刚进公司的一段时间,需要每天与ODPS数据表打交道。限于团队中文档梳理有限,所以通过PyODPS制作了3张元信息表:meta_tables
、meta_fields
、meta_instances
,供自己及团队同学使用。其中,主要利用了odps.list_tables()
和odps.list_instances()
。
表结构
-
meta_tables
:分区表,每天定时对全部数据表进行更新。可用于项目下表容量盘查、旧表名单拉取、人员贡献统计。
字段 |
类型 |
注释 |
tbl_name |
string |
表名 |
tbl_comment |
string |
表注释 |
tbl_owner |
string |
作者 |
tbl_pt_name |
string |
(如果是分区表)分区名 |
tbl_ddl_tm |
datetime |
最近创建时间 |
tbl_mod_tm |
datetime |
最近更新时间 |
etl_tm |
datetime |
ETL时间 |
pt |
string |
按日期分区 |
-
meta_fields
:分区表,每天定时对近24小时有过表结构修改的表进行更新。可用于字段类型、注释是否一致的校验。
字段 |
类型 |
注释 |
fld_name |
string |
字段名 |
fld_type |
string |
字段类型 |
fld_comment |
string |
字段注释 |
etl_tm |
datetime |
ETL时间 |
tbl_name |
string |
按表名分区 |
-
meta_instances
:分区表,每天定时遍历00:00~06:00实例。可用于表间血缘关系提取、实例耗时统计。
字段 |
类型 |
注释 |
ins_name |
string |
实例名 |
start_tm |
datetime |
开始时间 |
end_tm |
datetime |
结束时间 |
cost_tm |
bigint |
总耗时(秒) |
status |
string |
实例状态 |
ins_owner |
string |
作者 |
tsk_name |
string |
子任务 |
tbl_in |
string |
输入表(以,分割) |
tbl_out |
string |
输出表(以,分割) |
etl_tm |
datetime |
ETL时间 |
pt |
string |
按日期分区 |
获取代码
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_table_meta.py
# 调度: 每日早6点调度
# 日志: get_table_meta.log
# ###########################################################################################
import os
from datetime import datetime
from odps import ODPS
from odps.models import Schema, Column, Partition
start_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cur_path = os.path.split(os.path.realpath(__file__))[0]
odps = ODPS(access_id='<access_id>',
secret_access_key='<access_key>',
project='<project>',
endpoint='http://service.odps.aliyun.com/api',
tunnel_endpoint='http://dt.odps.aliyun.com')
to_table = 'meta_tables'
columns = [Column(name='tbl_name', type='string', comment='表名'),
Column(name='tbl_comment', type='string', comment='表注释'),
Column(name='tbl_owner', type='string', comment='作者'),
Column(name='tbl_pt_name', type='string', comment='(如果是分区表)分区名'),
Column(name='tbl_ddl_tm', type='datetime', comment='最近创建时间'),
Column(name='tbl_mod_tm', type='datetime', comment='最近更新时间'),
Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='pt', type='string', comment='按日期分区')]
schema = Schema(columns=columns, partitions=partitions)
records = []
try:
for tbl in odps.list_tables():
tm = datetime.now()
records.append([tbl.name, tbl.comment, tbl.owner.split(':')[-1],
tbl.schema.partitions[0].name if tbl.schema.partitions else None,
tbl.last_meta_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
tbl.last_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
tm.strftime('%Y-%m-%d %H:%M:%S')])
partition = '%s=%s' % (partitions[0].name, datetime.now().strftime('%Y%m%d'))
to_tbl = odps.create_table(to_table, schema, if_not_exists=True)
to_tbl.delete_partition(partition, if_exists=True)
odps.write_table(to_table, records, partition=partition, create_partition=True)
except:
status = 'failed'
n = 0
else:
status = 'succeed'
n = len(records)
end_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
f = open(os.path.join(cur_path, 'get_table_meta.log'), 'a')
f.write("Update {status} with {n} tables from {start} to {end}\n".format(**log))
f.close()
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_field_meta.py
# 调度: 每日早6点调度
# 日志: get_field_meta.log
# ###########################################################################################
import os
from datetime import datetime
from odps import ODPS
from odps.models import Schema, Column, Partition
start_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cur_path = os.path.split(os.path.realpath(__file__))[0]
to_table = 'meta_fields'
odps = ODPS(access_id='<access_id>',
secret_access_key='<access_key>',
project='<project>',
endpoint='http://service.odps.aliyun.com/api',
tunnel_endpoint='http://dt.odps.aliyun.com')
columns = [Column(name='fld_name', type='string', comment='字段名'),
Column(name='fld_type', type='string', comment='字段类型'),
Column(name='fld_comment', type='string', comment='字段注释'),
Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='tbl_name', type='string', comment='表名')]
schema = Schema(columns=columns, partitions=partitions)
try:
to_tbl = odps.create_table(to_table, schema, if_not_exists=True)
n = 0
for tbl in odps.list_tables():
tm = datetime.now()
last_mod_tm = tbl.last_meta_modified_time
if (tm - last_mod_tm).days > 0:
continue
else:
cols = tbl.schema.get_columns()
records = [[col.name, str(col.type).lower(), col.comment, tm.strftime('%Y-%m-%d %H:%M:%S')] for col in cols]
partition = '%s=%s' %(partitions[0].name, tbl.name)
to_tbl.delete_partition(partition, if_exists=True)
odps.write_table(to_table, records, partition=partition, create_partition=True)
n +=1
except:
status = 'failed'
n = 0
else:
status = 'succeed'
end_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
f = open(os.path.join(cur_path, 'get_field_meta.log'), 'a')
f.write("Update {status} with {n} tables from {start} to {end}\n".format(**log))
f.close()
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_instance_meta.py
# 调度: 每日早6点调度
# 日志: get_instance_meta.log
# ###########################################################################################
import os
import re
from datetime import datetime, date, time, timedelta
from odps import ODPS
from odps.models import Schema, Column, Partition
start_tm = datetime.now()
today_min = datetime.combine(date.today(), time.min)
cur_path = os.path.split(os.path.realpath(__file__))[0]
to_table = 'meta_instances'
odps = ODPS(access_id='<access_id>',
secret_access_key='<access_key>',
project='<project>',
endpoint='http://service.odps.aliyun.com/api',
tunnel_endpoint='http://dt.odps.aliyun.com')
columns = [Column(name='ins_name', type='string', comment='实例名'),
Column(name='start_tm', type='datetime', comment='开始时间'),
Column(name='end_tm', type='datetime', comment='结束时间'),
Column(name='cost_tm', type='bigint', comment='总耗时(秒)'),
Column(name='status', type='string', comment='实例状态'),
Column(name='ins_owner', type='string', comment='作者'),
Column(name='tsk_name', type='string', comment='子任务'),
Column(name='tbl_in', type='string', comment='输入表(以,分割)'),
Column(name='tbl_out', type='string', comment='输出表(以,分割)'),
Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='pt', type='string', comment='按日期分区')]
schema = Schema(columns=columns, partitions=partitions)
records = []
try:
for ins in odps.list_instances(start_time=today_min,
end_time=start_tm,
only_owner=False,
status='Terminated'):
tsk_name_filter = [re.match('console_query_task', tsk) for tsk in ins.get_task_names()]
try:
tsk_output_filter = [ins.get_task_summary(tsk) if not ins.get_task_summary(tsk)
else ins.get_task_summary(tsk).get('Outputs')
for tsk in ins.get_task_names()]
except:
continue
else:
pass
# 这里过滤了没有输入表、输出表的实例。这段代码初衷就是提取表间依赖关系,所以没有考虑所有实例
if ins.is_successful() and any(tsk_name_filter) and any(tsk_output_filter):
start_time = ins.start_time + timedelta(hours=8)
end_time = ins.end_time + timedelta(hours=8)
tbl_in = set()
tbl_out = set()
for tsk in ins.get_task_names():
smy = ins.get_task_summary(tsk)
tbl_in.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1) for key in smy['Inputs'].keys()])
tbl_out.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1) for key in smy['Outputs'].keys()])
records.append([ins.name,
start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S'),
(end_time - start_time).seconds,
ins.status.value.lower(),
ins.owner.split(':')[-1],
','.join(ins.get_task_names()) if ins.get_task_names() else None,
','.join(tbl_in) if tbl_in else None,
','.join(tbl_out) if tbl_out else None,
datetime.now().strftime('%Y-%m-%d %H:%M:%S')])
partition = '%s=%s' % (partitions[0].name, start_tm.strftime('%Y%m%d'))
to_tbl = odps.create_table(to_table, schema, if_not_exists=True)
to_tbl.delete_partition(partition, if_exists=True)
odps.write_table(to_table, records, partition=partition, create_partition=True)
except:
status = 'failed'
n = 0
else:
status = 'succeed'
n = len(records)
end_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
f = open(os.path.join(cur_path, 'get_field_meta.log'), 'a')
f.write("Update {status} with {n} instances from {start} to {end}\n".format(**log))
f.close()