SqlAlchemy 2.0 中文文档(二十)(2)https://developer.aliyun.com/article/1560348
类签名
类 sqlalchemy.orm.AliasedClass
(sqlalchemy.inspection.Inspectable
, sqlalchemy.orm.ORMColumnsClauseRole
)
class sqlalchemy.orm.util.AliasedInsp
为AliasedClass
对象提供检查接口。
给定一个AliasedClass
,使用inspect()
函数将返回一个AliasedInsp
对象:
from sqlalchemy import inspect from sqlalchemy.orm import aliased my_alias = aliased(MyMappedClass) insp = inspect(my_alias)
AliasedInsp
上的属性包括:
entity
- 表示的AliasedClass
。mapper
- 映射底层类的Mapper
。selectable
- 最终表示别名Table
或Select
构造的Alias
构造。name
- 别名的名称。 当从Query
中的结果元组返回时,也用作属性名称。with_polymorphic_mappers
- 指示在AliasedClass
的 select 构造中表达的所有这些映射器的集合。polymorphic_on
- 一个备用的列或 SQL 表达式,将用作多态加载的“辨别器”。
另请参阅
运行时检查 API
类签名
类sqlalchemy.orm.AliasedInsp
(sqlalchemy.orm.ORMEntityColumnsClauseRole
, sqlalchemy.orm.ORMFromClauseRole
, sqlalchemy.sql.cache_key.HasCacheKey
, sqlalchemy.orm.base.InspectionAttr
, sqlalchemy.util.langhelpers.MemoizedSlots
, sqlalchemy.inspection.Inspectable
, typing.Generic
)
class sqlalchemy.orm.Bundle
由Query
在一个命名空间下返回的 SQL 表达式的分组。
Bundle
基本上允许对列导向的Query
对象返回的基于元组的结果进行嵌套。 它还可以通过简单的子类化进行扩展,其中要覆盖的主要能力是如何返回表达式集,允许后处理以及自定义返回类型,而不涉及 ORM 身份映射的类。
另请参阅
使用 Bundles 分组选择的属性
成员
init(), c, columns, create_row_processor(), is_aliased_class, is_bundle, is_clause_element, is_mapper, label(), single_entity
类签名
类sqlalchemy.orm.Bundle
(sqlalchemy.orm.ORMColumnsClauseRole
, sqlalchemy.sql.annotation.SupportsCloneAnnotations
, sqlalchemy.sql.cache_key.MemoizedHasCacheKey
, sqlalchemy.inspection.Inspectable
, sqlalchemy.orm.base.InspectionAttr
)。
method __init__(name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any)
构造一个新的Bundle
。
例如:
bn = Bundle("mybundle", MyClass.x, MyClass.y) for row in session.query(bn).filter( bn.c.x == 5).filter(bn.c.y == 4): print(row.mybundle.x, row.mybundle.y)
参数:
name
– bundle 的名称。*exprs
– 组成 bundle 的列或 SQL 表达式。single_entity=False
– 如果为 True,则此Bundle
的行可以作为“单个实体”返回,而不是在与映射实体相同的元组中。
attribute c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
Bundle.columns
的别名。
attribute columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
被此Bundle
引用的 SQL 表达式的命名空间。
例如:
bn = Bundle("mybundle", MyClass.x, MyClass.y) q = sess.query(bn).filter(bn.c.x == 5)
也支持 bundle 的嵌套:
b1 = Bundle("b1", Bundle('b2', MyClass.a, MyClass.b), Bundle('b3', MyClass.x, MyClass.y) ) q = sess.query(b1).filter( b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
另请参阅
Bundle.c
method create_row_processor(query: Select[Any], procs: Sequence[Callable[[Row[Any]], Any]], labels: Sequence[str]) → Callable[[Row[Any]], Any]
为此Bundle
生成“行处理”函数。
可以被子类覆盖以在获取结果时提供自定义行为。该方法在查询执行时传递给语句对象和一组“行处理”函数;这些处理函数在给定结果行时将返回单个属性值,然后可以将其调整为任何返回数据结构。
下面的示例说明了将通常的Row
返回结构替换为直接的 Python 字典:
from sqlalchemy.orm import Bundle class DictBundle(Bundle): def create_row_processor(self, query, procs, labels): 'Override create_row_processor to return values as dictionaries' def proc(row): return dict( zip(labels, (proc(row) for proc in procs)) ) return proc
上述Bundle
的结果将返回字典值:
bn = DictBundle('mybundle', MyClass.data1, MyClass.data2) for row in session.execute(select(bn)).where(bn.c.data1 == 'd1'): print(row.mybundle['data1'], row.mybundle['data2'])
attribute is_aliased_class = False
如果此对象是AliasedClass
的实例,则为 True。
attribute is_bundle = True
如果此对象是Bundle
的实例,则为 True。
attribute is_clause_element = False
如果此对象是ClauseElement
的实例,则为 True。
attribute is_mapper = False
如果此对象是Mapper
的实例,则为 True。
method label(name)
提供一个传递了新标签的 Bundle
的副本。
attribute single_entity = Fals
如果为 True,则对于单个 Bundle 的查询将返回为单个实体,而不是在一个键元组中的元素。
function sqlalchemy.orm.with_loader_criteria(entity_or_base: _EntityType[Any], where_criteria: _ColumnExpressionArgument[bool] | Callable[[Any], _ColumnExpressionArgument[bool]], loader_only: bool = False, include_aliases: bool = False, propagate_to_loaders: bool = True, track_closure_variables: bool = True) → LoaderCriteriaOption
为所有特定实体的加载添加额外的 WHERE 条件。
新版本 1.4 中的新功能。
with_loader_criteria()
选项旨在向查询中的特定类型的实体添加限制条件,全局地,这意味着它将应用于实体在 SELECT 查询中的出现方式以及任何子查询、连接条件和关系加载中,包括急切加载和延迟加载器,而无需在查询的任何特定部分中指定它。呈现逻辑使用与单表继承相同的系统来确保某个鉴别器应用于表。
例如,使用 2.0 样式 查询,我们可以限制 User.addresses
集合的加载方式,而不管所使用的加载类型是什么:
from sqlalchemy.orm import with_loader_criteria stmt = select(User).options( selectinload(User.addresses), with_loader_criteria(Address, Address.email_address != 'foo')) )
上面,“selectinload” 对于 User.addresses
将把给定的过滤条件应用于 WHERE 子句。
另一个示例,其中过滤将应用于连接的 ON 子句,在此示例中使用 1.x 样式 查询:
q = session.query(User).outerjoin(User.addresses).options( with_loader_criteria(Address, Address.email_address != 'foo')) )
with_loader_criteria()
的主要目的是在 SessionEvents.do_orm_execute()
事件处理程序中使用它,以确保对特定实体的所有出现方式都以某种方式进行过滤,例如针对访问控制角色的过滤。它还可以用于将条件应用于关系加载。在下面的示例中,我们可以将一定的规则应用于特定 Session
发出的所有查询:
session = Session(bind=engine) @event.listens_for("do_orm_execute", session) def _add_filtering_criteria(execute_state): if ( execute_state.is_select and not execute_state.is_column_load and not execute_state.is_relationship_load ): execute_state.statement = execute_state.statement.options( with_loader_criteria( SecurityRole, lambda cls: cls.role.in_(['some_role']), include_aliases=True ) )
在上面的示例中,SessionEvents.do_orm_execute()
事件将拦截使用 Session
发出的所有查询。对于那些是 SELECT 语句且不是属性或关系加载的查询,会向查询中添加自定义 with_loader_criteria()
选项。with_loader_criteria()
选项将在给定的语句中使用,并将自动传播到所有从该查询继承的关系加载。
给定的 criteria 参数是一个接受 cls
参数的 lambda
。给定的类将扩展为包括所有映射的子类,它本身不需要是一个映射的类。
提示
在与contains_eager()
加载选项一起使用with_loader_criteria()
选项时,重要的是要注意with_loader_criteria()
仅影响决定渲染的 SQL 的部分查询,即 WHERE 和 FROM 子句。contains_eager()
选项不影响 SELECT 语句在列子句之外的渲染,因此与with_loader_criteria()
选项没有任何交互。然而,事情的“运作方式”是,contains_eager()
旨在与某种方式已经从附加实体中进行选择的查询一起使用,其中with_loader_criteria()
可以应用其附加条件。
在下面的示例中,假设有一个映射关系为A -> A.bs -> B
,给定的with_loader_criteria()
选项将影响 JOIN 的呈现方式:
stmt = select(A).join(A.bs).options( contains_eager(A.bs), with_loader_criteria(B, B.flag == 1) )
给定的with_loader_criteria()
选项将影响由.join(A.bs)
指定的 JOIN 的 ON 子句,因此被如预期般应用。contains_eager()
选项的作用是将 B 的列添加到列子句中:
SELECT b.id, b.a_id, b.data, b.flag, a.id AS id_1, a.data AS data_1 FROM a JOIN b ON a.id = b.a_id AND b.flag = :flag_1
在上述语句中使用contains_eager()
选项对with_loader_criteria()
选项的行为没有影响。如果省略了contains_eager()
选项,那么 SQL 在 FROM 和 WHERE 子句方面的情况将与with_loader_criteria()
继续将其条件添加到 JOIN 的 ON 子句中一样。contains_eager()
的添加仅影响列子句,即添加了针对 b 的附加列,然后 ORM 将其消耗以产生 B 实例。
警告
在调用with_loader_criteria()
时使用的 lambda 仅会被每个唯一类调用一次。自定义函数不应在此 lambda 内部调用。有关“lambda SQL”功能的概述,请参阅使用 Lambda 将语句生成速度提升到显著水平,该功能仅适用于高级用途。
参数:
entity_or_base
- 一个映射类,或者是一组特定映射类的超类,适用于该规则。where_criteria
-
一个应用限制条件的核心 SQL 表达式。当给定类是具有许多不同映射子类的基类时,这也可以是“lambda:”或 Python 函数,它接受目标类作为参数。
注意
为了支持 pickle,应使用模块级别的 Python 函数来生成 SQL 表达式,而不是 lambda 或固定的 SQL 表达式,后者倾向于不可 pickle。include_aliases
- 如果为 True,则也将规则应用于aliased()
构造。propagate_to_loaders
-
默认为 True,适用于诸如延迟加载器之类的关系加载器。这表示选项对象本身包括 SQL 表达式与每个加载的实例一起传递。将其设置为False
可防止将对象分配给单个实例。
另请参阅
ORM 查询事件 - 包括使用with_loader_criteria()
的示例。
添加全局 WHERE / ON 条件 - 将with_loader_criteria()
与SessionEvents.do_orm_execute()
事件相结合的基本示例。track_closure_variables
-
当 False 时,lambda 表达式内部的闭包变量将不会用作任何缓存键的一部分。这允许在 lambda 表达式内部使用更复杂的表达式,但需要 lambda 确保每次给定特定类时都返回相同的 SQL。
从版本 1.4.0b2 开始新增。
function sqlalchemy.orm.join(left: _FromClauseArgument, right: _FromClauseArgument, onclause: _OnClauseArgument | None = None, isouter: bool = False, full: bool = False) → _ORMJoin
生成左右子句之间的内部连接。
join()
是对由 join()
提供的核心连接接口的扩展,其中左右可选择的对象不仅可以是核心可选择对象(如 Table
),还可以是映射类或 AliasedClass
实例。“on” 子句可以是 SQL 表达式,也可以是引用已配置的 relationship()
的 ORM 映射属性。
join()
在现代用法中不常用,因为其功能已封装在 Select.join()
和 Query.join()
方法中。这些方法的自动化程度远远超出了 join()
本身。在启用 ORM 的 SELECT 语句中明确使用 join()
,需要使用 Select.select_from()
方法,示例如下:
from sqlalchemy.orm import join stmt = select(User).\ select_from(join(User, Address, User.addresses)).\ filter(Address.email_address=='foo@bar.com')
在现代 SQLAlchemy 中,以上连接可以更简洁地写为:
stmt = select(User).\ join(User.addresses).\ filter(Address.email_address=='foo@bar.com')
警告
直接使用 join()
可能无法与现代 ORM 选项(如 with_loader_criteria()
)正常工作。强烈建议在创建 ORM 连接时使用诸如 Select.join()
和 Select.join_from()
等方法提供的惯用连接模式。
另请参阅
连接 - 在 ORM 查询指南中了解惯用 ORM 连接模式的背景知识
function sqlalchemy.orm.outerjoin(left: _FromClauseArgument, right: _FromClauseArgument, onclause: _OnClauseArgument | None = None, full: bool = False) → _ORMJoin
在左右子句之间产生一个左外连接。
这是 join()
函数的“外连接”版本,功能与其相同,只是生成了一个 OUTER JOIN。请参阅该函数的文档以获取其他用法细节。
function sqlalchemy.orm.with_parent(instance: object, prop: attributes.QueryableAttribute[Any], from_entity: _EntityType[Any] | None = None) → ColumnElement[bool]
创建将此查询的主实体与给定的相关实例相关联的过滤条件,使用已建立的 relationship()
配置。
例如:
stmt = select(Address).where(with_parent(some_user, User.addresses))
渲染的 SQL 与在给定父对象上的属性上触发惰性加载器时渲染的 SQL 相同,这意味着适当的状态从 Python 中的父对象中获取,而不需要在渲染的语句中渲染到父表的连接。
给定的属性也可以使用 PropComparator.of_type()
来指示条件的左侧:
a1 = aliased(Address) a2 = aliased(Address) stmt = select(a1, a2).where( with_parent(u1, User.addresses.of_type(a2)) )
上述用法等同于使用 from_entity()
参数:
a1 = aliased(Address) a2 = aliased(Address) stmt = select(a1, a2).where( with_parent(u1, User.addresses, from_entity=a2) )
参数:
instance
- 具有一些relationship()
的实例。property
- 类绑定属性,指示应从实例使用哪个关系来协调父/子关系。from_entity
-
要考虑为左侧的实体。这默认为Query
本身的“零”实体。
版本 1.2 中的新功能。### Populate Existing
populate_existing
执行选项确保,对于加载的所有行,Session
中对应的实例将被完全刷新 - 擦除对象中的任何现有数据(包括未决更改),并用从结果加载的数据替换。
示例用法如下:
>>> stmt = select(User).execution_options(populate_existing=True) >>> result = session.execute(stmt) SELECT user_account.id, user_account.name, user_account.fullname FROM user_account ...
通常,ORM 对象只加载一次,如果它们与后续结果行中的主键匹配,那么该行不会应用于对象。这既是为了保留对象上未决的未刷新更改,也是为了避免刷新已经存在的数据的开销和复杂性。Session
假定高度隔离的事务的默认工作模型,并且在事务中预计数据会在本地更改之外发生变化的程度上,这些用例将使用显式步骤来处理,例如这种方法。
使用 populate_existing
,任何与查询匹配的对象集合都可以刷新,并且还允许控制关系加载器选项。例如,刷新一个实例同时刷新一组相关对象:
stmt = ( select(User) .where(User.name.in_(names)) .execution_options(populate_existing=True) .options(selectinload(User.addresses)) ) # will refresh all matching User objects as well as the related # Address objects users = session.execute(stmt).scalars().all()
populate_existing
的另一个用例是支持各种属性加载功能,可以根据每个查询的情况更改如何加载属性。适用于此选项的选项包括:
with_expression()
选项PropComparator.and_()
方法可以修改加载策略加载的内容contains_eager()
选项with_loader_criteria()
选项load_only()
选项用于选择要刷新的属性
populate_existing
执行选项等同于 1.x 风格 ORM 查询中的Query.populate_existing()
方法。
另请参阅
我正在使用我的 Session 重新加载数据,但它没有看到我在其他地方提交的更改 - 在常见问题解答中
刷新/过期 - 在 ORM Session
文档中
自动刷新
当传递为False
时,此选项将导致Session
不调用“autoflush”步骤。这相当于使用Session.no_autoflush
上下文管理器来禁用自动刷新:
>>> stmt = select(User).execution_options(autoflush=False) >>> session.execute(stmt) SELECT user_account.id, user_account.name, user_account.fullname FROM user_account ...
此选项也适用于启用 ORM 的Update
和Delete
查询。
autoflush
执行选项等同于 1.x 风格 ORM 查询中的Query.autoflush()
方法。
另请参阅
刷新
使用 Yield Per 获取大型结果集
yield_per
执行选项是一个整数值,它将导致Result
一次仅缓冲有限数量的行和/或 ORM 对象,然后再将数据提供给客户端。
通常,ORM 会立即获取所有行,为每一行构建 ORM 对象,并将这些对象组装到一个单一缓冲区中,然后将此缓冲区传递给Result
对象作为要返回的行的来源。这种行为的理由是为了允许诸如连接的急切加载、结果的唯一化以及依赖于标识映射在获取时为结果集中的每个对象保持一致状态的结果处理逻辑等功能的正确行为。
yield_per
选项的目的是改变这种行为,使得 ORM 结果集对于迭代非常大的结果集(例如 > 10K 行)进行了优化,其中用户已确定上述模式不适用。当使用 yield_per
时,ORM 将把 ORM 结果分批到子集合中,并在迭代 Result
对象时,从每个子集合中分别产生行,这样 Python 解释器就不需要声明非常大的内存区域,这既耗时又导致内存使用过多。该选项影响数据库游标的使用方式,以及 ORM 构造行和对象以传递给 Result
的方式。
提示
由上可见,Result
必须以可迭代的方式被消耗,即使用迭代(如 for row in result
)或使用部分行方法(如 Result.fetchmany()
或 Result.partitions()
)。调用 Result.all()
将使使用 yield_per
的目的失败。
使用 yield_per
相当于同时利用了 Connection.execution_options.stream_results
执行选项,如果支持的话,选择使用后端使用服务器端游标,并且在返回的 Result
对象上使用 Result.yield_per()
方法,它建立了要提取的行的固定大小,以及一次构造多少个 ORM 对象的相应限制。
提示
yield_per
现在也作为一个 Core 执行选项可用,详细描述请参阅 使用服务器端游标(又名流式结果)。本节详细介绍了将 yield_per
作为 ORM Session
的执行选项使用的方法。该选项在两种情境下尽可能地行为相似。
当与 ORM 一起使用时,yield_per
必须通过给定语句上的 Executable.execution_options()
方法或通过将其传递给 Session.execute()
或其他类似的 Session
方法的 Session.execute.execution_options
参数来建立。如下是用于获取 ORM 对象的典型用法:
>>> stmt = select(User).execution_options(yield_per=10) >>> for user_obj in session.scalars(stmt): ... print(user_obj) SELECT user_account.id, user_account.name, user_account.fullname FROM user_account [...] () User(id=1, name='spongebob', fullname='Spongebob Squarepants') User(id=2, name='sandy', fullname='Sandy Cheeks') ... >>> # ... rows continue ...
上述代码等同于下面的示例,该示例使用了 Connection.execution_options.stream_results
和 Connection.execution_options.max_row_buffer
核心级别的执行选项,结合 Result.yield_per()
方法:
# equivalent code >>> stmt = select(User).execution_options(stream_results=True, max_row_buffer=10) >>> for user_obj in session.scalars(stmt).yield_per(10): ... print(user_obj) SELECT user_account.id, user_account.name, user_account.fullname FROM user_account [...] () User(id=1, name='spongebob', fullname='Spongebob Squarepants') User(id=2, name='sandy', fullname='Sandy Cheeks') ... >>> # ... rows continue ...
yield_per
也常与 Result.partitions()
方法结合使用,该方法将在分组分区中迭代行。每个分区的大小默认为传递给 yield_per
的整数值,如下例所示:
>>> stmt = select(User).execution_options(yield_per=10) >>> for partition in session.scalars(stmt).partitions(): ... for user_obj in partition: ... print(user_obj) SELECT user_account.id, user_account.name, user_account.fullname FROM user_account [...] () User(id=1, name='spongebob', fullname='Spongebob Squarepants') User(id=2, name='sandy', fullname='Sandy Cheeks') ... >>> # ... rows continue ...
yield_per
执行选项不兼容于使用集合时的 “子查询”急切加载 或 “连接”急切加载。如果数据库驱动程序支持多个独立的游标,则它可能与 “select in”急切加载 兼容。
此外,yield_per
执行选项不兼容于 Result.unique()
方法;因为该方法依赖于为所有行存储完整的标识集,这必然会破坏使用 yield_per
的目的,即处理任意大量的行。
在版本 1.4.6 中更改:当使用Result
对象获取 ORM 行时,如果同时使用Result.unique()
过滤器以及yield_per
执行选项,则会引发异常。
当使用传统的Query
对象进行 1.x style ORM 使用时,Query.yield_per()
方法将与yield_per
执行选项具有相同的结果。
另请参阅
使用服务器端游标(又名流式结果)
SqlAlchemy 2.0 中文文档(二十)(4)https://developer.aliyun.com/article/1560352