SqlAlchemy 2.0 中文文档(五十三)(2)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: SqlAlchemy 2.0 中文文档(五十三)

SqlAlchemy 2.0 中文文档(五十三)(1)https://developer.aliyun.com/article/1563172


如何自动“重试”语句执行?

文档部分 处理断开连接 讨论了对已经断开连接的池化连接可用的策略。在这方面最现代的特性是 create_engine.pre_ping 参数,它允许在从池中检索数据库连接时发出“ping”,如果当前连接已断开,则重新连接。

需要注意的是,此“ping”仅在连接实际用于操作之前发出。一旦连接被提供给调用者,根据 Python DBAPI 规范,它现在已经受到autobegin操作的影响,这意味着当首次使用时,它将自动开始一个新事务,该事务在后续语句中仍然有效,直到调用 DBAPI 级别的 connection.commit()connection.rollback() 方法。

在现代使用 SQLAlchemy 中,一系列 SQL 语句总是在事务状态下调用,假设未启用 DBAPI 自动提交模式(下一节将详细介绍),这意味着没有单个语句会自动提交;如果操作失败,当前事务内所有语句的影响都将丢失。

对于“重试”语句的含义是,默认情况下,当连接丢失时,整个事务都将丢失。数据库无法以有用的方式“重新连接和重试”,并继续上次执行的位置,因为数据已经丢失。因此,SQLAlchemy 没有一个能在事务进行中工作时透明地进行“重新连接”的功能,以处理数据库连接在使用过程中断开的情况。处理中途断开连接的规范方法是从事务开始处重试整个操作,通常通过使用自定义 Python 装饰器多次“重试”特定函数直到成功,或者以其他方式设计应用程序,使其能够抵御事务被中断而导致操作失败的情况。

还有一个概念,即扩展程序可以跟踪事务中已经执行的所有语句,然后在新事务中重新执行它们,以近似实现“重试”操作。SQLAlchemy  的事件系统确实允许构建这样一个系统,但这种方法通常也不实用,因为没有办法保证这些 DML  语句将针对相同的状态进行操作,一旦事务结束,数据库在新事务中的状态可能会完全不同。在事务操作开始和提交的点明确将“重试”架构化到应用程序中仍然是更好的方法,因为应用程序级别的事务方法最了解如何重新运行它们的步骤。

否则,如果 SQLAlchemy 提供了一个透明且静默地在事务中重新连接连接的功能,则效果将是数据被静默丢失。通过试图隐藏问题,SQLAlchemy 将使情况变得更糟。

然而,如果我们使用事务,则会有更多的选择,如下一节所述。

使用 DBAPI 自动提交允许只读版本的透明重新连接

由于没有透明的重新连接机制的理由已经说明,上一节建立在这样一个假设之上,即应用程序实际上正在使用 DBAPI 级别的事务。由于大多数 DBAPI 现在提供了本地的“自动提交”设置,我们可以利用这些特性来为只读、自动提交的操作提供有限形式的透明重新连接。可以将透明的语句重试应用于 DBAPI 的cursor.execute()方法,但仍然不安全应用于 DBAPI 的cursor.executemany()方法,因为该语句可能已经消耗了给定参数的任何部分。

警告

下面的方法应用于写入数据的操作。用户应该仔细阅读和理解该方法的工作原理,并仔细针对具体目标的 DBAPI 驱动程序测试故障模式,然后再在生产中使用该方法。重试机制不能保证在所有情况下都防止断开连接错误。

可以通过利用DialectEvents.do_execute()DialectEvents.do_execute_no_params()钩子来应用于 DBAPI 级别的cursor.execute()方法的简单重试机制,这将能够在语句执行期间拦截断开连接。对于那些不完全缓冲结果集的 DBAPI,它不会拦截在结果集获取操作期间的连接故障。该配方要求数据库支持 DBAPI 级别的自动提交,并且对于特定后端不能保证。提供了一个名为reconnecting_engine()的单个函数,它将事件钩子应用于给定的Engine对象,返回一个始终自动提交的版本,该版本启用了 DBAPI 级别的自动提交。连接将透明地重新连接以进行单参数和无参数语句执行:

import time
from sqlalchemy import event
def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()
                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()
                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True
    e = engine.execution_options(isolation_level="AUTOCOMMIT")
    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )
    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )
    return e

给定上述配方,可以使用以下概念验证脚本演示事务中的重新连接。运行后,它将每五秒向数据库发出一个SELECT 1语句:

from sqlalchemy import create_engine
from sqlalchemy import select
if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)
    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)
    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )
    do_a_thing(e)

在脚本运行时重新启动数据库以演示透明重连接操作:

$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

上述配方已在 SQLAlchemy 1.4 中进行了测试。### 使用 DBAPI 自动提交允许透明重连接的只读版本

在未说明透明重连接机制的理由的情况下,前一节基于这样一种假设,即应用程序实际上正在使用 DBAPI 级别的事务。由于大多数 DBAPI 现在提供本地“自动提交”设置,我们可以利用这些特性为只读,仅自动提交操作提供一种有限形式的透明重连接。透明语句重试可以应用于 DBAPI 的cursor.execute()方法,但是仍然不安全应用于 DBAPI 的cursor.executemany()方法,因为该语句可能已经消耗了给定参数的任何部分。

警告

不应将以下配方用于写入数据的操作。用户应仔细阅读和理解配方的工作原理,并在生产使用此配方之前针对特定的 DBAPI 驱动程序非常仔细地测试故障模式。重试机制不能保证在所有情况下防止断开连接错误。

可以通过使用DialectEvents.do_execute()DialectEvents.do_execute_no_params()钩子向 DBAPI 级别的 cursor.execute() 方法应用简单的重试机制,这些钩子将能够在语句执行期间拦截断开连接。对于那些不完全缓冲结果集的 DBAPI,它将不会拦截结果集获取操作期间的连接故障。该方案要求数据库支持 DBAPI 级别的自动提交,并且不能保证适用于特定的后端。提供了一个名为 reconnecting_engine() 的单个函数,它将事件钩子应用于给定的 Engine 对象,返回一个始终启用 DBAPI 级别自动提交的版本。连接将自动重新连接以用于单参数和无参数语句执行:

import time
from sqlalchemy import event
def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()
                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()
                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True
    e = engine.execution_options(isolation_level="AUTOCOMMIT")
    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )
    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )
    return e

根据上述方案,可以使用以下概念证明脚本演示事务中重新连接。运行一次后,它将每五秒向数据库发出一个SELECT 1语句:

from sqlalchemy import create_engine
from sqlalchemy import select
if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)
    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)
    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )
    do_a_thing(e)

在脚本运行时重新启动数据库以演示透明的重新连接操作:

$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

上述方案已经在 SQLAlchemy 1.4 上进行了测试。

为什么 SQLAlchemy 发出了那么多个 ROLLBACK?

SQLAlchemy 目前假设 DBAPI 连接处于“非自动提交”模式 - 这是 Python 数据库 API 的默认行为,这意味着必须假定事务始终在进行中。连接池在连接返回时发出 connection.rollback()。这是为了释放连接上仍然存在的任何事务资源。在像  PostgreSQL 或 MSSQL  这样的数据库上,表资源被积极地锁定,这一点至关重要,以确保行和表不会在不再使用的连接中保持锁定状态。否则,应用程序可能会挂起。然而,这不仅仅是为了锁定,并且在具有任何类型的事务隔离的任何数据库上同样关键,包括具有  InnoDB 的 MySQL。如果在隔离内在连接上已经查询了该数据,任何仍然处于旧事务中的连接将返回陈旧的数据。有关为什么即使在 MySQL  上也可能看到陈旧数据的背景,请参阅dev.mysql.com/doc/refman/5.1/en/innodb-transaction-model.html

我使用的是 MyISAM - 如何关闭它?

连接池的连接返回行为的行为可以使用 reset_on_return 进行配置:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)

我使用的是 SQL Server - 如何将那些 ROLLBACKs 转换为 COMMITs?

reset_on_return 接受值 commitrollback,除了 TrueFalseNone。设置为 commit 将导致任何连接返回到池时进行 COMMIT:

engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)

我正在使用 MyISAM - 如何关闭它?

可以使用 reset_on_return 配置连接池的连接返回行为:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)

我正在使用 SQL Server - 如何将那些 ROLLBACKs 转换为 COMMITs?

reset_on_return 接受值 commitrollback,除了 TrueFalseNone。设置为 commit 将导致任何连接返回到池时进行 COMMIT:

engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)

我正在使用 SQLite 数据库的多个连接(通常用于测试事务操作),但我的测试程序不起作用!

如果使用 SQLite 的 :memory: 数据库,默认连接池是 SingletonThreadPool,它每个线程维护一个 SQLite 连接。因此,在同一线程中使用两个连接实际上是相同的 SQLite 连接。确保您不是使用 :memory: 数据库,以便引擎将使用 QueuePool(当前 SQLAlchemy 版本中非内存数据库的默认值)。

另请参阅

线程/池行为 - 有关 PySQLite 行为的信息。

在使用 Engine 时如何访问原始的 DBAPI 连接?

使用常规的 SA 引擎级 Connection,您可以通过 Connection.connection 属性获取到一个池代理版本的 DBAPI 连接,并且对于真正的 DBAPI 连接,您可以在此调用 PoolProxiedConnection.dbapi_connection 属性。在常规的同步驱动程序中,通常不需要访问非池代理的 DBAPI 连接,因为所有方法都是通过代理的:

engine = create_engine(...)
conn = engine.connect()
# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection
# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj
# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection
# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection

在版本 1.4.24 中更改:添加了 PoolProxiedConnection.dbapi_connection 属性,它取代了以前的 PoolProxiedConnection.connection 属性,后者仍然可用;此属性始终提供 pep-249 同步风格的连接对象。还添加了 PoolProxiedConnection.driver_connection 属性,它将始终引用真正的驱动程序级连接,无论它呈现什么 API。

访问 asyncio 驱动程序的底层连接

在使用 asyncio 驱动程序时,上述方案有两个变化。首先是在使用AsyncConnection时,必须使用可等待方法AsyncConnection.get_raw_connection()来访问PoolProxiedConnection。在这种情况下返回的PoolProxiedConnection保留了同步风格的 pep-249 使用模式,而PoolProxiedConnection.dbapi_connection属性指的是一个将 asyncio 连接适配为同步风格 pep-249 API 的 SQLAlchemy 适配连接对象,换句话说,在使用 asyncio 驱动程序时存在两层代理。实际的 asyncio 连接可以从driver_connection属性中获取。将上述示例重新阐述为 asyncio 的形式如下:

async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()
    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()
    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection
    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection
    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

自版本 1.4.24 起更改:添加了PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection属性,以允许通过一致的接口访问 pep-249 连接、pep-249 适配层和底层驱动程序连接。

在使用 asyncio 驱动程序时,上述“DBAPI”连接实际上是 SQLAlchemy 适配的连接形式,它呈现了同步风格的 pep-249 风格 API。要访问实际的 asyncio 驱动程序连接,可以通过PoolProxiedConnection.driver_connection属性来访问PoolProxiedConnection。对于标准的 pep-249 驱动程序,PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection是同义词。

在将连接返回到池之前,必须确保将连接上的任何隔离级别设置或其他操作特定设置恢复为正常状态。

作为恢复设置的替代方案,您可以在 Connection 或代理连接上调用 Connection.detach() 方法,这将使连接与池解除关联,从而在调用 Connection.close() 时关闭和丢弃它:

conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

使用 asyncio 驱动程序访问底层连接

当使用 asyncio 驱动程序时,对上述方案有两个变化。首先是当使用 AsyncConnection 时,必须使用可等待方法 AsyncConnection.get_raw_connection() 访问 PoolProxiedConnection。在这种情况下返回的 PoolProxiedConnection 保留了同步样式 pep-249 使用模式,并且 PoolProxiedConnection.dbapi_connection 属性指向一个 SQLAlchemy 适配的连接对象,将 asyncio 连接适配为同步样式 pep-249 API,换句话说,当使用 asyncio 驱动程序时会有两层代理。实际的 asyncio 连接可以从 driver_connection 属性获得。在 asyncio 方面重新表述上一个示例如下:

async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()
    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()
    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection
    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection
    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

从版本 1.4.24 开始更改:添加了 PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection 属性,以允许使用一致的接口访问 pep-249 连接、pep-249 适配层和底层驱动程序连接。

在使用 asyncio 驱动程序时,上述“DBAPI”连接实际上是一个经过 SQLAlchemy 适配的连接形式,它呈现了一个同步风格的  pep-249 风格 API。要访问实际的 asyncio 驱动程序连接,它将呈现所使用驱动程序的原始 asyncio API,可以通过PoolProxiedConnectionPoolProxiedConnection.driver_connection属性进行访问。对于标准的 pep-249 驱动程序,PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection 是同义词。

在将连接返回到池之前,您必须确保将任何隔离级别设置或其他特定操作设置恢复为正常状态。

作为恢复设置的替代方案,您可以在Connection或代理连接上调用Connection.detach()方法,这将使连接与池解除关联,从而在调用Connection.close()时关闭并丢弃连接:

conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

我如何在 Python 多进程或 os.fork() 中使用引擎/连接/会话?

这在使用连接池与多进程或 os.fork()一节中有详细介绍。


SqlAlchemy 2.0 中文文档(五十三)(3)https://developer.aliyun.com/article/1563174

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(五十三)(3)
SqlAlchemy 2.0 中文文档(五十三)
23 0
|
3月前
|
SQL 安全 关系型数据库
SqlAlchemy 2.0 中文文档(五十三)(4)
SqlAlchemy 2.0 中文文档(五十三)
27 0
|
3月前
|
SQL 安全 关系型数据库
SqlAlchemy 2.0 中文文档(五十三)(5)
SqlAlchemy 2.0 中文文档(五十三)
30 0
|
3月前
|
关系型数据库 MySQL API
SqlAlchemy 2.0 中文文档(五十三)(1)
SqlAlchemy 2.0 中文文档(五十三)
33 0
|
3月前
|
SQL JSON 关系型数据库
SqlAlchemy 2.0 中文文档(五十二)(3)
SqlAlchemy 2.0 中文文档(五十二)
26 0
|
3月前
|
SQL NoSQL 数据库
SqlAlchemy 2.0 中文文档(五十二)(5)
SqlAlchemy 2.0 中文文档(五十二)
21 0
|
3月前
|
SQL 测试技术 数据库
SqlAlchemy 2.0 中文文档(五十二)(1)
SqlAlchemy 2.0 中文文档(五十二)
22 0
|
3月前
|
SQL 数据库连接 Linux
SqlAlchemy 2.0 中文文档(五十二)(4)
SqlAlchemy 2.0 中文文档(五十二)
40 0
|
3月前
|
SQL NoSQL 数据库
SqlAlchemy 2.0 中文文档(五十二)(2)
SqlAlchemy 2.0 中文文档(五十二)
30 0
|
3月前
|
SQL JSON 数据库
SqlAlchemy 2.0 中文文档(五十二)(6)
SqlAlchemy 2.0 中文文档(五十二)
17 0