SqlAlchemy 2.0 中文文档(二十八)(3)

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB PostgreSQL 版,企业版 4核16GB
推荐场景:
HTAP混合负载
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: SqlAlchemy 2.0 中文文档(二十八)

SqlAlchemy 2.0 中文文档(二十八)(2)https://developer.aliyun.com/article/1560384


Asyncio 平台安装说明(包括 Apple M1)

asyncio 扩展仅支持 Python 3。它还依赖于greenlet库。这个依赖默认安装在常见的机器平���上,包括:

x86_64 aarch64 ppc64le amd64 win32

对于上述平台,已知greenlet提供预构建的 wheel 文件。对于其他平台,默认情况下不安装 greenlet;可以在Greenlet - Download Files查看当前的 greenlet 文件列表。请注意有许多架构被省略,包括 Apple M1

要安装 SQLAlchemy 并确保 greenlet 依赖存在,无论使用何种平台,可以按照以下方式安装 [asyncio] setuptools extra,这也会指示 pip 安装 greenlet:

pip install sqlalchemy[asyncio]

请注意,在没有预构建 wheel 文件的平台上安装 greenlet 意味着 greenlet 将从源代码构建,这要求 Python 的开发库也存在。

概要 - 核心

对于核心用途,create_async_engine() 函数创建一个AsyncEngine实例,然后提供传统Engine API 的异步版本。AsyncEngine通过其AsyncEngine.connect()AsyncEngine.begin()方法提供一个AsyncConnection,两者都提供异步上下文管理器。AsyncConnection然后可以使用AsyncConnection.execute()方法来执行语句,以提供一个缓冲的Result,或者使用AsyncConnection.stream()方法来提供一个流式的服务器端AsyncResult:

>>> import asyncio
>>> from sqlalchemy import Column
>>> from sqlalchemy import MetaData
>>> from sqlalchemy import select
>>> from sqlalchemy import String
>>> from sqlalchemy import Table
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> meta = MetaData()
>>> t1 = Table("t1", meta, Column("name", String(50), primary_key=True))
>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(meta.drop_all)
...         await conn.run_sync(meta.create_all)
...
...         await conn.execute(
...             t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
...         )
...
...     async with engine.connect() as conn:
...         # select a Result, which will be delivered with buffered
...         # results
...         result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
...
...         print(result.fetchall())
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()
>>> asyncio.run(async_main())
BEGIN  (implicit)
...
CREATE  TABLE  t1  (
  name  VARCHAR(50)  NOT  NULL,
  PRIMARY  KEY  (name)
)
...
INSERT  INTO  t1  (name)  VALUES  (?)
[...]  [('some name 1',),  ('some name 2',)]
COMMIT
BEGIN  (implicit)
SELECT  t1.name
FROM  t1
WHERE  t1.name  =  ?
[...]  ('some name 1',)
[('some name 1',)]
ROLLBACK 

上文提到的AsyncConnection.run_sync()方法可用于调用特殊的 DDL 函数,例如MetaData.create_all(),这些函数不包括可等待的挂钩。

提示

在使用AsyncEngine对象的范围超出上下文并被垃圾收集时,建议使用await调用AsyncEngine.dispose()方法,如上例中的async_main函数所示。这确保了连接池保持的任何连接都将在可等待的上下文中正确处理。与使用阻塞 IO 不同,SQLAlchemy 无法在像__del__或 weakref finalizer 之类的方法中正确处理这些连接,因为没有机会调用await。当引擎超出范围时未显式处理时,可能会导致发出到标准输出的警告,形式类似于垃圾收集中的RuntimeError: Event loop is closed

AsyncConnection还通过AsyncConnection.stream()方法提供了一个“流式”API,返回一个AsyncResult对象。此结果对象使用服务器端游标并提供异步/等待 API,例如异步迭代器:

async with engine.connect() as conn:
    async_result = await conn.stream(select(t1))
    async for row in async_result:
        print("row: %s" % (row,))

概述 - ORM

使用 2.0 风格查询,AsyncSession类提供完整的 ORM 功能。

在默认使用模式下,必须特别小心避免涉及 ORM 关系和列属性的延迟加载或其他已过期属性访问;下一节在使用 AsyncSession 时防止隐式 IO 详细说明了这一点。

警告

单个AsyncSession实例不适合在多个并发任务中使用。有关背景信息,请参阅使用 AsyncSession 处理并发任务和会话线程安全吗?AsyncSession 在并发任务中是否安全共享?部分。

下面的示例说明了包括映射器和会话配置在内的完整示例:

>>> from __future__ import annotations
>>> import asyncio
>>> import datetime
>>> from typing import List
>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy import func
>>> from sqlalchemy import select
>>> from sqlalchemy.ext.asyncio import AsyncAttrs
>>> from sqlalchemy.ext.asyncio import async_sessionmaker
>>> from sqlalchemy.ext.asyncio import AsyncSession
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from sqlalchemy.orm import DeclarativeBase
>>> from sqlalchemy.orm import Mapped
>>> from sqlalchemy.orm import mapped_column
>>> from sqlalchemy.orm import relationship
>>> from sqlalchemy.orm import selectinload
>>> class Base(AsyncAttrs, DeclarativeBase):
...     pass
>>> class B(Base):
...     __tablename__ = "b"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
...     data: Mapped[str]
>>> class A(Base):
...     __tablename__ = "a"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     data: Mapped[str]
...     create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
...     bs: Mapped[List[B]] = relationship()
>>> async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
...     async with async_session() as session:
...         async with session.begin():
...             session.add_all(
...                 [
...                     A(bs=[B(data="b1"), B(data="b2")], data="a1"),
...                     A(bs=[], data="a2"),
...                     A(bs=[B(data="b3"), B(data="b4")], data="a3"),
...                 ]
...             )
>>> async def select_and_update_objects(
...     async_session: async_sessionmaker[AsyncSession],
... ) -> None:
...     async with async_session() as session:
...         stmt = select(A).order_by(A.id).options(selectinload(A.bs))
...
...         result = await session.execute(stmt)
...
...         for a in result.scalars():
...             print(a, a.data)
...             print(f"created at: {a.create_date}")
...             for b in a.bs:
...                 print(b, b.data)
...
...         result = await session.execute(select(A).order_by(A.id).limit(1))
...
...         a1 = result.scalars().one()
...
...         a1.data = "new data"
...
...         await session.commit()
...
...         # access attribute subsequent to commit; this is what
...         # expire_on_commit=False allows
...         print(a1.data)
...
...         # alternatively, AsyncAttrs may be used to access any attribute
...         # as an awaitable (new in 2.0.13)
...         for b1 in await a1.awaitable_attrs.bs:
...             print(b1, b1.data)
>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     # async_sessionmaker: a factory for new AsyncSession objects.
...     # expire_on_commit - don't expire objects after transaction commit
...     async_session = async_sessionmaker(engine, expire_on_commit=False)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(Base.metadata.create_all)
...
...     await insert_objects(async_session)
...     await select_and_update_objects(async_session)
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()
>>> asyncio.run(async_main())
BEGIN  (implicit)
...
CREATE  TABLE  a  (
  id  INTEGER  NOT  NULL,
  data  VARCHAR  NOT  NULL,
  create_date  DATETIME  DEFAULT  (CURRENT_TIMESTAMP)  NOT  NULL,
  PRIMARY  KEY  (id)
)
...
CREATE  TABLE  b  (
  id  INTEGER  NOT  NULL,
  a_id  INTEGER  NOT  NULL,
  data  VARCHAR  NOT  NULL,
  PRIMARY  KEY  (id),
  FOREIGN  KEY(a_id)  REFERENCES  a  (id)
)
...
COMMIT
BEGIN  (implicit)
INSERT  INTO  a  (data)  VALUES  (?)  RETURNING  id,  create_date
[...]  ('a1',)
...
INSERT  INTO  b  (a_id,  data)  VALUES  (?,  ?)  RETURNING  id
[...]  (1,  'b2')
...
COMMIT
BEGIN  (implicit)
SELECT  a.id,  a.data,  a.create_date
FROM  a  ORDER  BY  a.id
[...]  ()
SELECT  b.a_id  AS  b_a_id,  b.id  AS  b_id,  b.data  AS  b_data
FROM  b
WHERE  b.a_id  IN  (?,  ?,  ?)
[...]  (1,  2,  3)
<A  object  at  ...>  a1
created  at:  ...
<B  object  at  ...>  b1
<B  object  at  ...>  b2
<A  object  at  ...>  a2
created  at:  ...
<A  object  at  ...>  a3
created  at:  ...
<B  object  at  ...>  b3
<B  object  at  ...>  b4
SELECT  a.id,  a.data,  a.create_date
FROM  a  ORDER  BY  a.id
LIMIT  ?  OFFSET  ?
[...]  (1,  0)
UPDATE  a  SET  data=?  WHERE  a.id  =  ?
[...]  ('new data',  1)
COMMIT
new  data
<B  object  at  ...>  b1
<B  object  at  ...>  b2 

在上面的示例中,使用可选的 async_sessionmaker 助手来实例化 AsyncSession,该助手提供了一个新 AsyncSession 对象的工厂,并带有一组固定的参数,其中包括将其与特定数据库 URL 关联起来的 AsyncEngine。然后将其传递给其他方法,在这些方法中,它可能会在 Python 异步上下文管理器中使用(即 async with: 语句),以便在块结束时自动关闭;这相当于调用 AsyncSession.close() 方法。

使用 AsyncSession 处理并发任务

AsyncSession 对象是一个可变的、有状态的对象,表示正在进行的单个、有状态的数据库事务。使用 asyncio 的并发任务,例如使用 asyncio.gather() 等 API,应该为每个单独的任务使用单独的 AsyncSession

有关在并发工作负载中如何使用 SessionAsyncSession 的一般描述,请参阅 Session 线程安全吗?AsyncSession 在并发任务中共享是否安全? 部分。### 使用 AsyncSession 时防止隐式 IO

使用传统的 asyncio,应用程序需要避免出现任何可能发生 IO-on-attribute 访问的点。可以用以下技术来帮助解决这个问题,其中许多技术在前面的示例中有所体现。

  • 惰性加载关系、延迟列或表达式的属性,或者在过期情况下访问的属性,可以利用 AsyncAttrs 混合类。当将此混合类添加到特定类或更一般地添加到声明性的 Base 超类时,它提供了一个访问器 AsyncAttrs.awaitable_attrs,它将任何属性作为可等待对象传递:
from __future__ import annotations
from typing import List
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import relationship
class Base(AsyncAttrs, DeclarativeBase):
    pass
class A(Base):
    __tablename__ = "a"
    # ... rest of mapping ...
    bs: Mapped[List[B]] = relationship()
class B(Base):
    __tablename__ = "b"
    # ... rest of mapping ...
  • 在不使用急切加载时,访问新加载的A实例上的A.bs集合通常会使用延迟加载,为了成功,通常会向数据库发出 IO,这在 asyncio 下会失败,因为不允许隐式 IO。在没有任何先前加载操作的情况下,在 asyncio 下直接访问此属性,该属性可以作为可等待对象访问,通过指定AsyncAttrs.awaitable_attrs前缀:
a1 = (await session.scalars(select(A))).one()
for b1 in await a1.awaitable_attrs.bs:
    print(b1)
  • AsyncAttrs mixin 提供了一个简洁的外观,覆盖了内部方法,该方法也被AsyncSession.run_sync()方法使用。
    版本 2.0.13 中的新功能。
    另请参见
    AsyncAttrs
  • 集合可以被替换为只写集合,这些集合永远不会隐式发出  IO,通过在 SQLAlchemy 2.0 中使用 Write Only Relationships  功能。使用此功能,集合永远不会被读取,只能通过显式 SQL 调用进行查询。查看 Asyncio Integration 部分中的示例async_orm_writeonly.py,展示了如何在 asyncio 中使用只写集合。
    当使用只写集合时,程序的行为在处理集合方面是简单且易于预测的。然而,缺点是没有任何内置系统可以一次性加载这些集合中的许多,而需要手动执行。因此,下面的许多要点涉及在使用传统的延迟加载关系与 asyncio 时需要更加小心的具体技术。
  • 如果不使用AsyncAttrs,关系可以声明为lazy="raise",这样默认情况下它们不会尝试发出 SQL。为了加载集合,将使用 eager loading。
  • 最有用的急切加载策略是selectinload()急切加载器,在前面的示例中被用来在await session.execute()调用的范围内急切加载A.bs集合:
stmt = select(A).options(selectinload(A.bs))
  • 当构建新对象时,集合总是被分配一个默认的空集合,比如上面的示例中的列表:
A(bs=[], data="a2")
  • 这使得在刷新A对象时,上述A对象上的.bs集合可以存在且可读;否则,当刷新A时,.bs将会被卸载,并在访问时引发错误。
  • AsyncSession配置为使用Session.expire_on_commit设置为 False,以便我们可以在调用AsyncSession.commit()后访问对象上的属性,就像在最后一行访问属性的地方一样:
# create AsyncSession with expire_on_commit=False
async_session = AsyncSession(engine, expire_on_commit=False)
# sessionmaker version
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
    result = await session.execute(select(A).order_by(A.id))
    a1 = result.scalars().first()
    # commit would normally expire all attributes
    await session.commit()
    # access attribute subsequent to commit; this is what
    # expire_on_commit=False allows
    print(a1.data)

其他指南包括:

  • 应该避免使用类似AsyncSession.expire()的方法,而应该使用AsyncSession.refresh()如果绝对需要过期。当使用 asyncio 时,通常不需要过期,因为Session.expire_on_commit应该通常设置为False
  • 可以在 asyncio 下显式加载延迟加载的关系,使用AsyncSession.refresh()如果所需的属性名称明确传递给Session.refresh.attribute_names,例如:
# assume a_obj is an A that has lazy loaded A.bs collection
a_obj = await async_session.get(A, [1])
# force the collection to load by naming it in attribute_names
await async_session.refresh(a_obj, ["bs"])
# collection is present
print(f"bs collection: {a_obj.bs}")
  • 当然,最好在一开始就使用急加载,以便在不需要延迟加载的情况下已经设置好集合。
    版本 2.0.4 中的新功能:增加了对AsyncSession.refresh()和底层Session.refresh()方法的支持,以强制加载延迟加载的关系,如果它们在Session.refresh.attribute_names参数中明确命名。在先前的版本中,即使在参数中命名,关系也会被静默跳过。
  • 避免使用 Cascades 中记录的all级联选项,而是明确列出所需的级联特性。all级联选项暗示了 refresh-expire 设置,这意味着AsyncSession.refresh()方法将使相关对象的属性过期,但不一定刷新这些相关对象,假设未在relationship()中配置急加载,将它们保持在过期状态。
  • 如果在deferred()列中使用了适当的加载选项,应该使用适当的加载选项,此外还应该注意relationship()结构。请参见限制使用延迟列加载的列以获取关于延迟列加载的背景信息。
  • 在动态关系加载器一节描述的“动态”关系加载器策略在默认情况下与 asyncio 方法不兼容。它只能在AsyncSession.run_sync()方法中直接调用,或者通过使用其.statement属性获取普通 select:
user = await session.get(User, 42)
addresses = (await session.scalars(user.addresses.statement)).all()
stmt = user.addresses.statement.where(Address.email_address.startswith("patrick"))
addresses_filter = (await session.scalars(stmt)).all()
  • 只写技术,在 SQLAlchemy 的 2.0 版本中引入,完全兼容 asyncio,并应该优先使用。
    另请参阅
    “动态”关系加载器被“只写”取代 - 迁移到 2.0 风格的注意事项
  • 如果在与不支持 RETURNING 的数据库(例如 MySQL 8)一起使用 asyncio,那么在刷新的新对象上将不会有服务器默认值,例如生成的时间戳,除非使用了Mapper.eager_defaults选项。在 SQLAlchemy 2.0 中,这种行为自动应用于像 PostgreSQL、SQLite 和 MariaDB 这样使用 RETURNING 来在插入行时获取新值的后端。### 在 asyncio 下运行同步方法和函数

深度炼金术

这种方法本质上是公开了 SQLAlchemy 能够首先提供 asyncio 接口的机制。虽然在技术上没有任何问题,但总的来说,这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些核心理念,即任何可能导致 IO 被调用的编程语句必须有一个await调用,以防程序不明确地指明每一行可能发生 IO 的位置。这种方法并没有改变这个一般的想法,只是允许一系列同步 IO 指令在函数调用的范围内豁免这个规则,基本上被捆绑成一个可等待的。

作为在 asyncio 事件循环中集成传统 SQLAlchemy “延迟加载” 的替代方法,提供了一个名为AsyncSession.run_sync()可选方法,它将运行任何 Python 函数在一个 greenlet 中,传统的同步编程概念将被转换为在到达数据库驱动程序时使用await。这里的一个假设方法是一个面向 asyncio 的应用程序可以将与数据库相关的方法打包成函数,这些函数使用AsyncSession.run_sync()调用。

修改上面的示例,如果我们不使用selectinload()来加载A.bs集合,我们可以在一个单独的函数中完成对这些属性访问的处理:

import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
def fetch_and_update_objects(session):
  """run traditional sync-style ORM code in a function that will be
 invoked within an awaitable.
 """
    # the session object here is a traditional ORM Session.
    # all features are available here including legacy Query use.
    stmt = select(A)
    result = session.execute(stmt)
    for a1 in result.scalars():
        print(a1)
        # lazy loads
        for b1 in a1.bs:
            print(b1)
    # legacy Query use
    a1 = session.query(A).order_by(A.id).first()
    a1.data = "new data"
async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)
    async with AsyncSession(engine) as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )
        await session.run_sync(fetch_and_update_objects)
        await session.commit()
    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()
asyncio.run(async_main())

在一个应用程序中运行某些函数在“同步”运行器中的上述方法与在一个基于事件的编程库(如gevent)上运行 SQLAlchemy 应用程序有一些相似之处。区别如下:

  1. 与使用gevent不同,我们可以继续使用标准的 Python asyncio 事件循环,或任何自定义事件循环,而无需集成到gevent事件循环中。
  2. 完全没有“monkeypatching”。上面的示例使用了一个真正的 asyncio 驱动程序,底层的 SQLAlchemy 连接池也使用 Python 内置的asyncio.Queue来池化连接。
  3. 程序可以自由地在 async/await 代码和使用同步代码的包含函数之间切换,几乎没有性能损失。没有“线程执行器”或任何额外的等待器或同步在使用。
  4. 底层网络驱动程序也使用纯 Python asyncio 概念,不使用geventeventlet提供的第三方网络库。### 与并发任务一起使用 AsyncSession

AsyncSession对象是一个可变的、有状态的对象,代表着正在进行的单个、有状态的数据库事务。使用 asyncio 进行并发任务,例如使用asyncio.gather()等 API,应该为每个单独的任务使用一个独立的AsyncSession

请参阅会话是否线程安全?AsyncSession 是否可以在并发任务中共享?部分,了解关于SessionAsyncSession在处理并发工作负载时应如何使用的一般描述。

使用 AsyncSession 时防止隐式 IO

使用传统的 asyncio,应用程序需要避免发生可能导致 IO-on-attribute 访问的任何点。下面列出的技术可以帮助实现这一点,其中许多在前面的示例中有所说明。

  • 延迟加载关系、延迟列或表达式,或在过期情况下访问的属性可以利用AsyncAttrs mixin。当将此 mixin 添加到特定类或更一般地添加到 Declarative Base超类时,会提供一个访问器AsyncAttrs.awaitable_attrs,将任何属性作为可等待对象传递:
from __future__ import annotations
from typing import List
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import relationship
class Base(AsyncAttrs, DeclarativeBase):
    pass
class A(Base):
    __tablename__ = "a"
    # ... rest of mapping ...
    bs: Mapped[List[B]] = relationship()
class B(Base):
    __tablename__ = "b"
    # ... rest of mapping ...
  • 在不使用急加载时,访问新加载实例A上的A.bs集合通常会使用延迟加载,为了成功,通常会向数据库发出 IO 请求,而在 asyncio 下会失败,因为不允许隐式 IO。要在 asyncio 下直接访问此属性而不需要任何先前的加载操作,可以通过指定AsyncAttrs.awaitable_attrs前缀将属性访问为可等待对象:
a1 = (await session.scalars(select(A))).one()
for b1 in await a1.awaitable_attrs.bs:
    print(b1)
  • AsyncAttrs mixin 提供了一个简洁的外观,也是AsyncSession.run_sync()方法使用的内部方法。
    版本 2.0.13 中的新功能。
    另请参见
    AsyncAttrs
  • 集合可以被只写集合替换,永远不会隐式发出 IO,通过在 SQLAlchemy 2.0 中使用只写关系功能。使用此功能,集合永远不会被读取,只能使用显式 SQL 调用进行查询。请参见 Asyncio Integration 部分中的示例async_orm_writeonly.py,演示了在 asyncio 中使用只写集合的示例。
    当使用只写集合时,程序在处理集合方面的行为简单且易于预测。然而,缺点是没有任何内置系统可以一次性加载许多这些集合,而是需要手动执行。因此,下面的许多要点涉及在使用传统的延迟加载关系与 asyncio 时需要更加小心的具体技术。
  • 如果不使用AsyncAttrs,可以使用lazy="raise"声明关系,这样默认情况下它们不会尝试发出 SQL。为了加载集合,将使用急加载。
  • 最有用的急加载策略是selectinload()急加载器,在前面的示例中用于在await session.execute()调用范围内急加载A.bs集合:
stmt = select(A).options(selectinload(A.bs))
  • 在构建新对象时,集合总是分配一个默认的空集合,例如上面示例中的列表:
A(bs=[], data="a2")
  • A对象被刷新时,允许上述A对象上的.bs集合存在并可读;否则,当A被刷新时,.bs将被卸载并在访问时引发错误。
  • AsyncSession配置为使用Session.expire_on_commit设置为 False,这样我们可以在调用AsyncSession.commit()后访问对象的属性,就像在最后一行访问属性时一样:
# create AsyncSession with expire_on_commit=False
async_session = AsyncSession(engine, expire_on_commit=False)
# sessionmaker version
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
    result = await session.execute(select(A).order_by(A.id))
    a1 = result.scalars().first()
    # commit would normally expire all attributes
    await session.commit()
    # access attribute subsequent to commit; this is what
    # expire_on_commit=False allows
    print(a1.data)

其他指南包括:

  • AsyncSession.expire()这样的方法应该避免使用,而应该使用AsyncSession.refresh()如果绝对需要过期。通常情况下不应该需要过期,因为在使用 asyncio 时通常应将Session.expire_on_commit设置为False
  • 使用AsyncSession.refresh()可以显式加载懒加载关系在 asyncio 下如果需要显式传递所需的属性名称给Session.refresh.attribute_names,例如:
# assume a_obj is an A that has lazy loaded A.bs collection
a_obj = await async_session.get(A, [1])
# force the collection to load by naming it in attribute_names
await async_session.refresh(a_obj, ["bs"])
# collection is present
print(f"bs collection: {a_obj.bs}")
  • 当然最好一开始就使用急加载,以便在不需要懒加载的情况下已经设置好集合。
    2.0.4 版本中新增:增加了对AsyncSession.refresh()和底层的Session.refresh()方法的支持,以强制延迟加载的关系加载,如果它们在Session.refresh.attribute_names参数中明确命名。在先前的版本中,即使在参数中命名,关系也会被静默跳过。
  • 避免使用 Cascades 文档中记录的all级联选项,而是明确列出所需的级联特性。all级联选项暗示了 refresh-expire 设置,这意味着AsyncSession.refresh()方法将使相关对象的属性过期,但不一定会刷新那些相关对象,假设未在relationship()中配置急加载,它们将保持在过期状态。
  • 如果使用deferred()列,应该使用适当的加载器选项,除了如上所述的relationship()构造。有关延迟列加载的背景,请参阅限制使用列延迟加载。
  • 在默认情况下,“动态”关系加载策略在动态关系加载器中描述,与 asyncio 方法不兼容。只有在AsyncSession.run_sync()方法中调用,或者通过使用其.statement属性获取正常的 select 语句,才能直接使用它:
user = await session.get(User, 42)
addresses = (await session.scalars(user.addresses.statement)).all()
stmt = user.addresses.statement.where(Address.email_address.startswith("patrick"))
addresses_filter = (await session.scalars(stmt)).all()
  • 仅写入技术,在 SQLAlchemy 的 2.0 版本中引入,与 asyncio 完全兼容,应该优先考虑使用。
    另请参见
    “动态”关系加载器被“仅写入”取代 - 迁移到 2.0 风格的注意事项
  • 如果在使用 asyncio 与不支持 RETURNING 的数据库(例如 MySQL 8)时,服务器默认值(例如生成的时间戳)将不会在新刷新的对象上可用,除非使用了Mapper.eager_defaults 选项。在 SQLAlchemy 2.0 中,这种行为会自动应用于像 PostgreSQL、SQLite 和 MariaDB 这样使用 RETURNING 在插入行时获取新值的后端。

在 asyncio 下运行同步方法和函数

深度合成

这种方法实质上是公开了 SQLAlchemy 能够在第一时间提供 asyncio 接口的机制。虽然这样做没有技术问题,但总的来说,这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些核心理念,即任何可能导致 IO 调用的编程语句必须具有一个 await 调用,否则程序在 IO 可能发生的每一行都不会明确地表明。这种方法并没有改变这个一般性的想法,除了它允许一系列同步 IO 指令在函数调用的范围内被豁免这个规则,本质上被捆绑成一个可等待的。

作为在 asyncio 事件循环中集成传统 SQLAlchemy “懒加载”的另一种替代方法,提供了一种称为AsyncSession.run_sync()可选方法,该方法将在一个 greenlet 内运行任何 Python 函数,其中当它们到达数据库驱动程序时,传统的同步编程概念将被转换为使用 await。这里的一个假设性方法是,一个以 asyncio 为导向的应用程序可以将数据库相关方法打包成函数,这些函数将使用AsyncSession.run_sync() 被调用。

修改上面的示例,如果我们不使用selectinload() 来处理 A.bs 集合,我们可以在一个单独的函数内完成对这些属性访问的处理:

import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
def fetch_and_update_objects(session):
  """run traditional sync-style ORM code in a function that will be
 invoked within an awaitable.
 """
    # the session object here is a traditional ORM Session.
    # all features are available here including legacy Query use.
    stmt = select(A)
    result = session.execute(stmt)
    for a1 in result.scalars():
        print(a1)
        # lazy loads
        for b1 in a1.bs:
            print(b1)
    # legacy Query use
    a1 = session.query(A).order_by(A.id).first()
    a1.data = "new data"
async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)
    async with AsyncSession(engine) as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )
        await session.run_sync(fetch_and_update_objects)
        await session.commit()
    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()
asyncio.run(async_main())

在“同步”运行器中运行某些函数的上述方法与在基于事件的编程库(例如 gevent)上运行 SQLAlchemy 应用程序有一些相似之处。区别如下:

  1. 与使用 gevent 不同,我们可以继续使用标准的 Python asyncio 事件循环,或者任何自定义的事件循环,而无需将其集成到 gevent 事件循环中。
  2. 完全没有“猴子补丁”。上面的示例利用了一个真正的 asyncio 驱动程序,底层的 SQLAlchemy 连接池也使用了 Python 内置的 asyncio.Queue 来池化连接。
  3. 程序可以自由在 async/await 代码和使用同步代码的包含函数之间切换,几乎没有性能损失。不使用“线程执行器”或任何额外的等待器或同步。
  4. 底层网络驱动程序也使用纯 Python asyncio 概念,不使用geventeventlet等第三方网络库。

使用 asyncio 扩展与事件

SQLAlchemy 事件系统不会直接暴露给 asyncio 扩展,这意味着尚未有 SQLAlchemy 事件处理程序的“异步”版本。

然而,由于 asyncio 扩展包围了通常的同步 SQLAlchemy API,因此常规的“同步”风格事件处理程序可以自由使用,就像没有使用 asyncio 一样。

如下所述,目前有两种注册事件的策略,针对面向 asyncio 的 API:

  • 事件可以在实例级别注册(例如特定的AsyncEngine实例),通过将事件与引用代理对象的sync属性关联。例如,要针对AsyncEngine实例注册PoolEvents.connect()事件,请使用其AsyncEngine.sync_engine属性作为目标。目标包括:

AsyncEngine.sync_engine

AsyncConnection.sync_connection

AsyncConnection.sync_engine

AsyncSession.sync_session

  • 要在类级别注册事件,针对同一类型的所有实例(例如所有AsyncSession实例),请使用相应的同步风格类。例如,要针对AsyncSession类注册SessionEvents.before_commit()事件,请将Session类作为目标。
  • 要在sessionmaker级别注册,结合一个明确的sessionmaker和一个async_sessionmaker,使用async_sessionmaker.sync_session_class,并将事件与sessionmaker关联。

在异步上下文中工作的事件处理程序中,像Connection这样的对象继续以通常的“同步”方式工作,而不需要awaitasync的使用;当消息最终被异步数据库适配器接收时,调用风格会透明地转换回异步调用风格。对于传递了 DBAPI 级别连接的事件,例如PoolEvents.connect(),该对象是一个符合 pep-249 的“连接”对象,它将同步样式调用转换为异步驱动程序。

具有异步引擎/会话/会话制造器的事件监听器示例

一些与面向异步 API 构造相关的同步样式事件处理程序示例如下:

  • 在 AsyncEngine 上的核心事件
    在这个例子中,我们访问AsyncEngine.sync_engine属性,作为ConnectionEventsPoolEvents的目标:
import asyncio
from sqlalchemy import event
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
# connect event on instance of Engine
@event.listens_for(engine.sync_engine, "connect")
def my_on_connect(dbapi_con, connection_record):
    print("New DBAPI connection:", dbapi_con)
    cursor = dbapi_con.cursor()
    # sync style API use for adapted DBAPI connection / cursor
    cursor.execute("select 'execute from event'")
    print(cursor.fetchone()[0])
# before_execute event on all Engine instances
@event.listens_for(Engine, "before_execute")
def my_before_execute(
    conn,
    clauseelement,
    multiparams,
    params,
    execution_options,
):
    print("before execute!")
async def go():
    async with engine.connect() as conn:
        await conn.execute(text("select 1"))
    await engine.dispose()
asyncio.run(go())
  • 输出:
New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>>
execute from event
before execute!
  • 在 AsyncSession 上的 ORM 事件
    在这个例子中,我们访问AsyncSession.sync_session作为SessionEvents的目标:
import asyncio
from sqlalchemy import event
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import Session
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
session = AsyncSession(engine)
# before_commit event on instance of Session
@event.listens_for(session.sync_session, "before_commit")
def my_before_commit(session):
    print("before commit!")
    # sync style API use on Session
    connection = session.connection()
    # sync style API use on Connection
    result = connection.execute(text("select 'execute from event'"))
    print(result.first())
# after_commit event on all Session instances
@event.listens_for(Session, "after_commit")
def my_after_commit(session):
    print("after commit!")
async def go():
    await session.execute(text("select 1"))
    await session.commit()
    await session.close()
    await engine.dispose()
asyncio.run(go())
  • 输出:
before commit!
execute from event
after commit!
  • 在 async_sessionmaker 上的 ORM 事件
    对于这种用例,我们将sessionmaker作为事件目标,然后将其分配给async_sessionmaker,使用async_sessionmaker.sync_session_class参数:
import asyncio
from sqlalchemy import event
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import sessionmaker
sync_maker = sessionmaker()
maker = async_sessionmaker(sync_session_class=sync_maker)
@event.listens_for(sync_maker, "before_commit")
def before_commit(session):
    print("before commit")
async def main():
    async_session = maker()
    await async_session.commit()
asyncio.run(main())
  • 输出:
before commit

在连接池和其他事件中使用仅可等待的驱动程序方法

如上节所述,事件处理程序(例如那些围绕 PoolEvents  的事件处理程序)接收到一个同步风格的“DBAPI”连接,这是由 SQLAlchemy asyncio 方言提供的包装对象,用于将底层  asyncio “驱动程序”连接适配成可以被 SQLAlchemy  内部使用的对象。当用户定义的实现需要直接使用最终的“驱动程序”连接,并在该驱动程序连接上使用仅可等待方法时,就会出现特殊的用例。其中一个例子是  asyncpg 驱动程序提供的 .set_type_codec() 方法。

为了适应这种用例,SQLAlchemy 的 AdaptedConnection 类提供了一个方法 AdaptedConnection.run_async(),允许在事件处理程序或其他 SQLAlchemy 内部的“同步”上下文中调用可等待函数。这个方法直接对应于 AsyncConnection.run_sync() 方法,后者允许在异步环境中运行同步风格的方法。

AdaptedConnection.run_async() 应该传递一个函数,该函数将接受内部的“驱动程序”连接作为单个参数,并返回一个可等待对象,该对象将由 AdaptedConnection.run_async() 方法调用。给定的函数本身不需要声明为 async;它可以是一个 Python 的 lambda:,因为返回的可等待值将在返回后被调用:

from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(...)
@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, *args):
    dbapi_connection.run_async(
        lambda connection: connection.set_type_codec(
            "MyCustomType",
            encoder,
            decoder,  # ...
        )
    )

在上面的例子中,传递给 register_custom_types 事件处理程序的对象是 AdaptedConnection 的一个实例,它提供了对底层仅支持异步驱动程序级连接对象的类似 DBAPI 的接口。然后,AdaptedConnection.run_async() 方法提供了对一个可等待环境的访问,在该环境中可以对底层驱动程序级连接进行操作。

版本 1.4.30 中的新功能。

异步引擎 / 会话 / 会话工厂的事件监听器示例

下面给出了一些与异步 API 构造相关的同步风格事件处理程序的示例:

  • 异步引擎的核心事件
    在这个例子中,我们将AsyncEngine.sync_engine属性作为ConnectionEventsPoolEvents的目标:
import asyncio
from sqlalchemy import event
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
# connect event on instance of Engine
@event.listens_for(engine.sync_engine, "connect")
def my_on_connect(dbapi_con, connection_record):
    print("New DBAPI connection:", dbapi_con)
    cursor = dbapi_con.cursor()
    # sync style API use for adapted DBAPI connection / cursor
    cursor.execute("select 'execute from event'")
    print(cursor.fetchone()[0])
# before_execute event on all Engine instances
@event.listens_for(Engine, "before_execute")
def my_before_execute(
    conn,
    clauseelement,
    multiparams,
    params,
    execution_options,
):
    print("before execute!")
async def go():
    async with engine.connect() as conn:
        await conn.execute(text("select 1"))
    await engine.dispose()
asyncio.run(go())
  • 输出:
New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>>
execute from event
before execute!
  • AsyncSession 上的 ORM 事件
    在这个例子中,我们将AsyncSession.sync_session作为SessionEvents的目标:
import asyncio
from sqlalchemy import event
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import Session
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
session = AsyncSession(engine)
# before_commit event on instance of Session
@event.listens_for(session.sync_session, "before_commit")
def my_before_commit(session):
    print("before commit!")
    # sync style API use on Session
    connection = session.connection()
    # sync style API use on Connection
    result = connection.execute(text("select 'execute from event'"))
    print(result.first())
# after_commit event on all Session instances
@event.listens_for(Session, "after_commit")
def my_after_commit(session):
    print("after commit!")
async def go():
    await session.execute(text("select 1"))
    await session.commit()
    await session.close()
    await engine.dispose()
asyncio.run(go())
  • 输出:
before commit!
execute from event
after commit!
  • 异步会话工厂上的 ORM 事件
    对于这种用例,我们将sessionmaker作为事件目标,然后使用async_sessionmaker并使用async_sessionmaker.sync_session_class参数进行赋值:
import asyncio
from sqlalchemy import event
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import sessionmaker
sync_maker = sessionmaker()
maker = async_sessionmaker(sync_session_class=sync_maker)
@event.listens_for(sync_maker, "before_commit")
def before_commit(session):
    print("before commit")
async def main():
    async_session = maker()
    await async_session.commit()
asyncio.run(main())
  • 输出:
before commit

在连接池和其他事件中使用仅可等待的驱动程序方法

如上节所述,事件处理程序(例如围绕PoolEvents事件处理程序定位的事件处理程序)接收到一个同步风格的“DBAPI”连接,这是  SQLAlchemy asyncio 方言提供的包装对象,用于将底层的 asyncio“driver”连接适配为 SQLAlchemy  内部可以使用的连接。当用户定义的实现需要直接使用最终的“driver”连接时,使用该驱动连接上的仅可等待方法时会出现特殊的用例。一个这样的例子是  asyncpg 驱动程序提供的.set_type_codec()方法。

为了适应这种用例,SQLAlchemy 的AdaptedConnection类提供了一个方法AdaptedConnection.run_async(),允许在事件处理程序或其他 SQLAlchemy 内部的“同步”上下文中调用可等待函数。这个方法直接类似于AsyncConnection.run_sync()方法,允许同步风格的方法在异步下运行。

AdaptedConnection.run_async()应该传递一个接受最内层的“driver”连接作为单个参数的函数,并返回一个由AdaptedConnection.run_async()方法调用的可等待对象。给定函数本身不需要声明为async;它完全可以是一个 Python 的lambda:,因为返回的可等待值将在返回后被调用:

from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(...)
@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, *args):
    dbapi_connection.run_async(
        lambda connection: connection.set_type_codec(
            "MyCustomType",
            encoder,
            decoder,  # ...
        )
    )

上面,传递给register_custom_types事件处理程序的对象是AdaptedConnection的一个实例,它提供了对底层仅异步驱动程序级连接对象的类似 DBAPI 的接口。然后,AdaptedConnection.run_async()方法提供了对可等待环境的访问,其中底层驱动程序级连接可以被操作。

1.4.30 版本中新增。

使用多个 asyncio 事件循环

使用多个事件循环的应用程序,例如在将 asyncio 与多线程结合的不常见情况下,在使用默认池实现时不应该将同一个AsyncEngine与不同的事件循环共享。

如果从一个事件循环传递AsyncEngine到另一个事件循环,应该在它被重新使用于新的事件循环之前调用AsyncEngine.dispose()方法。未这样做可能会导致类似于Task  got Future attached to a different loopRuntimeError

如果同一个引擎必须在不同的循环之间共享,应该配置为禁用连接池,使用NullPool,防止引擎使用任何连接超过一次:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@host/dbname",
    poolclass=NullPool,
)


SqlAlchemy 2.0 中文文档(二十八)(4)https://developer.aliyun.com/article/1560393

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
2天前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(二十九)(2)
SqlAlchemy 2.0 中文文档(二十九)
20 7
|
2天前
|
SQL 存储 关系型数据库
SqlAlchemy 2.0 中文文档(二十九)(1)
SqlAlchemy 2.0 中文文档(二十九)
16 4
|
2天前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(二十九)(3)
SqlAlchemy 2.0 中文文档(二十九)
16 4
|
2天前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(二十九)(4)
SqlAlchemy 2.0 中文文档(二十九)
15 4
|
2天前
|
SQL 缓存 前端开发
SqlAlchemy 2.0 中文文档(二十七)(5)
SqlAlchemy 2.0 中文文档(二十七)
10 2
|
2天前
|
SQL 前端开发 关系型数据库
SqlAlchemy 2.0 中文文档(二十七)(2)
SqlAlchemy 2.0 中文文档(二十七)
14 2
|
2天前
|
SQL 缓存 API
SqlAlchemy 2.0 中文文档(二十八)(4)
SqlAlchemy 2.0 中文文档(二十八)
11 1
|
2天前
|
自然语言处理 数据库 Python
SqlAlchemy 2.0 中文文档(二十六)(2)
SqlAlchemy 2.0 中文文档(二十六)
12 2
|
2天前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(二十六)(1)
SqlAlchemy 2.0 中文文档(二十六)
12 2
|
2天前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(二十六)(3)
SqlAlchemy 2.0 中文文档(二十六)
11 2