ORM 异常
SQLAlchemy ORM 异常。
对象名称 | 描述 |
ConcurrentModificationError | StaleDataError 的别名 |
NO_STATE | 可能由仪器实现引发的异常类型。 |
attribute sqlalchemy.orm.exc.ConcurrentModificationError
StaleDataError
的别名
exception sqlalchemy.orm.exc.DetachedInstanceError
尝试访问已分离的映射实例上未加载的属性。
类签名
类sqlalchemy.orm.exc.DetachedInstanceError
(sqlalchemy.exc.SQLAlchemyError
)
exception sqlalchemy.orm.exc.FlushError
在 flush() 过程中检测到无效条件。
类签名
类sqlalchemy.orm.exc.FlushError
(sqlalchemy.exc.SQLAlchemyError
)
exception sqlalchemy.orm.exc.LoaderStrategyException
一个属性的加载策略不存在。
类签名
类sqlalchemy.orm.exc.LoaderStrategyException
(sqlalchemy.exc.InvalidRequestError
)
method __init__(applied_to_property_type: Type[Any], requesting_property: MapperProperty[Any], applies_to: Type[MapperProperty[Any]] | None, actual_strategy_type: Type[LoaderStrategy] | None, strategy_key: Tuple[Any, ...])
sqlalchemy.orm.exc.NO_STATE = (<class 'AttributeError'>, <class 'KeyError'>)
可能由仪器实现引发的异常类型。
exception sqlalchemy.orm.exc.ObjectDeletedError
刷新操作未能检索到与对象已知主键标识符对应的数据库行。
当在对象上访问过期属性或使用Query.get()
检索到被检测为过期的对象时,刷新操作会进行。基于主键发出目标行的 SELECT;如果没有返回行,则引发此异常。
这个异常的真正含义只是与持久对象关联的主键标识符对应的行不存在。该行可能已被删除,或在某些情况下,主键已更新为新值,超出了 ORM 对目标对象的管理。
类签名
类sqlalchemy.orm.exc.ObjectDeletedError
(sqlalchemy.exc.InvalidRequestError
)
method __init__(state: InstanceState[Any], msg: str | None = None)
exception sqlalchemy.orm.exc.ObjectDereferencedError
由于对象被垃圾回收而无法完成操作。
类签名
类sqlalchemy.orm.exc.ObjectDereferencedError
(sqlalchemy.exc.SQLAlchemyError
)
exception sqlalchemy.orm.exc.StaleDataError
遇到了未被考虑到的数据库状态操作。
导致此情况发生的条件包括:
- 一个刷新可能已尝试更新或删除行,并且在 UPDATE 或 DELETE 语句期间匹配到了意外数量的行。请注意,当使用 version_id_col 时,UPDATE 或 DELETE 语句中的行也将与当前已知的版本标识符匹配。
- 一个带有 version_id_col 的映射对象被刷新,而从数据库返回的版本号与对象本身的版本号不匹配。
- 一个对象从其父对象中分离出来,然而该对象以前附加到了另一个父标识,该父标识已被垃圾收集,并且无法确定新的父标识是否真的是最新的“父”。
类签名
类sqlalchemy.orm.exc.StaleDataError
(sqlalchemy.exc.SQLAlchemyError
)
exception sqlalchemy.orm.exc.UnmappedClassError
请求了一个未知类的映射操作。
类签名
类sqlalchemy.orm.exc.UnmappedClassError
(sqlalchemy.orm.exc.UnmappedError
)
method __init__(cls: Type[_T], msg: str | None = None)
exception sqlalchemy.orm.exc.UnmappedColumnError
请求了一个未知列的映射操作。
类签名
类sqlalchemy.orm.exc.UnmappedColumnError
(sqlalchemy.exc.InvalidRequestError
)
exception sqlalchemy.orm.exc.UnmappedError
引发涉及未出现预期映射的异常的基类。
类签名
类sqlalchemy.orm.exc.UnmappedError
(sqlalchemy.exc.InvalidRequestError
)
exception sqlalchemy.orm.exc.UnmappedInstanceError
请求了一个未知实例的映射操作。
类签名
类sqlalchemy.orm.exc.UnmappedInstanceError
(sqlalchemy.orm.exc.UnmappedError
)
method __init__(obj: object, msg: str | None = None)
ORM 扩展
SQLAlchemy 有各种 ORM 扩展可用,这些扩展为核心行为添加了额外的功能。
这些扩展几乎完全建立在公共核心和 ORM API 上,鼓励用户阅读它们的源代码以进一步了解它们的行为。特别是“水平分片”、“混合属性”和“变动追踪”扩展非常简洁。
- 异步 I/O(asyncio)
- 关联代理
- 自动映射
- 烘焙查询
- 声明式扩展
- Mypy / Pep-484 对 ORM 映射的支持
- 变动追踪
- 排序列表
- 水平分片
- 混合属性
- 可索引
- 替代类仪器化
异步 I/O(asyncio)
支持 Python asyncio。包括对 Core 和 ORM 使用的支持,使用与 asyncio 兼容的方言。
版本 1.4 中的新功能。
警告
请阅读 Asyncio 平台安装说明(包括 Apple M1)以获取许多平台的重要平台安装说明,包括Apple M1 架构。
另请参阅
Core 和 ORM 的异步 IO 支持 - 初始功能公告
异步 IO 集成 - 展示了在 asyncio 扩展中使用 Core 和 ORM 的示例脚本。
Asyncio 平台安装说明(包括 Apple M1)
asyncio 扩展仅支持 Python 3。它还依赖于greenlet库。这个依赖在常见的机器平台上默认安装,包括:
x86_64 aarch64 ppc64le amd64 win32
对于上述平台,greenlet
已知提供预构建的 wheel 文件。对于其他平台,greenlet 不会默认安装;可以在Greenlet - Download Files查看当前的文件列表。请注意有许多架构被省略,包括 Apple M1。
要安装 SQLAlchemy 并确保greenlet
依赖存在,无论使用什么平台,可以按照以下方式安装[asyncio]
setuptools extra,这也会指示pip
安装greenlet
:
pip install sqlalchemy[asyncio]
请注意,在没有预构建 wheel 文件的平台上安装greenlet
意味着greenlet
将从源代码构建,这要求 Python 的开发库也存在。
概要 - Core
对于核心用法,create_async_engine()
函数创建一个AsyncEngine
的实例,然后提供传统Engine
API 的异步版本。AsyncEngine
通过其AsyncEngine.connect()
和AsyncEngine.begin()
方法提供一个AsyncConnection
,这两个方法都提供异步上下文管理器。AsyncConnection
然后可以使用AsyncConnection.execute()
方法来执行语句以提供一个缓冲的Result
,或者使用AsyncConnection.stream()
方法来提供一个流式的服务器端AsyncResult
:
>>> import asyncio >>> from sqlalchemy import Column >>> from sqlalchemy import MetaData >>> from sqlalchemy import select >>> from sqlalchemy import String >>> from sqlalchemy import Table >>> from sqlalchemy.ext.asyncio import create_async_engine >>> meta = MetaData() >>> t1 = Table("t1", meta, Column("name", String(50), primary_key=True)) >>> async def async_main() -> None: ... engine = create_async_engine("sqlite+aiosqlite://", echo=True) ... ... async with engine.begin() as conn: ... await conn.run_sync(meta.drop_all) ... await conn.run_sync(meta.create_all) ... ... await conn.execute( ... t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}] ... ) ... ... async with engine.connect() as conn: ... # select a Result, which will be delivered with buffered ... # results ... result = await conn.execute(select(t1).where(t1.c.name == "some name 1")) ... ... print(result.fetchall()) ... ... # for AsyncEngine created in function scope, close and ... # clean-up pooled connections ... await engine.dispose() >>> asyncio.run(async_main()) BEGIN (implicit) ... CREATE TABLE t1 ( name VARCHAR(50) NOT NULL, PRIMARY KEY (name) ) ... INSERT INTO t1 (name) VALUES (?) [...] [('some name 1',), ('some name 2',)] COMMIT BEGIN (implicit) SELECT t1.name FROM t1 WHERE t1.name = ? [...] ('some name 1',) [('some name 1',)] ROLLBACK
上面,AsyncConnection.run_sync()
方法可用于调用特殊的 DDL 函数,例如MetaData.create_all()
,这些函数不包括可等待的钩子。
提示
在使用AsyncEngine
对象的范围内调用await
来调用AsyncEngine.dispose()
方法是明智的,如上例中的async_main
函数所示。这确保了连接池保持的任何连接在可等待的上下文中被正确处理。与使用阻塞 IO 不同,SQLAlchemy 无法在__del__
或弱引用终结器等方法中正确处理这些连接,因为没有机会调用await
。当引擎超出范围时未显式处理引擎可能导致发出到标准输出的警告,类似于RuntimeError: Event loop is closed
的形式在垃圾回收中。
AsyncConnection
还提供了一个“流式” API,通过 AsyncConnection.stream()
方法返回一个 AsyncResult
对象。该结果对象使用服务器端游标并提供了一个 async/await API,比如一个异步迭代器:
async with engine.connect() as conn: async_result = await conn.stream(select(t1)) async for row in async_result: print("row: %s" % (row,))
概要 - ORM
使用 2.0 风格 查询,AsyncSession
类提供了完整的 ORM 功能。
在默认使用模式下,必须特别小心,以避免涉及 ORM 关系和列属性的 惰性加载 或其他已过期的属性访问;下一节 在使用 AsyncSession 时防止隐式 IO 对此进行了详细说明。
警告
一个 AsyncSession
实例不能安全地用于多个并发任务。请参阅章节 在并发任务中使用 AsyncSession 和 会话是线程安全的吗? AsyncSession 是否安全用于共享在并发任务中? 了解背景信息。
下面的示例演示了一个完整的示例,包括映射器和会话配置:
>>> from __future__ import annotations >>> import asyncio >>> import datetime >>> from typing import List >>> from sqlalchemy import ForeignKey >>> from sqlalchemy import func >>> from sqlalchemy import select >>> from sqlalchemy.ext.asyncio import AsyncAttrs >>> from sqlalchemy.ext.asyncio import async_sessionmaker >>> from sqlalchemy.ext.asyncio import AsyncSession >>> from sqlalchemy.ext.asyncio import create_async_engine >>> from sqlalchemy.orm import DeclarativeBase >>> from sqlalchemy.orm import Mapped >>> from sqlalchemy.orm import mapped_column >>> from sqlalchemy.orm import relationship >>> from sqlalchemy.orm import selectinload >>> class Base(AsyncAttrs, DeclarativeBase): ... pass >>> class B(Base): ... __tablename__ = "b" ... ... id: Mapped[int] = mapped_column(primary_key=True) ... a_id: Mapped[int] = mapped_column(ForeignKey("a.id")) ... data: Mapped[str] >>> class A(Base): ... __tablename__ = "a" ... ... id: Mapped[int] = mapped_column(primary_key=True) ... data: Mapped[str] ... create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now()) ... bs: Mapped[List[B]] = relationship() >>> async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None: ... async with async_session() as session: ... async with session.begin(): ... session.add_all( ... [ ... A(bs=[B(data="b1"), B(data="b2")], data="a1"), ... A(bs=[], data="a2"), ... A(bs=[B(data="b3"), B(data="b4")], data="a3"), ... ] ... ) >>> async def select_and_update_objects( ... async_session: async_sessionmaker[AsyncSession], ... ) -> None: ... async with async_session() as session: ... stmt = select(A).order_by(A.id).options(selectinload(A.bs)) ... ... result = await session.execute(stmt) ... ... for a in result.scalars(): ... print(a, a.data) ... print(f"created at: {a.create_date}") ... for b in a.bs: ... print(b, b.data) ... ... result = await session.execute(select(A).order_by(A.id).limit(1)) ... ... a1 = result.scalars().one() ... ... a1.data = "new data" ... ... await session.commit() ... ... # access attribute subsequent to commit; this is what ... # expire_on_commit=False allows ... print(a1.data) ... ... # alternatively, AsyncAttrs may be used to access any attribute ... # as an awaitable (new in 2.0.13) ... for b1 in await a1.awaitable_attrs.bs: ... print(b1, b1.data) >>> async def async_main() -> None: ... engine = create_async_engine("sqlite+aiosqlite://", echo=True) ... ... # async_sessionmaker: a factory for new AsyncSession objects. ... # expire_on_commit - don't expire objects after transaction commit ... async_session = async_sessionmaker(engine, expire_on_commit=False) ... ... async with engine.begin() as conn: ... await conn.run_sync(Base.metadata.create_all) ... ... await insert_objects(async_session) ... await select_and_update_objects(async_session) ... ... # for AsyncEngine created in function scope, close and ... # clean-up pooled connections ... await engine.dispose() >>> asyncio.run(async_main()) BEGIN (implicit) ... CREATE TABLE a ( id INTEGER NOT NULL, data VARCHAR NOT NULL, create_date DATETIME DEFAULT (CURRENT_TIMESTAMP) NOT NULL, PRIMARY KEY (id) ) ... CREATE TABLE b ( id INTEGER NOT NULL, a_id INTEGER NOT NULL, data VARCHAR NOT NULL, PRIMARY KEY (id), FOREIGN KEY(a_id) REFERENCES a (id) ) ... COMMIT BEGIN (implicit) INSERT INTO a (data) VALUES (?) RETURNING id, create_date [...] ('a1',) ... INSERT INTO b (a_id, data) VALUES (?, ?) RETURNING id [...] (1, 'b2') ... COMMIT BEGIN (implicit) SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id [...] () SELECT b.a_id AS b_a_id, b.id AS b_id, b.data AS b_data FROM b WHERE b.a_id IN (?, ?, ?) [...] (1, 2, 3) <A object at ...> a1 created at: ... <B object at ...> b1 <B object at ...> b2 <A object at ...> a2 created at: ... <A object at ...> a3 created at: ... <B object at ...> b3 <B object at ...> b4 SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id LIMIT ? OFFSET ? [...] (1, 0) UPDATE a SET data=? WHERE a.id = ? [...] ('new data', 1) COMMIT new data <B object at ...> b1 <B object at ...> b2
在上面的示例中,使用可选的 async_sessionmaker
助手实例化了 AsyncSession
,该助手提供了一个带有固定参数集的新 AsyncSession
对象的工厂,其中包括将其与特定数据库 URL 关联。然后将其传递给其他方法,在那里它可以在 Python 异步上下文管理器(即 async with:
语句)中使用,以便在块结束时自动关闭;这相当于调用 AsyncSession.close()
方法。
在并发任务中使用 AsyncSession
AsyncSession
对象是一个可变的,有状态的对象,代表了正在进行的单个,有状态的数据库事务。使用 asyncio 的并发任务,例如使用 asyncio.gather()
等 API,应该每个个体任务使用单独的 AsyncSession
。
参见 Is the Session thread-safe? Is AsyncSession safe to share in concurrent tasks? 部分,了解关于 Session
和 AsyncSession
如何在并发工作负载中使用的一般描述。### 在使用 AsyncSession 时防止隐式 IO
使用传统 asyncio,应用程序需要避免发生任何可能导致 IO-on-attribute 访问的点。以下是可用于帮助此目的的技术,在前述示例中有很多技术。
- 懒加载关系、延迟列或表达式的属性,或在到期情况下被访问的属性可以利用
AsyncAttrs
mixin。当将此 mixin 添加到特定类或更一般地添加到 DeclarativeBase
超类时,它提供一个访问器AsyncAttrs.awaitable_attrs
,它将任何属性作为可等待对象提供:
from __future__ import annotations from typing import List from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import relationship class Base(AsyncAttrs, DeclarativeBase): pass class A(Base): __tablename__ = "a" # ... rest of mapping ... bs: Mapped[List[B]] = relationship() class B(Base): __tablename__ = "b" # ... rest of mapping ...
- 在不使用急加载的情况下,访问新加载的
A
实例上的A.bs
集合通常会使用 lazy loading,为了成功,通常会向数据库发出 IO,但在 asyncio 下会失败,因为不允许隐式 IO。在没有任何先前加载操作的情况下直接访问此属性,在 asyncio 下,该属性可以作为可等待对象进行访问,指示AsyncAttrs.awaitable_attrs
前缀:
a1 = (await session.scalars(select(A))).one() for b1 in await a1.awaitable_attrs.bs: print(b1)
AsyncAttrs
mixin 提供了一个简洁的外观,它覆盖了内部方法,该方法也被AsyncSession.run_sync()
方法使用。
版本 2.0.13 中的新功能。
另请参见AsyncAttrs
- 使用 SQLAlchemy 2.0 中的 Write Only Relationships 特性,可以将集合替换为只写集合,这些集合永远不会隐式发出 IO,在此特性下,集合从不读取,只使用显式 SQL 调用查询。在 Asyncio Integration 部分的示例
async_orm_writeonly.py
中,可见一个使用 asyncio 的只写集合示例。
当使用仅写集合时,程序的行为在关于集合方面是简单且易于预测的。然而,缺点是没有任何内置系统可以一次性加载许多这些集合,而是需要手动执行。因此,下面的许多要点涉及在使用 asyncio 时使用传统的懒加载关系时需要更加小心的具体技术。 - 如果不使用
AsyncAttrs
,关系可以声明为lazy="raise"
,这样默认情况下它们不会尝试发出 SQL。为了加载集合,将使用 eager loading。 - 最有用的急加载策略是
selectinload()
急加载器,在前面的例子中被用来在await session.execute()
调用的范围内急加载A.bs
集合:
stmt = select(A).options(selectinload(A.bs))
- 当构建新对象时,集合总是被分配一个默认的空集合,比如上面的例子中的列表:
A(bs=[], data="a2")
- 这允许在刷新
A
对象时,上述A
对象上的.bs
集合存在且可读;否则,当刷新A
时,.bs
将被卸载并在访问时引发错误。 AsyncSession
是使用Session.expire_on_commit
设置为 False 进行配置的,这样我们可以在调用AsyncSession.commit()
之后访问对象的属性,就像在最后一行访问属性时一样:
# create AsyncSession with expire_on_commit=False async_session = AsyncSession(engine, expire_on_commit=False) # sessionmaker version async_session = async_sessionmaker(engine, expire_on_commit=False) async with async_session() as session: result = await session.execute(select(A).order_by(A.id)) a1 = result.scalars().first() # commit would normally expire all attributes await session.commit() # access attribute subsequent to commit; this is what # expire_on_commit=False allows print(a1.data)
其他指导原则包括:
- 应该避免使用类似
AsyncSession.expire()
的方法,而应该使用AsyncSession.refresh()
;如果绝对需要过期。通常情况下不应该需要过期,因为在使用 asyncio 时通常应该将Session.expire_on_commit
设置为False
。 - 使用
AsyncSession.refresh()
可以显式加载懒加载关系,如果所需的属性名称被显式传递给Session.refresh.attribute_names
,例如:
# assume a_obj is an A that has lazy loaded A.bs collection a_obj = await async_session.get(A, [1]) # force the collection to load by naming it in attribute_names await async_session.refresh(a_obj, ["bs"]) # collection is present print(f"bs collection: {a_obj.bs}")
- 当然最好一开始就使用急加载,以便无需延迟加载即可设置集合。
2.0.4 版中新增了对AsyncSession.refresh()
和底层Session.refresh()
方法的支持,以强制懒加载的关系加载,如果它们在Session.refresh.attribute_names
参数中明确命名。在之前的版本中,即使在参数中命名了关系,也会被静默跳过。 - 避免使用文档中记录的 Cascades 中的
all
级联选项,而是明确列出所需的级联特性。all
级联选项暗示了 refresh-expire 设置,这意味着AsyncSession.refresh()
方法将使相关对象上的属性过期,但不一定会刷新那些相关对象,假设未在relationship()
内配置急加载,则将其保留在过期状态。 - 如果使用,应该使用适当的加载器选项来为
deferred()
列进行延迟加载,除了上面注意到的relationship()
结构。请参阅 Limiting which Columns Load with Column Deferral 了解延迟列加载的背景信息。 - 在 Dynamic Relationship Loaders 中描述的“动态”关系加载器策略默认情况下不与 asyncio 方法兼容。只有在 Running Synchronous Methods and Functions under asyncio 中描述的
AsyncSession.run_sync()
方法内调用时,或者通过使用其.statement
属性获取普通选择时,它才能直接使用:
user = await session.get(User, 42) addresses = (await session.scalars(user.addresses.statement)).all() stmt = user.addresses.statement.where(Address.email_address.startswith("patrick")) addresses_filter = (await session.scalars(stmt)).all()
- 引入 SQLAlchemy 2.0 版本的 write only 技术完全与 asyncio 兼容,并应优先使用。
请参阅
“动态”关系加载器被“只写”所取代 - 迁移到 2.0 样式的注意事项 - 如果在不支持 RETURNING 的数据库(例如 MySQL 8)中使用 asyncio,那么新刷新的对象上将不会有服务器默认值,例如生成的时间戳,除非使用
Mapper.eager_defaults
选项。在 SQLAlchemy 2.0 中,这种行为会自动应用于像 PostgreSQL、SQLite 和 MariaDB 这样使用 RETURNING 在插入行时获取新值的后端。### 在 asyncio 下运行同步方法和函数
深度炼金术
这种方法本质上是公开了 SQLAlchemy 能够提供 asyncio 接口的机制。虽然这样做没有技术问题,但总体上这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些核心理念,即任何可能导致 IO 调用的编程语句必须有一个 await
调用,否则程序不会明确地指出每一行可能发生 IO 的地方。这种方法并没有改变这个一般观念,只是允许一系列同步 IO 指令在函数调用范围内免除这个规则,实质上被打包成一个可等待对象。
作为在 asyncio 事件循环中集成传统 SQLAlchemy “延迟加载”的另一种方法,提供了一种名为 AsyncSession.run_sync()
的可选方法,它将在一个 greenlet 中运行任何 Python 函数,传统的同步编程概念将在到达数据库驱动程序时转换为使用 await
。这里的一个假设方法是,一个面向 asyncio 的应用程序可以将与数据库相关的方法打包到使用 AsyncSession.run_sync()
调用的函数中。
修改上面的示例,如果我们不为 A.bs
集合使用 selectinload()
,我们可以在一个单独的函数中完成对这些属性访问的处理:
import asyncio from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine def fetch_and_update_objects(session): """run traditional sync-style ORM code in a function that will be invoked within an awaitable. """ # the session object here is a traditional ORM Session. # all features are available here including legacy Query use. stmt = select(A) result = session.execute(stmt) for a1 in result.scalars(): print(a1) # lazy loads for b1 in a1.bs: print(b1) # legacy Query use a1 = session.query(A).order_by(A.id).first() a1.data = "new data" async def async_main(): engine = create_async_engine( "postgresql+asyncpg://scott:tiger@localhost/test", echo=True, ) async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.create_all) async with AsyncSession(engine) as session: async with session.begin(): session.add_all( [ A(bs=[B(), B()], data="a1"), A(bs=[B()], data="a2"), A(bs=[B(), B()], data="a3"), ] ) await session.run_sync(fetch_and_update_objects) await session.commit() # for AsyncEngine created in function scope, close and # clean-up pooled connections await engine.dispose() asyncio.run(async_main())
在“sync”运行器中运行某些函数的上述方法与在类似 gevent
的事件驱动编程库上运行 SQLAlchemy 应用程序的应用程序有一些相似之处。区别如下:
- 与使用
gevent
不同,我们可以继续使用标准的 Python asyncio 事件循环,或任何自定义事件循环,而无需集成到gevent
事件循环中。 - 没有任何“猴子补丁”。上面的示例使用了真正的 asyncio 驱动程序,底层的 SQLAlchemy 连接池也使用了 Python 内置的
asyncio.Queue
来池化连接。 - 该程序可以自由地在异步/等待代码和使用同步代码的封装函数之间切换,几乎没有性能损失。没有使用“线程执行器”或任何额外的等待器或同步。
- 底层网络驱动程序也在使用纯 Python asyncio 概念,不使用
gevent
和eventlet
提供的第三方网络库。## 使用与异步扩展的事件
SQLAlchemy 的事件系统未直接由异步扩展暴露,这意味着目前还没有“异步”版本的 SQLAlchemy 事件处理程序。
但是,由于异步扩展包围了通常的同步 SQLAlchemy API,因此常规的“同步”风格事件处理程序可自由使用,就像没有使用 asyncio 一样。
如下所述,目前有两种策略可以注册给予 asyncio-facing APIs 的事件:
- 事件可以在实例级别(例如特定的
AsyncEngine
实例)上注册,方法是将事件与引用代理对象的sync
属性关联起来。例如,要针对AsyncEngine
实例注册PoolEvents.connect()
事件,请使用其AsyncEngine.sync_engine
属性作为目标。目标包括:
AsyncEngine.sync_engine
AsyncConnection.sync_connection
AsyncConnection.sync_engine
AsyncSession.sync_session
- 要在类级别注册事件,针对同一类型的所有实例(例如所有
AsyncSession
实例),请使用相应的同步样式类。例如,要针对AsyncSession
类注册SessionEvents.before_commit()
事件,请使用Session
类作为目标。 - 要在
sessionmaker
级别注册,请使用async_sessionmaker.sync_session_class
将显式sessionmaker
与async_sessionmaker
组合,并将事件与sessionmaker
相关联。
当在异步 IO 上下文中的事件处理程序中工作时,例如Connection
等对象将继续以通常的“同步”方式工作,而不需要await
或async
使用;当消息最终由异步 IO 数据库适配器接收时,调用样式将透明地转换回异步 IO 调用样式。对于传递了 DBAPI 级别连接的事件,例如PoolEvents.connect()
,对象是一个符合 pep-249 的“连接”对象,它将同步样式调用适配为异步 IO 驱动程序。
带有异步引擎/会话/会话工厂的事件监听器示例
下面是一些与异步 API 构造相关的同步风格事件处理程序的示例:
- AsyncEngine 上的核心事件
在此示例中,我们将AsyncEngine.sync_engine
属性作为ConnectionEvents
和PoolEvents
的目标:
import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.engine import Engine from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") # connect event on instance of Engine @event.listens_for(engine.sync_engine, "connect") def my_on_connect(dbapi_con, connection_record): print("New DBAPI connection:", dbapi_con) cursor = dbapi_con.cursor() # sync style API use for adapted DBAPI connection / cursor cursor.execute("select 'execute from event'") print(cursor.fetchone()[0]) # before_execute event on all Engine instances @event.listens_for(Engine, "before_execute") def my_before_execute( conn, clauseelement, multiparams, params, execution_options, ): print("before execute!") async def go(): async with engine.connect() as conn: await conn.execute(text("select 1")) await engine.dispose() asyncio.run(go())
- 输出:
New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>> execute from event before execute!
- AsyncSession 上的 ORM 事件
在此示例中,我们将AsyncSession.sync_session
作为SessionEvents
的目标:
import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import Session engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") session = AsyncSession(engine) # before_commit event on instance of Session @event.listens_for(session.sync_session, "before_commit") def my_before_commit(session): print("before commit!") # sync style API use on Session connection = session.connection() # sync style API use on Connection result = connection.execute(text("select 'execute from event'")) print(result.first()) # after_commit event on all Session instances @event.listens_for(Session, "after_commit") def my_after_commit(session): print("after commit!") async def go(): await session.execute(text("select 1")) await session.commit() await session.close() await engine.dispose() asyncio.run(go())
- 输出:
before commit! execute from event after commit!
- async_sessionmaker 上的 ORM 事件
对于这种用例,我们将sessionmaker
作为事件目标,然后使用async_sessionmaker.sync_session_class
参数将其分配给async_sessionmaker
:
import asyncio from sqlalchemy import event from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import sessionmaker sync_maker = sessionmaker() maker = async_sessionmaker(sync_session_class=sync_maker) @event.listens_for(sync_maker, "before_commit") def before_commit(session): print("before commit") async def main(): async_session = maker() await async_session.commit() asyncio.run(main())
- 输出:
before commit
在连接池和其他事件中使用仅 awaitable 的驱动程序方法
如上一节所讨论的那样,诸如PoolEvents
之类的事件处理程序接收到一个同步风格的“DBAPI”连接,这是 SQLAlchemy asyncio 方言提供的一个包装对象,用于将底层的 asyncio“driver”连接适配成 SQLAlchemy 内部可以使用的连接。当用户定义的事件处理程序需要直接使用最终的“driver”连接,并且只使用该驱动连接上的 awaitable 方法时,就会出现一种特殊的用例。其中一个例子是 asyncpg 驱动程序提供的.set_type_codec()
方法。
为了适应这种用例,SQLAlchemy 的AdaptedConnection
类提供了一个方法AdaptedConnection.run_async()
,允许在事件处理程序或其他 SQLAlchemy 内部的“同步”上下文中调用一个 awaitable 函数。这个方法直接类似于AsyncConnection.run_sync()
方法,它允许一个同步风格的方法在 async 下运行。
应该向AdaptedConnection.run_async()
传递一个接受内部“driver”连接作为单个参数的函数,并返回一个 awaitable,该 awaitable 将由AdaptedConnection.run_async()
方法调用。给定的函数本身不需要声明为async
;它完全可以是 Python 的lambda:
,因为返回的 awaitable 值将在返回后被调用:
from sqlalchemy import event from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine(...) @event.listens_for(engine.sync_engine, "connect") def register_custom_types(dbapi_connection, *args): dbapi_connection.run_async( lambda connection: connection.set_type_codec( "MyCustomType", encoder, decoder, # ... ) )
上面,传递给register_custom_types
事件处理程序的对象是AdaptedConnection
的一个实例,它提供了一个类似 DBAPI 的接口,用于访问底层的仅 async 的驱动级连接对象。然后,AdaptedConnection.run_async()
方法提供了访问底层驱动程序级连接的 awaitable 环境。
版本 1.4.30 中的新功能。
使用多个 asyncio 事件循环
当一个应用程序同时使用多个事件循环时,例如在罕见的情况下将 asyncio 与多线程结合使用时,当使用默认的池实现时,不应该将相同的 AsyncEngine
与不同的事件循环共享。
如果一个 AsyncEngine
从一个事件循环传递到另一个事件循环,则在重新使用之前应调用 AsyncEngine.dispose()
方法。未能这样做可能会导致类似于 Task got Future attached to a different loop
的 RuntimeError
。
如果同一个引擎必须在不同的循环之间共享,则应配置为使用 NullPool
来禁用池,防止引擎重复使用任何连接:
from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.pool import NullPool engine = create_async_engine( "postgresql+asyncpg://user:pass@host/dbname", poolclass=NullPool, )
使用 asyncio scoped session
在线程化的 SQLAlchemy 中使用的“scoped session”模式,使用适应版本称为 async_scoped_session
,在 asyncio 中也是可用的。
提示
SQLAlchemy 通常不推荐为新开发使用“scoped”模式,因为它依赖于必须在线程或任务完成后显式清除的可变全局状态。特别是在使用 asyncio 时,直接将 AsyncSession
传递给需要它的可等待函数可能是一个更好的主意。
在使用 async_scoped_session
时,由于在 asyncio 上下文中没有“线程本地”概念,必须为构造函数提供“scopefunc”参数。下面的示例说明了使用 asyncio.current_task()
函数来实现此目的:
from asyncio import current_task from sqlalchemy.ext.asyncio import ( async_scoped_session, async_sessionmaker, ) async_session_factory = async_sessionmaker( some_async_engine, expire_on_commit=False, ) AsyncScopedSession = async_scoped_session( async_session_factory, scopefunc=current_task, ) some_async_session = AsyncScopedSession()
警告
async_scoped_session
使用的“scopefunc”在任务中被调用任意次数,每次访问底层 AsyncSession
时都会调用该函数。因此,该函数应该是幂等且轻量级的,并且不应尝试创建或改变任何状态,例如建立回调等。
警告
在作用域中使用 current_task()
作为“键”要求必须从最外层的可等待对象中调用 async_scoped_session.remove()
方法,以确保任务完成时从注册表中删除键,否则任务句柄和 AsyncSession
将继续驻留在内存中,从根本上创建了内存泄漏。请参阅以下示例,该示例说明了 async_scoped_session.remove()
的正确用法。
async_scoped_session
包含与 scoped_session
类似的 代理行为,这意味着它可以直接作为 AsyncSession
对待,需要注意通常需要使用 await
关键字,包括 async_scoped_session.remove()
方法:
async def some_function(some_async_session, some_object): # use the AsyncSession directly some_async_session.add(some_object) # use the AsyncSession via the context-local proxy await AsyncScopedSession.commit() # "remove" the current proxied AsyncSession for the local context await AsyncScopedSession.remove()
新版版本 1.4.19。 ## 使用 Inspector 检查模式对象
SQLAlchemy 尚未提供 Inspector
的 asyncio 版本(介绍请参见 使用 Inspector 进行细粒度反射),但是可以通过利用 AsyncConnection.run_sync()
方法来在 asyncio 上下文中使用现有接口:
import asyncio from sqlalchemy import inspect from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test") def use_inspector(conn): inspector = inspect(conn) # use the inspector print(inspector.get_view_names()) # return any value to the caller return inspector.get_table_names() async def async_main(): async with engine.connect() as conn: tables = await conn.run_sync(use_inspector) asyncio.run(async_main())
另请参见
反射数据库对象
运行时检查 API
引擎 API 文档
对象名称 | 描述 |
async_engine_from_config(configuration[, prefix], **kwargs) | 使用配置字典创建一个新的 AsyncEngine 实例。 |
AsyncConnection | 一个用于Connection 的 asyncio 代理。 |
AsyncEngine | 一个用于Engine 的 asyncio 代理。 |
AsyncTransaction | Transaction 的一个 asyncio 代理。 |
create_async_engine(url, **kw) | 创建一个新的异步引擎实例。 |
create_async_pool_from_url(url, **kwargs) | 创建一个新的异步引擎实例。 |
function sqlalchemy.ext.asyncio.create_async_engine(url: str | URL, **kw: Any) → AsyncEngine
创建一个新的异步引擎实例。
传递给create_async_engine()
的参数基本与传递给create_engine()
的参数相同。指定的方言必须是支持 asyncio 的方言,例如 asyncpg。
1.4 版的新功能。
参数:
async_creator –
一个异步可调用函数,返回一个驱动级别的 asyncio 连接。如果给定,该函数不应该接受任何参数,并从底层的 asyncio 数据库驱动程序返回一个新的 asyncio 连接;连接将被包装在适当的结构中,以便与AsyncEngine
一起使用。请注意,URL 中指定的参数在此处不适用,创建函数应该使用自己的连接参数。
此参数是create_engine()
函数的 asyncio 等效参数。
2.0.16 版的新功能。
function sqlalchemy.ext.asyncio.async_engine_from_config(configuration: Dict[str, Any], prefix: str = 'sqlalchemy.', **kwargs: Any) → AsyncEngine
使用配置字典创建一个新的 AsyncEngine 实例。
这个函数类似于 SQLAlchemy 核心中的engine_from_config()
函数,不同之处在于所请求的方言必须是类似于 asyncpg 这样的支持 asyncio 的方言。该函数的参数签名与engine_from_config()
相同。
1.4.29 版的新功能。
function sqlalchemy.ext.asyncio.create_async_pool_from_url(url: str | URL, **kwargs: Any) → Pool
创建一个新的异步引擎实例。
传递给create_async_pool_from_url()
的参数基本与传递给create_pool_from_url()
的参数相同。指定的方言必须是支持 asyncio 的方言,例如 asyncpg。
2.0.10 版的新功能。
class sqlalchemy.ext.asyncio.AsyncEngine
一个Engine
的 asyncio 代理。
AsyncEngine
是使用create_async_engine()
函数获取的:
from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
新版本 1.4 中新增。
成员
begin(), clear_compiled_cache(), connect(), dialect, dispose(), driver, echo, engine, execution_options(), get_execution_options(), name, pool, raw_connection(), sync_engine, update_execution_options(), url
类签名
类sqlalchemy.ext.asyncio.AsyncEngine
(sqlalchemy.ext.asyncio.base.ProxyComparable
, sqlalchemy.ext.asyncio.AsyncConnectable
)
method begin() → AsyncIterator[AsyncConnection]
返回一个上下文管理器,当进入时将提供一个已建立 AsyncTransaction
的 AsyncConnection
。
例如:
async with async_engine.begin() as conn: await conn.execute( text("insert into table (x, y, z) values (1, 2, 3)") ) await conn.execute(text("my_special_procedure(5)"))
method clear_compiled_cache() → None
清除与方言关联的编译缓存。
代表Engine
类的代理,代表AsyncEngine
类。
这仅适用于通过create_engine.query_cache_size
参数建立的内置缓存。它不会影响通过Connection.execution_options.compiled_cache
参数传递的任何字典缓存。
新版本 1.4 中新增。
method connect() → AsyncConnection
返回一个AsyncConnection
对象。
当作为异步上下文管理器输入时,AsyncConnection
将从底层连接池中获取数据库连接:
async with async_engine.connect() as conn: result = await conn.execute(select(user_table))
AsyncConnection
也可以通过调用其AsyncConnection.start()
方法在上下文管理器之外启动。
attribute dialect
代理AsyncEngine
类的Engine.dialect
属性。
method async dispose(close: bool = True) → None
处置此AsyncEngine
使用的连接池。
参数:
关闭 –
如果将其默认值保留为True
,则会完全关闭所有当前已签入的数据库连接。然而,仍在使用的连接将不会被关闭,但它们将不再与此Engine
关联,因此当它们被单独关闭时,它们所关联的Pool
最终将被垃圾回收,如果已经在签入时关闭,则将完全关闭。
如果设置为False
,则前一个连接池将被取消引用,否则不会以任何方式触及。
另请参阅
Engine.dispose()
attribute driver
此Engine
正在使用的Dialect
的驱动程序名称。
代理AsyncEngine
类的Engine
类。
attribute echo
当为True
时,启用此元素的日志输出。
代理AsyncEngine
类的Engine
类。
这将设置此元素类和对象引用的命名空间的 Python 日志级别。布尔值True
表示将为记录器设置日志级别logging.INFO
,而字符串值debug
将将日志级别设置为logging.DEBUG
。
attribute engine
返回此Engine
。
代理AsyncEngine
类的Engine
类。
用于接受同一变量内的Connection
/ Engine
对象的传统方案。
method execution_options(**opt: Any) → AsyncEngine
返回一个新的 AsyncEngine
,该引擎将以给定的执行选项提供 AsyncConnection
对象。
代理自 Engine.execution_options()
。请参阅该方法了解详情。
method get_execution_options() → _ExecuteOptions
获取执行期间将生效的非 SQL 选项。
代表 AsyncEngine
类的 Engine
类的代理。
另请参阅
Engine.execution_options()
attribute name
此 Engine
使用的 Dialect
的字符串名称。
代表 AsyncEngine
类的 Engine
类的代理。
attribute pool
代表 AsyncEngine
类的 Engine.pool
属性的代理。
method async raw_connection() → PoolProxiedConnection
从连接池返回“原始” DBAPI 连接。
另请参阅
使用 Driver SQL 和原始 DBAPI 连接
attribute sync_engine: Engine
此 AsyncEngine
代理请求到同步样式的 Engine
。
此实例可用作事件目标。
另请参阅
与 asyncio 扩展一起使用事件
method update_execution_options(**opt: Any) → None
更新此 Engine
的默认执行选项字典。
代表 AsyncEngine
类的 Engine
类的代理。
**opt 中给定的键/值将添加到将用于所有连接的默认执行选项中。此字典的初始内容可以通过 execution_options
参数发送到 create_engine()
。
另请参阅
Connection.execution_options()
Engine.execution_options()
attribute url
代表AsyncEngine
类的Engine.url
属性的代理。
class sqlalchemy.ext.asyncio.AsyncConnection
一个Connection
的 asyncio 代理。
AsyncConnection
通过AsyncEngine.connect()
方法获取:
from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") async with engine.connect() as conn: result = await conn.execute(select(table))
版本 1.4 中新增。
成员
aclose(),begin(),begin_nested(),close(),closed,commit(),connection,default_isolation_level,dialect,exec_driver_sql(),execute(),execution_options(),get_nested_transaction(),get_raw_connection(),get_transaction(),in_nested_transaction(),in_transaction(),info,invalidate(),invalidated,rollback(),run_sync(),scalar(),scalars(),start(),stream(),stream_scalars(),sync_connection,sync_engine
类签名
class sqlalchemy.ext.asyncio.AsyncConnection
(sqlalchemy.ext.asyncio.base.ProxyComparable
, sqlalchemy.ext.asyncio.base.StartableContext
, sqlalchemy.ext.asyncio.AsyncConnectable
)
method async aclose() → None
AsyncConnection.close()
的同义词。
AsyncConnection.aclose()
名称专门用于支持 Python 标准库@contextlib.aclosing
上下文管理器函数。
版本 2.0.20 中的新功能。
method begin() → AsyncTransaction
在自动开始之前开始事务。
method begin_nested() → AsyncTransaction
开始一个嵌套事务并返回事务句柄。
method async close() → None
关闭此AsyncConnection
。
这也会导致回滚事务(如果存在)。
attribute closed
如果此连接已关闭,则返回 True。
代理Connection
类,代表AsyncConnection
类。
method async commit() → None
提交当前正在进行的事务。
如果已启动事务,则此方法提交当前事务。如果未启动事务,则该方法不起作用,假定连接处于非失效状态。
每当首次执行语句或调用Connection.begin()
方法时,都会自动在Connection
上开始事务。
attribute connection
未实现异步;调用AsyncConnection.get_raw_connection()
。
attribute default_isolation_level
与正在使用的Dialect
相关联的初始连接时间隔离级别。
代理Connection
类,代表AsyncConnection
类。
此值独立于Connection.execution_options.isolation_level
和Engine.execution_options.isolation_level
执行选项,并由Dialect
在创建第一个连接时确定,通过针对数据库执行 SQL 查询以获取当前隔离级别,然后再发出任何其他命令。
调用此访问器不会触发任何新的 SQL 查询。
另请参阅
Connection.get_isolation_level()
- 查看当前实际隔离级别
create_engine.isolation_level
- 设置每个Engine
的隔离级别
Connection.execution_options.isolation_level
- 设置每个Connection
的隔离级别
attribute dialect
代表AsyncConnection
类的Connection.dialect
属性的代理。
method async exec_driver_sql(statement: str, parameters: _DBAPIAnyExecuteParams | None = None, execution_options: CoreExecuteOptionsParameter | None = None) → CursorResult[Any]
执行驱动程序级别的 SQL 字符串并返回缓冲的Result
。
method async execute(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → CursorResult[Any]
执行 SQL 语句构造并返回一个缓冲的Result
。
参数:
object
–要执行的语句。这始终是一个同时存在于ClauseElement
和Executable
层次结构中的对象,包括:
Select
-Select
操作Insert
,Update
,Delete
TextClause
和TextualSelect
DDL
和继承自ExecutableDDLElement
的对象
parameters
– 将绑定到语句中的参数。这可以是参数名称到值的字典,也可以是可变序列(例如列表)的字典。当传递一个字典列表时,底层语句执行将使用 DBAPIcursor.executemany()
方法。当传递单个字典时,将使用 DBAPIcursor.execute()
方法。execution_options
– 可选的执行选项字典,将与语句执行关联。该字典可以提供Connection.execution_options()
接受的选项的子集。
返回:
一个Result
对象。
method async execution_options(**opt: Any) → AsyncConnection
设置在执行期间生效的非 SQL 选项。
返回此AsyncConnection
对象,其中添加了新选项。
有关此方法的完整详情,请参阅Connection.execution_options()
。
method get_nested_transaction() → AsyncTransaction | None
返回一个表示当前嵌套(保存点)事务的AsyncTransaction
,如果有的话。
这将使用底层同步连接的Connection.get_nested_transaction()
方法获取当前Transaction
,然后在新的AsyncTransaction
对象中进行代理。
新版本 1.4.0b2 中引入。
method async get_raw_connection() → PoolProxiedConnection
返回此AsyncConnection
正在使用的池化 DBAPI 级连接。
这是一个 SQLAlchemy 连接池代理连接,然后具有属性_ConnectionFairy.driver_connection
,该属性引用实际的驱动程序连接。其_ConnectionFairy.dbapi_connection
则指代一个AdaptedConnection
实例,将驱动程序连接适配为 DBAPI 协议。
method get_transaction() → AsyncTransaction | None
返回一个表示当前事务的AsyncTransaction
,如果有的话。
这将使用底层同步连接的Connection.get_transaction()
方法获取当前Transaction
,然后在新的AsyncTransaction
对象中进行代理。
新版本 1.4.0b2 中引入。
method in_nested_transaction() → bool
如果事务正在进行中,则返回 True。
新版本 1.4.0b2 中引入。
method in_transaction() → bool
如果事务正在进行中,则返回 True。
attribute info
返回底层Connection
的Connection.info
字典。
此字典可自由编写,以将用户定义的状态与数据库连接关联起来。
仅当AsyncConnection
当前已连接时才可用此属性。如果AsyncConnection.closed
属性为True
,则访问此属性将引发ResourceClosedError
。
新版本为 1.4.0b2。
method async invalidate(exception: BaseException | None = None) → None
使与此Connection
相关联的基础 DBAPI 连接无效。
参见方法Connection.invalidate()
,了解此方法的详细信息。
attribute invalidated
如果此连接已失效,则返回 True。
代理了AsyncConnection
类的Connection
类。
但这并不表示连接是否在池级别上失效。
method async rollback() → None
回滚当前正在进行的事务。
如果已启动事务,则此方法将回滚当前事务。如果未启动事务,则该方法不起作用。如果已启动事务并且连接处于无效状态,则使用此方法清除事务。
当首次执行语句或调用Connection.begin()
方法时,将自动在Connection
上启动事务。
method async run_sync(fn: ~typing.Callable[[~typing.Concatenate[~sqlalchemy.engine.base.Connection, ~_P]], ~sqlalchemy.ext.asyncio.engine._T], *arg: ~typing.~_P, **kw: ~typing.~_P) → _T
调用给定的同步(即非异步)可调用对象,并将同步风格的Connection
作为第一个参数传递。
此方法允许在异步应用程序的上下文中运行传统的同步 SQLAlchemy 函数。
例如:
def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str: '''A synchronous function that does not require awaiting :param conn: a Core SQLAlchemy Connection, used synchronously :return: an optional return value is supported ''' conn.execute( some_table.insert().values(int_col=arg1, str_col=arg2) ) return "success" async def do_something_async(async_engine: AsyncEngine) -> None: '''an async function that uses awaiting''' async with async_engine.begin() as async_conn: # run do_something_with_core() with a sync-style # Connection, proxied into an awaitable return_code = await async_conn.run_sync(do_something_with_core, 5, "strval") print(return_code)
通过在一个特别的被监控的 greenlet 中运行给定的可调用对象,此方法将一直维持 asyncio 事件循环直到数据库连接。
AsyncConnection.run_sync()
的最基本用法是调用诸如MetaData.create_all()
之类的方法,给定需要提供给MetaData.create_all()
作为Connection
对象的AsyncConnection
:
# run metadata.create_all(conn) with a sync-style Connection, # proxied into an awaitable with async_engine.begin() as conn: await conn.run_sync(metadata.create_all)
注意
提供的可调用对象在 asyncio 事件循环内联调用,并且将在传统 IO 调用上阻塞。此可调用对象内的 IO 应仅调用进入 SQLAlchemy 的 asyncio 数据库 API,这些 API 将被正确地适应到 greenlet 上下文中。
另请参阅
AsyncSession.run_sync()
在 asyncio 下运行同步方法和函数
method async scalar(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → Any
执行 SQL 语句构造并返回标量对象。
此方法是在调用Connection.execute()
方法后调用Result.scalar()
方法的简写。参数是等效的。
返回:
代表返回的第一行的第一列的标量 Python 值。
method async scalars(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → ScalarResult[Any]
执行 SQL 语句构造并返回标量对象。
此方法是在调用Connection.execute()
方法后调用Result.scalars()
方法的简写。参数是等效的。
返回:
一个ScalarResult
对象。
版本 1.4.24 中的新功能。
method async start(is_ctxmanager: bool = False) → AsyncConnection
在使用 Python with:
块之外启动此AsyncConnection
对象的上下文。
method stream(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → AsyncIterator[AsyncResult[Any]]
执行语句并返回一个产生AsyncResult
对象的可等待对象。
例如:
result = await conn.stream(stmt): async for row in result: print(f"{row}")
AsyncConnection.stream()
方法支持可选的上下文管理器用法,针对AsyncResult
对象,如下所示:
async with conn.stream(stmt) as result: async for row in result: print(f"{row}")
在上述模式中,即使迭代器被异常抛出中断,AsyncResult.close()
方法也会无条件地被调用。然而,上下文管理器的使用仍然是可选的,并且该函数可以以async with fn():
或await fn()
的方式调用。
新增于版本 2.0.0b3:增加了上下文管理器支持
返回:
将产生一个可等待对象,该对象将生成一个AsyncResult
对象。
另见
AsyncConnection.stream_scalars()
method stream_scalars(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → AsyncIterator[AsyncScalarResult[Any]]
执行语句并返回一个可等待的AsyncScalarResult
对象。
例如:
result = await conn.stream_scalars(stmt) async for scalar in result: print(f"{scalar}")
此方法是在调用Connection.stream()
方法后调用AsyncResult.scalars()
方法的简写。参数是等效的。
AsyncConnection.stream_scalars()
方法支持针对AsyncScalarResult
对象的可选上下文管理器使用,如下所示:
async with conn.stream_scalars(stmt) as result: async for scalar in result: print(f"{scalar}")
在上述模式中,即使迭代器被异常抛出中断,AsyncScalarResult.close()
方法也会无条件地被调用。然而,上下文管理器的使用仍然是可选的,并且该函数可以以async with fn():
或await fn()
的方式调用。
新增于版本 2.0.0b3:增加了上下文管理器支持
返回:
将产生一个可等待对象,该对象将生成一个AsyncScalarResult
对象。
新增于版本 1.4.24。
另见
AsyncConnection.stream()
attribute sync_connection: Connection | None
引用同步式Connection
指向此AsyncConnection
的请求代理。
此实例可用作事件目标。
另见
使用 asyncio 扩展的事件
attribute sync_engine: Engine
引用同步式Engine
指向此AsyncConnection
的关联,通过其基础的Connection
。
此实例可用作事件目标。
另见
使用 asyncio 扩展的事件
class sqlalchemy.ext.asyncio.AsyncTransaction
一个Transaction
的 asyncio 代理。
成员
close(), commit(), rollback(), start()
类签名
类sqlalchemy.ext.asyncio.AsyncTransaction
(sqlalchemy.ext.asyncio.base.ProxyComparable
,sqlalchemy.ext.asyncio.base.StartableContext
)
method async close() → None
关闭此AsyncTransaction
。
如果此事务是 begin/commit 嵌套中的基本事务,则事务将 rollback()。 否则,该方法返回。
此用于取消事务而不影响封闭事务范围的事务。
method async commit() → None
提交此AsyncTransaction
。
method async rollback() → None
回滚此AsyncTransaction
。
method async start(is_ctxmanager: bool = False) → AsyncTransaction
在不使用 Python with:
块的情况下启动此AsyncTransaction
对象的上下文。
SqlAlchemy 2.0 中文文档(二十八)(2)https://developer.aliyun.com/article/1560384