Python连接DB2操作的一个小例子

QQ交流群:127591054 作者:JackChiang 作者QQ:595696297

Python版本3.5

这个小项目,是解决数据库查看的时候一些错误的,查看。每天都会跑数据,但是第一天的数据和第二天的名字可以是一样的,我第二天还可以在跑一遍。

涉及到的知识:
1、日期获取,例如获取2012-02-21~2013-08-21之间所有的日期列表,需要一天一天查询遍历。
2、正则匹配名字,由于一个调度会生成4条数据,需要找到这4条数据的顺序必须固定才为正确否则就是不对的。
3、然后把查询出来有问题的数据写入到文件。

代码有详细的注解,有问题可以练习我QQ学习哦

# -*- coding: utf-8 -*-
import time
import sys,os #要重新载入sys。因为 Python 初始化后会删除 sys.setdefaultencoding 这个方 法
import ibm_db
import re
import datetime
from datetime import *

#1、首先获取db2数据库连接,进行连接
#2、读取用户要操作的日期,可以输入日期范围和固定日期
    #2.1、如果输入指定范围日期格式为2018-09-12~2019-09-21
        #如果用户输入的是范围,这时候需要一天一天的进行处理,方法和2.2一样
    #2.2、如果输入指定日期为:2019-09-02
#3、可以查询当前日志所有时间范围数据检查


#获取当前脚本所在路径
def cur_file_dir():
    path = sys.path[0] #获取脚本路径
    if os.path.isdir(path):#判断脚本是文件还是编译后的文件,如果是脚本返回脚本目录,如果是编译文件,返回编译文件路径
        return path
    elif os.path.isfile(path):
        return os.path.dirname(path)

#添加配置文件
def OneFile():
    dic = {}
    str = cur_file_dir()
    file_input = open(str+'\\DB2_Protory.txt','r')
    list_file_line = file_input.readlines()
    for list1 in list_file_line:
        print(list1.strip('\n'))
        list1 = list1.strip('\n')
        key = (list1.split('='))[0].upper()
        value = (list1.split('='))[1]
        dic[key]=value
        file_input.flush()
    file_input.close()
    return dic

#首先获取db2数据库连接,进行连接
def ConDB(database,hostname,port,protocol,uid,pwd):
    dburl = "DATABASE=%s;HOSTNAME=%s;PORT=%s;PROTOCOL=%s;UID=%s;PWD=%s;"%(database,hostname,port,protocol,uid,pwd)
    print(dburl)
    conn = ibm_db.connect(dburl, "", "")
    if conn:
        sql = "SELECT service_level, fixpack_num FROM TABLE(sysproc.env_get_inst_info())as INSTANCEINFO"
        stmt = ibm_db.exec_immediate(conn,sql)
        result = ibm_db.fetch_both(stmt)

        print('----------------------连接成功---------------------')
        print('---地址:%s 数据库:%s 用户:%s'%(hostname,database,uid))
        print('DB2  version: ',result[0])

        return conn
    else:
        return

#用来关闭数据库连接
def CloseClo(conn):
    ibm_db.close(conn)
#日期处理
def CurDate(str):
    #首先判断当前日期是连续的还是单一天数的。
    str_Date = re.split(r'~',str)
    #判断用户输入日期是不是有效日期
    flog = 1
    try:
        for i in range(len(str_Date)):
            datetime.strptime(str_Date[i],"%Y-%m-%d")
    except:
        flog = 0
    # 代表有两个日期,或者就是只有一个日期
    if len(str_Date) > 1 and flog == 1:
        #判断后面输入的日期是否大于前面的日期,验证用户输入数据是否正确
        if datetime.strptime(str_Date[0],"%Y-%m-%d").date() > datetime.strptime(str_Date[1],"%Y-%m-%d").date():
            #插入0代表数据错误
            str_Date.insert(0,0)
            return str_Date
        else:
            #否则插入2代表有两个日期
            str_Date.insert(0,2)
        return str_Date
    elif len(str_Date) == 1 and flog == 1:
        str_Date.insert(0,1)
    else:
        str_Date.insert(0,0)
    return str_Date

#获取两个日期中间的日期列表
def gen_dates(b_date, days):
    day = timedelta(days=1)
    for i in range(days):
        yield b_date + day*i


def get_date_list(user_say_date):
    """
    获取日期列表
    :param start: 开始日期
    :param end: 结束日期
    :return:
    """
    data = []
    if user_say_date[0] == 1:
        date.append(user_say_date[1])
        return data
    elif user_say_date[0] == 2:
        start = datetime.strptime(user_say_date[1],"%Y-%m-%d").date()
        end = datetime.strptime(user_say_date[2], "%Y-%m-%d").date()
        for d in gen_dates(start, (end-start).days):
            data.append(d)
        #最后把日期加入
        date.append(user_say_date[2])
        return data
    else:
        data = []
        return data





#查询根据时间所判断有问题的数据
def SelSQl(success_date,conn):

    #开始处理
    #拼接处理SQL
    SQL= "这里写的是操作的代码"%success_date


    stmt = ibm_db.exec_immediate(conn, SQL)
    #一行一行的读取
    result = ibm_db.fetch_both(stmt)
    # 设置默认值
    flog = 0

    a = ['', '', '', '']
    seq_id = [0, 0, 0, 0]
    read_error = 0 #记录出错次数,4个为一次
    sum_date = 0 #记录总数

    #打开文件准备记录错误信息
    str = cur_file_dir()
    f1 = open(str+"\\ErrorSQL.SQL",'a')

    while (result):
        # 四条数据做一次判断
        sum_date = sum_date + 1
        if flog == 4:
            #列表数据达到四条,开始判断顺序
            a0 = r"^LD_.*?_INIT$" #第一行匹配格式
            a1 = r"^AP_.*?_INIT$"  # 第二行匹配格式
            a2 = r"^LD_.*?[^I]?[^N]?[^I]?[^T]{1}$"  # 第三行匹配格式
            a3 = r"^AP_.*?[^I]?[^N]?[^I]?[^T]{1}$"  # 第四行匹配格式
            print(a)
            if re.match(a0, a[0]) != None and re.match(a1, a[1]) != None and re.match(a2, a[2]) != None and re.match(a3,a[3]) != None:
                pass
            else:
                # 如果其中有一个匹配不正确,代表数据顺序有问题。
                # 所以要把错误信息放入到列表
                read_error = read_error + 1
                if read_error == 1:
                    print('---------------------------日期:%s----------------------------'%success_date)
                    f1.write('---------------------------日期:%s----------------------------'%success_date)
                    f1.write('\n')
                print('发现数据出现问题!')
                # 把出错信息写入到文件。
                ErrorSQL = "SELECT * FROM ETL.JOB_LOG A WHERE JOB_SEQ_ID IN(%d,%d,%d,%d) UNION ALL" % (
                seq_id[0], seq_id[1], seq_id[2], seq_id[3])
                f1.write(ErrorSQL)
                f1.write('\n')

            # 把每条时间放入指定列表位置,四条数据刷新一次
            flog = 0
            a = ['', '', '', '']
            seq_id = [0, 0, 0, 0]
        a[flog] = result[2]
        seq_id[flog] = result[0]
        flog = flog + 1
        result = ibm_db.fetch_both(stmt)
    #写完错误关掉文件
    if read_error > 0 :
        f1.write('总行数为:%s ,出错的信息有: %s 次'%(sum_date/4,read_error))
        f1.write('\n')
    f1.close()
    print('总行数为:%s ,出错的信息有: %s 次'%(sum_date/4,read_error))

#Main
dice = OneFile()
conn = None
try:
    #conn = ConDB(dice['DATABASE'],dice['HOSTNAME'],dice['PORT'],dice['PROTOCOL'],dice['UID'],dice['PWD'])
    user_say = input("请输入指定范围日期格式例如为2018-09-12~2019-09-21,或者输入指定一天日期例如:2019-09-21")
    user_say_date = CurDate(user_say)
    print(user_say_date)
    data = get_date_list(user_say_date)

    print(data)
    #获取到一个日期列表然后循环进行查询
    #for current_data in data:
    #   SelSQl(current_data,conn)
    CloseClo(conn)
except Exception as ex:
    CloseClo(conn)
    print(ex)
finally:
    CloseClo(conn)






上一篇:【玩转DB2】二、DB2联邦详细操作和踩坑


下一篇:Python搞定爬取表映射文档翻译成执行语句(DB2)