通过队列解决sqllite多线程报错的问题(实现多线程增删改查,以字典形式查询结果)

简介: 小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错ProgrammingError: Recursive use of cursors not allowed.就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。

需求:

小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错

ProgrammingError: Recursive use of cursors not allowed.

就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。

刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。

话不多说,下面上代码

代码

# -*- coding: UTF-8 -*-
import sqlite3
import time
from Queue import Queue
from threading import Thread
def sqllite_escape(key_word):
    key_word = key_word.encode("utf-8")
    key_word = key_word.replace("'", "''")
    return key_word
class SelectConnect(object):
    '''
    只能用来查询
    '''
    def __init__(self):
        # isolation_level=None为智能提交模式,不需要commit
        self.conn = sqlite3.connect('resource/data.ta', check_same_thread=False, isolation_level=None)
        self.conn.execute('PRAGMA journal_mode = WAL')
        cursor = self.conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        self.conn.text_factory = str
        # 把结果用元祖的形式取出来
        self.curosr = self.conn.cursor()
        self.conn.row_factory = self.dict_factory
        # 把结果用字典的形式取出来
        self.curosr_diction = self.conn.cursor()
    def commit(self):
        self.conn.commit()
    def dict_factory(self, cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def close_db(self):
        # self.curosr.close()
        self.conn.close()
class SqliteMultithread(Thread):
    """
    Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
    This is done by internally queueing the requests and processing them sequentially
    in a separate thread (in the same order they arrived).
    """
    def __init__(self, filename, autocommit, journal_mode):
        super(SqliteMultithread, self).__init__()
        self.filename = filename
        self.autocommit = autocommit
        self.journal_mode = journal_mode
        self.reqs = Queue()  # use request queue of unlimited size
        self.setDaemon(True)  # python2.5-compatible
        self.running = True
        self.start()
    def dict_factory(self, cursor, row):
        # field = [i[0] for i in cursor.description]
        # value = [dict(zip(field, i)) for i in records]
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def run(self):
        if self.autocommit:
            conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
        else:
            conn = sqlite3.connect(self.filename, check_same_thread=False)
        conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
        conn.text_factory = str
        cursor = conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        conn.row_factory = self.dict_factory
        curosr_diction = conn.cursor()
        curosr_diction.execute('PRAGMA synchronous=OFF')
        # 把结果用字典的形式取出来
        while self.running:
            req, arg, res = self.reqs.get()
            if req == '--close--':
                break
            elif req == '--commit--':
                conn.commit()
            else:
                # print(arg)
                curosr_diction.execute(req, arg)
                # if res:
                #     for rec in cursor:
                #         res.put(rec)
                #     res.put('--no more--')
                if res:
                    res.put(curosr_diction.fetchall())
                if self.autocommit:
                    conn.commit()
        conn.close()
    def execute(self, req, arg=None, res=None):
        """
        `execute` calls are non-blocking: just queue up the request and return immediately.
        """
        self.reqs.put((req, arg or tuple(), res))
    def executemany(self, req, items):
        for item in items:
            self.execute(req, item)
    def select_all_dict(self, req, arg=None):
        '''
        直接返回一个list
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        return rec
    def select_one_dict(self, req, arg=None):
        '''
        直接返回list里的第一个元素,并且以字典展示
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        if len(rec) != 0:
            rec = rec[0]
        else:
            rec = None
        return rec
    def commit(self):
        self.execute('--commit--')
    def close(self):
        self.execute('--close--')
class Cursor(object):
    '''
    以元祖的形式查询出数据
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosr = old_con.curosr
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosr.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        try:
            self.curosr.executescript(string)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.executescript(string)
    def fetchall(self):
        return self.curosr.fetchall()
    def fetchone(self):
        return self.curosr.fetchone()
    def rowcount(self):
        return self.curosr.rowcount
    def close(self):
        self.curosr2.running = False
        self.curosr.close()
        self.conn.close()
class Curosrdiction(object):
    '''
    以字典的形式查询出数据,建议全部用这种。
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosrdiction = old_con.curosr_diction
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosrdiction.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        result = True
        try:
            self.curosrdiction.executescript(string)
        except Exception:
            print("失败一次")
            # print(string)
            time.sleep(0.1)
            # self.executescript(string)
            result = False
        return result
    def fetchall(self):
        return self.curosrdiction.fetchall()
    def fetchone(self):
        return self.curosrdiction.fetchone()
    def rowcount(self):
        return self.curosrdiction.rowcount
    def select_all_dict(self, string, *args):
        return self.curosr2.select_all_dict(string, *args)
    def select_one_dict(self, string, *args):
        return self.curosr2.select_one_dict(string, *args)
    def close(self):
        self.curosr2.running = False
        self.curosrdiction.close()
        self.conn.close()
    def commit(self):
        self.conn.commit()
        self.curosr2.commit()
# curosr = Cursor()
curosr_diction = Curosrdiction()
def commit():
    curosr_diction.commit()
def close_db():
    # curosr.close()
    curosr_diction.close()
目录
相关文章
|
5月前
|
人工智能 安全 机器人
无代码革命:10分钟打造企业专属数据库查询AI机器人
随着数字化转型加速,企业对高效智能交互解决方案的需求日益增长。阿里云AppFlow推出的AI助手产品,借助创新网页集成技术,助力企业打造专业数据库查询助手。本文详细介绍通过三步流程将AI助手转化为数据库交互工具的核心优势与操作指南,包括全场景适配、智能渲染引擎及零代码配置等三大技术突破。同时提供Web集成与企业微信集成方案,帮助企业实现便捷部署与安全管理,提升内外部用户体验。
599 12
无代码革命:10分钟打造企业专属数据库查询AI机器人
|
4月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
193 0
|
7月前
|
Cloud Native 关系型数据库 分布式数据库
|
7月前
|
并行计算 关系型数据库 MySQL
如何用 esProc 将数据库表转储提速查询
当数据库查询因数据量大或繁忙变慢时,可借助 esProc 将数据导出为文件进行计算,大幅提升性能。以 MySQL 的 3000 万行订单数据为例,两个典型查询分别耗时 17.69s 和 63.22s。使用 esProc 转储为二进制行存文件 (btx) 或列存文件 (ctx),结合游标过滤与并行计算,性能显著提升。例如,ctx 并行计算将原查询时间缩短至 0.566s,TopN 运算提速达 30 倍。esProc 的简洁语法和高效文件格式,特别适合历史数据的复杂分析场景。
|
6月前
|
SQL 数据库 开发者
Python中使用Flask-SQLAlchemy对数据库的增删改查简明示例
这样我们就对Flask-SQLAlchemy进行了一次简明扼要的旅程,阐述了如何定义模型,如何创建表,以及如何进行基本的数据库操作。希望你在阅读后能对Flask-SQLAlchemy有更深入的理解,这将为你在Python世界中从事数据库相关工作提供极大的便利。
639 77
|
4月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
5月前
|
Java 数据挖掘 调度
Java 多线程创建零基础入门新手指南:从零开始全面学习多线程创建方法
本文从零基础角度出发,深入浅出地讲解Java多线程的创建方式。内容涵盖继承`Thread`类、实现`Runnable`接口、使用`Callable`和`Future`接口以及线程池的创建与管理等核心知识点。通过代码示例与应用场景分析,帮助读者理解每种方式的特点及适用场景,理论结合实践,轻松掌握Java多线程编程 essentials。
354 5
|
6月前
|
XML 数据库 Android开发
Android数据库的使用(增删改查)
本文介绍了一个简单的数据库操作Demo,包含创建数据库、增删改查功能。通过5个按钮分别实现创建数据库、插入数据、删除数据、更新数据和查询数据的操作。代码结构清晰,适合初学者学习Android SQLite数据库基础操作。
195 5
|
6月前
|
数据库 Android开发 开发者
Android常用的room增删改查语句(外部数据库)
本文分享了将一个原生数据库驱动的单词APP重构为使用Room库的过程及遇到的问题,重点解决了Room中增删改查的常用语句实现。文章通过具体示例(以“forget”表为例),详细展示了如何定义实体类、Dao接口、Database类以及Repository和ViewModel的设计与实现。同时,提供了插入、删除、更新和查询数据的代码示例,包括模糊查询、分页加载等功能。此外,针对外部数据库导入问题,作者建议可通过公众号“计蒙不吃鱼”获取更多支持。此内容适合有一定Room基础的开发者深入学习。
223 0
Android常用的room增删改查语句(外部数据库)

热门文章

最新文章