使用事件跟踪查询、对象和会话更改
SQLAlchemy 特色是一个广泛的事件监听系统,贯穿于核心和 ORM 中。在 ORM 中,有各种各样的事件监听器钩子,这些钩子在 ORM 事件的 API 级别有文档记录。这些事件的集合多年来已经增长,包括许多非常有用的新事件,以及一些曾经不那么相关的旧事件。本节将尝试介绍主要的事件钩子以及它们何时可能被使用。
执行事件
从版本 1.4 新增:现在Session
提供了一个单一全面的钩子,用于拦截 ORM 代表进行的所有 SELECT 语句,以及大量的 UPDATE 和 DELETE 语句。这个钩子取代了以前的QueryEvents.before_compile()
事件以及QueryEvents.before_compile_update()
和QueryEvents.before_compile_delete()
。
Session
提供了一个全面的系统,通过该系统,通过Session.execute()
方法调用的所有查询,包括由Query
发出的所有 SELECT 语句以及代表列和关系加载器发出的所有 SELECT 语句,都可以被拦截和修改。该系统使用SessionEvents.do_orm_execute()
事件钩子以及ORMExecuteState
对象来表示事件状态。
基本查询拦截
SessionEvents.do_orm_execute()
首先对查询的任何拦截都是有用的,包括由 Query
以 1.x 风格 发出的查询,以及当 ORM 启用的 2.0 风格 下传递给 Session.execute()
的 select()
、update()
或 delete()
构造。ORMExecuteState
构造提供了访问器,以允许修改语句、参数和选项:
Session = sessionmaker(engine) @event.listens_for(Session, "do_orm_execute") def _do_orm_execute(orm_execute_state): if orm_execute_state.is_select: # add populate_existing for all SELECT statements orm_execute_state.update_execution_options(populate_existing=True) # check if the SELECT is against a certain entity and add an # ORDER BY if so col_descriptions = orm_execute_state.statement.column_descriptions if col_descriptions[0]["entity"] is MyEntity: orm_execute_state.statement = statement.order_by(MyEntity.name)
上述示例说明了对 SELECT 语句进行的一些简单修改。在这个层次上,SessionEvents.do_orm_execute()
事件钩子旨在取代之前使用的 QueryEvents.before_compile()
事件,该事件对各种加载器的各种类型并不一致地触发;此外,QueryEvents.before_compile()
仅适用于与 Query
的 1.x 风格 一起使用,而不适用于 2.0 风格 下使用 Session.execute()
。
添加全局的 WHERE / ON 条件
最常请求的查询扩展功能之一是向所有查询中的所有实体添加 WHERE 条件的能力。这可以通过使用 with_loader_criteria()
查询选项来实现,该选项可以单独使用,也可以在 SessionEvents.do_orm_execute()
事件中使用:
from sqlalchemy.orm import with_loader_criteria Session = sessionmaker(engine) @event.listens_for(Session, "do_orm_execute") def _do_orm_execute(orm_execute_state): if ( orm_execute_state.is_select and not orm_execute_state.is_column_load and not orm_execute_state.is_relationship_load ): orm_execute_state.statement = orm_execute_state.statement.options( with_loader_criteria(MyEntity.public == True) )
在上述内容中,为所有 SELECT 语句添加了一个选项,该选项将限制针对MyEntity
的所有查询以在public == True
上进行过滤。该条件将应用于立即查询范围内该类的所有加载。with_loader_criteria()
选项默认情况下还会自动传播到关系加载程序,这将应用于后续的关系加载,包括惰性加载,selectinloads 等。
对于一系列具有某些共同列结构的类,如果使用声明性混合来组合类,那么混合类本身可以与with_loader_criteria()
选项结合使用,通过使用 Python lambda 来使用。 Python lambda 将针对与条件匹配的特定实体在查询编译时调用。假设一系列基于名为HasTimestamp
的混合物的类:
import datetime class HasTimestamp: timestamp = mapped_column(DateTime, default=datetime.datetime.now) class SomeEntity(HasTimestamp, Base): __tablename__ = "some_entity" id = mapped_column(Integer, primary_key=True) class SomeOtherEntity(HasTimestamp, Base): __tablename__ = "some_entity" id = mapped_column(Integer, primary_key=True)
上述类SomeEntity
和SomeOtherEntity
将分别具有一个timestamp
列,其默认值为当前日期和时间。可以使用事件拦截所有扩展自HasTimestamp
并过滤其timestamp
列的对象,使其日期不晚于一个月前:
@event.listens_for(Session, "do_orm_execute") def _do_orm_execute(orm_execute_state): if ( orm_execute_state.is_select and not orm_execute_state.is_column_load and not orm_execute_state.is_relationship_load ): one_month_ago = datetime.datetime.today() - datetime.timedelta(months=1) orm_execute_state.statement = orm_execute_state.statement.options( with_loader_criteria( HasTimestamp, lambda cls: cls.timestamp >= one_month_ago, include_aliases=True, ) )
警告
在调用with_loader_criteria()
时使用 lambda 仅被调用一次每个唯一类。在此 lambda 内部不应调用自定义函数。请参阅使用 Lambda 将重要的速度增益添加到语句生成以获取“lambda SQL”功能的概述,该功能仅用于高级用途。
另请参阅
ORM 查询事件 - 包括上述with_loader_criteria()
配方的工作示例。 ### 重新执行语句
深度炼金术
语句重新执行功能涉及稍微复杂的递归序列,并旨在解决将 SQL 语句的执行重新路由到各种非 SQL 上下文的相当困难的问题。下面链接的“狗窝缓存”和“水平分片”的双例应该用作指导,以确定何时适合使用此相当高级的功能。
ORMExecuteState
能够控制给定语句的执行;这包括能力要么根本不调用语句,而是返回一个从缓存中检索到的预先构建的结果集,要么调用相同的语句多次,每次使用不同的状态,例如对多个数据库连接调用它,然后在内存中合并结果。这两种高级模式都在 SQLAlchemy 的示例套件中有详细说明。
在SessionEvents.do_orm_execute()
事件钩子内部时,可以使用ORMExecuteState.invoke_statement()
方法来使用新的嵌套调用Session.execute()
来调用语句,这将抢占当前正在进行的执行的后续处理,而是返回内部执行返回的Result
。因此,在此过程中已调用的SessionEvents.do_orm_execute()
钩子的事件处理程序也将在此嵌套调用中被跳过。
ORMExecuteState.invoke_statement()
方法返回一个Result
对象;然后该对象具有冻结为可缓存格式和解冻为新的Result
对象的能力,以及将其数据与其他Result
对象的数据合并的能力。
例如,使用SessionEvents.do_orm_execute()
来实现缓存:
from sqlalchemy.orm import loading cache = {} @event.listens_for(Session, "do_orm_execute") def _do_orm_execute(orm_execute_state): if "my_cache_key" in orm_execute_state.execution_options: cache_key = orm_execute_state.execution_options["my_cache_key"] if cache_key in cache: frozen_result = cache[cache_key] else: frozen_result = orm_execute_state.invoke_statement().freeze() cache[cache_key] = frozen_result return loading.merge_frozen_result( orm_execute_state.session, orm_execute_state.statement, frozen_result, load=False, )
当上述钩子生效时,使用缓存的示例如下:
stmt = ( select(User).where(User.name == "sandy").execution_options(my_cache_key="key_sandy") ) result = session.execute(stmt)
上面的例子中,自定义的执行选项被传递给Select.execution_options()
,以建立一个“缓存键”,然后该键将被SessionEvents.do_orm_execute()
钩子拦截。然后,这个缓存键将与可能存在于缓存中的FrozenResult
对象进行匹配,并且如果存在,则重新使用该对象。该示例利用了Result.freeze()
方法来“冻结”一个包含 ORM 结果的Result
对象,以便它可以被存储在缓存中并多次使用。为了从“冻结”结果中返回一个活动结果,使用merge_frozen_result()
函数将结果对象中的“冻结”数据合并到当前会话中。
上面的例子在 Dogpile 缓存中作为一个完整的例子实现。
ORMExecuteState.invoke_statement()
方法也可以被多次调用,传递不同的信息给ORMExecuteState.invoke_statement.bind_arguments
参数,以便Session
每次都使用不同的Engine
对象。这将每次返回一个不同的Result
对象;这些结果可以使用Result.merge()
方法合并在一起。这是水平分片扩展所采用的技术;请查看源代码以熟悉它。
另请参阅
Dogpile 缓存
水平分片 ## 持久化事件
可能是最广泛使用的一系列事件是“持久化”事件,它们对应于刷新过程。刷新是所有关于待处理对象更改的决定都会被做出,并以 INSERT、UPDATE 和 DELETE 语句的形式发送到数据库的地方。
before_flush()
SessionEvents.before_flush()
钩子是当应用程序希望在提交刷新时确保额外的持久性更改被执行时最常用的事件。 使用 SessionEvents.before_flush()
来验证对象的状态并在持久化之前组合其他对象和引用。在此事件中,可以安全地操纵会话的状态,即可以附加新对象,删除对象,并且可以自由更改对象上的单个属性,这些更改将在事件钩子完成时被纳入刷新过程中。
典型的 SessionEvents.before_flush()
钩子将被指示扫描集合 Session.new
、Session.dirty
和 Session.deleted
,以查找将要发生更改的对象。
有关 SessionEvents.before_flush()
的示例,请参见具有历史表的版本控制和使用时间行进行版本控制等示例。
after_flush()
SessionEvents.after_flush()
钩子在刷新过程的 SQL 被生成之后,但在被刷新的对象状态被更改之前调用。也就是说,您仍然可以检查 Session.new
、Session.dirty
和 Session.deleted
集合,以查看刚刷新的内容,并且还可以使用像 AttributeState
这样的历史跟踪功能来查看刚刚持久化的更改。在 SessionEvents.after_flush()
事件中,可以根据观察到的更改向数据库发送额外的 SQL。
after_flush_postexec()
SessionEvents.after_flush_postexec()
在SessionEvents.after_flush()
之后不久调用,但是在对象状态已经被修改以考虑刚刚发生的刷新之后调用。Session.new
、Session.dirty
和 Session.deleted
集合通常在此时完全为空。使用 SessionEvents.after_flush_postexec()
检查最终对象的标识映射,并可能发出附加的 SQL。在这个钩子中,有能力对对象进行新的更改,这意味着 Session
再次进入“脏”状态;Session
的机制会导致如果在此钩子中检测到新的更改,那么再次刷新如果在 Session.commit()
的上下文中调用了刷新;否则,待定更改将作为下一个正常刷新的一部分进行捆绑。当钩子在 Session.commit()
中检测到新的更改时,一个计数器确保在每次调用时,如果 SessionEvents.after_flush_postexec()
钩子持续添加新状态以刷新,则此方面的无限循环在 100 次迭代后停止。
映射器级刷新事件
除了刷新级别的钩子外,还有一套更精细的钩子,这些钩子更加细致,因为它们是基于每个对象调用的,并且根据刷新过程中的 INSERT、UPDATE 或 DELETE 进行分组。这些是映射器持久性钩子,它们也非常受欢迎,但是需要更加谨慎地对待这些事件,因为它们在已经进行的刷新过程的上下文中进行;在这里进行许多操作是不安全的。
这些事件是:
MapperEvents.before_insert()
MapperEvents.after_insert()
MapperEvents.before_update()
MapperEvents.after_update()
MapperEvents.before_delete()
MapperEvents.after_delete()
注意
需要注意的是,这些事件仅适用于会话刷新操作,而不适用于在 ORM-启用的 INSERT、UPDATE 和 DELETE 语句中描述的 ORM 级别的 INSERT/UPDATE/DELETE 功能。要拦截 ORM 级别的 DML,请使用SessionEvents.do_orm_execute()
事件。
每个事件都会传递Mapper
、映射对象本身以及用于发出 INSERT、UPDATE 或 DELETE 语句的Connection
。这些事件的吸引力显而易见,因为如果应用程序想要将某些活动绑定到特定类型的对象在 INSERT 时被持久化的时间,钩子就非常具体;不像SessionEvents.before_flush()
事件,不需要搜索诸如Session.new
之类的集合以找到目标。然而,当调用这些事件时,表示完整列表的刷新计划,即将发出的每个单独的 INSERT、UPDATE、DELETE 语句已经已经决定,在这个阶段不允许进行任何更改。因此,甚至对于给定对象的其他属性也只能进行局部更改。对对象或其他对象的任何其他更改将影响Session
的状态,这将导致其无法正常运行。
在这些映射器级持久性事件中不支持的操作包括:
Session.add()
Session.delete()
- 映射集合追加、添加、移除、删除、丢弃等操作。
- 映射关系属性设置/删除事件,即
someobject.related = someotherobject
传递Connection
的原因是鼓励在这里进行简单的 SQL 操作,直接在Connection
上进行,例如在日志表中递增计数器或插入额外行。
也有许多每个对象操作根本不需要在刷新事件中处理。最常见的替代方法是在对象的__init__()
方法中简单地建立额外的状态,例如创建要与新对象关联的其他对象。使用 Simple Validators 中描述的验证器是另一种方法;这些函数可以拦截属性的更改,并在响应属性更改时在目标对象上建立额外的状态更改。使用这两种方法,对象在到达刷新步骤之前就处于正确的状态。## 对象生命周期事件
事件的另一个用例是跟踪对象的生命周期。这指的是首次介绍于 Quickie Intro to Object States 的状态。
所有上述状态都可以完全通过事件进行跟踪。每个事件代表着一个独立的状态转换,意味着起始状态和目标状态都是被跟踪的一部分。除了初始的瞬态事件之外,所有事件都是以Session
对象或类的形式出现的,这意味着它们可以与特定的Session
对象关联:
from sqlalchemy import event from sqlalchemy.orm import Session session = Session() @event.listens_for(session, "transient_to_pending") def object_is_pending(session, obj): print("new pending: %s" % obj)
或者使用Session
类本身,以及特定的sessionmaker
,这可能是最有用的形式:
from sqlalchemy import event from sqlalchemy.orm import sessionmaker maker = sessionmaker() @event.listens_for(maker, "transient_to_pending") def object_is_pending(session, obj): print("new pending: %s" % obj)
监听器当然可以堆叠在一个函数上,这很可能是常见的情况。例如,要跟踪所有进入持久状态的对象:
@event.listens_for(maker, "pending_to_persistent") @event.listens_for(maker, "deleted_to_persistent") @event.listens_for(maker, "detached_to_persistent") @event.listens_for(maker, "loaded_as_persistent") def detect_all_persistent(session, instance): print("object is now persistent: %s" % instance)
瞬态
所有映射对象在首次构建时都是瞬态的。在这种状态下,对象独立存在,不与任何Session
关联。对于这种初始状态,没有特定的“转换”事件,因为没有Session
,但是如果想要拦截任何瞬态对象被创建时,InstanceEvents.init()
方法可能是最好的事件。此事件应用于特定类或超类。例如,要拦截特定声明基类的所有新对象:
from sqlalchemy.orm import DeclarativeBase from sqlalchemy import event class Base(DeclarativeBase): pass @event.listens_for(Base, "init", propagate=True) def intercept_init(instance, args, kwargs): print("new transient: %s" % instance)
瞬态到待定
当瞬态对象首次通过Session.add()
或Session.add_all()
方法与Session
关联时,瞬态对象变为待定。对象也可能作为引用对象的“级联”结果成为Session
的一部分,该引用对象是显式添加的。使用SessionEvents.transient_to_pending()
事件检测瞬态到待定的转换过程:
@event.listens_for(sessionmaker, "transient_to_pending") def intercept_transient_to_pending(session, object_): print("transient to pending: %s" % object_)
待定到持久化
当一个刷新操作进行并且为实例执行 INSERT 语句时,待定对象变为持久化。该对象现在具有标识键。使用SessionEvents.pending_to_persistent()
事件跟踪待定到持久化的过程:
@event.listens_for(sessionmaker, "pending_to_persistent") def intercept_pending_to_persistent(session, object_): print("pending to persistent: %s" % object_)
待定到瞬态
如果在待定对象被刷新之前调用Session.rollback()
方法,或者在刷新对象之前调用Session.expunge()
方法,则待定对象可以回退到瞬态状态。使用SessionEvents.pending_to_transient()
事件跟踪待定到瞬态的过程:
@event.listens_for(sessionmaker, "pending_to_transient") def intercept_pending_to_transient(session, object_): print("transient to pending: %s" % object_)
作为持久化加载
当对象从数据库加载时,它们可以直接进入 Session
中的 persistent 状态。跟踪此状态转换等同于跟踪对象加载的方式,并且等同于使用 InstanceEvents.load()
实例级事件。但是,SessionEvents.loaded_as_persistent()
事件作为一个会话中心的钩子提供,用于拦截通过这种特定方式进入持久化状态的对象:
@event.listens_for(sessionmaker, "loaded_as_persistent") def intercept_loaded_as_persistent(session, object_): print("object loaded into persistent state: %s" % object_)
持久化到瞬时
如果针对对象首次作为待处理对象添加的事务调用了 Session.rollback()
方法,持久化对象可以恢复到瞬时状态。在 ROLLBACK 的情况下,将该对象持久化的 INSERT 语句回滚,并将对象从 Session
中驱逐,使其再次成为瞬时状态。使用 SessionEvents.persistent_to_transient()
事件钩子跟踪从持久化恢复为瞬时的对象:
@event.listens_for(sessionmaker, "persistent_to_transient") def intercept_persistent_to_transient(session, object_): print("persistent to transient: %s" % object_)
持久化到删除
当在 flush 过程中从数据库中删除了标记为删除的对象时,持久化对象进入 deleted 状态。请注意,这与调用 Session.delete()
方法删除目标对象时并不相同。Session.delete()
方法只是将对象标记为删除;直到 flush 进行之后才会发出实际的 DELETE 语句。在 flush 进行之后,目标对象的“deleted”状态才存在。
在“deleted”状态中,对象与 Session
仅有轻微关联。它既不在标识映射中,也不在指示其曾待删除的 Session.deleted
集合中。
从“deleted”状态,当事务提交时,对象可以进入分离状态,或者如果事务被回滚,则可以重新进入持久化状态。
使用 SessionEvents.persistent_to_deleted()
跟踪持久化到删除的转换:
@event.listens_for(sessionmaker, "persistent_to_deleted") def intercept_persistent_to_deleted(session, object_): print("object was DELETEd, is now in deleted state: %s" % object_)
已删除到已分离
当会话的事务提交时,已删除对象将变为分离。在调用 Session.commit()
方法后,数据库事务已完成,Session
现在完全丢弃了已删除对象并删除了所有与其相关的关联。使用 SessionEvents.deleted_to_detached()
跟踪已删除到分离的转换:
@event.listens_for(sessionmaker, "deleted_to_detached") def intercept_deleted_to_detached(session, object_): print("deleted to detached: %s" % object_)
注意
当对象处于已删除状态时,InstanceState.deleted
属性,可使用 inspect(object).deleted
访问,将返回 True。但是,当对象分离时,InstanceState.deleted
将再次返回 False。要检测对象是否已删除,无论它是否已分离,请使用 InstanceState.was_deleted
访问器。
持久到分离
当对象与 Session
解除关联时,通过 Session.expunge()
、Session.expunge_all()
或 Session.close()
方法,持久对象将变为分离状态。
注意
如果一个对象的拥有者 Session
被应用程序解除引用并由于垃圾回收而被丢弃,该对象也可能会隐式分离。在这种情况下,不会发出任何事件。
使用 SessionEvents.persistent_to_detached()
事件跟踪对象从持久状态转为分离状态:
@event.listens_for(sessionmaker, "persistent_to_detached") def intercept_persistent_to_detached(session, object_): print("object became detached: %s" % object_)
分离到持久
当分离对象使用 Session.add()
或等效方法重新关联到会话时,它将变为持久对象。跟踪对象从分离状态返回持久状态时使用 SessionEvents.detached_to_persistent()
事件:
@event.listens_for(sessionmaker, "detached_to_persistent") def intercept_detached_to_persistent(session, object_): print("object became persistent again: %s" % object_)
已删除到持久
当删除对象在其所在的事务被回滚时,可以将其恢复为持久状态,使用Session.rollback()
方法回滚。使用SessionEvents.deleted_to_persistent()
事件跟踪将删除的对象移回持久状态:
@event.listens_for(sessionmaker, "deleted_to_persistent") def intercept_deleted_to_persistent(session, object_): print("deleted to persistent: %s" % object_) ```## 事务事件 事务事件允许应用在`Session`级别上发生事务边界时收到通知,以及当`Session`更改`Connection`对象上的事务状态时。 + `SessionEvents.after_transaction_create()`,`SessionEvents.after_transaction_end()` - 这些事件跟踪`Session`的逻辑事务范围,不特定于单个数据库连接。这些事件旨在帮助集成事务跟踪系统,如`zope.sqlalchemy`。当应用程序需要将某些外部范围与`Session`的事务范围对齐时,请使用这些事件。这些挂钩反映了`Session`的“嵌套”事务行为,因为它们跟踪逻辑“子事务”以及“嵌套”(例如,SAVEPOINT)事务。 + `SessionEvents.before_commit()`、`SessionEvents.after_commit()`、`SessionEvents.after_begin()`、`SessionEvents.after_rollback()`、`SessionEvents.after_soft_rollback()` - 这些事件允许从数据库连接的角度跟踪事务事件。特别是`SessionEvents.after_begin()`是一个每个连接的事件;一个维护多个连接的`Session`将为每个连接在当前事务中使用时单独发出此事件。然后回滚和提交事件指的是 DBAPI 连接自身直接接收回滚或提交指令的时候。 ## 属性更改事件 属性更改事件允许拦截对象上特定属性被修改的时机。这些事件包括`AttributeEvents.set()`、`AttributeEvents.append()`和`AttributeEvents.remove()`。这些事件非常有用,特别是对于每个对象的验证操作;然而,使用“验证器”钩子通常更加方便,它在幕后使用这些钩子;请参阅 Simple Validators 以了解背景信息。属性事件也是反向引用机制的基础。一个说明属性事件使用的示例在 Attribute Instrumentation 中。 ## 执行事件 新版本 1.4 中新增:`Session`现在具有一个全面的钩子,旨在拦截所有代表 ORM 执行的 SELECT 语句以及批量 UPDATE 和 DELETE 语句。这个钩子取代了之前的`QueryEvents.before_compile()`事件以及`QueryEvents.before_compile_update()`和`QueryEvents.before_compile_delete()`。 `Session`具有一个全面的系统,通过该系统可以拦截和修改通过`Session.execute()`方法调用的所有查询,其中包括由`Query`发出的所有 SELECT 语句以及所有代表列和关系加载程序发出的 SELECT 语句。该系统利用了`SessionEvents.do_orm_execute()`事件钩子以及`ORMExecuteState`对象来表示事件状态。 ### 基本查询拦截 `SessionEvents.do_orm_execute()`首先对查询的任何拦截都是有用的,这包括由`Query`发出的 1.x 风格以及当 ORM 启用的 2.0 风格的`select()`,`update()`或`delete()`构造被传递给`Session.execute()`时。`ORMExecuteState`构造提供了访问器,允许修改语句、参数和选项: ```py Session = sessionmaker(engine) @event.listens_for(Session, "do_orm_execute") def _do_orm_execute(orm_execute_state): if orm_execute_state.is_select: # add populate_existing for all SELECT statements orm_execute_state.update_execution_options(populate_existing=True) # check if the SELECT is against a certain entity and add an # ORDER BY if so col_descriptions = orm_execute_state.statement.column_descriptions if col_descriptions[0]["entity"] is MyEntity: orm_execute_state.statement = statement.order_by(MyEntity.name)
上面的示例说明了对 SELECT 语句的一些简单修改。在这个级别上,SessionEvents.do_orm_execute()
事件钩子旨在替换以前对各种加载器不一致触发的 QueryEvents.before_compile()
事件的使用;此外,QueryEvents.before_compile()
仅适用于 1.x 样式 与 Query
一起使用,并不适用于 2.0 样式 与 Session.execute()
一起使用。
添加全局 WHERE / ON 条件
最常请求的查询扩展功能之一是能够向所有查询中的所有实体添加 WHERE 条件。通过使用 with_loader_criteria()
查询选项,可以实现此目的,该选项可以单独使用,或者最好在 SessionEvents.do_orm_execute()
事件中使用:
from sqlalchemy.orm import with_loader_criteria Session = sessionmaker(engine) @event.listens_for(Session, "do_orm_execute") def _do_orm_execute(orm_execute_state): if ( orm_execute_state.is_select and not orm_execute_state.is_column_load and not orm_execute_state.is_relationship_load ): orm_execute_state.statement = orm_execute_state.statement.options( with_loader_criteria(MyEntity.public == True) )
上面,所有 SELECT 语句都添加了一个选项,将限制针对 MyEntity
的所有查询,以在 public == True
上进行过滤。这些条件将应用于立即查询范围内该类的所有加载。with_loader_criteria()
选项默认情况下也会自动传播到关系加载器,这将应用于后续的关系加载,包括延迟加载、selectinloads 等。
对于一系列具有某些共同列结构的类,如果这些类使用 declarative mixin 进行组合,那么 mixin 类本身可以与 with_loader_criteria()
选项结合使用,方法是使用 Python lambda。Python lambda 将在查询编译时针对符合条件的特定实体被调用。假设有一系列基于名为 HasTimestamp
的 mixin 的类:
import datetime class HasTimestamp: timestamp = mapped_column(DateTime, default=datetime.datetime.now) class SomeEntity(HasTimestamp, Base): __tablename__ = "some_entity" id = mapped_column(Integer, primary_key=True) class SomeOtherEntity(HasTimestamp, Base): __tablename__ = "some_entity" id = mapped_column(Integer, primary_key=True)
上述类 SomeEntity
和 SomeOtherEntity
将分别具有一个默认为当前日期和时间的列 timestamp
。可以使用事件拦截从 HasTimestamp
扩展的所有对象,并在一个月前之内的日期上过滤它们的 timestamp
列:
@event.listens_for(Session, "do_orm_execute") def _do_orm_execute(orm_execute_state): if ( orm_execute_state.is_select and not orm_execute_state.is_column_load and not orm_execute_state.is_relationship_load ): one_month_ago = datetime.datetime.today() - datetime.timedelta(months=1) orm_execute_state.statement = orm_execute_state.statement.options( with_loader_criteria( HasTimestamp, lambda cls: cls.timestamp >= one_month_ago, include_aliases=True, ) )
警告
在调用with_loader_criteria()
时使用 lambda 只会每个唯一类调用一次。在此 lambda 内部不应调用自定义函数。有关“lambda SQL”功能的概述,请参阅使用 Lambda 为语句生成带来显著速度提升,这仅适用于高级用法。
另请参阅
ORM 查询事件 - 包括上述with_loader_criteria()
示例的工作示例。### 重新执行语句
深度炼金术
重新执行功能涉及稍微复杂的递归序列,并旨在解决能够将 SQL 语句的执行重新路由到各种非 SQL 上下文的相当困难的问题。下面链接的“狗窝缓存”和“水平分片”这两个示例应该作为指导,指出何时适合使用这个相当高级的功能。
ORMExecuteState
能够控制给定语句的执行;这包括不执行语句的能力,允许从缓存中检索到的预构建结果集返回,以及多次以不同状态调用相同语句的能力,例如针对多个数据库连接调用它,然后在内存中合并结果。这两种高级模式在 SQLAlchemy 的示例套件中有详细展示。
当在SessionEvents.do_orm_execute()
事件钩子内部时,可以使用ORMExecuteState.invoke_statement()
方法来使用新的嵌套调用Session.execute()
来调用语句,这将预先中断当前正在进行的执行的后续处理,而是返回内部执行返回的Result
。在此过程中为SessionEvents.do_orm_execute()
钩子调用的事件处理程序也将在此嵌套调用中被跳过。
ORMExecuteState.invoke_statement()
方法返回一个Result
对象;该对象具有将其“冻结”为可缓存格式并“解冻”为新的Result
对象的能力,以及将其数据与其他Result
对象的数据合并的能力。
例如,使用SessionEvents.do_orm_execute()
来实现缓存:
from sqlalchemy.orm import loading cache = {} @event.listens_for(Session, "do_orm_execute") def _do_orm_execute(orm_execute_state): if "my_cache_key" in orm_execute_state.execution_options: cache_key = orm_execute_state.execution_options["my_cache_key"] if cache_key in cache: frozen_result = cache[cache_key] else: frozen_result = orm_execute_state.invoke_statement().freeze() cache[cache_key] = frozen_result return loading.merge_frozen_result( orm_execute_state.session, orm_execute_state.statement, frozen_result, load=False, )
有了上述钩子,使用缓存的示例如下:
stmt = ( select(User).where(User.name == "sandy").execution_options(my_cache_key="key_sandy") ) result = session.execute(stmt)
以上,在Select.execution_options()
中传递了一个自定义执行选项,以建立一个“缓存键”,然后会被SessionEvents.do_orm_execute()
钩子拦截。这个缓存键然后会与可能存在于缓存中的FrozenResult
对象匹配,如果存在,则会重新使用该对象。该示例利用了Result.freeze()
方法来“冻结”一个Result
对象,其中将包含 ORM 结果,以便将其存储在缓存中并多次使用。为了从“冻结”结果中返回一个实时结果,使用merge_frozen_result()
函数将结果对象中的“冻结”数据合并到当前会话中。
上述示例在 Dogpile Caching 中作为一个完整示例实现。
ORMExecuteState.invoke_statement()
方法也可以被多次调用,传递不同的信息给 ORMExecuteState.invoke_statement.bind_arguments
参数,以便 Session
每次都使用不同的 Engine
对象。每次都会返回一个不同的 Result
对象;这些结果可以使用 Result.merge()
方法合并在一起。这是 水平分片 扩展所使用的技术;请查看源代码以熟悉。
另请参阅
Dogpile Caching
水平分片
SqlAlchemy 2.0 中文文档(二十五)(2)https://developer.aliyun.com/article/1560576