实例要求
使用PySimpleGUI做一个把单位考勤系统导出的pdf文件合并输出Excel的应用,故事出自:
https://hannyang.blog.csdn.net/article/details/135395946
当时时间紧,没有好好做界面且输出csv文件了事。今天趁周六休息,把代码做一下升级处理,使用库PySimpleGUI做了一个稍微漂亮一点的界面;又用pdfplumber直接遍历多个pdf文件,得到数据后输出Excel文件,比我原本先做合并pdf文件再去取数要快,原先的pdf文件合并操作纯粹有点多余。最后,又尝试对pdf文件读取函数的改造,使用了asyncio异步编程效果非常不错。
下面请听我慢慢道来:
原始pdf文件格式
输出xls文件格式
运行界面
完整代码
import xlwt, pyperclip, asyncio, pdfplumber import os, time, datetime as dt import PySimpleGUI as sg # 全局变量 table_head = '姓名,部门,应到,实到,出勤率,迟到次数,早退次数,加班(分钟)' path, font = '', ('宋体',12) date, data = [], [] DateFormat = ' . . - . . ' ErrMessage = '错误' SortedType = ["出勤率排序","加班时长排序","迟到次数排序","早退次数排序"] # 定义布局 layout = [ [sg.Text("昆山分行考勤表",font=('',16)), sg.Text(pad=(132,10)), sg.Text("请选择考勤文件:",font=font), sg.Input(key="-FOLDER-", enable_events=True, readonly=True,font=font,size=18), sg.FolderBrowse(button_text='...', enable_events=True, initial_folder='./') ], [sg.Text("考勤日期:",font=font), sg.Text(DateFormat,key='-DATE-',font=font) ], [sg.Table(values='', headings=table_head.split(','), key='-TABLE-', auto_size_columns=False, justification='left', num_rows=10)], [sg.Button("输出Excel文件",size=(12,1),pad=(15,30)), sg.Button(SortedType[0], enable_events=True,size=(10,1),pad=(15,30)), sg.Button(SortedType[1], enable_events=True,size=(10,1),pad=(15,30)), sg.Button(SortedType[2], enable_events=True,size=(10,1),pad=(15,30)), sg.Button(SortedType[3], enable_events=True,size=(10,1),pad=(15,30)), sg.Button("退出",size=(10,1),pad=(15,30))], [sg.StatusBar('',key="-BAR-",font=font,size=92)] ] # 读取pdf表格 async def read_table(file): dct = dict() with pdfplumber.open(file) as pdf: for page in pdf.pages: tables = page.extract_tables(table_settings = {}) for table in tables: for lst in table: tmp = lst[1:] if not any(tmp): continue tmp = [tmp[0]]+tmp[3:8]+[tmp[-1]] tmp[0] = tmp[0].replace('\n','') tmp[0] = tmp[0].split('/') tmp[0] = tmp[0][-1] if lst[0]=='时间': dct[lst[0]] = tmp[0] else: dct[','.join([lst[0],tmp[0]])] = ','.join(tmp[1:]) return dct # 写入xls文件 def write_sheet(): global data, date, table_head, ErrMessage if ErrMessage[:2] in ('错误','文件'): return myxl = xlwt.Workbook() style = xlwt.easyxf('align: wrap yes; align: horiz center; font: bold yes;') sheet = myxl.add_sheet('考勤表') wcol = [20,40,60,30,30,40,40,40,60] for i,w in enumerate(wcol): sheet.col(i).width = w * 80 sheet.write_merge(0,0,0,8,'出勤统计报表',style) style = xlwt.easyxf('borders:top thin; borders:bottom thin; borders:left thin; borders:right thin;') sheet.write_merge(1,1,0,2,'考勤日期:'+date[0]) for i,head in enumerate(['序号']+table_head.split(',')): sheet.write(2,i,head,style) for i,row in enumerate(data): for j,col in enumerate([str(i+1)]+row): sheet.write(3+i,j,col,style) for i,t in enumerate(SortedType): if t in ErrMessage: tmp = SortedType[i] break else: tmp = "" excel_file = f'昆山分行考勤表{date[0]}({tmp}{strDateTime()}).xls' ErrMessage = f'文件输出为:{excel_file}' try: myxl.save(excel_file) except: ErrMessage = '写入excel文件失败!' finally: pyperclip.copy('\\'.join((os.getcwd(),excel_file))) window['-BAR-'].update(ErrMessage) # 获取当前时间 def strDateTime(diff=0): now = dt.datetime.now() time = now + dt.timedelta(days=diff) return f'{time.year}{time.month:02}{time.day:02}{time.hour:02}{time.minute:02}{time.second:02}' # 选择并处理文件 async def on_text_changed(event, values): global date, data, path, ErrMessage new_path = values["-FOLDER-"] window["-FOLDER-"].update(new_path.split('/')[-1]) if path==new_path: return else: path = new_path pdfs = [f for f in os.listdir(path) if f.endswith('.pdf') and not f.startswith('PDFmerged')] nums = len(pdfs) if nums==0: ErrMessage = '错误:所选文件夹中没有PDF文件!' window['-BAR-'].update(ErrMessage) window['-DATE-'].update(DateFormat) window['-TABLE-'].update(values=[]) return date, data, sheet = [], [], dict() tasks = [] for pdf in pdfs: tasks.append(read_table('/'.join([path,pdf]))) ErrMessage = f'文件读取中(共{nums}个PDF文件)......' window['-BAR-'].update(ErrMessage) window.refresh() results = await asyncio.gather(*tasks) for r in results: dt = r.get('时间',None) if dt: date.append(dt) sheet.update(r) if date: window['-DATE-'].update(date[-1]) for k,v in sheet.items(): if k in ('时间','姓名,所属组织','普通班个人出勤统计报表,'): continue data.append(','.join([k,v]).split(',')) window['-TABLE-'].update(values=data) persons = len(data) departments = len(set([d[1] for d in data])) if 0:#len(set(date))!=1: data = [] ErrMessage = f'错误:请检查所选文件存在多个时间段:{",".join(set(date))}' else: ErrMessage = f'考勤人数:{persons} / 部门数:{departments}' window['-BAR-'].update(ErrMessage) # 表格排序 def on_table_sorted(event, data): global ErrMessage if not data: return slist = ['x[-4][:-1]', 'x[-1]', 'x[-3]', 'x[-2]'] style = slist[SortedType.index(event)] data = sorted(data, key=lambda x: float(eval(style)), reverse=True) window['-TABLE-'].update(values=data) ErrMessage = f'已按{event}更新!' window['-BAR-'].update(ErrMessage) # 创建窗口 window = sg.Window("考勤表汇总", layout, finalize=True) # 事件循环 while True: event, values = window.read() if event == sg.WINDOW_CLOSED or event == "退出": break elif event == "-FOLDER-": asyncio.run(on_text_changed(event, values)) elif event in SortedType: on_table_sorted(event, data) elif event == "输出Excel文件": write_sheet() # 关闭窗口 window.close()
代码分析
重点代码都用彩色字体加粗标注了:
遍历表格
读取代码如下:
mport pdfplumber ...... with pdfplumber.open(file) as pdf: for page in pdf.pages: tables = page.extract_tables(table_settings = {}) for table in tables: for lst in table: # 根据表格实际情况来清洗数据 return dct
布局界面
import PySimpleGUI as pg layout = [ [sg.Text("昆山分行考勤表",font=('',16)), sg.Text(pad=(132,10)), sg.Text("请选择考勤文件:",font=font), sg.Input(key="-FOLDER-", enable_events=True, readonly=True,font=font,size=18), sg.FolderBrowse(button_text='...', enable_events=True, initial_folder='./') ], [sg.Text("考勤日期:",font=font), sg.Text(DateFormat,key='-DATE-',font=font) ], [sg.Table(values='', headings=table_head.split(','), key='-TABLE-', auto_size_columns=False, justification='left', num_rows=10)], [sg.Button("输出Excel文件",size=(12,1),pad=(15,30)), sg.Button(SortedType[0], enable_events=True,size=(10,1),pad=(15,30)), sg.Button(SortedType[1], enable_events=True,size=(10,1),pad=(15,30)), sg.Button(SortedType[2], enable_events=True,size=(10,1),pad=(15,30)), sg.Button(SortedType[3], enable_events=True,size=(10,1),pad=(15,30)), sg.Button("退出",size=(10,1),pad=(15,30))], [sg.StatusBar('',key="-BAR-",font=font,size=92)]]
控件简介
除了最常用的Text, Input, Button,使用了 FolderBrowse、Table、StatsBar 三个不是最常用的控件,分别是文件夹打开框、表格和状态栏。
表格最重要的三个参数: values, headings, auto_size_columns
sg.Table(values='', headings=table_head.split(','), auto_size_columns=False)
表格数据values和表头headings都列表(分别是二维和一维的),auto_size_columns=False建议不要缺省,否则列宽不可控,各列都自动缩进紧靠在一起。
表格更新数据的方法:window['-TABLE-'].update(values=data)
写入表格
import xlwt def write_sheet(): global data, date, table_head, ErrMessage if ErrMessage[:2] in ('错误','输出'): return myxl = xlwt.Workbook() style = xlwt.easyxf('align: wrap yes; align: horiz center; font: bold yes;') sheet = myxl.add_sheet('考勤表') wcol = [20,40,60,30,30,40,40,40,60] for i,w in enumerate(wcol): sheet.col(i).width = w * 80 sheet.write_merge(0,0,0,8,'出勤统计报表',style) style = xlwt.easyxf('borders:top thin; borders:bottom thin; borders:left thin; borders:right thin;') sheet.write_merge(1,1,0,2,'考勤日期:'+date[0]) for i,head in enumerate(['序号']+table_head.split(',')): sheet.write(2,i,head,style) for i,row in enumerate(data): for j,col in enumerate([str(i+1)]+row): sheet.write(3+i,j,col,style) for i,t in enumerate(SortedType): if t in ErrMessage: tmp = SortedType[i] break else: tmp = "" excel_file = f'昆山分行考勤表{date[0]}({tmp}{strDateTime()}).xls' ErrMessage = f'输出文件为:{excel_file}' try: myxl.save(excel_file) except: ErrMessage = '写入excel文件失败!' 注意单格和多个单元格的写入区别: sheet.write() sheet.write_merge()
表格排序
SortedType = ["出勤率排序","加班时长排序","迟到次数排序","早退次数排序"] def on_table_sorted(event, data): global ErrMessage if not data: return slist = ['x[-4][:-1]', 'x[-1]', 'x[-3]', 'x[-2]'] style = slist[SortedType.index(event)] data = sorted(data, key=lambda x: float(eval(style)), reverse=True) window['-TABLE-'].update(values=data) ErrMessage = f'已按{event}更新!' window['-BAR-'].update(ErrMessage)
虽然经常有人诟病eval()函数的安全性,但这里还是用eval()简化表格排序事件,否则要多写很多代码。
事件循环
while True: event, values = window.read() if event == sg.WINDOW_CLOSED or event == "退出": break elif event == "-FOLDER-": asyncio.run(on_text_changed(event, values)) elif event in SortedType: on_table_sorted(event, data) elif event == "输出Excel文件": write_sheet()
异步编程
此时,请出本篇的主角“异步编程”,什么是异步编程呢?就是有点多任务操作的意思。
异步编程是一种编程范式,它允许某些操作在等待结果时不阻塞整个程序。在传统的同步编程中,程序会按照顺序执行,一旦遇到需要等待的操作(如文件I/O或网络请求),整个程序就会被阻塞,等待操作完成。而在异步编程中,程序并不会因为某个耗时的IO操作而停下其他所有任务,而是将这个任务交给系统处理,自身继续执行后续的操作,等到IO操作完成后,系统会通知程序进行下一步的处理。
asyncio
在上一段代码中,响应"-FOLDER-"时使用了asyncio.run()函数:
import asyncio ....... ...... while True: event, values = window.read() if event == sg.WINDOW_CLOSED or event == "退出": break elif event == "-FOLDER-": asyncio.run(on_text_changed(event, values)) asyncio.run运行的这个是异步编程的主函数,需要用async def来定义:
async def
async def on_text_changed(event, values): ......其它代码略...... tasks = [] for pdf in pdfs: tasks.append(read_table('/'.join([path,pdf]))) ErrMessage = f'文件读取中(共{nums}个PDF文件)......' window['-BAR-'].update(ErrMessage) window.refresh() results = await asyncio.gather(*tasks) for r in results: ......遍历取回的被调异步函数返回值的列表......
await
异步主函数中使用 await asyncio.gather(*tasks) 取回被函数的返回结果,返回结果是多个任务的返回值组成的列表;而主函数的任务呢就,是被调函数组成的列表:asks.append(read_table())
同样的,被调函数也需要用async def来定义,它一般都是文件I/O或网络请求等比较耗时的操作:
async def read_table(file): dct = dict() with pdfplumber.open(file) as pdf: # 读取pdf文件 I/O操作 return dct