QQ交流群:127591054 作者:JackChiang 作者QQ:595696297
Python版本3.5
这个小项目,是解决数据库查看的时候一些错误的,查看。每天都会跑数据,但是第一天的数据和第二天的名字可以是一样的,我第二天还可以在跑一遍。
涉及到的知识:
1、日期获取,例如获取2012-02-21~2013-08-21之间所有的日期列表,需要一天一天查询遍历。
2、正则匹配名字,由于一个调度会生成4条数据,需要找到这4条数据的顺序必须固定才为正确否则就是不对的。
3、然后把查询出来有问题的数据写入到文件。
代码有详细的注解,有问题可以练习我QQ学习哦
# -*- coding: utf-8 -*-
import time
import sys,os #要重新载入sys。因为 Python 初始化后会删除 sys.setdefaultencoding 这个方 法
import ibm_db
import re
import datetime
from datetime import *
#1、首先获取db2数据库连接,进行连接
#2、读取用户要操作的日期,可以输入日期范围和固定日期
#2.1、如果输入指定范围日期格式为2018-09-12~2019-09-21
#如果用户输入的是范围,这时候需要一天一天的进行处理,方法和2.2一样
#2.2、如果输入指定日期为:2019-09-02
#3、可以查询当前日志所有时间范围数据检查
#获取当前脚本所在路径
def cur_file_dir():
path = sys.path[0] #获取脚本路径
if os.path.isdir(path):#判断脚本是文件还是编译后的文件,如果是脚本返回脚本目录,如果是编译文件,返回编译文件路径
return path
elif os.path.isfile(path):
return os.path.dirname(path)
#添加配置文件
def OneFile():
dic = {}
str = cur_file_dir()
file_input = open(str+'\\DB2_Protory.txt','r')
list_file_line = file_input.readlines()
for list1 in list_file_line:
print(list1.strip('\n'))
list1 = list1.strip('\n')
key = (list1.split('='))[0].upper()
value = (list1.split('='))[1]
dic[key]=value
file_input.flush()
file_input.close()
return dic
#首先获取db2数据库连接,进行连接
def ConDB(database,hostname,port,protocol,uid,pwd):
dburl = "DATABASE=%s;HOSTNAME=%s;PORT=%s;PROTOCOL=%s;UID=%s;PWD=%s;"%(database,hostname,port,protocol,uid,pwd)
print(dburl)
conn = ibm_db.connect(dburl, "", "")
if conn:
sql = "SELECT service_level, fixpack_num FROM TABLE(sysproc.env_get_inst_info())as INSTANCEINFO"
stmt = ibm_db.exec_immediate(conn,sql)
result = ibm_db.fetch_both(stmt)
print('----------------------连接成功---------------------')
print('---地址:%s 数据库:%s 用户:%s'%(hostname,database,uid))
print('DB2 version: ',result[0])
return conn
else:
return
#用来关闭数据库连接
def CloseClo(conn):
ibm_db.close(conn)
#日期处理
def CurDate(str):
#首先判断当前日期是连续的还是单一天数的。
str_Date = re.split(r'~',str)
#判断用户输入日期是不是有效日期
flog = 1
try:
for i in range(len(str_Date)):
datetime.strptime(str_Date[i],"%Y-%m-%d")
except:
flog = 0
# 代表有两个日期,或者就是只有一个日期
if len(str_Date) > 1 and flog == 1:
#判断后面输入的日期是否大于前面的日期,验证用户输入数据是否正确
if datetime.strptime(str_Date[0],"%Y-%m-%d").date() > datetime.strptime(str_Date[1],"%Y-%m-%d").date():
#插入0代表数据错误
str_Date.insert(0,0)
return str_Date
else:
#否则插入2代表有两个日期
str_Date.insert(0,2)
return str_Date
elif len(str_Date) == 1 and flog == 1:
str_Date.insert(0,1)
else:
str_Date.insert(0,0)
return str_Date
#获取两个日期中间的日期列表
def gen_dates(b_date, days):
day = timedelta(days=1)
for i in range(days):
yield b_date + day*i
def get_date_list(user_say_date):
"""
获取日期列表
:param start: 开始日期
:param end: 结束日期
:return:
"""
data = []
if user_say_date[0] == 1:
date.append(user_say_date[1])
return data
elif user_say_date[0] == 2:
start = datetime.strptime(user_say_date[1],"%Y-%m-%d").date()
end = datetime.strptime(user_say_date[2], "%Y-%m-%d").date()
for d in gen_dates(start, (end-start).days):
data.append(d)
#最后把日期加入
date.append(user_say_date[2])
return data
else:
data = []
return data
#查询根据时间所判断有问题的数据
def SelSQl(success_date,conn):
#开始处理
#拼接处理SQL
SQL= "这里写的是操作的代码"%success_date
stmt = ibm_db.exec_immediate(conn, SQL)
#一行一行的读取
result = ibm_db.fetch_both(stmt)
# 设置默认值
flog = 0
a = ['', '', '', '']
seq_id = [0, 0, 0, 0]
read_error = 0 #记录出错次数,4个为一次
sum_date = 0 #记录总数
#打开文件准备记录错误信息
str = cur_file_dir()
f1 = open(str+"\\ErrorSQL.SQL",'a')
while (result):
# 四条数据做一次判断
sum_date = sum_date + 1
if flog == 4:
#列表数据达到四条,开始判断顺序
a0 = r"^LD_.*?_INIT$" #第一行匹配格式
a1 = r"^AP_.*?_INIT$" # 第二行匹配格式
a2 = r"^LD_.*?[^I]?[^N]?[^I]?[^T]{1}$" # 第三行匹配格式
a3 = r"^AP_.*?[^I]?[^N]?[^I]?[^T]{1}$" # 第四行匹配格式
print(a)
if re.match(a0, a[0]) != None and re.match(a1, a[1]) != None and re.match(a2, a[2]) != None and re.match(a3,a[3]) != None:
pass
else:
# 如果其中有一个匹配不正确,代表数据顺序有问题。
# 所以要把错误信息放入到列表
read_error = read_error + 1
if read_error == 1:
print('---------------------------日期:%s----------------------------'%success_date)
f1.write('---------------------------日期:%s----------------------------'%success_date)
f1.write('\n')
print('发现数据出现问题!')
# 把出错信息写入到文件。
ErrorSQL = "SELECT * FROM ETL.JOB_LOG A WHERE JOB_SEQ_ID IN(%d,%d,%d,%d) UNION ALL" % (
seq_id[0], seq_id[1], seq_id[2], seq_id[3])
f1.write(ErrorSQL)
f1.write('\n')
# 把每条时间放入指定列表位置,四条数据刷新一次
flog = 0
a = ['', '', '', '']
seq_id = [0, 0, 0, 0]
a[flog] = result[2]
seq_id[flog] = result[0]
flog = flog + 1
result = ibm_db.fetch_both(stmt)
#写完错误关掉文件
if read_error > 0 :
f1.write('总行数为:%s ,出错的信息有: %s 次'%(sum_date/4,read_error))
f1.write('\n')
f1.close()
print('总行数为:%s ,出错的信息有: %s 次'%(sum_date/4,read_error))
#Main
dice = OneFile()
conn = None
try:
#conn = ConDB(dice['DATABASE'],dice['HOSTNAME'],dice['PORT'],dice['PROTOCOL'],dice['UID'],dice['PWD'])
user_say = input("请输入指定范围日期格式例如为2018-09-12~2019-09-21,或者输入指定一天日期例如:2019-09-21")
user_say_date = CurDate(user_say)
print(user_say_date)
data = get_date_list(user_say_date)
print(data)
#获取到一个日期列表然后循环进行查询
#for current_data in data:
# SelSQl(current_data,conn)
CloseClo(conn)
except Exception as ex:
CloseClo(conn)
print(ex)
finally:
CloseClo(conn)