Python连接DB2操作的一个小例子

简介: QQ交流群:127591054 作者:JackChiang 作者QQ:595696297Python版本3.5这个小项目,是解决数据库查看的时候一些错误的,查看。

QQ交流群:127591054 作者:JackChiang 作者QQ:595696297

Python版本3.5

这个小项目,是解决数据库查看的时候一些错误的,查看。每天都会跑数据,但是第一天的数据和第二天的名字可以是一样的,我第二天还可以在跑一遍。

涉及到的知识:
1、日期获取,例如获取2012-02-21~2013-08-21之间所有的日期列表,需要一天一天查询遍历。
2、正则匹配名字,由于一个调度会生成4条数据,需要找到这4条数据的顺序必须固定才为正确否则就是不对的。
3、然后把查询出来有问题的数据写入到文件。

代码有详细的注解,有问题可以练习我QQ学习哦

# -*- coding: utf-8 -*-
import time
import sys,os #要重新载入sys。因为 Python 初始化后会删除 sys.setdefaultencoding 这个方 法
import ibm_db
import re
import datetime
from datetime import *

#1、首先获取db2数据库连接,进行连接
#2、读取用户要操作的日期,可以输入日期范围和固定日期
    #2.1、如果输入指定范围日期格式为2018-09-12~2019-09-21
        #如果用户输入的是范围,这时候需要一天一天的进行处理,方法和2.2一样
    #2.2、如果输入指定日期为:2019-09-02
#3、可以查询当前日志所有时间范围数据检查


#获取当前脚本所在路径
def cur_file_dir():
    path = sys.path[0] #获取脚本路径
    if os.path.isdir(path):#判断脚本是文件还是编译后的文件,如果是脚本返回脚本目录,如果是编译文件,返回编译文件路径
        return path
    elif os.path.isfile(path):
        return os.path.dirname(path)

#添加配置文件
def OneFile():
    dic = {}
    str = cur_file_dir()
    file_input = open(str+'\\DB2_Protory.txt','r')
    list_file_line = file_input.readlines()
    for list1 in list_file_line:
        print(list1.strip('\n'))
        list1 = list1.strip('\n')
        key = (list1.split('='))[0].upper()
        value = (list1.split('='))[1]
        dic[key]=value
        file_input.flush()
    file_input.close()
    return dic

#首先获取db2数据库连接,进行连接
def ConDB(database,hostname,port,protocol,uid,pwd):
    dburl = "DATABASE=%s;HOSTNAME=%s;PORT=%s;PROTOCOL=%s;UID=%s;PWD=%s;"%(database,hostname,port,protocol,uid,pwd)
    print(dburl)
    conn = ibm_db.connect(dburl, "", "")
    if conn:
        sql = "SELECT service_level, fixpack_num FROM TABLE(sysproc.env_get_inst_info())as INSTANCEINFO"
        stmt = ibm_db.exec_immediate(conn,sql)
        result = ibm_db.fetch_both(stmt)

        print('----------------------连接成功---------------------')
        print('---地址:%s 数据库:%s 用户:%s'%(hostname,database,uid))
        print('DB2  version: ',result[0])

        return conn
    else:
        return

#用来关闭数据库连接
def CloseClo(conn):
    ibm_db.close(conn)
#日期处理
def CurDate(str):
    #首先判断当前日期是连续的还是单一天数的。
    str_Date = re.split(r'~',str)
    #判断用户输入日期是不是有效日期
    flog = 1
    try:
        for i in range(len(str_Date)):
            datetime.strptime(str_Date[i],"%Y-%m-%d")
    except:
        flog = 0
    # 代表有两个日期,或者就是只有一个日期
    if len(str_Date) > 1 and flog == 1:
        #判断后面输入的日期是否大于前面的日期,验证用户输入数据是否正确
        if datetime.strptime(str_Date[0],"%Y-%m-%d").date() > datetime.strptime(str_Date[1],"%Y-%m-%d").date():
            #插入0代表数据错误
            str_Date.insert(0,0)
            return str_Date
        else:
            #否则插入2代表有两个日期
            str_Date.insert(0,2)
        return str_Date
    elif len(str_Date) == 1 and flog == 1:
        str_Date.insert(0,1)
    else:
        str_Date.insert(0,0)
    return str_Date

#获取两个日期中间的日期列表
def gen_dates(b_date, days):
    day = timedelta(days=1)
    for i in range(days):
        yield b_date + day*i


def get_date_list(user_say_date):
    """
    获取日期列表
    :param start: 开始日期
    :param end: 结束日期
    :return:
    """
    data = []
    if user_say_date[0] == 1:
        date.append(user_say_date[1])
        return data
    elif user_say_date[0] == 2:
        start = datetime.strptime(user_say_date[1],"%Y-%m-%d").date()
        end = datetime.strptime(user_say_date[2], "%Y-%m-%d").date()
        for d in gen_dates(start, (end-start).days):
            data.append(d)
        #最后把日期加入
        date.append(user_say_date[2])
        return data
    else:
        data = []
        return data





#查询根据时间所判断有问题的数据
def SelSQl(success_date,conn):

    #开始处理
    #拼接处理SQL
    SQL= "这里写的是操作的代码"%success_date


    stmt = ibm_db.exec_immediate(conn, SQL)
    #一行一行的读取
    result = ibm_db.fetch_both(stmt)
    # 设置默认值
    flog = 0

    a = ['', '', '', '']
    seq_id = [0, 0, 0, 0]
    read_error = 0 #记录出错次数,4个为一次
    sum_date = 0 #记录总数

    #打开文件准备记录错误信息
    str = cur_file_dir()
    f1 = open(str+"\\ErrorSQL.SQL",'a')

    while (result):
        # 四条数据做一次判断
        sum_date = sum_date + 1
        if flog == 4:
            #列表数据达到四条,开始判断顺序
            a0 = r"^LD_.*?_INIT$" #第一行匹配格式
            a1 = r"^AP_.*?_INIT$"  # 第二行匹配格式
            a2 = r"^LD_.*?[^I]?[^N]?[^I]?[^T]{1}$"  # 第三行匹配格式
            a3 = r"^AP_.*?[^I]?[^N]?[^I]?[^T]{1}$"  # 第四行匹配格式
            print(a)
            if re.match(a0, a[0]) != None and re.match(a1, a[1]) != None and re.match(a2, a[2]) != None and re.match(a3,a[3]) != None:
                pass
            else:
                # 如果其中有一个匹配不正确,代表数据顺序有问题。
                # 所以要把错误信息放入到列表
                read_error = read_error + 1
                if read_error == 1:
                    print('---------------------------日期:%s----------------------------'%success_date)
                    f1.write('---------------------------日期:%s----------------------------'%success_date)
                    f1.write('\n')
                print('发现数据出现问题!')
                # 把出错信息写入到文件。
                ErrorSQL = "SELECT * FROM ETL.JOB_LOG A WHERE JOB_SEQ_ID IN(%d,%d,%d,%d) UNION ALL" % (
                seq_id[0], seq_id[1], seq_id[2], seq_id[3])
                f1.write(ErrorSQL)
                f1.write('\n')

            # 把每条时间放入指定列表位置,四条数据刷新一次
            flog = 0
            a = ['', '', '', '']
            seq_id = [0, 0, 0, 0]
        a[flog] = result[2]
        seq_id[flog] = result[0]
        flog = flog + 1
        result = ibm_db.fetch_both(stmt)
    #写完错误关掉文件
    if read_error > 0 :
        f1.write('总行数为:%s ,出错的信息有: %s 次'%(sum_date/4,read_error))
        f1.write('\n')
    f1.close()
    print('总行数为:%s ,出错的信息有: %s 次'%(sum_date/4,read_error))

#Main
dice = OneFile()
conn = None
try:
    #conn = ConDB(dice['DATABASE'],dice['HOSTNAME'],dice['PORT'],dice['PROTOCOL'],dice['UID'],dice['PWD'])
    user_say = input("请输入指定范围日期格式例如为2018-09-12~2019-09-21,或者输入指定一天日期例如:2019-09-21")
    user_say_date = CurDate(user_say)
    print(user_say_date)
    data = get_date_list(user_say_date)

    print(data)
    #获取到一个日期列表然后循环进行查询
    #for current_data in data:
    #   SelSQl(current_data,conn)
    CloseClo(conn)
except Exception as ex:
    CloseClo(conn)
    print(ex)
finally:
    CloseClo(conn)






相关文章
|
1月前
|
数据格式 Python
如何使用Python的Pandas库进行数据透视图(melt/cast)操作?
Pandas的`melt()`和`pivot()`函数用于数据透视。基本步骤:导入pandas,创建DataFrame,然后使用这两个函数转换数据格式。示例代码展示了如何通过`melt()`转为长格式,再用`pivot()`恢复为宽格式。输入数据是包含'Name'和'Age'列的DataFrame,最终结果经过转换后呈现出不同的布局。
39 6
|
1月前
|
Unix Shell Linux
赞!优雅的Python多环境管理神器!易上手易操作!
赞!优雅的Python多环境管理神器!易上手易操作!
|
1月前
|
SQL 关系型数据库 MySQL
Python怎么操作Mysql数据库
Python怎么操作Mysql数据库
51 0
|
17天前
|
人工智能 机器人 C++
【C++/Python】Windows用Swig实现C++调用Python(史上最简单详细,80岁看了都会操作)
【C++/Python】Windows用Swig实现C++调用Python(史上最简单详细,80岁看了都会操作)
|
1月前
|
开发者 Python
Python库中关于时间的常见操作
Python库中关于时间的常见操作
32 0
|
1天前
|
JSON 数据格式 索引
python 又一个点运算符操作的字典库:Munch
python 又一个点运算符操作的字典库:Munch
9 0
|
5天前
|
索引 Python
如何使用Python的Pandas库进行数据透视表(pivot table)操作?
使用Pandas在Python中创建数据透视表的步骤包括:安装Pandas库,导入它,创建或读取数据(如DataFrame),使用`pd.pivot_table()`指定数据框、行索引、列索引和值,计算聚合函数(如平均分),并可打印或保存结果到文件。这允许对数据进行高效汇总和分析。
10 2
|
12天前
|
数据采集 JSON 网络协议
「Python系列」Python urllib库(操作网页URL对网页的内容进行抓取处理)
`urllib` 是 Python 的一个标准库,用于打开和读取 URLs。它提供了一组模块,允许你以编程方式从网络获取数据,如网页内容、文件等。
35 0
|
19天前
|
网络协议 安全 Python
python监听连接请求
【4月更文挑战第5天】本教程介绍了网络编程中服务器监听连接请求的关键步骤。首先,理解监听是服务器在特定端口等待客户端连接的基本概念。接着,设置监听涉及创建套接字、绑定地址和端口,以及开始监听。提供了一个Python示例,展示如何使用socket库实现监听。注意点包括异常处理、并发处理和安全性考虑。学习后,读者能掌握基础的监听连接请求代码编写。
|
23天前
|
Python
python使用tkinter库,封装操作excel为GUI程序
python使用tkinter库,封装操作excel为GUI程序

热门文章

最新文章