通过队列解决sqllite多线程报错的问题(实现多线程增删改查,以字典形式查询结果)

简介: 小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错ProgrammingError: Recursive use of cursors not allowed.就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。

需求:

小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错

ProgrammingError: Recursive use of cursors not allowed.

就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。

刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。

话不多说,下面上代码

代码

# -*- coding: UTF-8 -*-
import sqlite3
import time
from Queue import Queue
from threading import Thread
def sqllite_escape(key_word):
    key_word = key_word.encode("utf-8")
    key_word = key_word.replace("'", "''")
    return key_word
class SelectConnect(object):
    '''
    只能用来查询
    '''
    def __init__(self):
        # isolation_level=None为智能提交模式,不需要commit
        self.conn = sqlite3.connect('resource/data.ta', check_same_thread=False, isolation_level=None)
        self.conn.execute('PRAGMA journal_mode = WAL')
        cursor = self.conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        self.conn.text_factory = str
        # 把结果用元祖的形式取出来
        self.curosr = self.conn.cursor()
        self.conn.row_factory = self.dict_factory
        # 把结果用字典的形式取出来
        self.curosr_diction = self.conn.cursor()
    def commit(self):
        self.conn.commit()
    def dict_factory(self, cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def close_db(self):
        # self.curosr.close()
        self.conn.close()
class SqliteMultithread(Thread):
    """
    Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
    This is done by internally queueing the requests and processing them sequentially
    in a separate thread (in the same order they arrived).
    """
    def __init__(self, filename, autocommit, journal_mode):
        super(SqliteMultithread, self).__init__()
        self.filename = filename
        self.autocommit = autocommit
        self.journal_mode = journal_mode
        self.reqs = Queue()  # use request queue of unlimited size
        self.setDaemon(True)  # python2.5-compatible
        self.running = True
        self.start()
    def dict_factory(self, cursor, row):
        # field = [i[0] for i in cursor.description]
        # value = [dict(zip(field, i)) for i in records]
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def run(self):
        if self.autocommit:
            conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
        else:
            conn = sqlite3.connect(self.filename, check_same_thread=False)
        conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
        conn.text_factory = str
        cursor = conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        conn.row_factory = self.dict_factory
        curosr_diction = conn.cursor()
        curosr_diction.execute('PRAGMA synchronous=OFF')
        # 把结果用字典的形式取出来
        while self.running:
            req, arg, res = self.reqs.get()
            if req == '--close--':
                break
            elif req == '--commit--':
                conn.commit()
            else:
                # print(arg)
                curosr_diction.execute(req, arg)
                # if res:
                #     for rec in cursor:
                #         res.put(rec)
                #     res.put('--no more--')
                if res:
                    res.put(curosr_diction.fetchall())
                if self.autocommit:
                    conn.commit()
        conn.close()
    def execute(self, req, arg=None, res=None):
        """
        `execute` calls are non-blocking: just queue up the request and return immediately.
        """
        self.reqs.put((req, arg or tuple(), res))
    def executemany(self, req, items):
        for item in items:
            self.execute(req, item)
    def select_all_dict(self, req, arg=None):
        '''
        直接返回一个list
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        return rec
    def select_one_dict(self, req, arg=None):
        '''
        直接返回list里的第一个元素,并且以字典展示
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        if len(rec) != 0:
            rec = rec[0]
        else:
            rec = None
        return rec
    def commit(self):
        self.execute('--commit--')
    def close(self):
        self.execute('--close--')
class Cursor(object):
    '''
    以元祖的形式查询出数据
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosr = old_con.curosr
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosr.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        try:
            self.curosr.executescript(string)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.executescript(string)
    def fetchall(self):
        return self.curosr.fetchall()
    def fetchone(self):
        return self.curosr.fetchone()
    def rowcount(self):
        return self.curosr.rowcount
    def close(self):
        self.curosr2.running = False
        self.curosr.close()
        self.conn.close()
class Curosrdiction(object):
    '''
    以字典的形式查询出数据,建议全部用这种。
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosrdiction = old_con.curosr_diction
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosrdiction.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        result = True
        try:
            self.curosrdiction.executescript(string)
        except Exception:
            print("失败一次")
            # print(string)
            time.sleep(0.1)
            # self.executescript(string)
            result = False
        return result
    def fetchall(self):
        return self.curosrdiction.fetchall()
    def fetchone(self):
        return self.curosrdiction.fetchone()
    def rowcount(self):
        return self.curosrdiction.rowcount
    def select_all_dict(self, string, *args):
        return self.curosr2.select_all_dict(string, *args)
    def select_one_dict(self, string, *args):
        return self.curosr2.select_one_dict(string, *args)
    def close(self):
        self.curosr2.running = False
        self.curosrdiction.close()
        self.conn.close()
    def commit(self):
        self.conn.commit()
        self.curosr2.commit()
# curosr = Cursor()
curosr_diction = Curosrdiction()
def commit():
    curosr_diction.commit()
def close_db():
    # curosr.close()
    curosr_diction.close()
相关文章
Unity连接Mysql数据库 增 删 改 查
在 Unity 中连接 MySQL 数据库,需使用 MySQL Connector/NET 作为数据库连接驱动,通过提供服务器地址、端口、用户名和密码等信息建立 TCP/IP 连接。代码示例展示了如何创建连接对象并执行增删改查操作,确保数据交互的实现。测试代码中,通过 `MySqlConnection` 类连接数据库,并使用 `MySqlCommand` 执行 SQL 语句,实现数据的查询、插入、删除和更新功能。
|
14天前
|
python3多线程中使用线程睡眠
本文详细介绍了Python3多线程编程中使用线程睡眠的基本方法和应用场景。通过 `time.sleep()`函数,可以使线程暂停执行一段指定的时间,从而控制线程的执行节奏。通过实际示例演示了如何在多线程中使用线程睡眠来实现计数器和下载器功能。希望本文能帮助您更好地理解和应用Python多线程编程,提高程序的并发能力和执行效率。
41 20
Unity多线程使用(线程池)
在C#中使用线程池需引用`System.Threading`。创建单个线程时,务必在Unity程序停止前关闭线程(如使用`Thread.Abort()`),否则可能导致崩溃。示例代码展示了如何创建和管理线程,确保在线程中执行任务并在主线程中处理结果。完整代码包括线程池队列、主线程检查及线程安全的操作队列管理,确保多线程操作的稳定性和安全性。
时序数据库 TDengine 化工新签约:存储降本一半,查询提速十倍
化工行业在数字化转型过程中面临数据接入复杂、实时性要求高、系统集成难度大等诸多挑战。福州力川数码科技有限公司科技依托深厚的行业积累,精准聚焦行业痛点,并携手 TDengine 提供高效解决方案。
16 0
【潜意识Java】MyBatis中的动态SQL灵活、高效的数据库查询以及深度总结
本文详细介绍了MyBatis中的动态SQL功能,涵盖其背景、应用场景及实现方式。
112 6
HarmonyOs开发:关系型数据库封装之增删改查
每个方法都预留了多种调用方式,比如使用callback异步回调或者使用Promise异步回调,亦或者同步执行,大家在使用的过程中,可以根据自身业务需要进行选择性调用,也分别暴露了成功和失败的方法,可以针对性的判断在执行的过程中是否执行成功。
127 13
【深入了解MySQL】优化查询性能与数据库设计的深度总结
本文详细介绍了MySQL查询优化和数据库设计技巧,涵盖基础优化、高级技巧及性能监控。
362 0
|
2月前
|
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
84 1
Java更新数据库报错:Data truncation: Cannot create a JSON value from a string with CHARACTER SET 'binary'.
在Java中,使用mybatis-plus更新实体类对象到mysql,其中一个字段对应数据库中json数据类型,更新时报错:Data truncation: Cannot create a JSON value from a string with CHARACTER SET 'binary'.
400 4
Java更新数据库报错:Data truncation: Cannot create a JSON value from a string with CHARACTER SET 'binary'.
数据库执行查询请求的过程?
客户端发起TCP连接请求,服务端通过连接器验证主机信息、用户名及密码,验证通过后创建专用进程处理交互。服务端进程缓存以减少创建和销毁线程的开销。后续步骤包括缓存查询(8.0版后移除)、语法解析、查询优化及存储引擎调用,最终返回查询结果。
50 6

热门文章

最新文章