SqlAlchemy 2.0 中文文档(十二)(5)

简介: SqlAlchemy 2.0 中文文档(十二)

SqlAlchemy 2.0 中文文档(十二)(4)https://developer.aliyun.com/article/1562928


带窗口函数的行限制关系

另一个关系到 AliasedClass 对象的有趣用例是关系需要连接到任何形式的专门 SELECT 时。一种情况是当需要使用窗口函数时,例如限制返回的行数。下面的示例说明了一个非主映射器关系,该关系将为每个集合加载前十个项目:

class A(Base):
    __tablename__ = "a"
    id = mapped_column(Integer, primary_key=True)
class B(Base):
    __tablename__ = "b"
    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))
partition = select(
    B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()
partitioned_b = aliased(B, partition)
A.partitioned_bs = relationship(
    partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)

我们可以使用上述 partitioned_bs 关系与大多数加载器策略,例如 selectinload():

for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
    print(a1.partitioned_bs)  # <-- will be no more than ten objects

在上面的例子中,“selectinload”查询如下所示:

SELECT
  a_1.id  AS  a_1_id,  anon_1.id  AS  anon_1_id,  anon_1.a_id  AS  anon_1_a_id,
  anon_1.data  AS  anon_1_data,  anon_1.index  AS  anon_1_index
FROM  a  AS  a_1
JOIN  (
  SELECT  b.id  AS  id,  b.a_id  AS  a_id,  b.data  AS  data,
  row_number()  OVER  (PARTITION  BY  b.a_id  ORDER  BY  b.id)  AS  index
  FROM  b)  AS  anon_1
ON  anon_1.a_id  =  a_1.id  AND  anon_1.index  <  %(index_1)s
WHERE  a_1.id  IN  (  ...  primary  key  collection  ...)
ORDER  BY  a_1.id

在上面的例子中,对于“a”中的每个匹配的主键,我们将按照“b.id”的顺序获取前十个“bs”。通过在“a_id”上分区,我们确保每个“行号”都局限于父“a_id”。

这样的映射通常还会包括从“A”到“B”的“普通”关系,用于持久性操作以及当需要“A”每个对象的完整集合时。

构建查询可用的属性

非常雄心勃勃的自定义连接条件可能无法直接持久化,有些情况下甚至可能无法正确加载。要消除持久性方程式的部分,使用标志relationship.viewonlyrelationship()上,将其建立为只读属性(写入到集合的数据将在 flush()时被忽略)。但是,在极端情况下,请考虑与Query一起使用常规的 Python 属性,如下所示:

class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)
    @property
    def addresses(self):
        return object_session(self).query(Address).with_parent(self).filter(...).all()

在其他情况下,可以构建描述符来利用现有的 Python 数据。有关更一般的 Python 属性的特殊讨论,请参阅使用描述符和混合部分。

另请参阅

使用描述符和混合

使用 viewonly 关系参数的注意事项

当应用于relationship()构造时,relationship.viewonly参数指示此relationship()不会参与任何  ORM 工作单元操作,此外,该属性也不会参与其表示的集合的 Python 变异。这意味着虽然 viewonly 关系可能引用可变的  Python 集合,如列表或集合,但对在映射实例上存在的该列表或集合进行更改对 ORM flush 过程没有影响。

要探索这种情景,请考虑以下映射:

from __future__ import annotations
import datetime
from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
    pass
class User(Base):
    __tablename__ = "user_account"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]
    all_tasks: Mapped[list[Task]] = relationship()
    current_week_tasks: Mapped[list[Task]] = relationship(
        primaryjoin=lambda: and_(
            User.id == Task.user_account_id,
            # this expression works on PostgreSQL but may not be supported
            # by other database engines
            Task.task_date >= func.now() - datetime.timedelta(days=7),
        ),
        viewonly=True,
    )
class Task(Base):
    __tablename__ = "task"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    user: Mapped[User] = relationship(back_populates="current_week_tasks")

以下各节将说明此配置的不同方面。

在 Python 中,包括 backrefs 在内的变异操作不适用于 viewonly=True

上述映射针对User.current_week_tasks视图关系,作为Task.user属性的 backref 目标。目前,SQLAlchemy 的 ORM 配置过程尚未标记此项,但这是配置错误。更改Task上的.user属性不会影响.current_week_tasks属性:

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]

还有另一个称为relationship.sync_backrefs的参数,可以在这里打开,以允许在这种情况下对.current_week_tasks进行突变,但是这并不被认为是viewonly关系的最佳实践,其不应该依赖于 Python 中的突变。

在此映射中,可以在User.all_tasksTask.user之间配置反向引用,因为它们都不是viewonly并且将正常同步。

除了禁用viewonly关系的反向引用突变之外,Python 中对User.all_tasks集合的普通更改也不会反映在User.current_week_tasks集合中,直到更改已刷新到数据库中。

总的来说,对于一个自定义集合应该立即响应 Python 中突变的用例,viewonly关系通常不合适。更好的方法是使用 SQLAlchemy 的 Hybrid Attributes 功能,或者仅对于实例化的情况使用 Python 的@property,在这种情况下,可以实现一个用户定义的集合,该集合是以当前 Python 实例为基础生成的。要将我们的示例更改为这种工作方式,我们修复Task.user上的relationship.back_populates参数,以引用User.all_tasks,然后说明一个简单的@property,该@property将以User.all_tasks集合的立即结果的形式提供结果:

class User(Base):
    __tablename__ = "user_account"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]
    all_tasks: Mapped[list[Task]] = relationship(back_populates="user")
    @property
    def current_week_tasks(self) -> list[Task]:
        past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
        return [t for t in self.all_tasks if t.task_date >= past_seven_days]
class Task(Base):
    __tablename__ = "task"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    user: Mapped[User] = relationship(back_populates="all_tasks")

每次在 Python 中计算的集合,我们都能保证始终得到正确的答案,而无需使用数据库:

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]

viewonly=True的集合/属性在过期之前不会被重新查询

对于原始的viewonly属性,如果我们确实对User.all_tasks集合进行了更改,那么在个事件发生后,viewonly集合才能显示这些更改的最终结果。第一个是将User.all_tasks的更改 flushed,以便新数据在数据库中可用,至少在本地事务范围内可用。第二个是User.current_week_tasks属性被 expired,并通过新的 SQL 查询重新加载到数据库。

为了支持这一要求,使用的最简单的流程是只在主要是只读操作中使用viewonly关系。比如下面,如果我们从数据库中检索到一个新的User,集合将是当前的:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]

当我们对u1.all_tasks进行修改时,如果我们希望在u1.current_week_tasks的 viewonly 关系中看到这些更改反映出来,这些更改需要被刷新,并且u1.current_week_tasks属性需要过期,以便在下次访问时懒加载。这样做的最简单方法是使用Session.commit(),保持Session.expire_on_commit参数设置为其默认值True

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.commit()
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]

上述,对Session.commit()的调用将更改刷新到数据库的u1.all_tasks,然后过期所有对象,因此当我们访问u1.current_week_tasks时,将从数据库中新鲜地获取此属性的内容,触发了一个:term:懒加载

要拦截操作而不实际提交事务,必须首先显式地过期属性。这样做的简单方法就是直接调用它。在下面的例子中,Session.flush()发送挂起的更改到数据库,然后使用Session.expire()来过期u1.current_week_tasks集合,以便在下次访问时重新获取:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

实际上,我们可以跳过对Session.flush()的调用,假设Session保持其默认值为TrueSession.autoflush,因为过期的current_week_tasks属性将在过期后在访问时触发自动刷新:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

进一步发展上述方法,我们可以在相关的User.all_tasks集合发生变化时,通过事件钩子来以编程方式应用过期。这是一种高级技术,应该首先检查像@property这样的更简单的架构或者坚持只读用例。在我们的简单示例中,配置如下:

from sqlalchemy import event, inspect
@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

使用上述钩子,突变操作被拦截,并导致User.current_week_tasks集合自动过期:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

上面使用的AttributeEvents事件钩子也会被后向引用突变触发,因此通过上面的钩子也会拦截对Task.user的更改:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]

使用 viewonly=True 的 In-Python 突变不合适

上述映射将User.current_week_tasks视图关系作为Task.user属性的 backref 目标。这目前并未被 SQLAlchemy 的 ORM 配置过程标记,但这是一个配置错误。更改Task上的.user属性不会影响.current_week_tasks属性:

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]

这里还有另一个参数叫做relationship.sync_backrefs,可以在这里打开,允许在这种情况下对.current_week_tasks进行变异,然而这并不被认为是最佳实践,对于只读关系,不应依赖于 Python 中的变异。

在这种映射中,可以在User.all_tasksTask.user之间配置反向引用,因为这两者都不是只读的,将正常同步。

除了对于只读关系禁用反向引用变异的问题外,Python 中对User.all_tasks集合的普通更改也不会反映在User.current_week_tasks集合中,直到更改已刷新到数据库。

总的来说,对于一个自定义集合应立即响应 Python 中的变异的用例,只读关系通常不合适。更好的方法是使用 SQLAlchemy 的 Hybrid Attributes 功能,或者对于仅实例情况,使用 Python 的@property,其中可以实现以当前 Python 实例为基础生成的用户定义集合。要将我们的示例更改为这种方式工作,我们修复Task.user上的relationship.back_populates参数,引用User.all_tasks,然后演示一个简单的@property,将以即时User.all_tasks集合的结果为基础提供结果。

class User(Base):
    __tablename__ = "user_account"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]
    all_tasks: Mapped[list[Task]] = relationship(back_populates="user")
    @property
    def current_week_tasks(self) -> list[Task]:
        past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
        return [t for t in self.all_tasks if t.task_date >= past_seven_days]
class Task(Base):
    __tablename__ = "task"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    user: Mapped[User] = relationship(back_populates="all_tasks")

使用每次都在 Python 中计算的即时集合,我们保证始终具有正确答案,而无需使用数据库:

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]

viewonly=True 集合/属性在过期之前不会重新查询

继续使用原始的只读属性,如果实际上对持久对象上的User.all_tasks集合进行更改,那么只有在发生两个事情之后,只读集合才能显示这种更改的净结果。第一是刷新对User.all_tasks的更改,以便新数据在数据库中可用,至少在本地事务范围内。第二是User.current_week_tasks属性被过期并通过对数据库的新 SQL 查询重新加载。

为了支持这个要求,使用最简单的流程是仅在主要是只读操作中使用仅视图关系。比如,如果我们从数据库中获取一个新的User,那么集合将是当前的:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]

当我们对u1.all_tasks进行修改时,如果我们希望在u1.current_week_tasks视图关系中看到这些更改的反映,这些更改需要被刷新,并且u1.current_week_tasks属性需要过期,这样它将在下一次访问时进行延迟加载。最简单的方法是使用Session.commit(),保持Session.expire_on_commit参数设置为其默认值True

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.commit()
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]

上面,对Session.commit()的调用将更改刷新到数据库中,然后使所有对象过期,这样当我们访问u1.current_week_tasks时,一个:term:延迟加载会发生,从数据库中重新获取该属性的内容。

要拦截操作而不实际提交事务,需要首先显式地将属性过期。一个简单的方法是直接调用它。在下面的示例中,Session.flush()将挂起的更改发送到数据库,然后使用Session.expire()使u1.current_week_tasks集合过期,以便在下一次访问时重新获取:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

我们实际上可以跳过对Session.flush()的调用,假设一个保持Session.autoflush值为默认值TrueSession,因为过期的current_week_tasks属性在过期后被访问时会触发自动刷新:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

继续上述方法到更复杂的内容,当相关的User.all_tasks集合发生变化时,我们可以在程序上应用过期,使用 event hooks。这是一种高级技术,应该首先检查简单的体系结构,比如@property或者坚持只读用例。在我们的简单示例中,这将配置为:

from sqlalchemy import event, inspect
@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

有了上述钩子,变更操作将被拦截,并导致User.current_week_tasks集合自动过期:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

上面使用的AttributeEvents事件钩子也会被反向引用的变化触发,因此通过上述钩子,对Task.user的更改也会被拦截:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]

8711b90ec0>, <main.Task object at 0x7f8711b90a10>]

上面,对`Session.commit()`的调用将更改刷新到数据库中,然后使所有对象过期,这样当我们访问`u1.current_week_tasks`时,一个:term:`延迟加载`会发生,从数据库中重新获取该属性的内容。
要拦截操作而不实际提交事务,需要首先显式地将属性过期。一个简单的方法是直接调用它。在下面的示例中,`Session.flush()`将挂起的更改发送到数据库,然后使用`Session.expire()`使`u1.current_week_tasks`集合过期,以便在下一次访问时重新获取:
```py
>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

我们实际上可以跳过对Session.flush()的调用,假设一个保持Session.autoflush值为默认值TrueSession,因为过期的current_week_tasks属性在过期后被访问时会触发自动刷新:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

继续上述方法到更复杂的内容,当相关的User.all_tasks集合发生变化时,我们可以在程序上应用过期,使用 event hooks。这是一种高级技术,应该首先检查简单的体系结构,比如@property或者坚持只读用例。在我们的简单示例中,这将配置为:

from sqlalchemy import event, inspect
@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

有了上述钩子,变更操作将被拦截,并导致User.current_week_tasks集合自动过期:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

上面使用的AttributeEvents事件钩子也会被反向引用的变化触发,因此通过上述钩子,对Task.user的更改也会被拦截:

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]
相关文章
|
2月前
|
SQL 数据库 Python
SqlAlchemy 2.0 中文文档(十一)(3)
SqlAlchemy 2.0 中文文档(十一)
30 11
|
2月前
|
SQL 存储 数据库
SqlAlchemy 2.0 中文文档(十一)(4)
SqlAlchemy 2.0 中文文档(十一)
29 11
|
2月前
|
存储 SQL Python
SqlAlchemy 2.0 中文文档(十一)(5)
SqlAlchemy 2.0 中文文档(十一)
34 10
|
2月前
|
SQL API 数据库
SqlAlchemy 2.0 中文文档(十一)(1)
SqlAlchemy 2.0 中文文档(十一)
28 2
|
2月前
|
存储 SQL 数据库
SqlAlchemy 2.0 中文文档(十一)(2)
SqlAlchemy 2.0 中文文档(十一)
21 2
|
2月前
|
测试技术 Python 容器
SqlAlchemy 2.0 中文文档(十二)(2)
SqlAlchemy 2.0 中文文档(十二)
19 1
|
2月前
|
SQL 存储 关系型数据库
SqlAlchemy 2.0 中文文档(十二)(1)
SqlAlchemy 2.0 中文文档(十二)
15 1
|
2月前
|
测试技术 Python 容器
SqlAlchemy 2.0 中文文档(十二)(4)
SqlAlchemy 2.0 中文文档(十二)
19 1
|
2月前
|
SQL Python
SqlAlchemy 2.0 中文文档(十五)(5)
SqlAlchemy 2.0 中文文档(十五)
55 1
|
2月前
|
SQL Oracle 关系型数据库
SqlAlchemy 2.0 中文文档(十五)(1)
SqlAlchemy 2.0 中文文档(十五)
32 1