SqlAlchemy 2.0 中文文档(二十八)(2)https://developer.aliyun.com/article/1560384
Asyncio 平台安装说明(包括 Apple M1)
asyncio 扩展仅支持 Python 3。它还依赖于greenlet库。这个依赖默认安装在常见的机器平���上,包括:
x86_64 aarch64 ppc64le amd64 win32
对于上述平台,已知greenlet
提供预构建的 wheel 文件。对于其他平台,默认情况下不安装 greenlet;可以在Greenlet - Download Files查看当前的 greenlet 文件列表。请注意有许多架构被省略,包括 Apple M1。
要安装 SQLAlchemy 并确保 greenlet
依赖存在,无论使用何种平台,可以按照以下方式安装 [asyncio]
setuptools extra,这也会指示 pip
安装 greenlet
:
pip install sqlalchemy[asyncio]
请注意,在没有预构建 wheel 文件的平台上安装 greenlet
意味着 greenlet
将从源代码构建,这要求 Python 的开发库也存在。
概要 - 核心
对于核心用途,create_async_engine()
函数创建一个AsyncEngine
实例,然后提供传统Engine
API 的异步版本。AsyncEngine
通过其AsyncEngine.connect()
和AsyncEngine.begin()
方法提供一个AsyncConnection
,两者都提供异步上下文管理器。AsyncConnection
然后可以使用AsyncConnection.execute()
方法来执行语句,以提供一个缓冲的Result
,或者使用AsyncConnection.stream()
方法来提供一个流式的服务器端AsyncResult
:
>>> import asyncio >>> from sqlalchemy import Column >>> from sqlalchemy import MetaData >>> from sqlalchemy import select >>> from sqlalchemy import String >>> from sqlalchemy import Table >>> from sqlalchemy.ext.asyncio import create_async_engine >>> meta = MetaData() >>> t1 = Table("t1", meta, Column("name", String(50), primary_key=True)) >>> async def async_main() -> None: ... engine = create_async_engine("sqlite+aiosqlite://", echo=True) ... ... async with engine.begin() as conn: ... await conn.run_sync(meta.drop_all) ... await conn.run_sync(meta.create_all) ... ... await conn.execute( ... t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}] ... ) ... ... async with engine.connect() as conn: ... # select a Result, which will be delivered with buffered ... # results ... result = await conn.execute(select(t1).where(t1.c.name == "some name 1")) ... ... print(result.fetchall()) ... ... # for AsyncEngine created in function scope, close and ... # clean-up pooled connections ... await engine.dispose() >>> asyncio.run(async_main()) BEGIN (implicit) ... CREATE TABLE t1 ( name VARCHAR(50) NOT NULL, PRIMARY KEY (name) ) ... INSERT INTO t1 (name) VALUES (?) [...] [('some name 1',), ('some name 2',)] COMMIT BEGIN (implicit) SELECT t1.name FROM t1 WHERE t1.name = ? [...] ('some name 1',) [('some name 1',)] ROLLBACK
上文提到的AsyncConnection.run_sync()
方法可用于调用特殊的 DDL 函数,例如MetaData.create_all()
,这些函数不包括可等待的挂钩。
提示
在使用AsyncEngine
对象的范围超出上下文并被垃圾收集时,建议使用await
调用AsyncEngine.dispose()
方法,如上例中的async_main
函数所示。这确保了连接池保持的任何连接都将在可等待的上下文中正确处理。与使用阻塞 IO 不同,SQLAlchemy 无法在像__del__
或 weakref finalizer 之类的方法中正确处理这些连接,因为没有机会调用await
。当引擎超出范围时未显式处理时,可能会导致发出到标准输出的警告,形式类似于垃圾收集中的RuntimeError: Event loop is closed
。
AsyncConnection
还通过AsyncConnection.stream()
方法提供了一个“流式”API,返回一个AsyncResult
对象。此结果对象使用服务器端游标并提供异步/等待 API,例如异步迭代器:
async with engine.connect() as conn: async_result = await conn.stream(select(t1)) async for row in async_result: print("row: %s" % (row,))
概述 - ORM
使用 2.0 风格查询,AsyncSession
类提供完整的 ORM 功能。
在默认使用模式下,必须特别小心避免涉及 ORM 关系和列属性的延迟加载或其他已过期属性访问;下一节在使用 AsyncSession 时防止隐式 IO 详细说明了这一点。
警告
单个AsyncSession
实例不适合在多个并发任务中使用。有关背景信息,请参阅使用 AsyncSession 处理并发任务和会话线程安全吗?AsyncSession 在并发任务中是否安全共享?部分。
下面的示例说明了包括映射器和会话配置在内的完整示例:
>>> from __future__ import annotations >>> import asyncio >>> import datetime >>> from typing import List >>> from sqlalchemy import ForeignKey >>> from sqlalchemy import func >>> from sqlalchemy import select >>> from sqlalchemy.ext.asyncio import AsyncAttrs >>> from sqlalchemy.ext.asyncio import async_sessionmaker >>> from sqlalchemy.ext.asyncio import AsyncSession >>> from sqlalchemy.ext.asyncio import create_async_engine >>> from sqlalchemy.orm import DeclarativeBase >>> from sqlalchemy.orm import Mapped >>> from sqlalchemy.orm import mapped_column >>> from sqlalchemy.orm import relationship >>> from sqlalchemy.orm import selectinload >>> class Base(AsyncAttrs, DeclarativeBase): ... pass >>> class B(Base): ... __tablename__ = "b" ... ... id: Mapped[int] = mapped_column(primary_key=True) ... a_id: Mapped[int] = mapped_column(ForeignKey("a.id")) ... data: Mapped[str] >>> class A(Base): ... __tablename__ = "a" ... ... id: Mapped[int] = mapped_column(primary_key=True) ... data: Mapped[str] ... create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now()) ... bs: Mapped[List[B]] = relationship() >>> async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None: ... async with async_session() as session: ... async with session.begin(): ... session.add_all( ... [ ... A(bs=[B(data="b1"), B(data="b2")], data="a1"), ... A(bs=[], data="a2"), ... A(bs=[B(data="b3"), B(data="b4")], data="a3"), ... ] ... ) >>> async def select_and_update_objects( ... async_session: async_sessionmaker[AsyncSession], ... ) -> None: ... async with async_session() as session: ... stmt = select(A).order_by(A.id).options(selectinload(A.bs)) ... ... result = await session.execute(stmt) ... ... for a in result.scalars(): ... print(a, a.data) ... print(f"created at: {a.create_date}") ... for b in a.bs: ... print(b, b.data) ... ... result = await session.execute(select(A).order_by(A.id).limit(1)) ... ... a1 = result.scalars().one() ... ... a1.data = "new data" ... ... await session.commit() ... ... # access attribute subsequent to commit; this is what ... # expire_on_commit=False allows ... print(a1.data) ... ... # alternatively, AsyncAttrs may be used to access any attribute ... # as an awaitable (new in 2.0.13) ... for b1 in await a1.awaitable_attrs.bs: ... print(b1, b1.data) >>> async def async_main() -> None: ... engine = create_async_engine("sqlite+aiosqlite://", echo=True) ... ... # async_sessionmaker: a factory for new AsyncSession objects. ... # expire_on_commit - don't expire objects after transaction commit ... async_session = async_sessionmaker(engine, expire_on_commit=False) ... ... async with engine.begin() as conn: ... await conn.run_sync(Base.metadata.create_all) ... ... await insert_objects(async_session) ... await select_and_update_objects(async_session) ... ... # for AsyncEngine created in function scope, close and ... # clean-up pooled connections ... await engine.dispose() >>> asyncio.run(async_main()) BEGIN (implicit) ... CREATE TABLE a ( id INTEGER NOT NULL, data VARCHAR NOT NULL, create_date DATETIME DEFAULT (CURRENT_TIMESTAMP) NOT NULL, PRIMARY KEY (id) ) ... CREATE TABLE b ( id INTEGER NOT NULL, a_id INTEGER NOT NULL, data VARCHAR NOT NULL, PRIMARY KEY (id), FOREIGN KEY(a_id) REFERENCES a (id) ) ... COMMIT BEGIN (implicit) INSERT INTO a (data) VALUES (?) RETURNING id, create_date [...] ('a1',) ... INSERT INTO b (a_id, data) VALUES (?, ?) RETURNING id [...] (1, 'b2') ... COMMIT BEGIN (implicit) SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id [...] () SELECT b.a_id AS b_a_id, b.id AS b_id, b.data AS b_data FROM b WHERE b.a_id IN (?, ?, ?) [...] (1, 2, 3) <A object at ...> a1 created at: ... <B object at ...> b1 <B object at ...> b2 <A object at ...> a2 created at: ... <A object at ...> a3 created at: ... <B object at ...> b3 <B object at ...> b4 SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id LIMIT ? OFFSET ? [...] (1, 0) UPDATE a SET data=? WHERE a.id = ? [...] ('new data', 1) COMMIT new data <B object at ...> b1 <B object at ...> b2
在上面的示例中,使用可选的 async_sessionmaker
助手来实例化 AsyncSession
,该助手提供了一个新 AsyncSession
对象的工厂,并带有一组固定的参数,其中包括将其与特定数据库 URL 关联起来的 AsyncEngine
。然后将其传递给其他方法,在这些方法中,它可能会在 Python 异步上下文管理器中使用(即 async with:
语句),以便在块结束时自动关闭;这相当于调用 AsyncSession.close()
方法。
使用 AsyncSession 处理并发任务
AsyncSession
对象是一个可变的、有状态的对象,表示正在进行的单个、有状态的数据库事务。使用 asyncio 的并发任务,例如使用 asyncio.gather()
等 API,应该为每个单独的任务使用单独的 AsyncSession
。
有关在并发工作负载中如何使用 Session
和 AsyncSession
的一般描述,请参阅 Session 线程安全吗?AsyncSession 在并发任务中共享是否安全? 部分。### 使用 AsyncSession 时防止隐式 IO
使用传统的 asyncio,应用程序需要避免出现任何可能发生 IO-on-attribute 访问的点。可以用以下技术来帮助解决这个问题,其中许多技术在前面的示例中有所体现。
- 惰性加载关系、延迟列或表达式的属性,或者在过期情况下访问的属性,可以利用
AsyncAttrs
混合类。当将此混合类添加到特定类或更一般地添加到声明性的Base
超类时,它提供了一个访问器AsyncAttrs.awaitable_attrs
,它将任何属性作为可等待对象传递:
from __future__ import annotations from typing import List from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import relationship class Base(AsyncAttrs, DeclarativeBase): pass class A(Base): __tablename__ = "a" # ... rest of mapping ... bs: Mapped[List[B]] = relationship() class B(Base): __tablename__ = "b" # ... rest of mapping ...
- 在不使用急切加载时,访问新加载的
A
实例上的A.bs
集合通常会使用延迟加载,为了成功,通常会向数据库发出 IO,这在 asyncio 下会失败,因为不允许隐式 IO。在没有任何先前加载操作的情况下,在 asyncio 下直接访问此属性,该属性可以作为可等待对象访问,通过指定AsyncAttrs.awaitable_attrs
前缀:
a1 = (await session.scalars(select(A))).one() for b1 in await a1.awaitable_attrs.bs: print(b1)
AsyncAttrs
mixin 提供了一个简洁的外观,覆盖了内部方法,该方法也被AsyncSession.run_sync()
方法使用。
版本 2.0.13 中的新功能。
另请参见AsyncAttrs
- 集合可以被替换为只写集合,这些集合永远不会隐式发出 IO,通过在 SQLAlchemy 2.0 中使用 Write Only Relationships 功能。使用此功能,集合永远不会被读取,只能通过显式 SQL 调用进行查询。查看 Asyncio Integration 部分中的示例
async_orm_writeonly.py
,展示了如何在 asyncio 中使用只写集合。
当使用只写集合时,程序的行为在处理集合方面是简单且易于预测的。然而,缺点是没有任何内置系统可以一次性加载这些集合中的许多,而需要手动执行。因此,下面的许多要点涉及在使用传统的延迟加载关系与 asyncio 时需要更加小心的具体技术。 - 如果不使用
AsyncAttrs
,关系可以声明为lazy="raise"
,这样默认情况下它们不会尝试发出 SQL。为了加载集合,将使用 eager loading。 - 最有用的急切加载策略是
selectinload()
急切加载器,在前面的示例中被用来在await session.execute()
调用的范围内急切加载A.bs
集合:
stmt = select(A).options(selectinload(A.bs))
- 当构建新对象时,集合总是被分配一个默认的空集合,比如上面的示例中的列表:
A(bs=[], data="a2")
- 这使得在刷新
A
对象时,上述A
对象上的.bs
集合可以存在且可读;否则,当刷新A
时,.bs
将会被卸载,并在访问时引发错误。 AsyncSession
配置为使用Session.expire_on_commit
设置为 False,以便我们可以在调用AsyncSession.commit()
后访问对象上的属性,就像在最后一行访问属性的地方一样:
# create AsyncSession with expire_on_commit=False async_session = AsyncSession(engine, expire_on_commit=False) # sessionmaker version async_session = async_sessionmaker(engine, expire_on_commit=False) async with async_session() as session: result = await session.execute(select(A).order_by(A.id)) a1 = result.scalars().first() # commit would normally expire all attributes await session.commit() # access attribute subsequent to commit; this is what # expire_on_commit=False allows print(a1.data)
其他指南包括:
- 应该避免使用类似
AsyncSession.expire()
的方法,而应该使用AsyncSession.refresh()
;如果绝对需要过期。当使用 asyncio 时,通常不需要过期,因为Session.expire_on_commit
应该通常设置为False
。 - 可以在 asyncio 下显式加载延迟加载的关系,使用
AsyncSession.refresh()
,如果所需的属性名称明确传递给Session.refresh.attribute_names
,例如:
# assume a_obj is an A that has lazy loaded A.bs collection a_obj = await async_session.get(A, [1]) # force the collection to load by naming it in attribute_names await async_session.refresh(a_obj, ["bs"]) # collection is present print(f"bs collection: {a_obj.bs}")
- 当然,最好在一开始就使用急加载,以便在不需要延迟加载的情况下已经设置好集合。
版本 2.0.4 中的新功能:增加了对AsyncSession.refresh()
和底层Session.refresh()
方法的支持,以强制加载延迟加载的关系,如果它们在Session.refresh.attribute_names
参数中明确命名。在先前的版本中,即使在参数中命名,关系也会被静默跳过。 - 避免使用 Cascades 中记录的
all
级联选项,而是明确列出所需的级联特性。all
级联选项暗示了 refresh-expire 设置,这意味着AsyncSession.refresh()
方法将使相关对象的属性过期,但不一定刷新这些相关对象,假设未在relationship()
中配置急加载,将它们保持在过期状态。 - 如果在
deferred()
列中使用了适当的加载选项,应该使用适当的加载选项,此外还应该注意relationship()
结构。请参见限制使用延迟列加载的列以获取关于延迟列加载的背景信息。 - 在动态关系加载器一节描述的“动态”关系加载器策略在默认情况下与 asyncio 方法不兼容。它只能在
AsyncSession.run_sync()
方法中直接调用,或者通过使用其.statement
属性获取普通 select:
user = await session.get(User, 42) addresses = (await session.scalars(user.addresses.statement)).all() stmt = user.addresses.statement.where(Address.email_address.startswith("patrick")) addresses_filter = (await session.scalars(stmt)).all()
- 只写技术,在 SQLAlchemy 的 2.0 版本中引入,完全兼容 asyncio,并应该优先使用。
另请参阅
“动态”关系加载器被“只写”取代 - 迁移到 2.0 风格的注意事项 - 如果在与不支持 RETURNING 的数据库(例如 MySQL 8)一起使用 asyncio,那么在刷新的新对象上将不会有服务器默认值,例如生成的时间戳,除非使用了
Mapper.eager_defaults
选项。在 SQLAlchemy 2.0 中,这种行为自动应用于像 PostgreSQL、SQLite 和 MariaDB 这样使用 RETURNING 来在插入行时获取新值的后端。### 在 asyncio 下运行同步方法和函数
深度炼金术
这种方法本质上是公开了 SQLAlchemy 能够首先提供 asyncio 接口的机制。虽然在技术上没有任何问题,但总的来说,这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些核心理念,即任何可能导致 IO 被调用的编程语句必须有一个await
调用,以防程序不明确地指明每一行可能发生 IO 的位置。这种方法并没有改变这个一般的想法,只是允许一系列同步 IO 指令在函数调用的范围内豁免这个规则,基本上被捆绑成一个可等待的。
作为在 asyncio 事件循环中集成传统 SQLAlchemy “延迟加载” 的替代方法,提供了一个名为AsyncSession.run_sync()
的可选方法,它将运行任何 Python 函数在一个 greenlet 中,传统的同步编程概念将被转换为在到达数据库驱动程序时使用await
。这里的一个假设方法是一个面向 asyncio 的应用程序可以将与数据库相关的方法打包成函数,这些函数使用AsyncSession.run_sync()
调用。
修改上面的示例,如果我们不使用selectinload()
来加载A.bs
集合,我们可以在一个单独的函数中完成对这些属性访问的处理:
import asyncio from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine def fetch_and_update_objects(session): """run traditional sync-style ORM code in a function that will be invoked within an awaitable. """ # the session object here is a traditional ORM Session. # all features are available here including legacy Query use. stmt = select(A) result = session.execute(stmt) for a1 in result.scalars(): print(a1) # lazy loads for b1 in a1.bs: print(b1) # legacy Query use a1 = session.query(A).order_by(A.id).first() a1.data = "new data" async def async_main(): engine = create_async_engine( "postgresql+asyncpg://scott:tiger@localhost/test", echo=True, ) async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.create_all) async with AsyncSession(engine) as session: async with session.begin(): session.add_all( [ A(bs=[B(), B()], data="a1"), A(bs=[B()], data="a2"), A(bs=[B(), B()], data="a3"), ] ) await session.run_sync(fetch_and_update_objects) await session.commit() # for AsyncEngine created in function scope, close and # clean-up pooled connections await engine.dispose() asyncio.run(async_main())
在一个应用程序中运行某些函数在“同步”运行器中的上述方法与在一个基于事件的编程库(如gevent
)上运行 SQLAlchemy 应用程序有一些相似之处。区别如下:
- 与使用
gevent
不同,我们可以继续使用标准的 Python asyncio 事件循环,或任何自定义事件循环,而无需集成到gevent
事件循环中。 - 完全没有“monkeypatching”。上面的示例使用了一个真正的 asyncio 驱动程序,底层的 SQLAlchemy 连接池也使用 Python 内置的
asyncio.Queue
来池化连接。 - 程序可以自由地在 async/await 代码和使用同步代码的包含函数之间切换,几乎没有性能损失。没有“线程执行器”或任何额外的等待器或同步在使用。
- 底层网络驱动程序也使用纯 Python asyncio 概念,不使用
gevent
和eventlet
提供的第三方网络库。### 与并发任务一起使用 AsyncSession
AsyncSession
对象是一个可变的、有状态的对象,代表着正在进行的单个、有状态的数据库事务。使用 asyncio 进行并发任务,例如使用asyncio.gather()
等 API,应该为每个单独的任务使用一个独立的AsyncSession
。
请参阅会话是否线程安全?AsyncSession 是否可以在并发任务中共享?部分,了解关于Session
和AsyncSession
在处理并发工作负载时应如何使用的一般描述。
使用 AsyncSession 时防止隐式 IO
使用传统的 asyncio,应用程序需要避免发生可能导致 IO-on-attribute 访问的任何点。下面列出的技术可以帮助实现这一点,其中许多在前面的示例中有所说明。
- 延迟加载关系、延迟列或表达式,或在过期情况下访问的属性可以利用
AsyncAttrs
mixin。当将此 mixin 添加到特定类或更一般地添加到 DeclarativeBase
超类时,会提供一个访问器AsyncAttrs.awaitable_attrs
,将任何属性作为可等待对象传递:
from __future__ import annotations from typing import List from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import relationship class Base(AsyncAttrs, DeclarativeBase): pass class A(Base): __tablename__ = "a" # ... rest of mapping ... bs: Mapped[List[B]] = relationship() class B(Base): __tablename__ = "b" # ... rest of mapping ...
- 在不使用急加载时,访问新加载实例
A
上的A.bs
集合通常会使用延迟加载,为了成功,通常会向数据库发出 IO 请求,而在 asyncio 下会失败,因为不允许隐式 IO。要在 asyncio 下直接访问此属性而不需要任何先前的加载操作,可以通过指定AsyncAttrs.awaitable_attrs
前缀将属性访问为可等待对象:
a1 = (await session.scalars(select(A))).one() for b1 in await a1.awaitable_attrs.bs: print(b1)
AsyncAttrs
mixin 提供了一个简洁的外观,也是AsyncSession.run_sync()
方法使用的内部方法。
版本 2.0.13 中的新功能。
另请参见AsyncAttrs
- 集合可以被只写集合替换,永远不会隐式发出 IO,通过在 SQLAlchemy 2.0 中使用只写关系功能。使用此功能,集合永远不会被读取,只能使用显式 SQL 调用进行查询。请参见 Asyncio Integration 部分中的示例
async_orm_writeonly.py
,演示了在 asyncio 中使用只写集合的示例。
当使用只写集合时,程序在处理集合方面的行为简单且易于预测。然而,缺点是没有任何内置系统可以一次性加载许多这些集合,而是需要手动执行。因此,下面的许多要点涉及在使用传统的延迟加载关系与 asyncio 时需要更加小心的具体技术。 - 如果不使用
AsyncAttrs
,可以使用lazy="raise"
声明关系,这样默认情况下它们不会尝试发出 SQL。为了加载集合,将使用急加载。 - 最有用的急加载策略是
selectinload()
急加载器,在前面的示例中用于在await session.execute()
调用范围内急加载A.bs
集合:
stmt = select(A).options(selectinload(A.bs))
- 在构建新对象时,集合总是分配一个默认的空集合,例如上面示例中的列表:
A(bs=[], data="a2")
- 当
A
对象被刷新时,允许上述A
对象上的.bs
集合存在并可读;否则,当A
被刷新时,.bs
将被卸载并在访问时引发错误。 AsyncSession
配置为使用Session.expire_on_commit
设置为 False,这样我们可以在调用AsyncSession.commit()
后访问对象的属性,就像在最后一行访问属性时一样:
# create AsyncSession with expire_on_commit=False async_session = AsyncSession(engine, expire_on_commit=False) # sessionmaker version async_session = async_sessionmaker(engine, expire_on_commit=False) async with async_session() as session: result = await session.execute(select(A).order_by(A.id)) a1 = result.scalars().first() # commit would normally expire all attributes await session.commit() # access attribute subsequent to commit; this is what # expire_on_commit=False allows print(a1.data)
其他指南包括:
- 像
AsyncSession.expire()
这样的方法应该避免使用,而应该使用AsyncSession.refresh()
;如果绝对需要过期。通常情况下不应该需要过期,因为在使用 asyncio 时通常应将Session.expire_on_commit
设置为False
。 - 使用
AsyncSession.refresh()
可以显式加载懒加载关系在 asyncio 下,如果需要显式传递所需的属性名称给Session.refresh.attribute_names
,例如:
# assume a_obj is an A that has lazy loaded A.bs collection a_obj = await async_session.get(A, [1]) # force the collection to load by naming it in attribute_names await async_session.refresh(a_obj, ["bs"]) # collection is present print(f"bs collection: {a_obj.bs}")
- 当然最好一开始就使用急加载,以便在不需要懒加载的情况下已经设置好集合。
2.0.4 版本中新增:增加了对AsyncSession.refresh()
和底层的Session.refresh()
方法的支持,以强制延迟加载的关系加载,如果它们在Session.refresh.attribute_names
参数中明确命名。在先前的版本中,即使在参数中命名,关系也会被静默跳过。 - 避免使用 Cascades 文档中记录的
all
级联选项,而是明确列出所需的级联特性。all
级联选项暗示了 refresh-expire 设置,这意味着AsyncSession.refresh()
方法将使相关对象的属性过期,但不一定会刷新那些相关对象,假设未在relationship()
中配置急加载,它们将保持在过期状态。 - 如果使用
deferred()
列,应该使用适当的加载器选项,除了如上所述的relationship()
构造。有关延迟列加载的背景,请参阅限制使用列延迟加载。 - 在默认情况下,“动态”关系加载策略在动态关系加载器中描述,与 asyncio 方法不兼容。只有在
AsyncSession.run_sync()
方法中调用,或者通过使用其.statement
属性获取正常的 select 语句,才能直接使用它:
user = await session.get(User, 42) addresses = (await session.scalars(user.addresses.statement)).all() stmt = user.addresses.statement.where(Address.email_address.startswith("patrick")) addresses_filter = (await session.scalars(stmt)).all()
- 仅写入技术,在 SQLAlchemy 的 2.0 版本中引入,与 asyncio 完全兼容,应该优先考虑使用。
另请参见
“动态”关系加载器被“仅写入”取代 - 迁移到 2.0 风格的注意事项 - 如果在使用 asyncio 与不支持 RETURNING 的数据库(例如 MySQL 8)时,服务器默认值(例如生成的时间戳)将不会在新刷新的对象上可用,除非使用了
Mapper.eager_defaults
选项。在 SQLAlchemy 2.0 中,这种行为会自动应用于像 PostgreSQL、SQLite 和 MariaDB 这样使用 RETURNING 在插入行时获取新值的后端。
在 asyncio 下运行同步方法和函数
深度合成
这种方法实质上是公开了 SQLAlchemy 能够在第一时间提供 asyncio 接口的机制。虽然这样做没有技术问题,但总的来说,这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些核心理念,即任何可能导致 IO 调用的编程语句必须具有一个 await
调用,否则程序在 IO 可能发生的每一行都不会明确地表明。这种方法并没有改变这个一般性的想法,除了它允许一系列同步 IO 指令在函数调用的范围内被豁免这个规则,本质上被捆绑成一个可等待的。
作为在 asyncio 事件循环中集成传统 SQLAlchemy “懒加载”的另一种替代方法,提供了一种称为AsyncSession.run_sync()
的可选方法,该方法将在一个 greenlet 内运行任何 Python 函数,其中当它们到达数据库驱动程序时,传统的同步编程概念将被转换为使用 await
。这里的一个假设性方法是,一个以 asyncio 为导向的应用程序可以将数据库相关方法打包成函数,这些函数将使用AsyncSession.run_sync()
被调用。
修改上面的示例,如果我们不使用selectinload()
来处理 A.bs
集合,我们可以在一个单独的函数内完成对这些属性访问的处理:
import asyncio from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine def fetch_and_update_objects(session): """run traditional sync-style ORM code in a function that will be invoked within an awaitable. """ # the session object here is a traditional ORM Session. # all features are available here including legacy Query use. stmt = select(A) result = session.execute(stmt) for a1 in result.scalars(): print(a1) # lazy loads for b1 in a1.bs: print(b1) # legacy Query use a1 = session.query(A).order_by(A.id).first() a1.data = "new data" async def async_main(): engine = create_async_engine( "postgresql+asyncpg://scott:tiger@localhost/test", echo=True, ) async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.create_all) async with AsyncSession(engine) as session: async with session.begin(): session.add_all( [ A(bs=[B(), B()], data="a1"), A(bs=[B()], data="a2"), A(bs=[B(), B()], data="a3"), ] ) await session.run_sync(fetch_and_update_objects) await session.commit() # for AsyncEngine created in function scope, close and # clean-up pooled connections await engine.dispose() asyncio.run(async_main())
在“同步”运行器中运行某些函数的上述方法与在基于事件的编程库(例如 gevent
)上运行 SQLAlchemy 应用程序有一些相似之处。区别如下:
- 与使用
gevent
不同,我们可以继续使用标准的 Python asyncio 事件循环,或者任何自定义的事件循环,而无需将其集成到gevent
事件循环中。 - 完全没有“猴子补丁”。上面的示例利用了一个真正的 asyncio 驱动程序,底层的 SQLAlchemy 连接池也使用了 Python 内置的
asyncio.Queue
来池化连接。 - 程序可以自由在 async/await 代码和使用同步代码的包含函数之间切换,几乎没有性能损失。不使用“线程执行器”或任何额外的等待器或同步。
- 底层网络驱动程序也使用纯 Python asyncio 概念,不使用
gevent
和eventlet
等第三方网络库。
使用 asyncio 扩展与事件
SQLAlchemy 事件系统不会直接暴露给 asyncio 扩展,这意味着尚未有 SQLAlchemy 事件处理程序的“异步”版本。
然而,由于 asyncio 扩展包围了通常的同步 SQLAlchemy API,因此常规的“同步”风格事件处理程序可以自由使用,就像没有使用 asyncio 一样。
如下所述,目前有两种注册事件的策略,针对面向 asyncio 的 API:
- 事件可以在实例级别注册(例如特定的
AsyncEngine
实例),通过将事件与引用代理对象的sync
属性关联。例如,要针对AsyncEngine
实例注册PoolEvents.connect()
事件,请使用其AsyncEngine.sync_engine
属性作为目标。目标包括:
AsyncEngine.sync_engine
AsyncConnection.sync_connection
AsyncConnection.sync_engine
AsyncSession.sync_session
- 要在类级别注册事件,针对同一类型的所有实例(例如所有
AsyncSession
实例),请使用相应的同步风格类。例如,要针对AsyncSession
类注册SessionEvents.before_commit()
事件,请将Session
类作为目标。 - 要在
sessionmaker
级别注册,结合一个明确的sessionmaker
和一个async_sessionmaker
,使用async_sessionmaker.sync_session_class
,并将事件与sessionmaker
关联。
在异步上下文中工作的事件处理程序中,像Connection
这样的对象继续以通常的“同步”方式工作,而不需要await
或async
的使用;当消息最终被异步数据库适配器接收时,调用风格会透明地转换回异步调用风格。对于传递了 DBAPI 级别连接的事件,例如PoolEvents.connect()
,该对象是一个符合 pep-249 的“连接”对象,它将同步样式调用转换为异步驱动程序。
具有异步引擎/会话/会话制造器的事件监听器示例
一些与面向异步 API 构造相关的同步样式事件处理程序示例如下:
- 在 AsyncEngine 上的核心事件
在这个例子中,我们访问AsyncEngine.sync_engine
属性,作为ConnectionEvents
和PoolEvents
的目标:
import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.engine import Engine from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") # connect event on instance of Engine @event.listens_for(engine.sync_engine, "connect") def my_on_connect(dbapi_con, connection_record): print("New DBAPI connection:", dbapi_con) cursor = dbapi_con.cursor() # sync style API use for adapted DBAPI connection / cursor cursor.execute("select 'execute from event'") print(cursor.fetchone()[0]) # before_execute event on all Engine instances @event.listens_for(Engine, "before_execute") def my_before_execute( conn, clauseelement, multiparams, params, execution_options, ): print("before execute!") async def go(): async with engine.connect() as conn: await conn.execute(text("select 1")) await engine.dispose() asyncio.run(go())
- 输出:
New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>> execute from event before execute!
- 在 AsyncSession 上的 ORM 事件
在这个例子中,我们访问AsyncSession.sync_session
作为SessionEvents
的目标:
import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import Session engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") session = AsyncSession(engine) # before_commit event on instance of Session @event.listens_for(session.sync_session, "before_commit") def my_before_commit(session): print("before commit!") # sync style API use on Session connection = session.connection() # sync style API use on Connection result = connection.execute(text("select 'execute from event'")) print(result.first()) # after_commit event on all Session instances @event.listens_for(Session, "after_commit") def my_after_commit(session): print("after commit!") async def go(): await session.execute(text("select 1")) await session.commit() await session.close() await engine.dispose() asyncio.run(go())
- 输出:
before commit! execute from event after commit!
- 在 async_sessionmaker 上的 ORM 事件
对于这种用例,我们将sessionmaker
作为事件目标,然后将其分配给async_sessionmaker
,使用async_sessionmaker.sync_session_class
参数:
import asyncio from sqlalchemy import event from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import sessionmaker sync_maker = sessionmaker() maker = async_sessionmaker(sync_session_class=sync_maker) @event.listens_for(sync_maker, "before_commit") def before_commit(session): print("before commit") async def main(): async_session = maker() await async_session.commit() asyncio.run(main())
- 输出:
before commit
在连接池和其他事件中使用仅可等待的驱动程序方法
如上节所述,事件处理程序(例如那些围绕 PoolEvents
的事件处理程序)接收到一个同步风格的“DBAPI”连接,这是由 SQLAlchemy asyncio 方言提供的包装对象,用于将底层 asyncio “驱动程序”连接适配成可以被 SQLAlchemy 内部使用的对象。当用户定义的实现需要直接使用最终的“驱动程序”连接,并在该驱动程序连接上使用仅可等待方法时,就会出现特殊的用例。其中一个例子是 asyncpg 驱动程序提供的 .set_type_codec()
方法。
为了适应这种用例,SQLAlchemy 的 AdaptedConnection
类提供了一个方法 AdaptedConnection.run_async()
,允许在事件处理程序或其他 SQLAlchemy 内部的“同步”上下文中调用可等待函数。这个方法直接对应于 AsyncConnection.run_sync()
方法,后者允许在异步环境中运行同步风格的方法。
AdaptedConnection.run_async()
应该传递一个函数,该函数将接受内部的“驱动程序”连接作为单个参数,并返回一个可等待对象,该对象将由 AdaptedConnection.run_async()
方法调用。给定的函数本身不需要声明为 async
;它可以是一个 Python 的 lambda:
,因为返回的可等待值将在返回后被调用:
from sqlalchemy import event from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine(...) @event.listens_for(engine.sync_engine, "connect") def register_custom_types(dbapi_connection, *args): dbapi_connection.run_async( lambda connection: connection.set_type_codec( "MyCustomType", encoder, decoder, # ... ) )
在上面的例子中,传递给 register_custom_types
事件处理程序的对象是 AdaptedConnection
的一个实例,它提供了对底层仅支持异步驱动程序级连接对象的类似 DBAPI 的接口。然后,AdaptedConnection.run_async()
方法提供了对一个可等待环境的访问,在该环境中可以对底层驱动程序级连接进行操作。
版本 1.4.30 中的新功能。
异步引擎 / 会话 / 会话工厂的事件监听器示例
下面给出了一些与异步 API 构造相关的同步风格事件处理程序的示例:
- 异步引擎的核心事件
在这个例子中,我们将AsyncEngine.sync_engine
属性作为ConnectionEvents
和PoolEvents
的目标:
import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.engine import Engine from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") # connect event on instance of Engine @event.listens_for(engine.sync_engine, "connect") def my_on_connect(dbapi_con, connection_record): print("New DBAPI connection:", dbapi_con) cursor = dbapi_con.cursor() # sync style API use for adapted DBAPI connection / cursor cursor.execute("select 'execute from event'") print(cursor.fetchone()[0]) # before_execute event on all Engine instances @event.listens_for(Engine, "before_execute") def my_before_execute( conn, clauseelement, multiparams, params, execution_options, ): print("before execute!") async def go(): async with engine.connect() as conn: await conn.execute(text("select 1")) await engine.dispose() asyncio.run(go())
- 输出:
New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>> execute from event before execute!
- AsyncSession 上的 ORM 事件
在这个例子中,我们将AsyncSession.sync_session
作为SessionEvents
的目标:
import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import Session engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") session = AsyncSession(engine) # before_commit event on instance of Session @event.listens_for(session.sync_session, "before_commit") def my_before_commit(session): print("before commit!") # sync style API use on Session connection = session.connection() # sync style API use on Connection result = connection.execute(text("select 'execute from event'")) print(result.first()) # after_commit event on all Session instances @event.listens_for(Session, "after_commit") def my_after_commit(session): print("after commit!") async def go(): await session.execute(text("select 1")) await session.commit() await session.close() await engine.dispose() asyncio.run(go())
- 输出:
before commit! execute from event after commit!
- 异步会话工厂上的 ORM 事件
对于这种用例,我们将sessionmaker
作为事件目标,然后使用async_sessionmaker
并使用async_sessionmaker.sync_session_class
参数进行赋值:
import asyncio from sqlalchemy import event from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import sessionmaker sync_maker = sessionmaker() maker = async_sessionmaker(sync_session_class=sync_maker) @event.listens_for(sync_maker, "before_commit") def before_commit(session): print("before commit") async def main(): async_session = maker() await async_session.commit() asyncio.run(main())
- 输出:
before commit
在连接池和其他事件中使用仅可等待的驱动程序方法
如上节所述,事件处理程序(例如围绕PoolEvents
事件处理程序定位的事件处理程序)接收到一个同步风格的“DBAPI”连接,这是 SQLAlchemy asyncio 方言提供的包装对象,用于将底层的 asyncio“driver”连接适配为 SQLAlchemy 内部可以使用的连接。当用户定义的实现需要直接使用最终的“driver”连接时,使用该驱动连接上的仅可等待方法时会出现特殊的用例。一个这样的例子是 asyncpg 驱动程序提供的.set_type_codec()
方法。
为了适应这种用例,SQLAlchemy 的AdaptedConnection
类提供了一个方法AdaptedConnection.run_async()
,允许在事件处理程序或其他 SQLAlchemy 内部的“同步”上下文中调用可等待函数。这个方法直接类似于AsyncConnection.run_sync()
方法,允许同步风格的方法在异步下运行。
AdaptedConnection.run_async()
应该传递一个接受最内层的“driver”连接作为单个参数的函数,并返回一个由AdaptedConnection.run_async()
方法调用的可等待对象。给定函数本身不需要声明为async
;它完全可以是一个 Python 的lambda:
,因为返回的可等待值将在返回后被调用:
from sqlalchemy import event from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine(...) @event.listens_for(engine.sync_engine, "connect") def register_custom_types(dbapi_connection, *args): dbapi_connection.run_async( lambda connection: connection.set_type_codec( "MyCustomType", encoder, decoder, # ... ) )
上面,传递给register_custom_types
事件处理程序的对象是AdaptedConnection
的一个实例,它提供了对底层仅异步驱动程序级连接对象的类似 DBAPI 的接口。然后,AdaptedConnection.run_async()
方法提供了对可等待环境的访问,其中底层驱动程序级连接可以被操作。
1.4.30 版本中新增。
使用多个 asyncio 事件循环
使用多个事件循环的应用程序,例如在将 asyncio 与多线程结合的不常见情况下,在使用默认池实现时不应该将同一个AsyncEngine
与不同的事件循环共享。
如果从一个事件循环传递AsyncEngine
到另一个事件循环,应该在它被重新使用于新的事件循环之前调用AsyncEngine.dispose()
方法。未这样做可能会导致类似于Task got Future attached to a different loop
的RuntimeError
。
如果同一个引擎必须在不同的循环之间共享,应该配置为禁用连接池,使用NullPool
,防止引擎使用任何连接超过一次:
from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.pool import NullPool engine = create_async_engine( "postgresql+asyncpg://user:pass@host/dbname", poolclass=NullPool, )
SqlAlchemy 2.0 中文文档(二十八)(4)https://developer.aliyun.com/article/1560393