通过队列解决sqllite多线程报错的问题(实现多线程增删改查,以字典形式查询结果)

简介: 小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错ProgrammingError: Recursive use of cursors not allowed.就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。

需求:

小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错

ProgrammingError: Recursive use of cursors not allowed.

就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。

刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。

话不多说,下面上代码

代码

# -*- coding: UTF-8 -*-
import sqlite3
import time
from Queue import Queue
from threading import Thread
def sqllite_escape(key_word):
    key_word = key_word.encode("utf-8")
    key_word = key_word.replace("'", "''")
    return key_word
class SelectConnect(object):
    '''
    只能用来查询
    '''
    def __init__(self):
        # isolation_level=None为智能提交模式,不需要commit
        self.conn = sqlite3.connect('resource/data.ta', check_same_thread=False, isolation_level=None)
        self.conn.execute('PRAGMA journal_mode = WAL')
        cursor = self.conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        self.conn.text_factory = str
        # 把结果用元祖的形式取出来
        self.curosr = self.conn.cursor()
        self.conn.row_factory = self.dict_factory
        # 把结果用字典的形式取出来
        self.curosr_diction = self.conn.cursor()
    def commit(self):
        self.conn.commit()
    def dict_factory(self, cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def close_db(self):
        # self.curosr.close()
        self.conn.close()
class SqliteMultithread(Thread):
    """
    Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
    This is done by internally queueing the requests and processing them sequentially
    in a separate thread (in the same order they arrived).
    """
    def __init__(self, filename, autocommit, journal_mode):
        super(SqliteMultithread, self).__init__()
        self.filename = filename
        self.autocommit = autocommit
        self.journal_mode = journal_mode
        self.reqs = Queue()  # use request queue of unlimited size
        self.setDaemon(True)  # python2.5-compatible
        self.running = True
        self.start()
    def dict_factory(self, cursor, row):
        # field = [i[0] for i in cursor.description]
        # value = [dict(zip(field, i)) for i in records]
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def run(self):
        if self.autocommit:
            conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
        else:
            conn = sqlite3.connect(self.filename, check_same_thread=False)
        conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
        conn.text_factory = str
        cursor = conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        conn.row_factory = self.dict_factory
        curosr_diction = conn.cursor()
        curosr_diction.execute('PRAGMA synchronous=OFF')
        # 把结果用字典的形式取出来
        while self.running:
            req, arg, res = self.reqs.get()
            if req == '--close--':
                break
            elif req == '--commit--':
                conn.commit()
            else:
                # print(arg)
                curosr_diction.execute(req, arg)
                # if res:
                #     for rec in cursor:
                #         res.put(rec)
                #     res.put('--no more--')
                if res:
                    res.put(curosr_diction.fetchall())
                if self.autocommit:
                    conn.commit()
        conn.close()
    def execute(self, req, arg=None, res=None):
        """
        `execute` calls are non-blocking: just queue up the request and return immediately.
        """
        self.reqs.put((req, arg or tuple(), res))
    def executemany(self, req, items):
        for item in items:
            self.execute(req, item)
    def select_all_dict(self, req, arg=None):
        '''
        直接返回一个list
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        return rec
    def select_one_dict(self, req, arg=None):
        '''
        直接返回list里的第一个元素,并且以字典展示
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        if len(rec) != 0:
            rec = rec[0]
        else:
            rec = None
        return rec
    def commit(self):
        self.execute('--commit--')
    def close(self):
        self.execute('--close--')
class Cursor(object):
    '''
    以元祖的形式查询出数据
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosr = old_con.curosr
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosr.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        try:
            self.curosr.executescript(string)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.executescript(string)
    def fetchall(self):
        return self.curosr.fetchall()
    def fetchone(self):
        return self.curosr.fetchone()
    def rowcount(self):
        return self.curosr.rowcount
    def close(self):
        self.curosr2.running = False
        self.curosr.close()
        self.conn.close()
class Curosrdiction(object):
    '''
    以字典的形式查询出数据,建议全部用这种。
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosrdiction = old_con.curosr_diction
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosrdiction.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        result = True
        try:
            self.curosrdiction.executescript(string)
        except Exception:
            print("失败一次")
            # print(string)
            time.sleep(0.1)
            # self.executescript(string)
            result = False
        return result
    def fetchall(self):
        return self.curosrdiction.fetchall()
    def fetchone(self):
        return self.curosrdiction.fetchone()
    def rowcount(self):
        return self.curosrdiction.rowcount
    def select_all_dict(self, string, *args):
        return self.curosr2.select_all_dict(string, *args)
    def select_one_dict(self, string, *args):
        return self.curosr2.select_one_dict(string, *args)
    def close(self):
        self.curosr2.running = False
        self.curosrdiction.close()
        self.conn.close()
    def commit(self):
        self.conn.commit()
        self.curosr2.commit()
# curosr = Cursor()
curosr_diction = Curosrdiction()
def commit():
    curosr_diction.commit()
def close_db():
    # curosr.close()
    curosr_diction.close()
目录
相关文章
|
1天前
|
SQL 安全 Java
MyBatis-Plus条件构造器:构建安全、高效的数据库查询
MyBatis-Plus 提供了一套强大的条件构造器(Wrapper),用于构建复杂的数据库查询条件。Wrapper 类允许开发者以链式调用的方式构造查询条件,无需编写繁琐的 SQL 语句,从而提高开发效率并减少 SQL 注入的风险。
7 1
MyBatis-Plus条件构造器:构建安全、高效的数据库查询
|
1天前
|
存储 关系型数据库 MySQL
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
13 1
|
1天前
|
SQL 关系型数据库 数据库
PostgreSQL性能飙升的秘密:这几个调优技巧让你的数据库查询速度翻倍!
【10月更文挑战第25天】本文介绍了几种有效提升 PostgreSQL 数据库查询效率的方法,包括索引优化、查询优化、配置优化和硬件优化。通过合理设计索引、编写高效 SQL 查询、调整配置参数和选择合适硬件,可以显著提高数据库性能。
8 1
|
6天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
11 3
|
6天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
9 2
|
6天前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
16 2
|
6天前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
16 1
|
6天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
17 1
|
6天前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
14 1
|
16天前
|
前端开发 Java 数据库连接
javamvc配置,增删改查,文件上传下载。
【10月更文挑战第4天】javamvc配置,增删改查,文件上传下载。
33 1