SqlAlchemy 2.0 中文文档(二十八)(1)

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
日志服务 SLS,月写入数据量 50GB 1个月
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: SqlAlchemy 2.0 中文文档(二十八)


原文:docs.sqlalchemy.org/en/20/contents.html

ORM 异常

原文:docs.sqlalchemy.org/en/20/orm/exceptions.html

SQLAlchemy ORM 异常。

对象名称 描述
ConcurrentModificationError StaleDataError的别名
NO_STATE 可能由仪器实现引发的异常类型。
attribute sqlalchemy.orm.exc.ConcurrentModificationError

StaleDataError的别名

exception sqlalchemy.orm.exc.DetachedInstanceError

尝试访问已分离的映射实例上未加载的属性。

类签名

sqlalchemy.orm.exc.DetachedInstanceError (sqlalchemy.exc.SQLAlchemyError)

exception sqlalchemy.orm.exc.FlushError

在 flush() 过程中检测到无效条件。

类签名

sqlalchemy.orm.exc.FlushError (sqlalchemy.exc.SQLAlchemyError)

exception sqlalchemy.orm.exc.LoaderStrategyException

一个属性的加载策略不存在。

类签名

sqlalchemy.orm.exc.LoaderStrategyException (sqlalchemy.exc.InvalidRequestError)

method __init__(applied_to_property_type: Type[Any], requesting_property: MapperProperty[Any], applies_to: Type[MapperProperty[Any]] | None, actual_strategy_type: Type[LoaderStrategy] | None, strategy_key: Tuple[Any, ...])
sqlalchemy.orm.exc.NO_STATE = (<class 'AttributeError'>, <class 'KeyError'>)

可能由仪器实现引发的异常类型。

exception sqlalchemy.orm.exc.ObjectDeletedError

刷新操作未能检索到与对象已知主键标识符对应的数据库行。

当在对象上访问过期属性或使用Query.get()检索到被检测为过期的对象时,刷新操作会进行。基于主键发出目标行的 SELECT;如果没有返回行,则引发此异常。

这个异常的真正含义只是与持久对象关联的主键标识符对应的行不存在。该行可能已被删除,或在某些情况下,主键已更新为新值,超出了 ORM 对目标对象的管理。

类签名

sqlalchemy.orm.exc.ObjectDeletedError (sqlalchemy.exc.InvalidRequestError)

method __init__(state: InstanceState[Any], msg: str | None = None)
exception sqlalchemy.orm.exc.ObjectDereferencedError

由于对象被垃圾回收而无法完成操作。

类签名

sqlalchemy.orm.exc.ObjectDereferencedErrorsqlalchemy.exc.SQLAlchemyError

exception sqlalchemy.orm.exc.StaleDataError

遇到了未被考虑到的数据库状态操作。

导致此情况发生的条件包括:

  • 一个刷新可能已尝试更新或删除行,并且在 UPDATE 或 DELETE 语句期间匹配到了意外数量的行。请注意,当使用 version_id_col 时,UPDATE 或 DELETE 语句中的行也将与当前已知的版本标识符匹配。
  • 一个带有 version_id_col 的映射对象被刷新,而从数据库返回的版本号与对象本身的版本号不匹配。
  • 一个对象从其父对象中分离出来,然而该对象以前附加到了另一个父标识,该父标识已被垃圾收集,并且无法确定新的父标识是否真的是最新的“父”。

类签名

sqlalchemy.orm.exc.StaleDataErrorsqlalchemy.exc.SQLAlchemyError

exception sqlalchemy.orm.exc.UnmappedClassError

请求了一个未知类的映射操作。

类签名

sqlalchemy.orm.exc.UnmappedClassErrorsqlalchemy.orm.exc.UnmappedError

method __init__(cls: Type[_T], msg: str | None = None)
exception sqlalchemy.orm.exc.UnmappedColumnError

请求了一个未知列的映射操作。

类签名

sqlalchemy.orm.exc.UnmappedColumnErrorsqlalchemy.exc.InvalidRequestError

exception sqlalchemy.orm.exc.UnmappedError

引发涉及未出现预期映射的异常的基类。

类签名

sqlalchemy.orm.exc.UnmappedErrorsqlalchemy.exc.InvalidRequestError

exception sqlalchemy.orm.exc.UnmappedInstanceError

请求了一个未知实例的映射操作。

类签名

sqlalchemy.orm.exc.UnmappedInstanceErrorsqlalchemy.orm.exc.UnmappedError

method __init__(obj: object, msg: str | None = None)

ORM 扩展

原文:docs.sqlalchemy.org/en/20/orm/extensions/index.html

SQLAlchemy 有各种 ORM 扩展可用,这些扩展为核心行为添加了额外的功能。

这些扩展几乎完全建立在公共核心和 ORM API 上,鼓励用户阅读它们的源代码以进一步了解它们的行为。特别是“水平分片”、“混合属性”和“变动追踪”扩展非常简洁。

  • 异步 I/O(asyncio)
  • 关联代理
  • 自动映射
  • 烘焙查询
  • 声明式扩展
  • Mypy / Pep-484 对 ORM 映射的支持
  • 变动追踪
  • 排序列表
  • 水平分片
  • 混合属性
  • 可索引
  • 替代类仪器化

异步 I/O(asyncio)

原文:docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html

支持 Python asyncio。包括对 Core 和 ORM 使用的支持,使用与 asyncio 兼容的方言。

版本 1.4 中的新功能。

警告

请阅读 Asyncio 平台安装说明(包括 Apple M1)以获取许多平台的重要平台安装说明,包括Apple M1 架构

另请参阅

Core 和 ORM 的异步 IO 支持 - 初始功能公告

异步 IO 集成 - 展示了在 asyncio 扩展中使用 Core 和 ORM 的示例脚本。

Asyncio 平台安装说明(包括 Apple M1)

asyncio 扩展仅支持 Python 3。它还依赖于greenlet库。这个依赖在常见的机器平台上默认安装,包括:

x86_64 aarch64 ppc64le amd64 win32

对于上述平台,greenlet已知提供预构建的 wheel 文件。对于其他平台,greenlet 不会默认安装;可以在Greenlet - Download Files查看当前的文件列表。请注意有许多架构被省略,包括 Apple M1

要安装 SQLAlchemy 并确保greenlet依赖存在,无论使用什么平台,可以按照以下方式安装[asyncio] setuptools extra,这也会指示pip安装greenlet

pip install sqlalchemy[asyncio]

请注意,在没有预构建 wheel 文件的平台上安装greenlet意味着greenlet将从源代码构建,这要求 Python 的开发库也存在。

概要 - Core

对于核心用法,create_async_engine()函数创建一个AsyncEngine的实例,然后提供传统Engine API 的异步版本。AsyncEngine通过其AsyncEngine.connect()AsyncEngine.begin()方法提供一个AsyncConnection,这两个方法都提供异步上下文管理器。AsyncConnection然后可以使用AsyncConnection.execute()方法来执行语句以提供一个缓冲的Result,或者使用AsyncConnection.stream()方法来提供一个流式的服务器端AsyncResult

>>> import asyncio
>>> from sqlalchemy import Column
>>> from sqlalchemy import MetaData
>>> from sqlalchemy import select
>>> from sqlalchemy import String
>>> from sqlalchemy import Table
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> meta = MetaData()
>>> t1 = Table("t1", meta, Column("name", String(50), primary_key=True))
>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(meta.drop_all)
...         await conn.run_sync(meta.create_all)
...
...         await conn.execute(
...             t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
...         )
...
...     async with engine.connect() as conn:
...         # select a Result, which will be delivered with buffered
...         # results
...         result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
...
...         print(result.fetchall())
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()
>>> asyncio.run(async_main())
BEGIN  (implicit)
...
CREATE  TABLE  t1  (
  name  VARCHAR(50)  NOT  NULL,
  PRIMARY  KEY  (name)
)
...
INSERT  INTO  t1  (name)  VALUES  (?)
[...]  [('some name 1',),  ('some name 2',)]
COMMIT
BEGIN  (implicit)
SELECT  t1.name
FROM  t1
WHERE  t1.name  =  ?
[...]  ('some name 1',)
[('some name 1',)]
ROLLBACK 

上面,AsyncConnection.run_sync()方法可用于调用特殊的 DDL 函数,例如MetaData.create_all(),这些函数不包括可等待的钩子。

提示

在使用AsyncEngine对象的范围内调用await来调用AsyncEngine.dispose()方法是明智的,如上例中的async_main函数所示。这确保了连接池保持的任何连接在可等待的上下文中被正确处理。与使用阻塞 IO 不同,SQLAlchemy 无法在__del__或弱引用终结器等方法中正确处理这些连接,因为没有机会调用await。当引擎超出范围时未显式处理引擎可能导致发出到标准输出的警告,类似于RuntimeError: Event loop is closed的形式在垃圾回收中。

AsyncConnection 还提供了一个“流式” API,通过 AsyncConnection.stream() 方法返回一个 AsyncResult 对象。该结果对象使用服务器端游标并提供了一个 async/await API,比如一个异步迭代器:

async with engine.connect() as conn:
    async_result = await conn.stream(select(t1))
    async for row in async_result:
        print("row: %s" % (row,))

概要 - ORM

使用 2.0 风格 查询,AsyncSession 类提供了完整的 ORM 功能。

在默认使用模式下,必须特别小心,以避免涉及 ORM 关系和列属性的 惰性加载 或其他已过期的属性访问;下一节 在使用 AsyncSession 时防止隐式 IO 对此进行了详细说明。

警告

一个 AsyncSession 实例不能安全地用于多个并发任务。请参阅章节 在并发任务中使用 AsyncSession 和 会话是线程安全的吗? AsyncSession 是否安全用于共享在并发任务中? 了解背景信息。

下面的示例演示了一个完整的示例,包括映射器和会话配置:

>>> from __future__ import annotations
>>> import asyncio
>>> import datetime
>>> from typing import List
>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy import func
>>> from sqlalchemy import select
>>> from sqlalchemy.ext.asyncio import AsyncAttrs
>>> from sqlalchemy.ext.asyncio import async_sessionmaker
>>> from sqlalchemy.ext.asyncio import AsyncSession
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from sqlalchemy.orm import DeclarativeBase
>>> from sqlalchemy.orm import Mapped
>>> from sqlalchemy.orm import mapped_column
>>> from sqlalchemy.orm import relationship
>>> from sqlalchemy.orm import selectinload
>>> class Base(AsyncAttrs, DeclarativeBase):
...     pass
>>> class B(Base):
...     __tablename__ = "b"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
...     data: Mapped[str]
>>> class A(Base):
...     __tablename__ = "a"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     data: Mapped[str]
...     create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
...     bs: Mapped[List[B]] = relationship()
>>> async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
...     async with async_session() as session:
...         async with session.begin():
...             session.add_all(
...                 [
...                     A(bs=[B(data="b1"), B(data="b2")], data="a1"),
...                     A(bs=[], data="a2"),
...                     A(bs=[B(data="b3"), B(data="b4")], data="a3"),
...                 ]
...             )
>>> async def select_and_update_objects(
...     async_session: async_sessionmaker[AsyncSession],
... ) -> None:
...     async with async_session() as session:
...         stmt = select(A).order_by(A.id).options(selectinload(A.bs))
...
...         result = await session.execute(stmt)
...
...         for a in result.scalars():
...             print(a, a.data)
...             print(f"created at: {a.create_date}")
...             for b in a.bs:
...                 print(b, b.data)
...
...         result = await session.execute(select(A).order_by(A.id).limit(1))
...
...         a1 = result.scalars().one()
...
...         a1.data = "new data"
...
...         await session.commit()
...
...         # access attribute subsequent to commit; this is what
...         # expire_on_commit=False allows
...         print(a1.data)
...
...         # alternatively, AsyncAttrs may be used to access any attribute
...         # as an awaitable (new in 2.0.13)
...         for b1 in await a1.awaitable_attrs.bs:
...             print(b1, b1.data)
>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     # async_sessionmaker: a factory for new AsyncSession objects.
...     # expire_on_commit - don't expire objects after transaction commit
...     async_session = async_sessionmaker(engine, expire_on_commit=False)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(Base.metadata.create_all)
...
...     await insert_objects(async_session)
...     await select_and_update_objects(async_session)
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()
>>> asyncio.run(async_main())
BEGIN  (implicit)
...
CREATE  TABLE  a  (
  id  INTEGER  NOT  NULL,
  data  VARCHAR  NOT  NULL,
  create_date  DATETIME  DEFAULT  (CURRENT_TIMESTAMP)  NOT  NULL,
  PRIMARY  KEY  (id)
)
...
CREATE  TABLE  b  (
  id  INTEGER  NOT  NULL,
  a_id  INTEGER  NOT  NULL,
  data  VARCHAR  NOT  NULL,
  PRIMARY  KEY  (id),
  FOREIGN  KEY(a_id)  REFERENCES  a  (id)
)
...
COMMIT
BEGIN  (implicit)
INSERT  INTO  a  (data)  VALUES  (?)  RETURNING  id,  create_date
[...]  ('a1',)
...
INSERT  INTO  b  (a_id,  data)  VALUES  (?,  ?)  RETURNING  id
[...]  (1,  'b2')
...
COMMIT
BEGIN  (implicit)
SELECT  a.id,  a.data,  a.create_date
FROM  a  ORDER  BY  a.id
[...]  ()
SELECT  b.a_id  AS  b_a_id,  b.id  AS  b_id,  b.data  AS  b_data
FROM  b
WHERE  b.a_id  IN  (?,  ?,  ?)
[...]  (1,  2,  3)
<A  object  at  ...>  a1
created  at:  ...
<B  object  at  ...>  b1
<B  object  at  ...>  b2
<A  object  at  ...>  a2
created  at:  ...
<A  object  at  ...>  a3
created  at:  ...
<B  object  at  ...>  b3
<B  object  at  ...>  b4
SELECT  a.id,  a.data,  a.create_date
FROM  a  ORDER  BY  a.id
LIMIT  ?  OFFSET  ?
[...]  (1,  0)
UPDATE  a  SET  data=?  WHERE  a.id  =  ?
[...]  ('new data',  1)
COMMIT
new  data
<B  object  at  ...>  b1
<B  object  at  ...>  b2 

在上面的示例中,使用可选的 async_sessionmaker 助手实例化了 AsyncSession,该助手提供了一个带有固定参数集的新 AsyncSession 对象的工厂,其中包括将其与特定数据库 URL 关联。然后将其传递给其他方法,在那里它可以在 Python 异步上下文管理器(即 async with: 语句)中使用,以便在块结束时自动关闭;这相当于调用 AsyncSession.close() 方法。

在并发任务中使用 AsyncSession

AsyncSession 对象是一个可变的,有状态的对象,代表了正在进行的单个,有状态的数据库事务。使用 asyncio 的并发任务,例如使用 asyncio.gather() 等 API,应该每个个体任务使用单独的 AsyncSession

参见 Is the Session thread-safe? Is AsyncSession safe to share in concurrent tasks? 部分,了解关于 SessionAsyncSession 如何在并发工作负载中使用的一般描述。### 在使用 AsyncSession 时防止隐式 IO

使用传统 asyncio,应用程序需要避免发生任何可能导致 IO-on-attribute 访问的点。以下是可用于帮助此目的的技术,在前述示例中有很多技术。

  • 懒加载关系、延迟列或表达式的属性,或在到期情况下被访问的属性可以利用 AsyncAttrs mixin。当将此 mixin 添加到特定类或更一般地添加到 Declarative Base 超类时,它提供一个访问器 AsyncAttrs.awaitable_attrs,它将任何属性作为可等待对象提供:
from __future__ import annotations
from typing import List
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import relationship
class Base(AsyncAttrs, DeclarativeBase):
    pass
class A(Base):
    __tablename__ = "a"
    # ... rest of mapping ...
    bs: Mapped[List[B]] = relationship()
class B(Base):
    __tablename__ = "b"
    # ... rest of mapping ...
  • 在不使用急加载的情况下,访问新加载的 A 实例上的 A.bs 集合通常会使用 lazy loading,为了成功,通常会向数据库发出 IO,但在 asyncio 下会失败,因为不允许隐式 IO。在没有任何先前加载操作的情况下直接访问此属性,在 asyncio 下,该属性可以作为可等待对象进行访问,指示 AsyncAttrs.awaitable_attrs 前缀:
a1 = (await session.scalars(select(A))).one()
for b1 in await a1.awaitable_attrs.bs:
    print(b1)
  • AsyncAttrs mixin 提供了一个简洁的外观,它覆盖了内部方法,该方法也被 AsyncSession.run_sync() 方法使用。
    版本 2.0.13 中的新功能。
    另请参见
    AsyncAttrs
  • 使用 SQLAlchemy 2.0 中的 Write Only Relationships 特性,可以将集合替换为只写集合,这些集合永远不会隐式发出 IO,在此特性下,集合从不读取,只使用显式 SQL 调用查询。在 Asyncio Integration 部分的示例 async_orm_writeonly.py 中,可见一个使用 asyncio 的只写集合示例。
    当使用仅写集合时,程序的行为在关于集合方面是简单且易于预测的。然而,缺点是没有任何内置系统可以一次性加载许多这些集合,而是需要手动执行。因此,下面的许多要点涉及在使用 asyncio 时使用传统的懒加载关系时需要更加小心的具体技术。
  • 如果不使用AsyncAttrs,关系可以声明为lazy="raise",这样默认情况下它们不会尝试发出 SQL。为了加载集合,将使用 eager loading。
  • 最有用的急加载策略是selectinload()急加载器,在前面的例子中被用来在await session.execute()调用的范围内急加载A.bs集合:
stmt = select(A).options(selectinload(A.bs))
  • 当构建新对象时,集合总是被分配一个默认的空集合,比如上面的例子中的列表:
A(bs=[], data="a2")
  • 这允许在刷新A对象时,上述A对象上的.bs集合存在且可读;否则,当刷新A时,.bs将被卸载并在访问时引发错误。
  • AsyncSession是使用Session.expire_on_commit设置为 False 进行配置的,这样我们可以在调用AsyncSession.commit()之后访问对象的属性,就像在最后一行访问属性时一样:
# create AsyncSession with expire_on_commit=False
async_session = AsyncSession(engine, expire_on_commit=False)
# sessionmaker version
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
    result = await session.execute(select(A).order_by(A.id))
    a1 = result.scalars().first()
    # commit would normally expire all attributes
    await session.commit()
    # access attribute subsequent to commit; this is what
    # expire_on_commit=False allows
    print(a1.data)

其他指导原则包括:

  • 应该避免使用类似AsyncSession.expire()的方法,而应该使用AsyncSession.refresh()如果绝对需要过期。通常情况下不应该需要过期,因为在使用 asyncio 时通常应该将Session.expire_on_commit设置为False
  • 使用AsyncSession.refresh()可以显式加载懒加载关系,如果所需的属性名称被显式传递给Session.refresh.attribute_names,例如:
# assume a_obj is an A that has lazy loaded A.bs collection
a_obj = await async_session.get(A, [1])
# force the collection to load by naming it in attribute_names
await async_session.refresh(a_obj, ["bs"])
# collection is present
print(f"bs collection: {a_obj.bs}")
  • 当然最好一开始就使用急加载,以便无需延迟加载即可设置集合。
    2.0.4 版中新增了对AsyncSession.refresh()和底层Session.refresh()方法的支持,以强制懒加载的关系加载,如果它们在Session.refresh.attribute_names参数中明确命名。在之前的版本中,即使在参数中命名了关系,也会被静默跳过。
  • 避免使用文档中记录的 Cascades 中的all级联选项,而是明确列出所需的级联特性。all级联选项暗示了 refresh-expire 设置,这意味着AsyncSession.refresh()方法将使相关对象上的属性过期,但不一定会刷新那些相关对象,假设未在relationship()内配置急加载,则将其保留在过期状态。
  • 如果使用,应该使用适当的加载器选项来为deferred()列进行延迟加载,除了上面注意到的relationship()结构。请参阅 Limiting which Columns Load with Column Deferral 了解延迟列加载的背景信息。
  • 在  Dynamic Relationship Loaders 中描述的“动态”关系加载器策略默认情况下不与 asyncio 方法兼容。只有在  Running Synchronous Methods and Functions under asyncio 中描述的AsyncSession.run_sync()方法内调用时,或者通过使用其.statement属性获取普通选择时,它才能直接使用:
user = await session.get(User, 42)
addresses = (await session.scalars(user.addresses.statement)).all()
stmt = user.addresses.statement.where(Address.email_address.startswith("patrick"))
addresses_filter = (await session.scalars(stmt)).all()
  • 引入 SQLAlchemy 2.0 版本的 write only 技术完全与 asyncio 兼容,并应优先使用。
    请参阅
    “动态”关系加载器被“只写”所取代 - 迁移到 2.0 样式的注意事项
  • 如果在不支持 RETURNING 的数据库(例如 MySQL 8)中使用 asyncio,那么新刷新的对象上将不会有服务器默认值,例如生成的时间戳,除非使用 Mapper.eager_defaults 选项。在 SQLAlchemy 2.0 中,这种行为会自动应用于像 PostgreSQL、SQLite 和 MariaDB 这样使用 RETURNING 在插入行时获取新值的后端。### 在 asyncio 下运行同步方法和函数

深度炼金术

这种方法本质上是公开了 SQLAlchemy 能够提供 asyncio 接口的机制。虽然这样做没有技术问题,但总体上这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些核心理念,即任何可能导致 IO 调用的编程语句必须有一个 await 调用,否则程序不会明确地指出每一行可能发生 IO 的地方。这种方法并没有改变这个一般观念,只是允许一系列同步 IO 指令在函数调用范围内免除这个规则,实质上被打包成一个可等待对象。

作为在 asyncio 事件循环中集成传统 SQLAlchemy “延迟加载”的另一种方法,提供了一种名为 AsyncSession.run_sync()可选方法,它将在一个 greenlet 中运行任何 Python 函数,传统的同步编程概念将在到达数据库驱动程序时转换为使用 await。这里的一个假设方法是,一个面向 asyncio 的应用程序可以将与数据库相关的方法打包到使用 AsyncSession.run_sync() 调用的函数中。

修改上面的示例,如果我们不为 A.bs 集合使用 selectinload(),我们可以在一个单独的函数中完成对这些属性访问的处理:

import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
def fetch_and_update_objects(session):
  """run traditional sync-style ORM code in a function that will be
 invoked within an awaitable.
 """
    # the session object here is a traditional ORM Session.
    # all features are available here including legacy Query use.
    stmt = select(A)
    result = session.execute(stmt)
    for a1 in result.scalars():
        print(a1)
        # lazy loads
        for b1 in a1.bs:
            print(b1)
    # legacy Query use
    a1 = session.query(A).order_by(A.id).first()
    a1.data = "new data"
async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)
    async with AsyncSession(engine) as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )
        await session.run_sync(fetch_and_update_objects)
        await session.commit()
    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()
asyncio.run(async_main())

在“sync”运行器中运行某些函数的上述方法与在类似 gevent 的事件驱动编程库上运行 SQLAlchemy 应用程序的应用程序有一些相似之处。区别如下:

  1. 与使用 gevent 不同,我们可以继续使用标准的 Python asyncio 事件循环,或任何自定义事件循环,而无需集成到 gevent 事件循环中。
  2. 没有任何“猴子补丁”。上面的示例使用了真正的 asyncio 驱动程序,底层的 SQLAlchemy 连接池也使用了 Python 内置的 asyncio.Queue 来池化连接。
  3. 该程序可以自由地在异步/等待代码和使用同步代码的封装函数之间切换,几乎没有性能损失。没有使用“线程执行器”或任何额外的等待器或同步。
  4. 底层网络驱动程序也在使用纯 Python asyncio 概念,不使用geventeventlet提供的第三方网络库。## 使用与异步扩展的事件

SQLAlchemy 的事件系统未直接由异步扩展暴露,这意味着目前还没有“异步”版本的 SQLAlchemy 事件处理程序。

但是,由于异步扩展包围了通常的同步 SQLAlchemy API,因此常规的“同步”风格事件处理程序可自由使用,就像没有使用 asyncio 一样。

如下所述,目前有两种策略可以注册给予 asyncio-facing APIs 的事件:

  • 事件可以在实例级别(例如特定的AsyncEngine实例)上注册,方法是将事件与引用代理对象的sync属性关联起来。例如,要针对AsyncEngine实例注册PoolEvents.connect()事件,请使用其AsyncEngine.sync_engine属性作为目标。目标包括:

AsyncEngine.sync_engine

AsyncConnection.sync_connection

AsyncConnection.sync_engine

AsyncSession.sync_session

  • 要在类级别注册事件,针对同一类型的所有实例(例如所有AsyncSession实例),请使用相应的同步样式类。例如,要针对AsyncSession类注册SessionEvents.before_commit()事件,请使用Session类作为目标。
  • 要在sessionmaker级别注册,请使用async_sessionmaker.sync_session_class将显式sessionmakerasync_sessionmaker组合,并将事件与sessionmaker相关联。

当在异步 IO 上下文中的事件处理程序中工作时,例如Connection等对象将继续以通常的“同步”方式工作,而不需要awaitasync使用;当消息最终由异步 IO 数据库适配器接收时,调用样式将透明地转换回异步 IO 调用样式。对于传递了 DBAPI 级别连接的事件,例如PoolEvents.connect(),对象是一个符合 pep-249 的“连接”对象,它将同步样式调用适配为异步 IO 驱动程序。

带有异步引擎/会话/会话工厂的事件监听器示例

下面是一些与异步 API 构造相关的同步风格事件处理程序的示例:

  • AsyncEngine 上的核心事件
    在此示例中,我们将AsyncEngine.sync_engine属性作为ConnectionEventsPoolEvents的目标:
import asyncio
from sqlalchemy import event
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
# connect event on instance of Engine
@event.listens_for(engine.sync_engine, "connect")
def my_on_connect(dbapi_con, connection_record):
    print("New DBAPI connection:", dbapi_con)
    cursor = dbapi_con.cursor()
    # sync style API use for adapted DBAPI connection / cursor
    cursor.execute("select 'execute from event'")
    print(cursor.fetchone()[0])
# before_execute event on all Engine instances
@event.listens_for(Engine, "before_execute")
def my_before_execute(
    conn,
    clauseelement,
    multiparams,
    params,
    execution_options,
):
    print("before execute!")
async def go():
    async with engine.connect() as conn:
        await conn.execute(text("select 1"))
    await engine.dispose()
asyncio.run(go())
  • 输出:
New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>>
execute from event
before execute!
  • AsyncSession 上的 ORM 事件
    在此示例中,我们将AsyncSession.sync_session作为SessionEvents的目标:
import asyncio
from sqlalchemy import event
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import Session
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
session = AsyncSession(engine)
# before_commit event on instance of Session
@event.listens_for(session.sync_session, "before_commit")
def my_before_commit(session):
    print("before commit!")
    # sync style API use on Session
    connection = session.connection()
    # sync style API use on Connection
    result = connection.execute(text("select 'execute from event'"))
    print(result.first())
# after_commit event on all Session instances
@event.listens_for(Session, "after_commit")
def my_after_commit(session):
    print("after commit!")
async def go():
    await session.execute(text("select 1"))
    await session.commit()
    await session.close()
    await engine.dispose()
asyncio.run(go())
  • 输出:
before commit!
execute from event
after commit!
  • async_sessionmaker 上的 ORM 事件
    对于这种用例,我们将sessionmaker作为事件目标,然后使用async_sessionmaker.sync_session_class参数将其分配给async_sessionmaker
import asyncio
from sqlalchemy import event
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import sessionmaker
sync_maker = sessionmaker()
maker = async_sessionmaker(sync_session_class=sync_maker)
@event.listens_for(sync_maker, "before_commit")
def before_commit(session):
    print("before commit")
async def main():
    async_session = maker()
    await async_session.commit()
asyncio.run(main())
  • 输出:
before commit

在连接池和其他事件中使用仅 awaitable 的驱动程序方法

如上一节所讨论的那样,诸如PoolEvents之类的事件处理程序接收到一个同步风格的“DBAPI”连接,这是  SQLAlchemy asyncio 方言提供的一个包装对象,用于将底层的 asyncio“driver”连接适配成 SQLAlchemy  内部可以使用的连接。当用户定义的事件处理程序需要直接使用最终的“driver”连接,并且只使用该驱动连接上的 awaitable  方法时,就会出现一种特殊的用例。其中一个例子是 asyncpg 驱动程序提供的.set_type_codec()方法。

为了适应这种用例,SQLAlchemy 的AdaptedConnection类提供了一个方法AdaptedConnection.run_async(),允许在事件处理程序或其他 SQLAlchemy 内部的“同步”上下文中调用一个 awaitable 函数。这个方法直接类似于AsyncConnection.run_sync()方法,它允许一个同步风格的方法在 async 下运行。

应该向AdaptedConnection.run_async()传递一个接受内部“driver”连接作为单个参数的函数,并返回一个 awaitable,该 awaitable 将由AdaptedConnection.run_async()方法调用。给定的函数本身不需要声明为async;它完全可以是 Python 的lambda:,因为返回的 awaitable 值将在返回后被调用:

from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(...)
@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, *args):
    dbapi_connection.run_async(
        lambda connection: connection.set_type_codec(
            "MyCustomType",
            encoder,
            decoder,  # ...
        )
    )

上面,传递给register_custom_types事件处理程序的对象是AdaptedConnection的一个实例,它提供了一个类似 DBAPI 的接口,用于访问底层的仅 async 的驱动级连接对象。然后,AdaptedConnection.run_async()方法提供了访问底层驱动程序级连接的 awaitable 环境。

版本 1.4.30 中的新功能。

使用多个 asyncio 事件循环

当一个应用程序同时使用多个事件循环时,例如在罕见的情况下将 asyncio 与多线程结合使用时,当使用默认的池实现时,不应该将相同的 AsyncEngine 与不同的事件循环共享。

如果一个 AsyncEngine 从一个事件循环传递到另一个事件循环,则在重新使用之前应调用 AsyncEngine.dispose() 方法。未能这样做可能会导致类似于 Task  got Future attached to a different loopRuntimeError

如果同一个引擎必须在不同的循环之间共享,则应配置为使用 NullPool 来禁用池,防止引擎重复使用任何连接:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@host/dbname",
    poolclass=NullPool,
)

使用 asyncio scoped session

在线程化的 SQLAlchemy 中使用的“scoped session”模式,使用适应版本称为 async_scoped_session,在 asyncio 中也是可用的。

提示

SQLAlchemy 通常不推荐为新开发使用“scoped”模式,因为它依赖于必须在线程或任务完成后显式清除的可变全局状态。特别是在使用 asyncio 时,直接将 AsyncSession 传递给需要它的可等待函数可能是一个更好的主意。

在使用 async_scoped_session 时,由于在 asyncio 上下文中没有“线程本地”概念,必须为构造函数提供“scopefunc”参数。下面的示例说明了使用 asyncio.current_task() 函数来实现此目的:

from asyncio import current_task
from sqlalchemy.ext.asyncio import (
    async_scoped_session,
    async_sessionmaker,
)
async_session_factory = async_sessionmaker(
    some_async_engine,
    expire_on_commit=False,
)
AsyncScopedSession = async_scoped_session(
    async_session_factory,
    scopefunc=current_task,
)
some_async_session = AsyncScopedSession()

警告

async_scoped_session 使用的“scopefunc”在任务中被调用任意次数,每次访问底层 AsyncSession 时都会调用该函数。因此,该函数应该是幂等且轻量级的,并且不应尝试创建或改变任何状态,例如建立回调等。

警告

在作用域中使用 current_task() 作为“键”要求必须从最外层的可等待对象中调用 async_scoped_session.remove() 方法,以确保任务完成时从注册表中删除键,否则任务句柄和 AsyncSession 将继续驻留在内存中,从根本上创建了内存泄漏。请参阅以下示例,该示例说明了 async_scoped_session.remove() 的正确用法。

async_scoped_session 包含与 scoped_session 类似的 代理行为,这意味着它可以直接作为 AsyncSession 对待,需要注意通常需要使用 await 关键字,包括 async_scoped_session.remove() 方法:

async def some_function(some_async_session, some_object):
    # use the AsyncSession directly
    some_async_session.add(some_object)
    # use the AsyncSession via the context-local proxy
    await AsyncScopedSession.commit()
    # "remove" the current proxied AsyncSession for the local context
    await AsyncScopedSession.remove()

新版版本 1.4.19。 ## 使用 Inspector 检查模式对象

SQLAlchemy 尚未提供 Inspector 的 asyncio 版本(介绍请参见 使用 Inspector 进行细粒度反射),但是可以通过利用 AsyncConnection.run_sync() 方法来在 asyncio 上下文中使用现有接口:

import asyncio
from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")
def use_inspector(conn):
    inspector = inspect(conn)
    # use the inspector
    print(inspector.get_view_names())
    # return any value to the caller
    return inspector.get_table_names()
async def async_main():
    async with engine.connect() as conn:
        tables = await conn.run_sync(use_inspector)
asyncio.run(async_main())

另请参见

反射数据库对象

运行时检查 API

引擎 API 文档

对象名称 描述
async_engine_from_config(configuration[, prefix], **kwargs) 使用配置字典创建一个新的 AsyncEngine 实例。
AsyncConnection 一个用于Connection的 asyncio 代理。
AsyncEngine 一个用于Engine的 asyncio 代理。
AsyncTransaction Transaction的一个 asyncio 代理。
create_async_engine(url, **kw) 创建一个新的异步引擎实例。
create_async_pool_from_url(url, **kwargs) 创建一个新的异步引擎实例。
function sqlalchemy.ext.asyncio.create_async_engine(url: str | URL, **kw: Any) → AsyncEngine

创建一个新的异步引擎实例。

传递给create_async_engine()的参数基本与传递给create_engine()的参数相同。指定的方言必须是支持 asyncio 的方言,例如 asyncpg。

1.4 版的新功能。

参数:

async_creator

一个异步可调用函数,返回一个驱动级别的 asyncio 连接。如果给定,该函数不应该接受任何参数,并从底层的 asyncio 数据库驱动程序返回一个新的 asyncio 连接;连接将被包装在适当的结构中,以便与AsyncEngine一起使用。请注意,URL 中指定的参数在此处不适用,创建函数应该使用自己的连接参数。

此参数是create_engine()函数的 asyncio 等效参数。

2.0.16 版的新功能。

function sqlalchemy.ext.asyncio.async_engine_from_config(configuration: Dict[str, Any], prefix: str = 'sqlalchemy.', **kwargs: Any) → AsyncEngine

使用配置字典创建一个新的 AsyncEngine 实例。

这个函数类似于 SQLAlchemy 核心中的engine_from_config()函数,不同之处在于所请求的方言必须是类似于 asyncpg 这样的支持 asyncio 的方言。该函数的参数签名与engine_from_config()相同。

1.4.29 版的新功能。

function sqlalchemy.ext.asyncio.create_async_pool_from_url(url: str | URL, **kwargs: Any) → Pool

创建一个新的异步引擎实例。

传递给create_async_pool_from_url()的参数基本与传递给create_pool_from_url()的参数相同。指定的方言必须是支持 asyncio 的方言,例如 asyncpg。

2.0.10 版的新功能。

class sqlalchemy.ext.asyncio.AsyncEngine

一个Engine的 asyncio 代理。

AsyncEngine是使用create_async_engine()函数获取的:

from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")

新版本 1.4 中新增。

成员

begin(), clear_compiled_cache(), connect(), dialect, dispose(),  driver, echo, engine, execution_options(), get_execution_options(),  name, pool, raw_connection(), sync_engine, update_execution_options(),  url

类签名

sqlalchemy.ext.asyncio.AsyncEngine (sqlalchemy.ext.asyncio.base.ProxyComparable, sqlalchemy.ext.asyncio.AsyncConnectable)

method begin() → AsyncIterator[AsyncConnection]

返回一个上下文管理器,当进入时将提供一个已建立 AsyncTransactionAsyncConnection

例如:

async with async_engine.begin() as conn:
    await conn.execute(
        text("insert into table (x, y, z) values (1, 2, 3)")
    )
    await conn.execute(text("my_special_procedure(5)"))
method clear_compiled_cache() → None

清除与方言关联的编译缓存。

代表Engine类的代理,代表AsyncEngine类。

这仅适用于通过create_engine.query_cache_size参数建立的内置缓存。它不会影响通过Connection.execution_options.compiled_cache参数传递的任何字典缓存。

新版本 1.4 中新增。

method connect() → AsyncConnection

返回一个AsyncConnection对象。

当作为异步上下文管理器输入时,AsyncConnection将从底层连接池中获取数据库连接:

async with async_engine.connect() as conn:
    result = await conn.execute(select(user_table))

AsyncConnection也可以通过调用其AsyncConnection.start()方法在上下文管理器之外启动。

attribute dialect

代理AsyncEngine类的Engine.dialect属性。

method async dispose(close: bool = True) → None

处置此AsyncEngine使用的连接池。

参数:

关闭

如果将其默认值保留为True,则会完全关闭所有当前已签入的数据库连接。然而,仍在使用的连接将不会被关闭,但它们将不再与此Engine关联,因此当它们被单独关闭时,它们所关联的Pool最终将被垃圾回收,如果已经在签入时关闭,则将完全关闭。

如果设置为False,则前一个连接池将被取消引用,否则不会以任何方式触及。

另请参阅

Engine.dispose()

attribute driver

Engine正在使用的Dialect的驱动程序名称。

代理AsyncEngine类的Engine类。

attribute echo

当为True时,启用此元素的日志输出。

代理AsyncEngine类的Engine类。

这将设置此元素类和对象引用的命名空间的 Python 日志级别。布尔值True表示将为记录器设置日志级别logging.INFO,而字符串值debug将将日志级别设置为logging.DEBUG

attribute engine

返回此Engine

代理AsyncEngine类的Engine类。

用于接受同一变量内的Connection / Engine对象的传统方案。

method execution_options(**opt: Any) → AsyncEngine

返回一个新的 AsyncEngine,该引擎将以给定的执行选项提供 AsyncConnection 对象。

代理自 Engine.execution_options()。请参阅该方法了解详情。

method get_execution_options() → _ExecuteOptions

获取执行期间将生效的非 SQL 选项。

代表 AsyncEngine 类的 Engine 类的代理。

另请参阅

Engine.execution_options()

attribute name

Engine 使用的 Dialect 的字符串名称。

代表 AsyncEngine 类的 Engine 类的代理。

attribute pool

代表 AsyncEngine 类的 Engine.pool 属性的代理。

method async raw_connection() → PoolProxiedConnection

从连接池返回“原始” DBAPI 连接。

另请参阅

使用 Driver SQL 和原始 DBAPI 连接

attribute sync_engine: Engine

AsyncEngine 代理请求到同步样式的 Engine

此实例可用作事件目标。

另请参阅

与 asyncio 扩展一起使用事件

method update_execution_options(**opt: Any) → None

更新此 Engine 的默认执行选项字典。

代表 AsyncEngine 类的 Engine 类的代理。

**opt 中给定的键/值将添加到将用于所有连接的默认执行选项中。此字典的初始内容可以通过 execution_options 参数发送到 create_engine()

另请参阅

Connection.execution_options()

Engine.execution_options()

attribute url

代表AsyncEngine类的Engine.url属性的代理。

class sqlalchemy.ext.asyncio.AsyncConnection

一个Connection的 asyncio 代理。

AsyncConnection通过AsyncEngine.connect()方法获取:

from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
async with engine.connect() as conn:
    result = await conn.execute(select(table))

版本 1.4 中新增。

成员

aclose(),begin(),begin_nested(),close(),closed,commit(),connection,default_isolation_level,dialect,exec_driver_sql(),execute(),execution_options(),get_nested_transaction(),get_raw_connection(),get_transaction(),in_nested_transaction(),in_transaction(),info,invalidate(),invalidated,rollback(),run_sync(),scalar(),scalars(),start(),stream(),stream_scalars(),sync_connection,sync_engine

类签名

class sqlalchemy.ext.asyncio.AsyncConnection (sqlalchemy.ext.asyncio.base.ProxyComparable, sqlalchemy.ext.asyncio.base.StartableContext, sqlalchemy.ext.asyncio.AsyncConnectable)

method async aclose() → None

AsyncConnection.close()的同义词。

AsyncConnection.aclose()名称专门用于支持 Python 标准库@contextlib.aclosing上下文管理器函数。

版本 2.0.20 中的新功能。

method begin() → AsyncTransaction

在自动开始之前开始事务。

method begin_nested() → AsyncTransaction

开始一个嵌套事务并返回事务句柄。

method async close() → None

关闭此AsyncConnection

这也会导致回滚事务(如果存在)。

attribute closed

如果此连接已关闭,则返回 True。

代理Connection类,代表AsyncConnection类。

method async commit() → None

提交当前正在进行的事务。

如果已启动事务,则此方法提交当前事务。如果未启动事务,则该方法不起作用,假定连接处于非失效状态。

每当首次执行语句或调用Connection.begin()方法时,都会自动在Connection上开始事务。

attribute connection

未实现异步;调用AsyncConnection.get_raw_connection()

attribute default_isolation_level

与正在使用的Dialect相关联的初始连接时间隔离级别。

代理Connection类,代表AsyncConnection类。

此值独立于Connection.execution_options.isolation_levelEngine.execution_options.isolation_level执行选项,并由Dialect在创建第一个连接时确定,通过针对数据库执行 SQL 查询以获取当前隔离级别,然后再发出任何其他命令。

调用此访问器不会触发任何新的 SQL 查询。

另请参阅

Connection.get_isolation_level() - 查看当前实际隔离级别

create_engine.isolation_level - 设置每个Engine的隔离级别

Connection.execution_options.isolation_level - 设置每个Connection的隔离级别

attribute dialect

代表AsyncConnection类的Connection.dialect属性的代理。

method async exec_driver_sql(statement: str, parameters: _DBAPIAnyExecuteParams | None = None, execution_options: CoreExecuteOptionsParameter | None = None) → CursorResult[Any]

执行驱动程序级别的 SQL 字符串并返回缓冲的Result

method async execute(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → CursorResult[Any]

执行 SQL 语句构造并返回一个缓冲的Result

参数:

  • object –要执行的语句。这始终是一个同时存在于ClauseElementExecutable层次结构中的对象,包括:
  • Select - Select操作
  • Insert, Update, Delete
  • TextClauseTextualSelect
  • DDL 和继承自ExecutableDDLElement的对象
  • parameters – 将绑定到语句中的参数。这可以是参数名称到值的字典,也可以是可变序列(例如列表)的字典。当传递一个字典列表时,底层语句执行将使用 DBAPI cursor.executemany()方法。当传递单个字典时,将使用 DBAPI cursor.execute()方法。
  • execution_options – 可选的执行选项字典,将与语句执行关联。该字典可以提供Connection.execution_options()接受的选项的子集。

返回:

一个Result对象。

method async execution_options(**opt: Any) → AsyncConnection

设置在执行期间生效的非 SQL 选项。

返回此AsyncConnection对象,其中添加了新选项。

有关此方法的完整详情,请参阅Connection.execution_options()

method get_nested_transaction() → AsyncTransaction | None

返回一个表示当前嵌套(保存点)事务的AsyncTransaction,如果有的话。

这将使用底层同步连接的Connection.get_nested_transaction()方法获取当前Transaction,然后在新的AsyncTransaction对象中进行代理。

新版本 1.4.0b2 中引入。

method async get_raw_connection() → PoolProxiedConnection

返回此AsyncConnection正在使用的池化 DBAPI 级连接。

这是一个 SQLAlchemy 连接池代理连接,然后具有属性_ConnectionFairy.driver_connection,该属性引用实际的驱动程序连接。其_ConnectionFairy.dbapi_connection则指代一个AdaptedConnection实例,将驱动程序连接适配为 DBAPI 协议。

method get_transaction() → AsyncTransaction | None

返回一个表示当前事务的AsyncTransaction,如果有的话。

这将使用底层同步连接的Connection.get_transaction()方法获取当前Transaction,然后在新的AsyncTransaction对象中进行代理。

新版本 1.4.0b2 中引入。

method in_nested_transaction() → bool

如果事务正在进行中,则返回 True。

新版本 1.4.0b2 中引入。

method in_transaction() → bool

如果事务正在进行中,则返回 True。

attribute info

返回底层ConnectionConnection.info字典。

此字典可自由编写,以将用户定义的状态与数据库连接关联起来。

仅当AsyncConnection当前已连接时才可用此属性。如果AsyncConnection.closed属性为True,则访问此属性将引发ResourceClosedError

新版本为 1.4.0b2。

method async invalidate(exception: BaseException | None = None) → None

使与此Connection相关联的基础 DBAPI 连接无效。

参见方法Connection.invalidate(),了解此方法的详细信息。

attribute invalidated

如果此连接已失效,则返回 True。

代理了AsyncConnection类的Connection类。

但这并不表示连接是否在池级别上失效。

method async rollback() → None

回滚当前正在进行的事务。

如果已启动事务,则此方法将回滚当前事务。如果未启动事务,则该方法不起作用。如果已启动事务并且连接处于无效状态,则使用此方法清除事务。

当首次执行语句或调用Connection.begin()方法时,将自动在Connection上启动事务。

method async run_sync(fn: ~typing.Callable[[~typing.Concatenate[~sqlalchemy.engine.base.Connection, ~_P]], ~sqlalchemy.ext.asyncio.engine._T], *arg: ~typing.~_P, **kw: ~typing.~_P) → _T

调用给定的同步(即非异步)可调用对象,并将同步风格的Connection作为第一个参数传递。

此方法允许在异步应用程序的上下文中运行传统的同步 SQLAlchemy 函数。

例如:

def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str:
  '''A synchronous function that does not require awaiting
 :param conn: a Core SQLAlchemy Connection, used synchronously
 :return: an optional return value is supported
 '''
    conn.execute(
        some_table.insert().values(int_col=arg1, str_col=arg2)
    )
    return "success"
async def do_something_async(async_engine: AsyncEngine) -> None:
  '''an async function that uses awaiting'''
    async with async_engine.begin() as async_conn:
        # run do_something_with_core() with a sync-style
        # Connection, proxied into an awaitable
        return_code = await async_conn.run_sync(do_something_with_core, 5, "strval")
        print(return_code)

通过在一个特别的被监控的 greenlet 中运行给定的可调用对象,此方法将一直维持 asyncio 事件循环直到数据库连接。

AsyncConnection.run_sync()的最基本用法是调用诸如MetaData.create_all()之类的方法,给定需要提供给MetaData.create_all()作为Connection对象的AsyncConnection

# run metadata.create_all(conn) with a sync-style Connection,
# proxied into an awaitable
with async_engine.begin() as conn:
    await conn.run_sync(metadata.create_all)

注意

提供的可调用对象在 asyncio 事件循环内联调用,并且将在传统 IO 调用上阻塞。此可调用对象内的 IO 应仅调用进入 SQLAlchemy 的 asyncio 数据库 API,这些 API 将被正确地适应到 greenlet 上下文中。

另请参阅

AsyncSession.run_sync()

在 asyncio 下运行同步方法和函数

method async scalar(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → Any

执行 SQL 语句构造并返回标量对象。

此方法是在调用Connection.execute()方法后调用Result.scalar()方法的简写。参数是等效的。

返回:

代表返回的第一行的第一列的标量 Python 值。

method async scalars(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → ScalarResult[Any]

执行 SQL 语句构造并返回标量对象。

此方法是在调用Connection.execute()方法后调用Result.scalars()方法的简写。参数是等效的。

返回:

一个ScalarResult对象。

版本 1.4.24 中的新功能。

method async start(is_ctxmanager: bool = False) → AsyncConnection

在使用 Python with: 块之外启动此AsyncConnection对象的上下文。

method stream(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → AsyncIterator[AsyncResult[Any]]

执行语句并返回一个产生AsyncResult对象的可等待对象。

例如:

result = await conn.stream(stmt):
async for row in result:
    print(f"{row}")

AsyncConnection.stream()方法支持可选的上下文管理器用法,针对AsyncResult对象,如下所示:

async with conn.stream(stmt) as result:
    async for row in result:
        print(f"{row}")

在上述模式中,即使迭代器被异常抛出中断,AsyncResult.close()方法也会无条件地被调用。然而,上下文管理器的使用仍然是可选的,并且该函数可以以async with fn():await fn()的方式调用。

新增于版本 2.0.0b3:增加了上下文管理器支持

返回:

将产生一个可等待对象,该对象将生成一个AsyncResult对象。

另见

AsyncConnection.stream_scalars()

method stream_scalars(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → AsyncIterator[AsyncScalarResult[Any]]

执行语句并返回一个可等待的AsyncScalarResult对象。

例如:

result = await conn.stream_scalars(stmt)
async for scalar in result:
    print(f"{scalar}")

此方法是在调用Connection.stream()方法后调用AsyncResult.scalars()方法的简写。参数是等效的。

AsyncConnection.stream_scalars()方法支持针对AsyncScalarResult对象的可选上下文管理器使用,如下所示:

async with conn.stream_scalars(stmt) as result:
    async for scalar in result:
        print(f"{scalar}")

在上述模式中,即使迭代器被异常抛出中断,AsyncScalarResult.close()方法也会无条件地被调用。然而,上下文管理器的使用仍然是可选的,并且该函数可以以async with fn():await fn()的方式调用。

新增于版本 2.0.0b3:增加了上下文管理器支持

返回:

将产生一个可等待对象,该对象将生成一个AsyncScalarResult对象。

新增于版本 1.4.24。

另见

AsyncConnection.stream()

attribute sync_connection: Connection | None

引用同步式Connection指向此AsyncConnection的请求代理。

此实例可用作事件目标。

另见

使用 asyncio 扩展的事件

attribute sync_engine: Engine

引用同步式Engine指向此AsyncConnection的关联,通过其基础的Connection

此实例可用作事件目标。

另见

使用 asyncio 扩展的事件

class sqlalchemy.ext.asyncio.AsyncTransaction

一个Transaction的 asyncio 代理。

成员

close(), commit(), rollback(), start()

类签名

sqlalchemy.ext.asyncio.AsyncTransactionsqlalchemy.ext.asyncio.base.ProxyComparablesqlalchemy.ext.asyncio.base.StartableContext

method async close() → None

关闭此AsyncTransaction

如果此事务是 begin/commit 嵌套中的基本事务,则事务将 rollback()。 否则,该方法返回。

此用于取消事务而不影响封闭事务范围的事务。

method async commit() → None

提交此AsyncTransaction

method async rollback() → None

回滚此AsyncTransaction

method async start(is_ctxmanager: bool = False) → AsyncTransaction

在不使用 Python with:块的情况下启动此AsyncTransaction对象的上下文。


SqlAlchemy 2.0 中文文档(二十八)(2)https://developer.aliyun.com/article/1560384

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
4月前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(二十九)(3)
SqlAlchemy 2.0 中文文档(二十九)
44 4
|
4月前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(二十九)(2)
SqlAlchemy 2.0 中文文档(二十九)
39 7
|
4月前
|
SQL 存储 关系型数据库
SqlAlchemy 2.0 中文文档(二十九)(1)
SqlAlchemy 2.0 中文文档(二十九)
51 4
|
4月前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(二十九)(4)
SqlAlchemy 2.0 中文文档(二十九)
43 4
|
4月前
|
SQL 缓存 前端开发
SqlAlchemy 2.0 中文文档(二十七)(5)
SqlAlchemy 2.0 中文文档(二十七)
46 2
|
4月前
|
SQL 前端开发 关系型数据库
SqlAlchemy 2.0 中文文档(二十七)(2)
SqlAlchemy 2.0 中文文档(二十七)
34 2
|
4月前
|
SQL 缓存 API
SqlAlchemy 2.0 中文文档(二十八)(4)
SqlAlchemy 2.0 中文文档(二十八)
70 1
|
4月前
|
关系型数据库 测试技术 API
SqlAlchemy 2.0 中文文档(二十八)(3)
SqlAlchemy 2.0 中文文档(二十八)
40 1
|
4月前
|
SQL 数据库 数据库管理
SqlAlchemy 2.0 中文文档(二十七)(3)
SqlAlchemy 2.0 中文文档(二十七)
31 1
|
4月前
|
SQL 前端开发 API
SqlAlchemy 2.0 中文文档(二十七)(1)
SqlAlchemy 2.0 中文文档(二十七)
58 1