SqlAlchemy 2.0 中文文档(四十五)(1)https://developer.aliyun.com/article/1563077
连接池配置
大多数情况下,create_engine()
函数返回的 Engine
已经集成了一个预先配置合理的 QueuePool
。如果你只是阅读本节来学习如何启用连接池 - 恭喜!你已经完成了。
最常见的 QueuePool
调优参数可以直接作为关键字参数传递给 create_engine()
:pool_size
、max_overflow
、pool_recycle
和 pool_timeout
。例如:
engine = create_engine( "postgresql+psycopg2://me@localhost/mydb", pool_size=20, max_overflow=0 )
所有 SQLAlchemy 连接池实现都有一个共同点,即它们都不会“预先创建”连接 - 所有实现都会在首次使用之前等待创建连接。在那时,如果没有额外的并发检出请求要求更多的连接,就不会创建额外的连接。这就是为什么 create_engine()
默认使用一个大小为五的 QueuePool
是完全可以的,而不用考虑应用程序是否真的需要排队五个连接 - 只有在应用程序实际上同时使用了五个连接时,池才会增长到这个大小,此时使用一个小池是完全适当的默认行为。
注意
QueuePool
类 不兼容 asyncio。当使用 create_async_engine
创建一个 AsyncEngine
实例时,会使用 AsyncAdaptedQueuePool
类,该类使用了一个兼容 asyncio 的队列实现。
切换连接池实现
使用不同类型的池与create_engine()
的通常方法是使用poolclass
参数。该参数接受从sqlalchemy.pool
模块导入的类,并为您处理构建池的详细信息。一个常见的用例是禁用连接池,这可以通过使用NullPool
实现来实现:
from sqlalchemy.pool import NullPool engine = create_engine( "postgresql+psycopg2://scott:tiger@localhost/test", poolclass=NullPool )
使用自定义连接函数
参见自定义 DBAPI connect()参数/连接时例程部分,了解各种连接自定义例程的情况。
构造池
要单独使用Pool
,则creator
函数是唯一需要的参数,并且首先传递,然后是任何其他选项:
import sqlalchemy.pool as pool import psycopg2 def getconn(): c = psycopg2.connect(user="ed", host="127.0.0.1", dbname="test") return c mypool = pool.QueuePool(getconn, max_overflow=10, pool_size=5)
然后可以使用Pool.connect()
函数从池中获取 DBAPI 连接。此方法的返回值是包含在透明代理中的 DBAPI 连接:
# get a connection conn = mypool.connect() # use it cursor_obj = conn.cursor() cursor_obj.execute("select foo")
透明代理的目的是拦截close()
调用,以便将 DBAPI 连接返回到池中:
# "close" the connection. Returns # it to the pool. conn.close()
当代理被垃圾回收时,它也会将其包含的 DBAPI 连接返回到池中,尽管在 Python 中并不确定这是否立即发生(尽管在 cPython 中是典型的)。然而,不建议这样使用,特别是不支持使用 asyncio DBAPI 驱动程序。
返回时重置
池包含“返回时重置”行为,当连接返回到池中时,将调用 DBAPI 连接的rollback()
方法。这样做是为了从连接中移除任何现有的事务状态,这不仅包括未提交的数据,还包括表和行锁。对于大多数 DBAPI,调用rollback()
是廉价的,如果 DBAPI 已经完成了一个事务,那么该方法应该是一个空操作。
对于非事务连接禁用返回时重置
对于非常特定的情况,其中 rollback()
不实用,例如当使用配置为 自动提交 或者使用没有 ACID 能力的数据库(如 MySQL 的 MyISAM 引擎)的连接时,可以禁用返回时重置行为,这通常是出于性能考虑。可以通过使用 Pool
的 Pool.reset_on_return
参数来影响,该参数也可以从 create_engine()
中使用 create_engine.pool_reset_on_return
进行设置,传递一个值为 None
。下面的示例中进行了说明,结合了 AUTOCOMMIT
的 create_engine.isolation_level
参数设置:
non_acid_engine = create_engine( "mysql://scott:tiger@host/db", pool_reset_on_return=None, isolation_level="AUTOCOMMIT", )
上述引擎在连接返回到池中时实际上不会执行回滚操作;由于启用了 AUTOCOMMIT,驱动程序也不会执行任何 BEGIN 操作。
自定义的返回重置方案
仅由单个 rollback()
组成的“返回重置”对于某些用例可能不够;特别是,使用临时表的应用程序可能希望这些表在连接签入时自动删除。一些(但并非全部)后端包含可以在数据库连接范围内“重置”这些表的功能,这可能是连接池重置的一种理想行为。其他服务器资源,例如准备好的语句句柄和服务器端语句缓存,可能会在签入过程之后持续存在,具体取决于具体情况是否希望如此。同样,一些(但再次并非全部)后端可能提供了重置此状态的方法。已知具有此类重置方案的两个 SQLAlchemy 包含的方言包括 Microsoft SQL Server,其中通常使用一个名为 sp_reset_connection
的未记录但广为人知的存储过程,以及 PostgreSQL,它有一系列命令包括 DISCARD
RESET
、DEALLOCATE
和 UNLISTEN
。
以下示例说明了如何使用PoolEvents.reset()
事件钩子,在返回时用 Microsoft SQL Server 的sp_reset_connection
存储过程替换重置。create_engine.pool_reset_on_return
参数设置为None
,以便完全替换默认行为。自定义钩子实现在任何情况下都调用.rollback()
,因为通常重要的是 DBAPI 自己的提交/回滚跟踪与事务状态保持一致:
from sqlalchemy import create_engine from sqlalchemy import event mssql_engine = create_engine( "mssql+pyodbc://scott:tiger⁵HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", # disable default reset-on-return scheme pool_reset_on_return=None, ) @event.listens_for(mssql_engine, "reset") def _reset_mssql(dbapi_connection, connection_record, reset_state): if not reset_state.terminate_only: dbapi_connection.execute("{call sys.sp_reset_connection}") # so that the DBAPI itself knows that the connection has been # reset dbapi_connection.rollback()
在版本 2.0.0b3 中更改:为PoolEvents.reset()
事件添加了额外的状态参数,并另外确保事件对所有“重置”事件都被调用,因此它适用于自定义“重置”处理程序的地方。之前使用PoolEvents.checkin()
处理程序的方案仍然可用。
另请参阅
- 连接池临时表/资源重置 - 在 Microsoft SQL Server 文档中
- 连接池临时表/资源重置 - 在 PostgreSQL 文档中
记录返回时的重置事件
对包括返回时的重置在内的池事件进行记录可以设置为logging.DEBUG
日志级别,以及sqlalchemy.pool
记录器,或者在使用create_engine()
时将create_engine.echo_pool
设置为"debug"
:
>>> from sqlalchemy import create_engine >>> engine = create_engine("postgresql://scott:tiger@localhost/test", echo_pool="debug")
以上池将显示详细的日志,包括返回时的重置:
>>> c1 = engine.connect() DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <connection object ...> DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> checked out from pool >>> c1.close() DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> being returned to pool DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> rollback-on-return
对非事务连接禁用返回时的重置
对于一些特定情况下rollback()
不适用的情况,比如在使用配置为自动提交或者在使用没有 ACID 功能的数据库,比如 MySQL 的 MyISAM 引擎时,可以禁用返回时的重置行为,通常出于性能原因。可以通过使用Pool.reset_on_return
参数来实现,该参数也可以从create_engine()
中获取,作为create_engine.pool_reset_on_return
,传递一个值为None
。下面的示例中演示了这一点,结合了AUTOCOMMIT
的create_engine.isolation_level
参数设置:
non_acid_engine = create_engine( "mysql://scott:tiger@host/db", pool_reset_on_return=None, isolation_level="AUTOCOMMIT", )
由于启用了 AUTOCOMMIT,上述引擎在连接返回到池时实际上不会执行 ROLLBACK 操作;由于驱动程序也不会执行任何 BEGIN 操作。
自定义返回时重置方案
对于一些使用临时表的应用程序,仅由一个rollback()
组成的“返回时重置”可能不足够;特别是,使用临时表的应用程序可能希望在连接检入时自动删除这些表。一些(但并非所有)后端包括可以在数据库连接范围内“重置”这些表的功能,这可能是连接池重置的一种理想行为。其他服务器资源,如准备好的语句句柄和服务器端语句缓存,可能会在检入过程之后持续存在,具体取决于具体情况是否希望这样。同样,一些(但再次不是所有)后端可能提供一种重置此状态的方法。已知具有此类重置方案的两个 SQLAlchemy 包含的方言包括 Microsoft SQL Server,其中通常使用一个名为sp_reset_connection
的未记录但广为人知的存储过程,以及 PostgreSQL,后者有一系列良好记录的命令,包括DISCARD
RESET
,DEALLOCATE
和UNLISTEN
。
以下示例说明了如何使用 PoolEvents.reset()
事件钩子将返回时的重置替换为 Microsoft SQL Server 的 sp_reset_connection
存储过程。 create_engine.pool_reset_on_return
参数设置为 None
,以便自定义方案完全替换默认行为。自定义钩子实现在任何情况下都调用 .rollback()
,因为通常重要的是 DBAPI 自己的提交/回滚跟踪与事务的状态保持一致:
from sqlalchemy import create_engine from sqlalchemy import event mssql_engine = create_engine( "mssql+pyodbc://scott:tiger⁵HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", # disable default reset-on-return scheme pool_reset_on_return=None, ) @event.listens_for(mssql_engine, "reset") def _reset_mssql(dbapi_connection, connection_record, reset_state): if not reset_state.terminate_only: dbapi_connection.execute("{call sys.sp_reset_connection}") # so that the DBAPI itself knows that the connection has been # reset dbapi_connection.rollback()
从版本 2.0.0b3 中更改:为 PoolEvents.reset()
事件添加了额外的状态参数,并确保该事件被调用以进行所有“重置”发生,因此它适用于自定义“重置”处理程序的位置。以前使用 PoolEvents.checkin()
处理程序的方案仍然可用。
请参阅
- 临时表 / 资源重置以进行连接池 - 在 Microsoft SQL Server 文档中
- 临时表 / 资源重置以进行连接池 - 在 PostgreSQL 文档中
记录返回时的重置事件
可以将池事件的日志设置为 logging.DEBUG
日志级别,以及 sqlalchemy.pool
记录器,或者在使用 create_engine()
时将 create_engine.echo_pool
设置为 "debug"
,以记录包括返回时的重置在内的池事件:
>>> from sqlalchemy import create_engine >>> engine = create_engine("postgresql://scott:tiger@localhost/test", echo_pool="debug")
上述连接池将显示详细的日志,包括返回时的重置:
>>> c1 = engine.connect() DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <connection object ...> DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> checked out from pool >>> c1.close() DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> being returned to pool DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> rollback-on-return
池事件
连接池支持事件接口,允许在第一次连接时、每次新连接时以及连接的签入和签出时执行钩子。有关详细信息,请参阅 PoolEvents
。
处理断开连接
连接池具有刷新单个连接以及其整个连接集的能力,将先前池化的连接设置为“无效”。一个常见的用例是当数据库服务器重新启动时,连接池能够优雅地恢复,并且所有先前建立的连接都不再可用。有两种方法可以实现这一点。
断开连接处理 - 悲观
悲观方法是指在每次连接池检出时在 SQL 连接上发出测试语句,以测试数据库连接是否仍然可用。该实现是方言特定的,并且利用了 DBAPI 特定的 ping 方法,或者使用简单的 SQL 语句如“SELECT 1”,以便测试连接的活性。
此方法在连接检出过程中增加了一点开销,但否则是完全消除由于过期的池化连接而导致的数据库错误的最简单可靠方法。调用应用程序无需担心组织操作以便能够从池中检出的过期连接中恢复。
通过使用 Pool.pre_ping
参数,可以在每次连接池检出时对连接进行悲观测试,此参数可从 create_engine()
中通过 create_engine.pool_pre_ping
参数获得:
engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)
“预连接测试”功能在每种方言上都是基于每个数据库适配器(DBAPI)特定的“ping”方法运行,如果不可用,则会发出与“SELECT 1”等效的 SQL,并捕获任何错误,并将错误检测为“断开”情况。如果 ping / 错误检查确定连接不可用,则连接将立即被回收,并且所有其他比当前时间更早的池化连接都将被作废,以便在下次检出它们时,它们也将在使用之前被回收。
如果数据库在“预连接测试”运行时仍然不可用,则初始连接将失败,并且将正常传播连接失败的错误。在数据库可用于连接但无法响应“ping”的情况下,“pre_ping”将尝试最多三次,然后放弃,并传播最后收到的数据库错误。
需要注意的是,预先 ping 的方法不适用于在事务或其他 SQL 操作中断开连接的情况。如果数据库在事务进行中变得不可用,则事务将丢失并引发数据库错误。虽然Connection
对象将检测“断开”情况并在此条件发生时重新使用连接并使其余连接池无效,但引发异常的个别操作将丢失,应用程序需要放弃该操作或重新尝试整个事务。如果引擎使用 DBAPI 级别的自动提交连接进行配置,如设置包括 DBAPI 自动提交的事务隔离级别,则连接可能会在操作中透明地重新连接使用事件。有关示例,请参阅如何自动“重试”语句执行?部分。
对于使用“SELECT 1”并捕获错误以检测断开连接的方言,可以使用DialectEvents.handle_error()
钩子来增强新的特定于后端的错误消息的断开连接测试。
自定义 / 传统悲观 Ping
在添加create_engine.pool_pre_ping
之前,历史上一直通过ConnectionEvents.engine_connect()
引擎事件手动执行“预先 ping”方法。以下是最常见的配方,供参考,以防应用程序已经使用此类配方,或者需要特殊行为:
from sqlalchemy import exc from sqlalchemy import event from sqlalchemy import select some_engine = create_engine(...) @event.listens_for(some_engine, "engine_connect") def ping_connection(connection, branch): if branch: # this parameter is always False as of SQLAlchemy 2.0, # but is still accepted by the event hook. In 1.x versions # of SQLAlchemy, "branched" connections should be skipped. return try: # run a SELECT 1\. use a core select() so that # the SELECT of a scalar value without a table is # appropriately formatted for the backend connection.scalar(select(1)) except exc.DBAPIError as err: # catch SQLAlchemy's DBAPIError, which is a wrapper # for the DBAPI's exception. It includes a .connection_invalidated # attribute which specifies if this connection is a "disconnect" # condition, which is based on inspection of the original exception # by the dialect in use. if err.connection_invalidated: # run the same SELECT again - the connection will re-validate # itself and establish a new connection. The disconnect detection # here also causes the whole connection pool to be invalidated # so that all stale connections are discarded. connection.scalar(select(1)) else: raise
上述配方的优点在于我们利用了 SQLAlchemy 用于检测那些已知指示“断开”情况的 DBAPI 异常以及Engine
对象在此条件发生时正确使当前连接池无效并允许当前Connection
重新验证到新的 DBAPI 连接的能力。
断开连接处理 - 乐观
当不使用悲观处理时,以及当数据库在事务中的连接期间关闭和/或重新启动时,处理陈旧/关闭连接的另一种方法是让 SQLAlchemy 在发生断开连接时处理,此时池中的所有连接都将被作废,意味着它们被认为是陈旧的,并将在下次检出时刷新。此行为假定Pool
与Engine
一起使用。Engine
具有可以检测断开连接事件并自动刷新池的逻辑。
当Connection
尝试使用 DBAPI 连接,并引发与“断开”事件对应的异常时,连接将被作废。然后,Connection
调用Pool.recreate()
方法,有效地作废所有当前未检出的连接,以便在下次检出时用新连接替换。下面的代码示例说明了这个流程:
from sqlalchemy import create_engine, exc e = create_engine(...) c = e.connect() try: # suppose the database has been restarted. c.execute(text("SELECT * FROM table")) c.close() except exc.DBAPIError as e: # an exception is raised, Connection is invalidated. if e.connection_invalidated: print("Connection was invalidated!") # after the invalidate event, a new connection # starts with a new Pool c = e.connect() c.execute(text("SELECT * FROM table"))
上面的示例说明了在检测到断开连接事件后,刷新池不需要任何特殊干预,池在此后会正常运行。然而,在数据库不可用事件发生时,每个正在使用的连接会引发一个数据库异常。在使用 ORM 会话的典型 Web 应用程序中,上述情况将对应于一个请求失败并显示 500 错误,然后 Web 应用程序在此之后会正常继续。因此,这种方法是“乐观”的,不预期频繁的数据库重启。
设置池回收
可以增强“乐观”方法的另一个设置是设置池回收参数。该参数防止池使用已经过一定年龄的特定连接,并适用于数据库后端,例如 MySQL,在特定时间后自动关闭陈旧连接:
from sqlalchemy import create_engine e = create_engine("mysql+mysqldb://scott:tiger@localhost/test", pool_recycle=3600)
在上述情况下,任何已经打开超过一小时的 DBAPI 连接将在下次检出时被作废并替换。请注意,作废仅发生在检出时 - 而不是在任何处于已检出状态的连接上。pool_recycle
是Pool
本身的一个函数,与是否使用Engine
无关。### 更多关于作废的信息
Pool
提供了“连接失效”服务,允许对连接进行显式失效以及在确定使连接无法使用的条件下自动失效。
“失效”意味着特定的 DBAPI 连接被从池中移除并丢弃。如果不清楚连接本身是否可能已关闭,则会调用此连接的.close()
方法,但是如果此方法失败,则会记录异常,但操作仍将继续。
当使用Engine
时,Connection.invalidate()
方法是显式失效的常规入口点。导致 DBAPI 连接失效的其他条件包括:
- 当调用诸如
connection.execute()
之类的方法时引发的 DBAPI 异常,例如OperationalError
被检测为指示所谓的“断开”条件。由于 Python DBAPI 没有用于确定异常性质的标准系统,所有 SQLAlchemy 方言都包含一个称为is_disconnect()
的系统,它将检查异常对象的内容,包括字符串消息以及其中包含的任何潜在错误代码,以确定此异常是否指示连接不再可用。如果是这种情况,则调用_ConnectionFairy.invalidate()
方法,然后丢弃 DBAPI 连接。 - 当连接被返回到池中,并且调用
connection.rollback()
或connection.commit()
方法时,根据池的“返回时重置”行为,抛出异常。最后尝试调用连接的.close()
方法,然后将其丢弃。 - 当实现
PoolEvents.checkout()
的监听器引发DisconnectionError
异常时,表示连接将无法使用,需要进行新的连接尝试。
所有发生的失效都将调用PoolEvents.invalidate()
事件。### 支持断开场景的新数据库错误代码
每个 SQLAlchemy 方言都包括一个名为 is_disconnect()
的例程,每当遇到 DBAPI 异常时就会调用它。DBAPI 异常对象被传递给此方法,方言特定的启发式将确定接收到的错误代码是否指示数据库连接已“断开”,或者处于无法使用的状态,这表明应该对其进行回收。这里应用的启发式可以通过 DialectEvents.handle_error()
事件钩子进行自定义,该钩子通常通过拥有的 Engine
对象进行建立。使用此钩子,所有发生的错误都将传递一个称为 ExceptionContext
的上下文对象。自定义事件钩子可以控制特定错误是否应被视为“断开”情况,以及此断开是否应导致整个连接池无效化。
例如,要添加支持将 Oracle 错误代码 DPY-1001
和 DPY-4011
视为已处理的断开代码,请在创建引擎后应用事件处理程序:
import re from sqlalchemy import create_engine engine = create_engine("oracle://scott:tiger@dnsname") @event.listens_for(engine, "handle_error") def handle_exception(context: ExceptionContext) -> None: if not context.is_disconnect and re.match( r"^(?:DPI-1001|DPI-4011)", str(context.original_exception) ): context.is_disconnect = True return None
上述错误处理函数将被调用以处理所有抛出的 Oracle 错误,包括在使用 pool pre ping 功能时捕获的那些依赖于断开连接错误处理的后端(2.0 中新增)。
参见
DialectEvents.handle_error()
### 断开连接处理 - 悲观
悲观的方法是指在每次连接池检出时发出一个测试语句,以测试数据库连接是否仍然可用。该实现是方言特定的,可以使用特定于 DBAPI 的 ping 方法,也可以使用简单的 SQL 语句如“SELECT 1”来测试连接的活动性。
这种方法会给连接检出过程增加一点开销,但是除此之外,它是完全消除由于过时的连接池连接而导致数据库错误的最简单和可靠的方法。调用应用程序不需要关心组织操作以便从连接池中检出过时的连接。
通过使用 create_engine()
的 create_engine.pool_pre_ping
参数,可以通过使用 Pool.pre_ping
参数来实现在检出时对连接进行悲观测试:
engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)
“预连接”功能是在每个方言基础上运行的,通过调用特定于 DBAPI 的“ping”方法,或者如果不可用,则会发出等效于“SELECT 1”的 SQL,捕获任何错误并将错误检测为“断开”情况。如果 ping / 错误检查确定连接不可用,则连接将立即被回收,并且所有比当前时间更旧的池化连接都将被作废,以便下次检出时,在使用之前也将被回收。
如果在“预 ping”运行时数据库仍然不可用,则初始连接将失败,并且连接失败的错误将正常传播。在数据库可用于连接但无法响应“ping”的情况下,将尝试最多三次“预 ping”,然后放弃,传播上次收到的数据库错误。
需要注意的是,预连接方法不适用于事务中断开的连接或其他 SQL 操作。如果数据库在事务进行中变得不可用,则事务将丢失并引发数据库错误。虽然 Connection
对象会检测到“断开”情况并在发生此情况时回收连接以及使其余连接池无效,但引发异常的个别操作将丢失,由应用程序来放弃操作或重新尝试整个事务。如果引擎使用 DBAPI 级别的自动提交连接进行配置,如 设置事务隔离级别,包括 DBAPI 自动提交,则可以使用事件在操作中透明地重新连接。有关示例,请参阅 如何“自动重试”语句执行? 节。
对于使用“SELECT 1”并捕获错误以检测断开的方言,可以使用 DialectEvents.handle_error()
钩子来增加新的后端特定错误消息的断开测试。
自定义 / 传统悲观的预连接
在添加 create_engine.pool_pre_ping
之前,历史上“预 ping”方法是通过使用 ConnectionEvents.engine_connect()
引擎事件手动执行的。以下是最常见的配方,供参考,以防应用程序已经使用了这样的配方,或者需要特殊的行为:
from sqlalchemy import exc from sqlalchemy import event from sqlalchemy import select some_engine = create_engine(...) @event.listens_for(some_engine, "engine_connect") def ping_connection(connection, branch): if branch: # this parameter is always False as of SQLAlchemy 2.0, # but is still accepted by the event hook. In 1.x versions # of SQLAlchemy, "branched" connections should be skipped. return try: # run a SELECT 1\. use a core select() so that # the SELECT of a scalar value without a table is # appropriately formatted for the backend connection.scalar(select(1)) except exc.DBAPIError as err: # catch SQLAlchemy's DBAPIError, which is a wrapper # for the DBAPI's exception. It includes a .connection_invalidated # attribute which specifies if this connection is a "disconnect" # condition, which is based on inspection of the original exception # by the dialect in use. if err.connection_invalidated: # run the same SELECT again - the connection will re-validate # itself and establish a new connection. The disconnect detection # here also causes the whole connection pool to be invalidated # so that all stale connections are discarded. connection.scalar(select(1)) else: raise
上述配方的优点是我们利用 SQLAlchemy 的设施来检测那些已知指示“断开连接”情况的 DBAPI 异常,以及 Engine
对象在此条件发生时正确地使当前连接池无效并允许当前 Connection
重新验证到新的 DBAPI 连接。 #### 自定义/遗留悲观 ping
在添加 create_engine.pool_pre_ping
之前,“预先 ping” 方法的历史记录通常是使用 ConnectionEvents.engine_connect()
引擎事件手动执行的。下面是最常见的配方,供参考,以防应用程序已经使用此类配方,或者需要特殊的行为:
from sqlalchemy import exc from sqlalchemy import event from sqlalchemy import select some_engine = create_engine(...) @event.listens_for(some_engine, "engine_connect") def ping_connection(connection, branch): if branch: # this parameter is always False as of SQLAlchemy 2.0, # but is still accepted by the event hook. In 1.x versions # of SQLAlchemy, "branched" connections should be skipped. return try: # run a SELECT 1\. use a core select() so that # the SELECT of a scalar value without a table is # appropriately formatted for the backend connection.scalar(select(1)) except exc.DBAPIError as err: # catch SQLAlchemy's DBAPIError, which is a wrapper # for the DBAPI's exception. It includes a .connection_invalidated # attribute which specifies if this connection is a "disconnect" # condition, which is based on inspection of the original exception # by the dialect in use. if err.connection_invalidated: # run the same SELECT again - the connection will re-validate # itself and establish a new connection. The disconnect detection # here also causes the whole connection pool to be invalidated # so that all stale connections are discarded. connection.scalar(select(1)) else: raise
上述配方的优点是我们利用 SQLAlchemy 的设施来检测那些已知指示“断开连接”情况的 DBAPI 异常,以及 Engine
对象在此条件发生时正确地使当前连接池无效并允许当前 Connection
重新验证到新的 DBAPI 连接。
SqlAlchemy 2.0 中文文档(四十五)(3)https://developer.aliyun.com/article/1563079