利用PyODPS 获取项目的元信息

刚进公司的一段时间,需要每天与ODPS数据表打交道。限于团队中文档梳理有限,所以通过PyODPS制作了3张元信息表:meta_tablesmeta_fieldsmeta_instances,供自己及团队同学使用。其中,主要利用了odps.list_tables()odps.list_instances()

表结构

  • meta_tables:分区表,每天定时对全部数据表进行更新。可用于项目下表容量盘查、旧表名单拉取、人员贡献统计。
字段 类型 注释
tbl_name string 表名
tbl_comment string 表注释
tbl_owner string 作者
tbl_pt_name string (如果是分区表)分区名
tbl_ddl_tm datetime 最近创建时间
tbl_mod_tm datetime 最近更新时间
etl_tm datetime ETL时间
pt string 按日期分区
  • meta_fields:分区表,每天定时对近24小时有过表结构修改的表进行更新。可用于字段类型、注释是否一致的校验。
字段 类型 注释
fld_name string 字段名
fld_type string 字段类型
fld_comment string 字段注释
etl_tm datetime ETL时间
tbl_name string 按表名分区
  • meta_instances:分区表,每天定时遍历00:00~06:00实例。可用于表间血缘关系提取、实例耗时统计。
字段 类型 注释
ins_name string 实例名
start_tm datetime 开始时间
end_tm datetime 结束时间
cost_tm bigint 总耗时(秒)
status string 实例状态
ins_owner string 作者
tsk_name string 子任务
tbl_in string 输入表(以,分割)
tbl_out string 输出表(以,分割)
etl_tm datetime ETL时间
pt string 按日期分区

获取代码

  • get_table_meta.py
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_table_meta.py
# 调度: 每日早6点调度
# 日志: get_table_meta.log
# ###########################################################################################

import os
from datetime import datetime
from odps import ODPS
from odps.models import Schema, Column, Partition

start_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cur_path = os.path.split(os.path.realpath(__file__))[0]

odps = ODPS(access_id='<access_id>',
            secret_access_key='<access_key>',
            project='<project>',
            endpoint='http://service.odps.aliyun.com/api',
            tunnel_endpoint='http://dt.odps.aliyun.com')

to_table = 'meta_tables'
columns = [Column(name='tbl_name', type='string', comment='表名'),
           Column(name='tbl_comment', type='string', comment='表注释'),
           Column(name='tbl_owner', type='string', comment='作者'),
           Column(name='tbl_pt_name', type='string', comment='(如果是分区表)分区名'),
           Column(name='tbl_ddl_tm', type='datetime', comment='最近创建时间'),
           Column(name='tbl_mod_tm', type='datetime', comment='最近更新时间'),
           Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='pt', type='string', comment='按日期分区')]
schema = Schema(columns=columns, partitions=partitions)

records = []
try:
    for tbl in odps.list_tables():
        tm = datetime.now()
        records.append([tbl.name, tbl.comment, tbl.owner.split(':')[-1],
                        tbl.schema.partitions[0].name if tbl.schema.partitions else None,
                        tbl.last_meta_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
                        tbl.last_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
                        tm.strftime('%Y-%m-%d %H:%M:%S')])
    partition = '%s=%s' % (partitions[0].name, datetime.now().strftime('%Y%m%d'))
    to_tbl = odps.create_table(to_table, schema, if_not_exists=True)
    to_tbl.delete_partition(partition, if_exists=True)
    odps.write_table(to_table, records, partition=partition, create_partition=True)

except:
    status = 'failed'
    n = 0
else:
    status = 'succeed'
    n = len(records)

end_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
f = open(os.path.join(cur_path, 'get_table_meta.log'), 'a')
f.write("Update {status} with {n} tables from {start} to {end}\n".format(**log))
f.close()
  • get_field_meta.py
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_field_meta.py
# 调度: 每日早6点调度
# 日志: get_field_meta.log
# ###########################################################################################

import os
from datetime import datetime
from odps import ODPS
from odps.models import Schema, Column, Partition

start_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cur_path = os.path.split(os.path.realpath(__file__))[0]

to_table = 'meta_fields'
odps = ODPS(access_id='<access_id>',
            secret_access_key='<access_key>',
            project='<project>',
            endpoint='http://service.odps.aliyun.com/api',
            tunnel_endpoint='http://dt.odps.aliyun.com')


columns = [Column(name='fld_name', type='string', comment='字段名'),
           Column(name='fld_type', type='string', comment='字段类型'),
           Column(name='fld_comment', type='string', comment='字段注释'),
           Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='tbl_name', type='string', comment='表名')]
schema = Schema(columns=columns, partitions=partitions)
try:
    to_tbl = odps.create_table(to_table, schema, if_not_exists=True)
    n = 0
    for tbl in odps.list_tables():
        tm = datetime.now()
        last_mod_tm = tbl.last_meta_modified_time
        if (tm - last_mod_tm).days > 0:
            continue
        else:
            cols = tbl.schema.get_columns()
            records = [[col.name, str(col.type).lower(), col.comment, tm.strftime('%Y-%m-%d %H:%M:%S')] for col in cols]
            partition = '%s=%s' %(partitions[0].name, tbl.name)
            to_tbl.delete_partition(partition, if_exists=True)
            odps.write_table(to_table, records, partition=partition, create_partition=True)
            n +=1
except:
    status = 'failed'
    n = 0
else:
    status = 'succeed'

end_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
f = open(os.path.join(cur_path, 'get_field_meta.log'), 'a')
f.write("Update {status} with {n} tables from {start} to {end}\n".format(**log))
f.close()
  • get_instance_meta.py
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_instance_meta.py
# 调度: 每日早6点调度
# 日志: get_instance_meta.log
# ###########################################################################################

import os
import re
from datetime import datetime, date, time, timedelta
from odps import ODPS
from odps.models import Schema, Column, Partition

start_tm = datetime.now()
today_min = datetime.combine(date.today(), time.min)
cur_path = os.path.split(os.path.realpath(__file__))[0]

to_table = 'meta_instances'
odps = ODPS(access_id='<access_id>',
            secret_access_key='<access_key>',
            project='<project>',
            endpoint='http://service.odps.aliyun.com/api',
            tunnel_endpoint='http://dt.odps.aliyun.com')

columns = [Column(name='ins_name', type='string', comment='实例名'),
           Column(name='start_tm', type='datetime', comment='开始时间'),
           Column(name='end_tm', type='datetime', comment='结束时间'),
           Column(name='cost_tm', type='bigint', comment='总耗时(秒)'),
           Column(name='status', type='string', comment='实例状态'),
           Column(name='ins_owner', type='string', comment='作者'),
           Column(name='tsk_name', type='string', comment='子任务'),
           Column(name='tbl_in', type='string', comment='输入表(以,分割)'),
           Column(name='tbl_out', type='string', comment='输出表(以,分割)'),
           Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='pt', type='string', comment='按日期分区')]
schema = Schema(columns=columns, partitions=partitions)

records = []
try:
    for ins in odps.list_instances(start_time=today_min,
                                   end_time=start_tm,
                                   only_owner=False,
                                   status='Terminated'):
        tsk_name_filter = [re.match('console_query_task', tsk) for tsk in ins.get_task_names()]
        try:
            tsk_output_filter = [ins.get_task_summary(tsk) if not ins.get_task_summary(tsk)
                                 else ins.get_task_summary(tsk).get('Outputs')
                                 for tsk in ins.get_task_names()]
        except:
            continue
        else:
            pass
        # 这里过滤了没有输入表、输出表的实例。这段代码初衷就是提取表间依赖关系,所以没有考虑所有实例
        if ins.is_successful() and any(tsk_name_filter) and any(tsk_output_filter):
            start_time = ins.start_time + timedelta(hours=8)
            end_time = ins.end_time + timedelta(hours=8)
            tbl_in = set()
            tbl_out = set()
            for tsk in ins.get_task_names():
                smy = ins.get_task_summary(tsk)
                tbl_in.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1) for key in smy['Inputs'].keys()])
                tbl_out.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1) for key in smy['Outputs'].keys()])
            records.append([ins.name,
                            start_time.strftime('%Y-%m-%d %H:%M:%S'),
                            end_time.strftime('%Y-%m-%d %H:%M:%S'),
                            (end_time - start_time).seconds,
                            ins.status.value.lower(),
                            ins.owner.split(':')[-1],
                            ','.join(ins.get_task_names()) if ins.get_task_names() else None,
                            ','.join(tbl_in) if tbl_in else None,
                            ','.join(tbl_out) if tbl_out else None,
                            datetime.now().strftime('%Y-%m-%d %H:%M:%S')])
    partition = '%s=%s' % (partitions[0].name, start_tm.strftime('%Y%m%d'))
    to_tbl = odps.create_table(to_table, schema, if_not_exists=True)
    to_tbl.delete_partition(partition, if_exists=True)
    odps.write_table(to_table, records, partition=partition, create_partition=True)

except:
    status = 'failed'
    n = 0
else:
    status = 'succeed'
    n = len(records)

end_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
f = open(os.path.join(cur_path, 'get_field_meta.log'), 'a')
f.write("Update {status} with {n} instances from {start} to {end}\n".format(**log))
f.close()
上一篇:JAVA开发规范


下一篇:CentOS系统通过PXE实现批量无人值守安装