需求:
小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错
ProgrammingError: Recursive use of cursors not allowed.
就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。
刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。
话不多说,下面上代码
代码
# -*- coding: UTF-8 -*- import sqlite3 import time from Queue import Queue from threading import Thread def sqllite_escape(key_word): key_word = key_word.encode("utf-8") key_word = key_word.replace("'", "''") return key_word class SelectConnect(object): ''' 只能用来查询 ''' def __init__(self): # isolation_level=None为智能提交模式,不需要commit self.conn = sqlite3.connect('resource/data.ta', check_same_thread=False, isolation_level=None) self.conn.execute('PRAGMA journal_mode = WAL') cursor = self.conn.cursor() cursor.execute('PRAGMA synchronous=OFF') self.conn.text_factory = str # 把结果用元祖的形式取出来 self.curosr = self.conn.cursor() self.conn.row_factory = self.dict_factory # 把结果用字典的形式取出来 self.curosr_diction = self.conn.cursor() def commit(self): self.conn.commit() def dict_factory(self, cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d def close_db(self): # self.curosr.close() self.conn.close() class SqliteMultithread(Thread): """ Wrap sqlite connection in a way that allows concurrent requests from multiple threads. This is done by internally queueing the requests and processing them sequentially in a separate thread (in the same order they arrived). """ def __init__(self, filename, autocommit, journal_mode): super(SqliteMultithread, self).__init__() self.filename = filename self.autocommit = autocommit self.journal_mode = journal_mode self.reqs = Queue() # use request queue of unlimited size self.setDaemon(True) # python2.5-compatible self.running = True self.start() def dict_factory(self, cursor, row): # field = [i[0] for i in cursor.description] # value = [dict(zip(field, i)) for i in records] d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d def run(self): if self.autocommit: conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False) else: conn = sqlite3.connect(self.filename, check_same_thread=False) conn.execute('PRAGMA journal_mode = %s' % self.journal_mode) conn.text_factory = str cursor = conn.cursor() cursor.execute('PRAGMA synchronous=OFF') conn.row_factory = self.dict_factory curosr_diction = conn.cursor() curosr_diction.execute('PRAGMA synchronous=OFF') # 把结果用字典的形式取出来 while self.running: req, arg, res = self.reqs.get() if req == '--close--': break elif req == '--commit--': conn.commit() else: # print(arg) curosr_diction.execute(req, arg) # if res: # for rec in cursor: # res.put(rec) # res.put('--no more--') if res: res.put(curosr_diction.fetchall()) if self.autocommit: conn.commit() conn.close() def execute(self, req, arg=None, res=None): """ `execute` calls are non-blocking: just queue up the request and return immediately. """ self.reqs.put((req, arg or tuple(), res)) def executemany(self, req, items): for item in items: self.execute(req, item) def select_all_dict(self, req, arg=None): ''' 直接返回一个list :param req: :param arg: :return: ''' res = Queue() # results of the select will appear as items in this queue self.execute(req, arg, res) rec = res.get() return rec def select_one_dict(self, req, arg=None): ''' 直接返回list里的第一个元素,并且以字典展示 :param req: :param arg: :return: ''' res = Queue() # results of the select will appear as items in this queue self.execute(req, arg, res) rec = res.get() if len(rec) != 0: rec = rec[0] else: rec = None return rec def commit(self): self.execute('--commit--') def close(self): self.execute('--close--') class Cursor(object): ''' 以元祖的形式查询出数据 ''' def __init__(self): old_con = SelectConnect() self.conn = old_con.conn self.curosr = old_con.curosr self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL") def execute(self, string, *args): try: if string.startswith('select'): return self.curosr.execute(string, *args) else: return self.curosr2.execute(string, *args) except Exception: print("失败一次") print(string) time.sleep(0.1) self.execute(string, *args) def executescript(self, string): try: self.curosr.executescript(string) except Exception: print("失败一次") print(string) time.sleep(0.1) self.executescript(string) def fetchall(self): return self.curosr.fetchall() def fetchone(self): return self.curosr.fetchone() def rowcount(self): return self.curosr.rowcount def close(self): self.curosr2.running = False self.curosr.close() self.conn.close() class Curosrdiction(object): ''' 以字典的形式查询出数据,建议全部用这种。 ''' def __init__(self): old_con = SelectConnect() self.conn = old_con.conn self.curosrdiction = old_con.curosr_diction self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL") def execute(self, string, *args): try: if string.startswith('select'): return self.curosrdiction.execute(string, *args) else: return self.curosr2.execute(string, *args) except Exception: print("失败一次") print(string) time.sleep(0.1) self.execute(string, *args) def executescript(self, string): result = True try: self.curosrdiction.executescript(string) except Exception: print("失败一次") # print(string) time.sleep(0.1) # self.executescript(string) result = False return result def fetchall(self): return self.curosrdiction.fetchall() def fetchone(self): return self.curosrdiction.fetchone() def rowcount(self): return self.curosrdiction.rowcount def select_all_dict(self, string, *args): return self.curosr2.select_all_dict(string, *args) def select_one_dict(self, string, *args): return self.curosr2.select_one_dict(string, *args) def close(self): self.curosr2.running = False self.curosrdiction.close() self.conn.close() def commit(self): self.conn.commit() self.curosr2.commit() # curosr = Cursor() curosr_diction = Curosrdiction() def commit(): curosr_diction.commit() def close_db(): # curosr.close() curosr_diction.close()