SqlAlchemy 2.0 中文文档(四十六)(1)https://developer.aliyun.com/article/1563035
SQL 执行和连接事件
对象名称 | 描述 |
ConnectionEvents | Connection 和Engine 的可用事件。 |
方言事件 | 用于执行替换函数的事件接口。 |
class sqlalchemy.events.ConnectionEvents
Connection
和Engine
的可用事件。
这里的方法定义了事件的名称以及传递给监听器函数的成员的名称。
事件监听器可以与任何Connection
或Engine
类或实例相关联,例如一个Engine
,例如:
from sqlalchemy import event, create_engine def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): log.info("Received statement: %s", statement) engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/test') event.listen(engine, "before_cursor_execute", before_cursor_execute)
或者使用特定的Connection
:
with engine.begin() as conn: @event.listens_for(conn, 'before_cursor_execute') def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): log.info("Received statement: %s", statement)
当方法使用语句参数调用时,例如在after_cursor_execute()
或before_cursor_execute()
中,语句是准备发送到连接的 DBAPI cursor
的确切 SQL 字符串Dialect
。
before_execute()
和before_cursor_execute()
事件也可以使用retval=True
标志来建立,这允许修改发送到数据库的语句和参数。before_cursor_execute()
事件在这里特别有用,可以添加临时字符串转换,例如注释,以适用于所有执行:
from sqlalchemy.engine import Engine from sqlalchemy import event @event.listens_for(Engine, "before_cursor_execute", retval=True) def comment_sql_calls(conn, cursor, statement, parameters, context, executemany): statement = statement + " -- some comment" return statement, parameters
注意
ConnectionEvents
可以建立在任何组合的 Engine
、Connection
,以及这些类的实例上。对于给定的 Connection
实例,所有四个范围的事件都会触发。但是,出于性能原因,Connection
对象在实例化时确定其父 Engine
是否已经建立了事件侦听器。在依赖的 Connection
实例实例化后,向 Engine
类或 Engine
实例添加的事件侦听器通常不会对该 Connection
实例可用。而是,新添加的侦听器将对在父 Engine
类或实例上建立这些事件侦听器之后创建的 Connection
实例产生影响。
参数:
retval=False – 仅适用于 before_execute()
和 before_cursor_execute()
事件。当为 True 时,用户定义的事件函数必须有一个返回值,即替换给定语句和参数的参数元组。有关特定返回参数的描述,请参见这些方法。
成员
after_cursor_execute(), after_execute(), before_cursor_execute(), before_execute(), begin(), begin_twophase(), commit(), commit_twophase(), dispatch, engine_connect(), engine_disposed(), prepare_twophase(), release_savepoint(), rollback(), rollback_savepoint(), rollback_twophase(), savepoint(), set_connection_execution_options(), set_engine_execution_options()
类签名
类sqlalchemy.events.ConnectionEvents
(sqlalchemy.event.Events
)
method after_cursor_execute(conn: Connection, cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext | None, executemany: bool) → None
在执行后拦截低级游标 execute() 事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'after_cursor_execute') def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany): "listen for the 'after_cursor_execute' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象cursor
– DBAPI 游标对象。如果语句是一个 SELECT,将会有待处理的结果,但不应该消耗这些结果,因为它们会被CursorResult
需要。statement
– 字符串 SQL 语句,就像传递给 DBAPI 的一样parameters
– 字典、元组或传递给 DBAPI 游标的execute()
或executemany()
方法的参数列表。在某些情况下可能为None
。context
– 使用的ExecutionContext
对象。可能为None
。executemany
– 布尔值,如果为True
,则这是一个executemany()
调用,如果为False
,则这是一个execute()
调用。
method after_execute(conn: Connection, clauseelement: Executable, multiparams: _CoreMultiExecuteParams, params: _CoreSingleExecuteParams, execution_options: _ExecuteOptions, result: Result[Any]) → None
在执行后拦截高级 execute() 事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'after_execute') def receive_after_execute(conn, clauseelement, multiparams, params, execution_options, result): "listen for the 'after_execute' event" # ... (event handling logic) ... # DEPRECATED calling style (pre-1.4, will be removed in a future release) @event.listens_for(SomeEngine, 'after_execute') def receive_after_execute(conn, clauseelement, multiparams, params, result): "listen for the 'after_execute' event" # ... (event handling logic) ...
1.4 版本变更:ConnectionEvents.after_execute()
事件现在接受参数 ConnectionEvents.after_execute.conn
, ConnectionEvents.after_execute.clauseelement
, ConnectionEvents.after_execute.multiparams
, ConnectionEvents.after_execute.params
, ConnectionEvents.after_execute.execution_options
, ConnectionEvents.after_execute.result
。将来版本将删除对接受前述“已弃用”参数签名的监听器函数的支持。
参数:
conn
–Connection
对象clauseelement
– SQL 表达式构造,Compiled
实例或传递给Connection.execute()
的字符串语句。multiparams
– 多个参数集,一个字典列表。params
– 单个参数集,一个字典。execution_options
–
传递给语句的执行选项字典,如果有的话。这是将要使用的所有选项的合并,包括语句的选项、连接的选项以及传递给方法本身的用于执行 2.0 风格的选项。result
– 执行生成的CursorResult
。
method before_cursor_execute(conn: Connection, cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext | None, executemany: bool) → Tuple[str, _DBAPIAnyExecuteParams] | None
在执行之前拦截低级别游标 execute() 事件,接收要针对游标调用的字符串 SQL 语句和特定于 DBAPI 的参数列表。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'before_cursor_execute') def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany): "listen for the 'before_cursor_execute' event" # ... (event handling logic) ...
此事件既可用于记录,也可用于对 SQL 字符串进行后期修改。对于除了特定于目标后端的参数修改之外的参数修改,它不太理想。
可以选择使用 retval=True
标志建立此事件。在这种情况下,应返回 statement
和 parameters
参数作为两个元组:
@event.listens_for(Engine, "before_cursor_execute", retval=True) def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): # do something with statement, parameters return statement, parameters
参见 ConnectionEvents
中的示例。
参数:
conn
–Connection
对象cursor
– DBAPI 游标对象statement
– 字符串 SQL 语句,如传递给 DBAPIparameters
– 字典、元组或传递给 DBAPIcursor
的execute()
或executemany()
方法的参数列表。在某些情况下可能为None
。context
– 正在使用的ExecutionContext
对象。可能为None
。executemany
– 布尔值,如果为True
,则为executemany()
调用,如果为False
,则为execute()
调用。
另请参阅
before_execute()
after_cursor_execute()
method before_execute(conn: Connection, clauseelement: Executable, multiparams: _CoreMultiExecuteParams, params: _CoreSingleExecuteParams, execution_options: _ExecuteOptions) → Tuple[Executable, _CoreMultiExecuteParams, _CoreSingleExecuteParams] | None
拦截高级别的 execute()
事件,在渲染为 SQL 之前接收未编译的 SQL 构造和其他对象。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'before_execute') def receive_before_execute(conn, clauseelement, multiparams, params, execution_options): "listen for the 'before_execute' event" # ... (event handling logic) ... # DEPRECATED calling style (pre-1.4, will be removed in a future release) @event.listens_for(SomeEngine, 'before_execute') def receive_before_execute(conn, clauseelement, multiparams, params): "listen for the 'before_execute' event" # ... (event handling logic) ...
在 1.4 版本中更改:ConnectionEvents.before_execute()
事件现在接受参数 ConnectionEvents.before_execute.conn
、ConnectionEvents.before_execute.clauseelement
、ConnectionEvents.before_execute.multiparams
、ConnectionEvents.before_execute.params
、ConnectionEvents.before_execute.execution_options
的支持。在未来版本中,将删除接受前述“已弃用”参数签名的侦听器函数的支持。
此事件对于调试 SQL 编译问题以及数据库发送的参数的早期操作非常有用,因为此处的参数列表将以一致的格式呈现。
此事件可以选择使用 retval=True
标志来建立。在这种情况下,应将 clauseelement
、multiparams
和 params
参数作为三元组返回:
@event.listens_for(Engine, "before_execute", retval=True) def before_execute(conn, clauseelement, multiparams, params): # do something with clauseelement, multiparams, params return clauseelement, multiparams, params
参数:
conn
–Connection
对象clauseelement
– SQL 表达式构造,Compiled
实例,或传递给Connection.execute()
的字符串语句。multiparams
– 多个参数集,字典列表。params
– 单个参数集,一个字典。execution_options
–
执行选项字典,与语句一起传递,如果有的话。这是将要使用的所有选项的合并,包括语句的选项、连接的选项以及传递给方法本身的选项,用于 2.0 风格的执行。
也请参阅
before_cursor_execute()
method begin(conn: Connection) → None
拦截 begin()
事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'begin') def receive_begin(conn): "listen for the 'begin' event" # ... (event handling logic) ...
参数:
conn – Connection
对象
method begin_twophase(conn: Connection, xid: Any) → None
拦截 begin_twophase()
事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'begin_twophase') def receive_begin_twophase(conn, xid): "listen for the 'begin_twophase' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象xid
– 两阶段 XID 标识符
method commit(conn: Connection) → None
拦截由 Transaction
发起的提交事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'commit') def receive_commit(conn): "listen for the 'commit' event" # ... (event handling logic) ...
注意,如果 reset_on_return
标志设置为 'commit'
,则 Pool
也可能在归还时 “自动提交” DBAPI 连接。要拦截此提交,请使用 PoolEvents.reset()
钩子。
参数:
conn – Connection
对象
method commit_twophase(conn: Connection, xid: Any, is_prepared: bool) → None
拦截 commit_twophase()
事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'commit_twophase') def receive_commit_twophase(conn, xid, is_prepared): "listen for the 'commit_twophase' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象xid
– 两阶段 XID 标识符is_prepared
– 布尔值,指示是否调用了TwoPhaseTransaction.prepare()
。
attribute dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.ConnectionEventsDispatch object>
回溯到 _Dispatch 类。
双向 _Dispatch._events
method engine_connect(conn: Connection) → None
拦截新建 Connection
。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'engine_connect') def receive_engine_connect(conn): "listen for the 'engine_connect' event" # ... (event handling logic) ... # DEPRECATED calling style (pre-2.0, will be removed in a future release) @event.listens_for(SomeEngine, 'engine_connect') def receive_engine_connect(conn, branch): "listen for the 'engine_connect' event" # ... (event handling logic) ...
从版本 2.0 开始更改:ConnectionEvents.engine_connect()
事件现在接受参数 ConnectionEvents.engine_connect.conn
。将来的版本中将删除对接受上述“弃用”之前参数签名的侦听器函数的支持。
通常,此事件是直接调用 Engine.connect()
方法的直接结果。
它与 PoolEvents.connect()
方法不同,后者是指在 DBAPI 级别对数据库的实际连接;DBAPI 连接可能会被池化并重复使用多次。相比之下,此事件仅与在此类 DBAPI 连接周围生成更高级别的 Connection
包装器有关。
它还与 PoolEvents.checkout()
事件不同,后者特定于 Connection
对象,而不是 PoolEvents.checkout()
处理的 DBAPI 连接,尽管该 DBAPI 连接可以通过 Connection.connection
属性在此处获得。但请注意,如果 Connection
无效并重新建立,则单个 Connection
对象的生命周期中实际上可以有多个 PoolEvents.checkout()
事件。
参数:
conn – Connection
对象。
另请参阅
PoolEvents.checkout()
单个 DBAPI 连接的低级别池检出事件
method engine_disposed(engine: Engine) → None
拦截 Engine.dispose()
方法被调用的情况。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'engine_disposed') def receive_engine_disposed(engine): "listen for the 'engine_disposed' event" # ... (event handling logic) ...
Engine.dispose()
方法指示引擎“处理”它的连接池(例如 Pool
),并用新的连接池替换它。处理旧连接池的效果是关闭现有的已检入连接。新连接池在首次使用之前不会建立任何新连接。
这个事件可用于指示应该清理与 Engine
相关的资源,但要注意,Engine
仍然可以用于新请求,此时它将重新获取连接资源。
method prepare_twophase(conn: Connection, xid: Any) → None
拦截 prepare_twophase()
事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'prepare_twophase') def receive_prepare_twophase(conn, xid): "listen for the 'prepare_twophase' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象xid
– 两阶段 XID 标识符
method release_savepoint(conn: Connection, name: str, context: None) → None
拦截 release_savepoint()
事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'release_savepoint') def receive_release_savepoint(conn, name, context): "listen for the 'release_savepoint' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象name
– 用于保存点的指定名称。context
– 未使用
method rollback(conn: Connection) → None
拦截由 Transaction
启动的 rollback()
事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'rollback') def receive_rollback(conn): "listen for the 'rollback' event" # ... (event handling logic) ...
请注意,如果 reset_on_return
标志设置为其默认值 'rollback'
,Pool
在归还时也会“自动回滚” DBAPI 连接。要拦截此回滚,请使用 PoolEvents.reset()
钩子。
参数:
conn – Connection
对象
另请参阅
PoolEvents.reset()
method rollback_savepoint(conn: Connection, name: str, context: None) → None
拦截 rollback_savepoint()
事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'rollback_savepoint') def receive_rollback_savepoint(conn, name, context): "listen for the 'rollback_savepoint' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象name
– 用于保存点的指定名称。context
– 未使用
method rollback_twophase(conn: Connection, xid: Any, is_prepared: bool) → None
拦截 rollback_twophase()
事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'rollback_twophase') def receive_rollback_twophase(conn, xid, is_prepared): "listen for the 'rollback_twophase' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象xid
– 两阶段 XID 标识符is_prepared
– 布尔值,指示是否调用了TwoPhaseTransaction.prepare()
。
method savepoint(conn: Connection, name: str) → None
拦截 savepoint()
事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'savepoint') def receive_savepoint(conn, name): "listen for the 'savepoint' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象name
– 用于保存点的指定名称。
method set_connection_execution_options(conn: Connection, opts: Dict[str, Any]) → None
拦截 Connection.execution_options()
方法调用时。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'set_connection_execution_options') def receive_set_connection_execution_options(conn, opts): "listen for the 'set_connection_execution_options' event" # ... (event handling logic) ...
此方法在新的 Connection
生成后调用,具有新更新的执行选项集,但在 Dialect
对这些新选项之前。
请注意,当从其父Engine
继承执行选项的新Connection
被生成时,不会调用此方法;要拦截此条件,请使用ConnectionEvents.engine_connect()
事件。
参数:
conn
– 新复制的Connection
对象opts
–
传递给Connection.execution_options()
方法的选项字典。此字典可以就地修改,以影响最终生效的选项。
版本 2.0 中的新内容:opts
字典可以就地修改。
另请参阅
ConnectionEvents.set_engine_execution_options()
- 当调用Engine.execution_options()
时调用的事件。
method set_engine_execution_options(engine: Engine, opts: Dict[str, Any]) → None
当调用Engine.execution_options()
方法时拦截。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'set_engine_execution_options') def receive_set_engine_execution_options(engine, opts): "listen for the 'set_engine_execution_options' event" # ... (event handling logic) ...
Engine.execution_options()
方法生成Engine
的浅拷贝,其中存储了新的选项。这个新的Engine
被传递到这里。此方法的一个特定应用是将ConnectionEvents.engine_connect()
事件处理程序添加到给定的Engine
上,该处理程序将执行一些针对这些执行选项的特定于每个Connection
的任务。
参数:
conn
– 新复制的Engine
对象opts
–
传递给Connection.execution_options()
方法的选项字典。此字典可以就地修改,以影响最终生效的选项。
版本 2.0 中的新内容:opts
字典可以就地修改。
另请参阅
ConnectionEvents.set_connection_execution_options()
- 当调用Connection.execution_options()
时调用的事件。
class sqlalchemy.events.DialectEvents
用于执行替换函数的事件接口。
这些事件允许直接检测和替换与 DBAPI 交互的关键方言函数。
注意
DialectEvents
钩子应被视为半公开和实验性质的。这些钩子不适用于一般情况,并且仅适用于那些需要将复杂的 DBAPI 机制重新注入到现有方言中的情况。对于一般用途的语句拦截事件,请使用ConnectionEvents
接口。
另请参阅
ConnectionEvents.before_cursor_execute()
ConnectionEvents.before_execute()
ConnectionEvents.after_cursor_execute()
ConnectionEvents.after_execute()
成员
dispatch, do_connect(), do_execute(), do_execute_no_params(), do_executemany(), do_setinputsizes(), handle_error()
类签名
类sqlalchemy.events.DialectEvents
(sqlalchemy.event.Events
)
attribute dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.DialectEventsDispatch object>
参考 _Dispatch 类。
双向对 _Dispatch._events
method do_connect(dialect: Dialect, conn_rec: ConnectionPoolEntry, cargs: Tuple[Any, ...], cparams: Dict[str, Any]) → DBAPIConnection | None
在建立连接之前接收连接参数。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_connect') def receive_do_connect(dialect, conn_rec, cargs, cparams): "listen for the 'do_connect' event" # ... (event handling logic) ...
这个事件很有用,因为它允许处理程序操纵控制 DBAPI connect()
函数将如何调用的 cargs
和/或 cparams
集合。cargs
将始终是一个可以原位变异的 Python 列表,而cparams
是一个也可以变异的 Python 字典:
e = create_engine("postgresql+psycopg2://user@host/dbname") @event.listens_for(e, 'do_connect') def receive_do_connect(dialect, conn_rec, cargs, cparams): cparams["password"] = "some_password"
事件钩子也可用于完全覆盖connect()
的调用,方法是返回一个非None
的 DBAPI 连接对象:
e = create_engine("postgresql+psycopg2://user@host/dbname") @event.listens_for(e, 'do_connect') def receive_do_connect(dialect, conn_rec, cargs, cparams): return psycopg2.connect(*cargs, **cparams)
另请参阅
自定义 DBAPI connect() 参数 / 在连接时运行的程序
method do_execute(cursor: DBAPICursor, statement: str, parameters: _DBAPISingleExecuteParams, context: ExecutionContext) → Literal[True] | None
接收一个游标以调用 execute()。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_execute') def receive_do_execute(cursor, statement, parameters, context): "listen for the 'do_execute' event" # ... (event handling logic) ...
返回 True 值以阻止进一步调用事件,并指示游标执行已在事件处理程序中发生。
method do_execute_no_params(cursor: DBAPICursor, statement: str, context: ExecutionContext) → Literal[True] | None
接收一个游标以调用没有参数的 execute()。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_execute_no_params') def receive_do_execute_no_params(cursor, statement, context): "listen for the 'do_execute_no_params' event" # ... (event handling logic) ...
返回 True 值以阻止进一步调用事件,并指示游标执行已在事件处理程序中发生。
method do_executemany(cursor: DBAPICursor, statement: str, parameters: _DBAPIMultiExecuteParams, context: ExecutionContext) → Literal[True] | None
接收一个游标以调用 executemany()。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_executemany') def receive_do_executemany(cursor, statement, parameters, context): "listen for the 'do_executemany' event" # ... (event handling logic) ...
返回 True 值以阻止进一步调用事件,并指示游标执行已在事件处理程序中发生。
method do_setinputsizes(inputsizes: Dict[BindParameter[Any], Any], cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext) → None
接收可供修改的 setinputsizes 字典。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_setinputsizes') def receive_do_setinputsizes(inputsizes, cursor, statement, parameters, context): "listen for the 'do_setinputsizes' event" # ... (event handling logic) ...
当方言使用 DBAPI cursor.setinputsizes()
方法传递关于特定语句参数绑定的信息时,会触发此事件。给定的 inputsizes
字典将包含BindParameter
对象作为键,链接到特定于 DBAPI 的类型对象作为值;对于未绑定的参数,它们将以 None
作为值添加到字典中,这意味着该参数将不会包含在最终的 setinputsizes 调用中。该事件可用于检查和/或记录被绑定的数据类型,并直接修改字典。可以向该字典中添加、修改或删除参数。调用者通常希望检查给定绑定对象的 BindParameter.type
属性,以便对 DBAPI 对象做出决策。
事件之后,inputsizes
字典将转换为适当的数据结构以传递给 cursor.setinputsizes
;对于位置绑定参数执行样式,转换为列表;对于命名绑定参数执行样式,转换为字符串参数键到 DBAPI 类型对象的字典。
setinputsizes 钩子整体上仅用于包含标志 use_setinputsizes=True
的方言。使用此标志的方言包括 cx_Oracle、pg8000、asyncpg 和 pyodbc 方言。
注意
与 pyodbc 一起使用时,必须向方言传递 use_setinputsizes
标志,例如:
create_engine("mssql+pyodbc://...", use_setinputsizes=True)
另请参阅
Setinputsizes 支持
新版本 1.2.9 中新增。
另请参阅
使用 setinputsizes 对 cx_Oracle 数据绑定性能进行细粒度控制
method handle_error(exception_context: ExceptionContext) → BaseException | None
拦截Dialect
典型但不限于在 Connection
范围内发出的异常。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'handle_error') def receive_handle_error(exception_context): "listen for the 'handle_error' event" # ... (event handling logic) ...
从版本 2.0 开始更改:DialectEvents.handle_error()
事件已移至DialectEvents
类中,从ConnectionEvents
类中移除,以便它还可以参与使用create_engine.pool_pre_ping
参数配置的“预连接”操作。该事件仍通过使用Engine
作为事件目标来注册,但请注意,不再支持将Connection
用作DialectEvents.handle_error()
的事件目标。
这包括由 DBAPI 发出的所有异常,以及 SQLAlchemy 语句调用过程中的其他区域,包括编码错误和其他语句验证错误。调用事件的其他区域包括事务开始和结束、结果行获取、游标创建。
请注意,handle_error()
可能随时支持新类型的异常和新的调用场景。使用此事件的代码必须预期在次要版本中存在新的调用模式。
为了支持对应于异常的广泛成员的各种情况,并允许在不向后兼容的情况下扩展事件,所接收的唯一参数是一个ExceptionContext
的实例。此对象包含表示异常详细信息的数据成员。
此钩子支持的用例包括:
- 仅用于日志记录和调试目的的只读低级别异常处理
- 建立 DBAPI 连接错误消息是否指示需要重新连接数据库连接,包括某些方言使用的“pre_ping”处理程序
- 在响应特定异常时建立或禁用连接或拥有连接池是否无效或过期
- 异常重写
在失败的操作的游标(如果有)仍处于打开和可访问状态时调用该钩子。可以在此游标上调用特殊的清理操作;SQLAlchemy 将尝试在调用此钩子后关闭此游标。
从 SQLAlchemy 2.0 开始,使用 create_engine.pool_pre_ping
参数启用的“pre_ping”处理程序也将参与 handle_error()
过程,对于那些依赖断开连接代码来检测数据库活动性的方言。请注意,一些方言,如 psycopg、psycopg2 和大多数 MySQL 方言,使用由 DBAPI 提供的本地 ping()
方法,该方法不使用断开连接代码。
在版本 2.0.0 中进行了更改:DialectEvents.handle_error()
事件钩子参与连接池“预先 ping”操作。在此使用中,ExceptionContext.engine
属性将为 None
,但是正在使用的 Dialect
可通过 ExceptionContext.dialect
属性始终可用。
在版本 2.0.5 中进行了更改:添加了 ExceptionContext.is_pre_ping
属性,当在连接池的“预先 ping”操作中触发 DialectEvents.handle_error()
事件钩子时,该属性将设置为 True
。
在版本 2.0.5 中进行了更改:修复了一个问题,允许 PostgreSQL psycopg
和 psycopg2
驱动程序以及所有 MySQL 驱动程序在连接池“预先 ping”操作期间正确参与 DialectEvents.handle_error()
事件钩子;此前,这些驱动程序的实现对这些驱动程序而言是无效的。
处理程序函数有两个选项来将 SQLAlchemy 构造的异常替换为用户定义的异常。它可以直接引发此新异常,此时所有后续事件监听器都将被绕过,并且异常将在适当的清理完成后被引发:
@event.listens_for(Engine, "handle_error") def handle_exception(context): if isinstance(context.original_exception, psycopg2.OperationalError) and \ "failed" in str(context.original_exception): raise MySpecialException("failed operation")
警告
因为 DialectEvents.handle_error()
事件专门提供了将异常重新抛出为失败语句引发的最终异常的方法,如果用户定义的事件处理程序本身失败并引发意外异常,则堆栈跟踪将会误导!建议在这里小心编码,并在发生意外异常时使用日志记录和/或内联调试。
或者,可以使用“链接”样式的事件处理,通过使用retval=True
修饰符配置处理程序,并从函数返回新的异常实例来使用。在这种情况下,事件处理将继续到下一个处理程序。可以使用ExceptionContext.chained_exception
获取“链接”异常:
@event.listens_for(Engine, "handle_error", retval=True) def handle_exception(context): if context.chained_exception is not None and \ "special" in context.chained_exception.message: return MySpecialException("failed", cause=context.chained_exception)
返回None
的处理程序可以在链中使用;当处理程序返回None
时,如果有的话,前一个异常实例将保持为传递给下一个处理程序的当前异常。
当引发或返回自定义异常时,SQLAlchemy 将直接引发此新异常,不会被任何 SQLAlchemy 对象包装。如果异常不是sqlalchemy.exc.StatementError
的子类,某些功能可能不可用;目前包括 ORM 在自动刷新过程中引发异常时添加有关“自动刷新”的详细提示的功能。
参数:
context – 一个ExceptionContext
对象。有关所有可用成员的详细信息,请参阅此类。
另请参阅
支持断开场景下的新数据库错误代码
SqlAlchemy 2.0 中文文档(四十六)(3)https://developer.aliyun.com/article/1563036