SqlAlchemy 2.0 中文文档(四十六)(5)https://developer.aliyun.com/article/1563039
参数:
conn
–Connection
对象name
– 用于保存点的指定名称。context
– 未使用
method rollback(conn: Connection) → None
拦截 rollback() 事件,由 Transaction
发起。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'rollback') def receive_rollback(conn): "listen for the 'rollback' event" # ... (event handling logic) ...
注意,如果 reset_on_return
标志设置为其默认值 'rollback'
,则 Pool
也会在归还时“自动回滚” DBAPI 连接。要拦截此回滚,请使用 PoolEvents.reset()
钩子。
参数:
conn – Connection
对象
另请参阅
PoolEvents.reset()
method rollback_savepoint(conn: Connection, name: str, context: None) → None
拦截 rollback_savepoint() 事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'rollback_savepoint') def receive_rollback_savepoint(conn, name, context): "listen for the 'rollback_savepoint' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象name
– 用于保存点的指定名称。context
– 未使用
method rollback_twophase(conn: Connection, xid: Any, is_prepared: bool) → None
拦截 rollback_twophase() 事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'rollback_twophase') def receive_rollback_twophase(conn, xid, is_prepared): "listen for the 'rollback_twophase' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象xid
– 两阶段 XID 标识符is_prepared
– 布尔值,指示是否调用了TwoPhaseTransaction.prepare()
。
method savepoint(conn: Connection, name: str) → None
拦截 savepoint() 事件。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'savepoint') def receive_savepoint(conn, name): "listen for the 'savepoint' event" # ... (event handling logic) ...
参数:
conn
–Connection
对象name
– 用于保存点的指定名称。
method set_connection_execution_options(conn: Connection, opts: Dict[str, Any]) → None
拦截 Connection.execution_options()
方法调用时。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'set_connection_execution_options') def receive_set_connection_execution_options(conn, opts): "listen for the 'set_connection_execution_options' event" # ... (event handling logic) ...
此方法在新的 Connection
生成后被调用,带有新更新的执行选项集合,但在 Dialect
对这些新选项采取任何操作之前。
请注意,当从其父Engine
继承执行选项的新Connection
生成时,不会调用此方法;要拦截此条件,请使用ConnectionEvents.engine_connect()
事件。
参数:
conn
– 新复制的Connection
对象opts
–
传递给Connection.execution_options()
方法的选项字典。此字典可以就地修改以影响最终生效的选项。
2.0 版中的新功能:opts
字典可以就地修改。
另请参阅
ConnectionEvents.set_engine_execution_options()
- 当调用Engine.execution_options()
时触发的事件。
method set_engine_execution_options(engine: Engine, opts: Dict[str, Any]) → None
当调用Engine.execution_options()
方法时拦截。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'set_engine_execution_options') def receive_set_engine_execution_options(engine, opts): "listen for the 'set_engine_execution_options' event" # ... (event handling logic) ...
Engine.execution_options()
方法会生成一个存储新选项的Engine
的浅拷贝。这个新的Engine
会传递到这里。这个方法的一个特定应用是向给定的Engine
添加一个ConnectionEvents.engine_connect()
事件处理程序,该处理程序将执行一些特定于这些执行选项的每个Connection
任务。
参数:
conn
– 新复制的Engine
对象opts
–
传递给Connection.execution_options()
方法的选项字典。此字典可以就地修改以影响最终生效的选项。
2.0 版中的新功能:opts
字典可以就地修改。
另请参阅
ConnectionEvents.set_connection_execution_options()
- 当调用 Connection.execution_options()
时调用的事件。
class sqlalchemy.events.DialectEvents
执行替换函数的事件接口。
这些事件允许直接对与 DBAPI 交互的关键方言函数进行检测和替换。
注意
DialectEvents
钩子应被视为半公开和实验性质。这些钩子不适用于一般用途,仅适用于需要将复杂的 DBAPI 机制重新注入现有方言的情况。对于一般用途的语句拦截事件,请使用 ConnectionEvents
接口。
另请参阅
ConnectionEvents.before_cursor_execute()
ConnectionEvents.before_execute()
ConnectionEvents.after_cursor_execute()
ConnectionEvents.after_execute()
成员
dispatch, do_connect(), do_execute(), do_execute_no_params(), do_executemany(), do_setinputsizes(), handle_error()
类签名
类 sqlalchemy.events.DialectEvents
(sqlalchemy.event.Events
)
attribute dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.DialectEventsDispatch object>
参考回到 _Dispatch 类。
双向对抗 _Dispatch._events
method do_connect(dialect: Dialect, conn_rec: ConnectionPoolEntry, cargs: Tuple[Any, ...], cparams: Dict[str, Any]) → DBAPIConnection | None
在建立连接之前接收连接参数。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_connect') def receive_do_connect(dialect, conn_rec, cargs, cparams): "listen for the 'do_connect' event" # ... (event handling logic) ...
这个事件非常有用,因为它允许处理程序操作控制如何调用 DBAPI connect()
函数的 cargs
和/或 cparams
集合。cargs
将始终是一个可以原地变异的 Python 列表,cparams
是一个也可以被变异的 Python 字典:
e = create_engine("postgresql+psycopg2://user@host/dbname") @event.listens_for(e, 'do_connect') def receive_do_connect(dialect, conn_rec, cargs, cparams): cparams["password"] = "some_password"
事件钩子也可以完全覆盖对 connect()
的调用,通过返回一个非 None
的 DBAPI 连接对象:
e = create_engine("postgresql+psycopg2://user@host/dbname") @event.listens_for(e, 'do_connect') def receive_do_connect(dialect, conn_rec, cargs, cparams): return psycopg2.connect(*cargs, **cparams)
另请参阅
自定义 DBAPI connect() 参数 / on-connect routines
method do_execute(cursor: DBAPICursor, statement: str, parameters: _DBAPISingleExecuteParams, context: ExecutionContext) → Literal[True] | None
接收一个游标以调用 execute()
。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_execute') def receive_do_execute(cursor, statement, parameters, context): "listen for the 'do_execute' event" # ... (event handling logic) ...
返回 True 以阻止进一步调用事件,并指示光标执行已经在事件处理程序中发生。
method do_execute_no_params(cursor: DBAPICursor, statement: str, context: ExecutionContext) → Literal[True] | None
接收一个游标以调用没有参数的 execute()
。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_execute_no_params') def receive_do_execute_no_params(cursor, statement, context): "listen for the 'do_execute_no_params' event" # ... (event handling logic) ...
返回 True 以阻止进一步调用事件,并指示光标执行已经在事件处理程序中发生。
method do_executemany(cursor: DBAPICursor, statement: str, parameters: _DBAPIMultiExecuteParams, context: ExecutionContext) → Literal[True] | None
接收一个游标以调用 executemany()
。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_executemany') def receive_do_executemany(cursor, statement, parameters, context): "listen for the 'do_executemany' event" # ... (event handling logic) ...
返回 True 以阻止进一步调用事件,并指示光标执行已经在事件处理程序中发生。
method do_setinputsizes(inputsizes: Dict[BindParameter[Any], Any], cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext) → None
接收 setinputsizes 字典以进行可能的修改。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'do_setinputsizes') def receive_do_setinputsizes(inputsizes, cursor, statement, parameters, context): "listen for the 'do_setinputsizes' event" # ... (event handling logic) ...
在方言使用 DBAPI cursor.setinputsizes()
方法传递有关特定语句的参数绑定的情况下,将发出此事件。给定的 inputsizes
字典将包含 BindParameter
对象作为键,链接到 DBAPI 特定类型对象作为值;对于未绑定的参数,将使用 None
作为值将其添加到字典中,这意味着该参数不会包含在最终的 setinputsizes 调用中。可以使用此事件来检查和/或记录正在绑定的数据类型,以及直接修改字典。可以向此字典添加、修改或删除参数。调用者通常会想要检查给定绑定对象的 BindParameter.type
属性,以便对 DBAPI 对象做出决策。
在事件之后,inputsizes
字典将转换为适当的数据结构,以传递给 cursor.setinputsizes
;对于位置绑定参数执行样式,将转换为列表,对于命名绑定参数执行样式,将转换为字符串参数键到 DBAPI 类型对象的字典。
setinputsizes
钩子通常仅在包括标志 use_setinputsizes=True
的方言中使用。使用此功能的方言包括 cx_Oracle、pg8000、asyncpg 和 pyodbc 方言。
注意
对于使用 pyodbc,必须将 use_setinputsizes
标志传递给方言,例如:
create_engine("mssql+pyodbc://...", use_setinputsizes=True)
另请参阅
setinputsizes 支持
新版本 1.2.9 中的新功能。
另请参阅
通过 setinputsizes 对 cx_Oracle 数据绑定性能进行细粒度控制
method handle_error(exception_context: ExceptionContext) → BaseException | None
拦截由 Dialect
处理的所有异常,通常但不限于在 Connection
范围内发出的异常。
示例参数形式:
from sqlalchemy import event @event.listens_for(SomeEngine, 'handle_error') def receive_handle_error(exception_context): "listen for the 'handle_error' event" # ... (event handling logic) ...
从版本 2.0 起更改:DialectEvents.handle_error()
事件已移至 DialectEvents
类,从 ConnectionEvents
类移动,以便它还可以参与由 create_engine.pool_pre_ping
参数配置的“pre ping”操作。该事件仍通过使用 Engine
作为事件目标进行注册,但请注意,不再支持将 Connection
用作 DialectEvents.handle_error()
的事件目标。
这包括由 DBAPI 发出的所有异常以及 SQLAlchemy 的语句调用过程中,包括编码错误和其他语句验证错误。调用事件的其他区域包括事务开始和结束,结果行获取,游标创建。
请注意,handle_error()
可能在 任何时候 支持新类型的异常和新的调用场景。使用此事件的代码必须预期在次要版本中存在新的调用模式。
为了支持对应于异常的各种成员,并且允许事件的可扩展性而不会导致向后不兼容,唯一接收的参数是ExceptionContext
的实例。此对象包含表示异常详细信息的数据成员。
此钩子支持的用例包括:
- 用于记录和调试目的的只读低级异常处理
- 确定 DBAPI 连接错误消息是否表明需要重新连接数据库,包括一些方言中使用的“pre_ping”处理程序
- 确定或禁用特定异常响应中连接或拥有的连接池是否无效或过期
- 异常重写
在失败操作的游标(如果有)仍然打开且可访问时调用该钩子。可以在此游标上调用特殊的清理操作;SQLAlchemy 将在调用此钩子后尝试关闭此游标。
自 SQLAlchemy 2.0 起,使用 create_engine.pool_pre_ping
参数启用的“pre_ping”处理程序也将参与 handle_error()
过程,对于依赖于断开连接代码来检测数据库存活性的那些方言。请注意,某些方言,如 psycopg、psycopg2 和大多数 MySQL 方言,使用由 DBAPI 提供的本机 ping()
方法,该方法不使用断开连接代码。
在 2.0.0 版本中进行了更改:DialectEvents.handle_error()
事件钩子参与连接池“预检”操作。在此用法中,ExceptionContext.engine
属性将为 None
,但是正在使用的 Dialect
始终可通过 ExceptionContext.dialect
属性获得。
在 2.0.5 版本中进行了更改:添加了 ExceptionContext.is_pre_ping
属性,当在连接池预检操作中触发 DialectEvents.handle_error()
事件钩子时,该属性将设置为 True
。
在 2.0.5 版本中进行了更改:修复了一个问题,允许 PostgreSQL psycopg
和 psycopg2
驱动程序,以及所有 MySQL 驱动程序,在连接池“预检”操作期间正确参与 DialectEvents.handle_error()
事件钩子;先前,这些驱动程序的实现是不工作的。
处理程序函数有两个选项,可以将 SQLAlchemy 构造的异常替换为用户定义的异常。它可以直接引发这个新异常,这样所有后续的事件监听器都会被绕过,并且在适当的清理后引发异常:
@event.listens_for(Engine, "handle_error") def handle_exception(context): if isinstance(context.original_exception, psycopg2.OperationalError) and \ "failed" in str(context.original_exception): raise MySpecialException("failed operation")
警告
因为 DialectEvents.handle_error()
事件特别提供了将异常重新抛出为失败语句引发的最终异常,如果用户定义的事件处理程序本身失败并抛出意外异常,则堆栈跟踪将是误导性的;堆栈跟踪可能不会显示实际失败的代码行!建议在此处小心编码,并在发生意外异常时使用日志记录和/或内联调试。
或者,可以使用“链接”风格的事件处理,通过使用retval=True
修饰符配置处理程序,并从函数返回新的异常实例。在这种情况下,事件处理将继续到下一个处理程序。可使用ExceptionContext.chained_exception
获取“链接”异常:
@event.listens_for(Engine, "handle_error", retval=True) def handle_exception(context): if context.chained_exception is not None and \ "special" in context.chained_exception.message: return MySpecialException("failed", cause=context.chained_exception)
返回None
的处理程序可以在链中使用;当处理程序返回None
时,如果有的话,前一个异常实例将保持为传递给下一个处理程序的当前异常。
当引发或返回自定义异常时,SQLAlchemy 将原样引发此新异常,不会被任何 SQLAlchemy 对象包装。如果异常不是sqlalchemy.exc.StatementError
的子类,则某些功能可能不可用;目前包括 ORM 在自动刷新过程中引发异常时添加有关“自动刷新”的详细提示的功能。
参数:
context – 一个ExceptionContext
对象。有关所有可用成员的详细信息,请参阅此类。
另请参见
支持断开连接场景的新数据库错误代码
SqlAlchemy 2.0 中文文档(四十六)(7)https://developer.aliyun.com/article/1563041