一、问题描述
用 Python 模拟 sql 语句,实现对员工信息的增删改查。
- 封装函数,传入参数:文件路径和 sql 命令。
- 模拟 sql 语句实现对员工信息的现增删改查,并打印结果。
二、Python编程
导入需要的依赖库
# -*- coding: UTF-8 -*-"""@Author :叶庭云@file :实训第二次作业@function :封装函数 根据输入的文件路径和sql命令 模拟sql语句实现对员工信息的现增删改查"""importreimportos
函数式编程
defsql_parse(sql_, key_list): """ 解析sql命令字符串,按照key_lis列表里的元素分割sql得到字典形式的命令sql_dic :param sql_: :param key_list: :return: """sql_list= [] sql_dic= {} foriinkey_list: b= [j.strip() forjinsql_.split(i)] iflen(b) >1: iflen(sql_.split('limit')) >1: sql_dic['limit'] =sql_.split('limit')[-1] ifi=='where'ori=='values': sql_dic[i] =b[-1] ifsql_list: sql_dic[sql_list[-1]] =b[0] sql_list.append(i) sql_=b[-1] else: sql_=b[0] ifsql_dic.get('select'): ifnotsql_dic.get('from') andnotsql_dic.get('where'): sql_dic['from'] =b[-1] ifsql_dic.get('select'): sql_dic['select'] =sql_dic.get('select').split(',') ifsql_dic.get('where'): sql_dic['where'] =where_parse(sql_dic.get('where')) returnsql_dicdefwhere_parse(where): """ 格式化where字符串为列表where_list,用'and', 'or', 'not'分割字符串 :param where: :return: """casual_l= [where] logic_key= ['and', 'or', 'not'] forjinlogic_key: foriincasual_l: ifinotinlogic_key: iflen(i.split(j)) >1: ele=i.split(j) index=casual_l.index(i) casual_l.pop(index) casual_l.insert(index, ele[0]) casual_l.insert(index+1, j) casual_l.insert(index+2, ele[1]) casual_l= [kforkincasual_lifk] where_list=three_parse(casual_l, logic_key) returnwhere_listdefthree_parse(casual_l, logic_key): """ 处理临时列表casual_l中具体的条件,'staff_id>5'-->['staff_id','>','5'] :param casual_l: :param logic_key: :return: """where_list= [] foriincasual_l: ifinotinlogic_key: b=i.split('like') iflen(b) >1: b.insert(1, 'like') where_list.append(b) else: key= ['<', '=', '>'] new_lis= [] opt=''lis= [jforjinre.split('([=<>])', i) ifj] forkinlis: ifkinkey: opt+=kelse: new_lis.append(k) new_lis.insert(1, opt) where_list.append(new_lis) else: where_list.append(i) returnwhere_listdefsql_action(sql_dic, titles): """ 把解析好的sql_dic分发给相应函数执行处理 :param sql_dic: :param titles: :return: """key= {'select': select, 'insert': insert, 'delete': delete, 'update': update} res= [] foriinsql_dic: ifiinkey: res=key[i](sql_dic, titles) returnresdefselect(sql_dic, title_): """ 处理select语句命令 :param sql_dic: :param title_: :return: """withopen(data_path, 'r', encoding='utf-8') asfh: filter_res=where_action(fh, sql_dic.get('where'), title_) limit_res=limit_action(filter_res, sql_dic.get('limit')) search_res=search_action(limit_res, sql_dic.get('select'), title_) returnsearch_resdefinsert(sql_dic): """ 处理insert语句命令 :param sql_dic: :return: """withopen(data_path, 'r+', encoding='utf-8') asfp: data_=fp.readlines() phone_list= [i.strip().split(',')[4] foriindata_] ins_count=0ifnotdata_: new_id=1else: last=data_[-1] last_id=int(last.split(',')[0]) new_id=last_id+1record=sql_dic.get('values').split('/') foriinrecord: ifi.split(',')[2] inphone_list: print('\033[1;31m%s 手机号已存在\033[0m'%i) else: new_record='\n%s,%s\n'% (str(new_id), i) fp.write(new_record) new_id+=1ins_count+=1fp.flush() # 刷新记录return ['insert successfully!'], [str(ins_count)] defdelete(sql_dic, item): """ 处理delete语句命令 :param sql_dic: :param item: :return: """withopen(data_path, 'r', encoding='utf-8') asr_file, \ open('staff_data_bak.txt', 'w', encoding='utf-8') asw_file: del_count=0# delete from staff_data.txt where staff_id>=5 and staff_id<=10forlineinr_file.read().split("\n")[1:]: # print(line)dic=dict(zip(item.split(','), line.split(','))) # print(dic)# delete from staff_data.txt where staff_id>=5 and staff_id<=10print(dic) print(sql_dic.get("where")) # exp_1, opt, exp_2 = sql_dic.get("where")record_list= [] try: foriteminsql_dic.get("where"): print(item) iftype(item) islist: exp_1, opt, exp_2=itemprint(exp_1, opt, exp_2) flag=eval(f'{dic["staff_id"]}{opt}{exp_2}') print(flag) record_list.append(flag) print(all(record_list), record_list) ifall(record_list): # 删除满足条件的del_count+=1passelse: w_file.write(line+"\n") exceptExceptionase: print(e.args[0]) passw_file.flush() # os.remove('staff_data.txt')# os.rename('staff_data_bak.txt', 'staff_data.txt')return ['delete successfully!'], [str(del_count)] defupdate(sql_dic, title_): """ 处理update语句命令 :param sql_dic: :param title_: :return: """set_l=sql_dic.get('set').strip().split(',') set_list= [i.split('=') foriinset_l] update_count=0withopen(data_path, 'r', encoding='utf-8') asr_file, \ open('staff_data_bak.txt', 'w', encoding='utf-8') asw_file: forlineinr_file: dic=dict(zip(title_.split(','), line.strip().split(','))) filter_res=logic_action(dic, sql_dic.get('where')) iffilter_res: foriinset_list: k=i[0] v=i[-1] dic[k] =vline= [dic[i] foriintitle_.split(',')] update_count+=1line=','.join(line) +'\n'w_file.write(line) w_file.flush() os.remove('staff_data.txt') os.rename('staff_data_bak.txt', 'staff_data.txt') return ['update successfully!'], [str(update_count)] defwhere_action(fh, where_list, title_): """ 具体处理where_list里的所有条件 :param fh: :param where_list: :param title_: :return: """res= [] iflen(where_list) !=0: forlineinfh: dic=dict(zip(title_.split(','), line.strip().split(','))) ifdic['name'] !='name': logic_res=logic_action(dic, where_list) iflogic_res: res.append(line.strip().split(',')) else: res= [i.split(',') foriinfh.readlines()] returnresdeflogic_action(dic, where_list): """ 判断数据文件中每一条是否符合where_list条件 :param dic: :param where_list: :return: """logic= [] # print(where_list)# print(dic)forexpinwhere_list: # print(exp)iftype(exp) islist: exp_k, opt, exp_v=exp# print(exp_k, opt, exp_v)ifexp[1] =='=': opt='=='logical_char="'%s'%s'%s'"% (dic[exp_k], opt, exp_v) # print(logical_char)ifopt!='like': exp=str(eval(logical_char)) else: ifexp_vindic[exp_k]: exp='True'else: exp='False'logic.append(exp) res=eval(' '.join(logic)) returnresdeflimit_action(filter_res, limit_l): """ 用列表切分处理显示符合条件的数量 :param filter_res: :param limit_l: :return: """iflimit_l: index=int(limit_l[0]) res=filter_res[:index] else: res=filter_resreturnresdefsearch_action(limit_res, select_list, title_): """ 处理需要查询并显示的title和相应数据 :param limit_res: :param select_list: :param title_: :return: """res= [] fields_list_=title_.split(',') ifselect_list[0] =='*': res=limit_reselse: fields_list_=select_listfordata_inlimit_res: dic=dict(zip(title_.split(','), data_)) r_l= [] foriinfields_list_: r_l.append((dic[i].strip())) res.append(r_l) returnfields_list_, res
主函数调用
if__name__=='__main__': # 指令关键词列表key_lis= ['select', 'insert', 'delete', 'update', 'from', 'into', 'set', 'values', 'where', 'limit'] whileTrue: sql=input('请输入sql命令,退出请输入exit->>>').strip() sql=re.sub(' ', '', sql) iflen(sql) ==0: continueifsql=='exit': breaksql_dict=sql_parse(sql, key_lis) try: data_path=sql_dict['from'] exceptKeyError: data_path=sql_dict['into'] withopen(data_path, 'r', encoding='utf-8') asf: title=f.readline().strip() fields_list, fields_data=sql_action(sql_dict, title) print('\033[0;32m结果如下:\033[0m') print('-'.join(fields_list)) fordatainfields_data: print('-'.join(data))
三、测试结果
# 测试sql命令如下:# select * from staff_data.txt where dept=人事# select name,age from staff_data.txt where age > 27# select * from staff_data.txt where enroll_date like 2017# insert into staff_data.txt values 叶庭云,21,13198497869,算法,2018-7-12# update into staff_data.txt set dept=算法,phone=13566677787 where staff_id=8# delete from staff_data.txt where staff_id>=5 and staff_id<=10# delete from staff_data.txt where staff_id>=5 and staff_id<=10
所有测试结果如下:
推荐阅读: