SqlAlchemy 2.0 中文文档(四十四)(3)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: SqlAlchemy 2.0 中文文档(四十四)

SqlAlchemy 2.0 中文文档(四十四)(2)https://developer.aliyun.com/article/1563069


将 RETURNING 行与参数集关联起来

新版本 2.0.10 中新增。

在前一节中示例的“批处理”模式查询不保证返回的记录顺序与输入数据的顺序相对应。当由 SQLAlchemy ORM 的 unit of work 过程使用时,以及对与输入数据相关的返回的服务器生成的值进行关联的应用程序时,Insert.returning()UpdateBase.return_defaults()方法包括一个选项Insert.returning.sort_by_parameter_order,指示“insertmanyvalues”模式应该保证这种对应关系。这与数据库后端实际 INSERT 的记录顺序无关,在任何情况下都不能假设;只有当收到返回的记录时应该有序排列,以与原始输入数据传递的顺序相对应。

Insert.returning.sort_by_parameter_order 参数存在时,对于使用服务器生成的整数主键值(如 IDENTITY、PostgreSQL SERIAL、MariaDB AUTO_INCREMENT 或 SQLite 的 ROWID 方案)的表,可能会选择使用更复杂的 INSERT…RETURNING 形式,并根据返回的值对行进行后执行排序,或者如果不存在这样的形式,则 “insertmanyvalues” 功能可能会优雅地降级到“非批处理”模式,为每个参数集运行单独的 INSERT 语句。

例如,在 SQL Server 中,当自动增量的 IDENTITY 列用作主键时,使用以下 SQL 形式:

INSERT  INTO  a  (data,  x,  y)
OUTPUT  inserted.id,  inserted.id  AS  id__1
SELECT  p0,  p1,  p2  FROM  (VALUES
  (?,  ?,  ?,  0),  (?,  ?,  ?,  1),  (?,  ?,  ?,  2),
  ...
  (?,  ?,  ?,  77)
)  AS  imp_sen(p0,  p1,  p2,  sen_counter)  ORDER  BY  sen_counter

对于 PostgreSQL 也是类似的形式,当主键列使用 SERIAL 或 IDENTITY 时。上述形式保证插入行的顺序。但它确保 IDENTITY 或 SERIAL 值将按照每个参数集的顺序创建[2]。然后,“insertmanyvalues” 功能通过递增整数标识对上述 INSERT 语句返回的行进行排序。

对于 SQLite 数据库,不存在适当的 INSERT 形式可以将新的 ROWID 值的生成与传递参数集的顺序进行关联。因此,在请求有序 RETURNING 时,使用服务器生成的主键值时,SQLite 后端将降级为“非批处理”模式。对于 MariaDB,默认的 INSERT 形式对 insertmanyvalues 足够,因为此数据库后端在使用 InnoDB 时会将 AUTO_INCREMENT 的顺序与输入数据的顺序对齐[3]

对于客户端生成的主键,例如当使用 Python 的 uuid.uuid4() 函数为 Uuid 列生成新值时,“insertmanyvalues” 功能会透明地将此列包含在 RETURNING 记录中,并将其值与给定输入记录的值进行关联,从而保持输入记录和结果行之间的对应关系。由此可见,当使用客户端生成的主键值时,所有后端都允许批处理,参数相关的 RETURNING 顺序。

“insertmanyvalues” 的 “batch” 模式确定了一列或多列用作输入参数和返回行之间对应点的列,这被称为插入标记,它是用于跟踪这些值的特定列。通常会自动选择“插入标记”,但也可以根据极端特殊情况进行用户配置;章节配置标记列对此进行了描述。

对于不提供适当的 INSERT 形式以确定地与输入值对齐的服务器生成值的后端,或者对于具有其他类型服务器生成的主键值的Table配置,“insertmanyvalues”模式将在保证请求 RETURNING 排序时使用非批处理模式。

另请参阅

对于Table配置,如果没有客户端主键值,并且提供服务器生成的主键值(或没有主键),而数据库无法根据多个参数集以确定性或可排序的方式调用,则“insertmanyvalues”功能在满足Insert.returning.sort_by_parameter_order要求的情况下,可能会选择使用非批处理模式

在这种模式下,保持原始的 INSERT SQL 形式,并且“insertmanyvalues”功能将为每个参数集单独运行给定的语句,将返回的行组织成完整的结果集。与以前的 SQLAlchemy 版本不同,它会在最小化 Python 开销的紧凑循环中执行。在某些情况下,例如在 SQLite 上,“非批处理”模式的性能与“批处理”模式完全相同。

语句执行模型

对于“批量”和“非批量”模式,该特性必然会使用 DBAPI cursor.execute() 方法调用多个 INSERT 语句,在单个对核心级 Connection.execute() 方法的调用范围内,每个语句包含多达固定数量的参数集。此限制可按下面的描述进行配置,位于 控制批量大小。对 cursor.execute() 的单独调用被单独记录,并且单独传递给事件侦听器,如 ConnectionEvents.before_cursor_execute()(请参阅下面的 日志和事件)。

配置哨兵列

在典型情况下,为了提供具有确定性行顺序的 INSERT…RETURNING,“insertmanyvalues” 特性将自动从给定表的主键中确定一个哨兵列,如果无法识别,则优雅地降级到“逐行”模式。作为一个完全可选的特性,为了获得对于具有服务器生成的主键的表的完整“insertmanyvalues”批量性能,其默认生成器函数与“哨兵”用例不兼容的情况,其他非主键列可以被标记为“哨兵”列,假设它们满足某些要求。一个典型的例子是具有客户端默认值的非主键 Uuid 列,如 Python 的 uuid.uuid4() 函数。还有一种构造方法可以创建具有面向“insertmanyvalues”用例的客户端整数计数器的简单整数列。

可以通过在合格的列上添加 Column.insert_sentinel 来指示哨兵列。最基本的“合格”列是一个非空且唯一的列,具有客户端默认值,例如 UUID 列如下所示:

import uuid
from sqlalchemy import Column
from sqlalchemy import FetchedValue
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import Uuid
my_table = Table(
    "some_table",
    metadata,
    # assume some arbitrary server-side function generates
    # primary key values, so cannot be tracked by a bulk insert
    Column("id", String(50), server_default=FetchedValue(), primary_key=True),
    Column("data", String(50)),
    Column(
        "uniqueid",
        Uuid(),
        default=uuid.uuid4,
        nullable=False,
        unique=True,
        insert_sentinel=True,
    ),
)

在使用 ORM Declarative 模型时,可以使用 mapped_column 构造相同的形式:

import uuid
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
class Base(DeclarativeBase):
    pass
class MyClass(Base):
    __tablename__ = "my_table"
    id: Mapped[str] = mapped_column(primary_key=True, server_default=FetchedValue())
    data: Mapped[str] = mapped_column(String(50))
    uniqueid: Mapped[uuid.UUID] = mapped_column(
        default=uuid.uuid4, unique=True, insert_sentinel=True
    )

尽管默认生成器生成的值必须是唯一的,但上述“哨兵”列上的实际 UNIQUE 约束,由 unique=True 参数指示,本身是可选的,如果不需要,可以省略。

还有一种特殊的“插入标志”形式,它是一个专用的可空整数列,该列使用仅在“insertmanyvalues”操作期间使用的特殊默认整数计数器;作为额外的行为,该列将在 SQL 语句和结果集中省略自身,并以基本透明的方式行为。但是,它确实需要在实际的数据库表中物理存在。可以使用函数 insert_sentinel() 构建这种 Column 样式:

from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import Uuid
from sqlalchemy import insert_sentinel
Table(
    "some_table",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("data", String(50)),
    insert_sentinel("sentinel"),
)

在使用 ORM Declarative 时,有一个友好的版本 insert_sentinel() 称为 orm_insert_sentinel() 可供使用,它可以在 Base 类或 mixin 上使用;如果使用 declared_attr() 封装,该列将应用于所有包括在联合继承层次结构中的表绑定子类中:

from sqlalchemy.orm import declared_attr
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import orm_insert_sentinel
class Base(DeclarativeBase):
    @declared_attr
    def _sentinel(cls) -> Mapped[int]:
        return orm_insert_sentinel()
class MyClass(Base):
    __tablename__ = "my_table"
    id: Mapped[str] = mapped_column(primary_key=True, server_default=FetchedValue())
    data: Mapped[str] = mapped_column(String(50))
class MySubClass(MyClass):
    __tablename__ = "sub_table"
    id: Mapped[str] = mapped_column(ForeignKey("my_table.id"), primary_key=True)
class MySingleInhClass(MyClass):
    pass

在上面的示例中,“my_table” 和 “sub_table” 都将有一个名为 “_sentinel” 的额外整数列,该列可供 ORM 使用的 “insertmanyvalues” 功能来帮助优化批量插入。 ### 控制批次大小

“insertmanyvalues”的一个关键特性是 INSERT 语句的大小受到固定最大数量的“values”子句以及方言特定的固定的一次性可在一个 INSERT 语句中表示的绑定参数总数的限制。当给定的参数字典数量超过固定限制时,或者当要在单个 INSERT 语句中呈现的绑定参数总数超过固定限制时(这两个固定限制是分开的),将在单个 Connection.execute() 调用范围内调用多个 INSERT 语句,其中每个 INSERT 语句都容纳一部分参数字典,称为“批次”。每个“批次”中表示的参数字典数量就是“批次大小”。例如,批次大小为 500 意味着每个发出的 INSERT 语句最多将插入 500 行。

能够调整批处理大小可能是很重要的,因为较大的批处理大小对于插入(INSERT)操作可能更有效率,其中值集本身相对较小,并且较小的批处理大小可能更适合使用非常大的值集的插入操作,其中渲染的 SQL 大小以及传递给一个语句的总数据大小可能受到基于后端行为和内存约束的特定大小的限制的影响。因此,批处理大小可以根据每个 Engine 和每个语句的基础进行配置。另一方面,参数限制是根据正在使用的数据库的已知特性固定的。

大多数后端的批处理大小默认为 1000,具有额外的每个方言的“最大参数数”限制因素,可能会进一步减小每个语句的批处理大小。参数的最大数目因方言和服务器版本而异;最大值为 32700(选择了一个距离 PostgreSQL 的限制 32767 和 SQLite 的现代限制 32766 很大的距离,同时为语句中的附加参数以及 DBAPI 的怪异性留出了空间)。旧版本的 SQLite(在 3.32.0 之前)将此值设置为 999。MariaDB 没有确定的限制,但 32700 仍然作为 SQL 消息大小的限制因素。

“批处理大小”的值可以通过 Enginecreate_engine.insertmanyvalues_page_size 参数进行全局设置。例如,要影响包含每个语句中的最多 100 个参数集的 INSERT 语句:

e = create_engine("sqlite://", insertmanyvalues_page_size=100)

批处理大小也可以使用 Connection.execution_options.insertmanyvalues_page_size 执行选项在每个语句的基础上进行影响,例如每次执行:

with e.begin() as conn:
    result = conn.execute(
        table.insert().returning(table.c.id),
        parameterlist,
        execution_options={"insertmanyvalues_page_size": 100},
    )

或者在语句本身上进行配置:

stmt = (
    table.insert()
    .returning(table.c.id)
    .execution_options(insertmanyvalues_page_size=100)
)
with e.begin() as conn:
    result = conn.execute(stmt, parameterlist)
```### 记录和事件
“insertmanyvalues” 功能与 SQLAlchemy 的语句记录以及游标事件完全集成,例如 `ConnectionEvents.before_cursor_execute()`。当参数列表被拆分成单独的批次时,**每个 INSERT 语句都将被单独记录并传递给事件处理程序**。这与之前 SQLAlchemy 1.x 系列中仅使用 psycopg2 的功能的工作方式相比是一个重大变化,之前的工作方式中,多个 INSERT 语句的生成对于记录和事件是隐藏的。日志显示会截断长参数列表以便阅读,并且还会指示每个语句的特定批次。下面的示例说明了此日志的摘录:
```py
INSERT INTO a (data, x, y) VALUES (?, ?, ?), ... 795 characters truncated ...  (?, ?, ?), (?, ?, ?) RETURNING id
[generated in 0.00177s (insertmanyvalues) 1/10 (unordered)] ('d0', 0, 0, 'd1',  ...
INSERT INTO a (data, x, y) VALUES (?, ?, ?), ... 795 characters truncated ...  (?, ?, ?), (?, ?, ?) RETURNING id
[insertmanyvalues 2/10 (unordered)] ('d100', 100, 1000, 'd101', ...
...
INSERT INTO a (data, x, y) VALUES (?, ?, ?), ... 795 characters truncated ...  (?, ?, ?), (?, ?, ?) RETURNING id
[insertmanyvalues 10/10 (unordered)] ('d900', 900, 9000, 'd901', ...

当 非批处理模式 发生时,日志记录将会指示这一点,并显示 insertmanyvalues 消息:

...
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 67/78 (ordered; batch not supported)] ('d66', 66, 66)
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 68/78 (ordered; batch not supported)] ('d67', 67, 67)
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 69/78 (ordered; batch not supported)] ('d68', 68, 68)
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 70/78 (ordered; batch not supported)] ('d69', 69, 69)
...

另请参阅

配置日志

Upsert 支持

PostgreSQL、SQLite 和 MariaDB 方言提供了特定于后端的“upsert”构造 insert()insert()insert(),它们分别是 Insert 构造,具有诸如 on_conflict_do_update() 或 ``on_duplicate_key()` 的附加方法。当这些构造与 RETURNING 一起使用时,它们还支持“insertmanyvalues”行为,允许高效地进行带 RETURNING 的 upsert 操作。 ## 引擎处理

Engine 指的是一个连接池,这意味着在正常情况下,当 Engine 对象仍然驻留在内存中时,存在着打开的数据库连接。当一个 Engine 被垃圾回收时,它的连接池将不再被该 Engine 引用,并且假设没有连接仍然被检出,那么连接池及其连接也将被垃圾回收,这将关闭实际的数据库连接。但是除此之外,Engine 将保持打开的数据库连接,假设它使用的是通常的默认连接池实现 QueuePool

Engine 通常应该是一个事先建立并在应用程序的整个生命周期中维护的永久性构造。它应该按照每个连接的方式创建和处理;相反,它是一个注册表,既维护着一组连接池,又维护着关于正在使用的数据库和 DBAPI 的配置信息,以及某种程度上的针对每个数据库资源的内部缓存。

然而,有许多情况下希望所有由Engine引用的连接资源都被完全关闭。通常不建议依赖 Python 垃圾回收来处理这些情况;相反,可以使用Engine.dispose()方法显式地释放Engine。这会处理引擎的底层连接池,并用一个空的新连接池替换它。只要此时丢弃了Engine并且不再使用它,它引用的所有检入连接也将被完全关闭。

调用Engine.dispose()的有效用例包括:

  • 当程序想要释放连接池中的所有剩余已检入连接,并且不再期望对该数据库进行任何未来操作时。
  • 当程序使用多进程或fork(),并且将Engine对象复制到子进程时,应调用Engine.dispose(),以便引擎在该 fork 中创建全新的数据库连接。数据库连接通常不会跨进程边界传输。在这种情况下,使用Engine.dispose.close参数设置为 False。有关此用例的更多背景信息,请参见使用连接池进行多进程或 os.fork()部分。
  • 在测试套件或多租户场景中,可能会创建和处理许多临时的短寿命Engine对象。

当引擎被处理或被垃圾回收时,已签出的连接不会被丢弃,因为这些连接仍然在应用程序的其他地方被强引用。但是,在调用Engine.dispose()之后,这些连接将不再与该Engine相关联;当它们关闭时,它们将返回到它们现在孤立的连接池中,该连接池最终会在所有引用它的连接都不再在任何地方被引用时被垃圾回收。由于这个过程不容易控制,强烈建议仅在所有签出的连接都已签入或以其他方式与它们的池解除关联后才调用Engine.dispose()

对于受到 Engine 对象的连接池使用影响的应用程序,另一种选择是完全禁用连接池。这通常只会对使用新连接产生轻微的性能影响,并且意味着当连接被检入时,它会完全关闭并且不会在内存中保留。请参阅 切换连接池实现 以获取有关如何禁用连接池的指南。

另请参阅

连接池

在多进程或 os.fork() 中使用连接池 ## 使用驱动程序 SQL 和原始 DBAPI 连接

在介绍使用 Connection.execute() 时,使用了 text() 构造来说明如何调用文本 SQL 语句。在使用 SQLAlchemy 时,文本 SQL 实际上更多地是例外而不是规范,因为核心表达式语言和 ORM 都将 SQL 的文本表示抽象化了。但是,text() 构造本身也提供了对文本 SQL 的一些抽象,它规范了如何传递绑定参数,以及支持参数和结果集行的数据类型行为。

直接调用驱动程序的 SQL 字符串

对于希望直接传递给底层驱动程序(称为 DBAPI)的文本 SQL 而不经过 text() 构造的用例,可以使用 Connection.exec_driver_sql() 方法:

with engine.connect() as conn:
    conn.exec_driver_sql("SET param='bar'")

从版本 1.4 开始:新增了 Connection.exec_driver_sql() 方法。

直接使用 DBAPI 游标

有些情况下,SQLAlchemy 并没有提供一种通用化的方法来访问一些 DBAPI 函数,例如调用存储过程以及处理多个结果集。在这些情况下,直接处理原始的 DBAPI 连接同样是一种方便的方式。

访问原始 DBAPI 连接的最常见方法是直接从已有的 Connection 对象中获取。这可以通过 Connection.connection 属性进行访问:

connection = engine.connect()
dbapi_conn = connection.connection

这里的 DBAPI 连接实际上是一个“代理”,就连接池的原始连接而言,然而这是一个大多数情况下可以忽略的实现细节。由于这个 DBAPI 连接仍然包含在一个拥有的Connection对象的范围内,最好使用Connection对象来进行大多数功能,如事务控制以及调用Connection.close()方法;如果这些操作直接在 DBAPI 连接上执行,拥有的Connection将不会意识到这些状态的变化。

为了克服由拥有的Connection维护的 DBAPI 连接所施加的限制,还可以使用 Engine.raw_connection() 方法获取一个 DBAPI 连接,而不需要先获取一个 Connection

dbapi_conn = engine.raw_connection()

这个 DBAPI 连接再次是一个“代理”形式,就像以前的情况一样。这种代理的目的现在显而易见,当我们调用此连接的 .close() 方法时,DBAPI 连接通常实际上不会关闭,而是被释放回引擎的连接池:

dbapi_conn.close()

虽然 SQLAlchemy 可能会在未来添加更多用于更多 DBAPI 使用案例的内置模式,但由于这些情况往往很少需要,并且它们也高度依赖于正在使用的 DBAPI 的类型,所以在任何情况下,直接使用 DBAPI 调用模式始终存在于那些需要的情况下。

另见

在使用 Engine 时,如何访问原始 DBAPI 连接? - 包括有关如何访问 DBAPI 连接以及在使用 asyncio 驱动程序时“驱动程序”连接的其他详细信息。

一些用于 DBAPI 连接的方法如下。### 调用存储过程和用户定义的函数

SQLAlchemy 支持以几种方式调用存储过程和用户定义的函数。请注意,所有的 DBAPI 都有不同的做法,因此您必须咨询底层 DBAPI 的文档以获取与您特定用途相关的具体信息。以下示例是假设性的,并且可能不适用于您的底层 DBAPI。

对于具有特殊语法或参数问题的存储过程或函数,可以使用 DBAPI 级别的 callproc,您的 DBAPI 可能可以使用。这种模式的一个例子是:

connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()
    cursor_obj.callproc("my_procedure", ["x", "y", "z"])
    results = list(cursor_obj.fetchall())
    cursor_obj.close()
    connection.commit()
finally:
    connection.close()

注意

并非所有的 DBAPI 都使用 callproc,总体使用细节会有所不同。上面的例子只是说明如何使用特定的 DBAPI 函数。

您的 DBAPI 可能没有callproc要求,可能要求使用另一种模式调用存储过程或用户定义的函数,例如正常的 SQLAlchemy 连接使用。这种用法模式的一个例子是,在撰写本文档时,使用 psycopg2 DBAPI 在 PostgreSQL 数据库中执行存储过程,应该使用正常的连接使用方式调用:

connection.execute("CALL my_procedure();")

上面的例子是假设性的。底层数据库不保证在这些情况下支持“CALL”或“SELECT”,关键字可能会根据函数是存储过程还是用户定义的函数而变化。在这些情况下,您应该查阅底层的 DBAPI 和数据库文档,以确定使用的正确语法和模式。

多结果集

多结果集支持可以通过原始 DBAPI 游标使用nextset方法获得:

connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()
    cursor_obj.execute("select * from table1; select * from table2")
    results_one = cursor_obj.fetchall()
    cursor_obj.nextset()
    results_two = cursor_obj.fetchall()
    cursor_obj.close()
finally:
    connection.close()

注册新方言

create_engine()函数调用通过 setuptools 入口点定位给定的方言。这些入口点可以在 setup.py 脚本中为第三方方言建立。例如,要创建一个名为“foodialect://”的新方言,步骤如下:

  1. 创建一个名为foodialect的包。
  2. 该包应该有一个包含方言类的模块,该类通常是sqlalchemy.engine.default.DefaultDialect的子类。在这个例子中,假设它被称为FooDialect,并且通过foodialect.dialect访问其模块。
  3. 入口点可以在setup.cfg中建立如下:
[options.entry_points]
sqlalchemy.dialects  =
  foodialect  =  foodialect.dialect:FooDialect

如果方言为现有的 SQLAlchemy 支持的数据库提供了对特定 DBAPI 的支持,则可以给出名称,包括数据库限定符。例如,如果FooDialect实际上是一个 MySQL 方言,可以像这样建立入口点:

[options.entry_points]
sqlalchemy.dialects
  mysql.foodialect  =  foodialect.dialect:FooDialect

上述入口点将被访问为create_engine("mysql+foodialect://")

在进程内注册方言

SQLAlchemy 还允许在当前进程中注册一个方言,绕过需要单独安装的必要性。使用register()函数如下:

from sqlalchemy.dialects import registry
registry.register("mysql.foodialect", "myapp.dialect", "MyMySQLDialect")

上述将响应create_engine("mysql+foodialect://")并从myapp.dialect模块加载MyMySQLDialect类。

连接/引擎 API

对象名称 描述
连接 为包装的 DB-API 连接提供高级功能。
CreateEnginePlugin 一组旨在根据 URL 中的入口点名称增强 Engine 对象构造的钩子。
Engine PoolDialect 连接在一起,提供数据库连接和行为的来源。
ExceptionContext 封装有关正在进行的错误条件的信息。
NestedTransaction 表示“嵌套”或 SAVEPOINT 事务。
RootTransaction 表示 Connection 上的“根”事务。
Transaction 表示正在进行的数据库事务。
TwoPhaseTransaction 表示两阶段事务。
class sqlalchemy.engine.Connection

为封装的 DB-API 连接提供高级功能。

通过调用 Engine.connect() 方法获取 Engine 对象的 Connection 对象,并提供执行 SQL 语句以及事务控制的服务。

Connection 对象不是线程安全的。虽然一个 Connection 可以通过正确同步的访问在线程之间共享,但底层的 DBAPI 连接可能不支持线程之间的共享访问。查看 DBAPI 文档以获取详细信息。

成员

init(), begin(), begin_nested(), begin_twophase(), close(), closed, commit(), connection, default_isolation_level, detach(), exec_driver_sql(), execute(), execution_options(), get_execution_options(), get_isolation_level(), get_nested_transaction(), get_transaction(), in_nested_transaction(), in_transaction(), info, invalidate(), invalidated, rollback(), scalar(), scalars(), schema_for_object()

连接对象表示从连接池中检出的单个 DBAPI 连接。在此状态下,连接池不会影响连接,包括其到期或超时状态。为了让连接池正确管理连接,连接应在未被使用时返回到连接池(即 connection.close())。

类签名

sqlalchemy.engine.Connection (sqlalchemy.engine.interfaces.ConnectionEventsTarget, sqlalchemy.inspection.Inspectable)

method __init__(engine: Engine, connection: PoolProxiedConnection | None = None, _has_events: bool | None = None, _allow_revalidate: bool = True, _allow_autobegin: bool = True)

构建一个新的连接。

method begin() → RootTransaction

开始一个事务,以便在自动开始之前进行。

例如:

with engine.connect() as conn:
    with conn.begin() as trans:
        conn.execute(table.insert(), {"username": "sandy"})

返回的对象是 RootTransaction 的实例。该对象表示事务的“范围”,当 Transaction.rollback()Transaction.commit() 方法被调用时完成事务;该对象还可作为上述示例中所示的上下文管理器。

Connection.begin() 方法开始一个事务,通常在连接首次用于执行语句时始终会开始。可能使用此方法的原因是在特定时间调用 ConnectionEvents.begin() 事件,或者在连接检出的范围内组织代码以利用上下文管理的块,例如:

with engine.connect() as conn:
    with conn.begin():
        conn.execute(...)
        conn.execute(...)
    with conn.begin():
        conn.execute(...)
        conn.execute(...)

上述代码在行为上与不使用 Connection.begin() 的以下代码基本没有区别;下面的样式称为“随时提交”样式:

with engine.connect() as conn:
    conn.execute(...)
    conn.execute(...)
    conn.commit()
    conn.execute(...)
    conn.execute(...)
    conn.commit()

从数据库的角度来看,Connection.begin() 方法不会发出任何 SQL 或以任何方式更改底层 DBAPI 连接的状态;Python DBAPI 没有任何显式事务开始的概念。

另请参阅

处理事务和 DBAPI - 在 SQLAlchemy 统一教程中

Connection.begin_nested() - 使用保存点

Connection.begin_twophase() - 使用两阶段 / XID 事务

Engine.begin() - 可从 Engine 使用的上下文管理器

method begin_nested() → NestedTransaction

开始一个嵌套事务(即保存点),并返回一个控制保存点范围的事务句柄。

例如:

with engine.begin() as connection:
    with connection.begin_nested():
        connection.execute(table.insert(), {"username": "sandy"})

返回的对象是NestedTransaction的实例,其中包括事务方法NestedTransaction.commit()NestedTransaction.rollback();对于嵌套事务,这些方法对应于操作“RELEASE SAVEPOINT ”和“ROLLBACK TO SAVEPOINT ”。保存点的名称是局限于NestedTransaction对象的,并且会自动生成。与任何其他Transaction一样,NestedTransaction可以用作上面示例中所说明的上下文管理器,该上下文管理器将“释放”或“回滚”相应于块内操作是否成功或引发异常。

嵌套事务要求底层数据库支持 SAVEPOINT,否则行为未定义。SAVEPOINT 通常用于在事务内运行可能失败的操作,同时继续外部事务。例如:

from sqlalchemy import exc
with engine.begin() as connection:
    trans = connection.begin_nested()
    try:
        connection.execute(table.insert(), {"username": "sandy"})
        trans.commit()
    except exc.IntegrityError:  # catch for duplicate username
        trans.rollback()  # rollback to savepoint
    # outer transaction continues
    connection.execute( ... )

如果在调用 Connection.begin_nested() 之前没有先调用 Connection.begin()Engine.begin(),则 Connection 对象将“自动开始”外部事务。这个外部事务可以使用“随时提交”样式提交,例如:

with engine.connect() as connection:  # begin() wasn't called
    with connection.begin_nested(): will auto-"begin()" first
        connection.execute( ... )
    # savepoint is released
    connection.execute( ... )
    # explicitly commit outer transaction
    connection.commit()
    # can continue working with connection here

2.0 版本更改:Connection.begin_nested() 现在将参与连接的“自动开始”行为,这是自 2.0 版本 / 1.4 版本“未来”风格连接的新功能。

另请参阅

Connection.begin()

使用 SAVEPOINT - 保存点的 ORM 支持

method begin_twophase(xid: Any | None = None) → TwoPhaseTransaction

开始一个两阶段或 XA 事务并返回一个事务句柄。

返回的对象是TwoPhaseTransaction的实例,除了由Transaction提供的方法之外,还提供了一个TwoPhaseTransaction.prepare()方法。

参数:

xid – 两阶段事务 id。如果未提供,将生成一个随机 id。

另请参阅

Connection.begin()

Connection.begin_twophase()

method close() → None

关闭此Connection

这会导致底层数据库资源的释放,即内部引用的 DBAPI 连接。DBAPI 连接通常会恢复到产生此ConnectionEngine所引用的连接持有Pool。无论是否存在任何与此Connection相关的Transaction对象,DBAPI 连接上的任何事务状态都将通过 DBAPI 连接的rollback()方法无条件释放。

如果存在任何事务,则此方法还会调用Connection.rollback()

在调用Connection.close()之后,Connection将永久处于关闭状态,不允许进行任何进一步的操作。

attribute closed

如果此连接已关闭,则返回 True。

method commit() → None

提交当前正在进行的事务。

如果已经开始了当前事务,则此方法提交当前事务。如果没有启动事务,则此方法不起作用,假设连接处于非无效状态。

每当首次执行语句或调用Connection.begin()方法时,Connection会自动开始事务。

注意

Connection.commit() 方法仅对链接到 Connection 对象的主数据库事务起作用。它不会操作从 Connection.begin_nested() 方法调用的 SAVEPOINT;要控制 SAVEPOINT,请对 Connection.begin_nested() 方法本身返回的 NestedTransaction 调用 NestedTransaction.commit()

attribute connection

此连接管理的底层 DB-API 连接。

这是一个 SQLAlchemy 连接池代理连接,然后具有属性 _ConnectionFairy.dbapi_connection,该属性引用实际的驱动程序连接。

另请参阅

使用 Driver SQL 和原始 DBAPI 连接

attribute default_isolation_level

与正在使用的 Dialect 关联的初始连接时间隔离级别。

此值独立于 Connection.execution_options.isolation_levelEngine.execution_options.isolation_level 执行选项,并且由 Dialect 在创建第一个连接时确定,通过针对数据库执行当前隔离级别的 SQL 查询,在发出任何其他命令之前。

调用此访问器不会触发任何新的 SQL 查询。

另请参阅

Connection.get_isolation_level() - 查看当前实际隔离级别

create_engine.isolation_level - 设置每个 Engine 的隔离级别

Connection.execution_options.isolation_level - 设置每个 Connection 的隔离级别

method detach() → None

从其连接池中分离底层 DB-API 连接。

例如:

with engine.connect() as conn:
    conn.detach()
    conn.execute(text("SET search_path TO schema1, schema2"))
    # work with connection
# connection is fully closed (since we used "with:", can
# also call .close())

Connection实例将保持可用。当关闭(或从上下文管理器上下文中退出)时,DB-API 连接将被真正关闭,不会返回到其原始池中。

此方法可用于隔离连接上的应用程序的其余部分的修改状态(例如事务隔离级别或类似内容)。

method exec_driver_sql(statement: str, parameters: _DBAPIAnyExecuteParams | None = None, execution_options: CoreExecuteOptionsParameter | None = None) → CursorResult[Any]

直接在 DBAPI 游标上执行字符串 SQL 语句,无需任何 SQL 编译步骤。

这可以用于直接将任何字符串传递给正在使用的 DBAPI 的cursor.execute()方法。

参数:

  • statement – 要执行的语句字符串。绑定参数必须使用底层 DBAPI 的 paramstyle,例如“qmark”,“pyformat”,“format”等。
  • parameters – 表示要在执行中使用的绑定参数值。格式之一:具有命名参数的字典,具有位置参数的元组,或包含用于多次执行支持的字典或元组的列表。

返回值:

CursorResult

例如,多个字典:

conn.exec_driver_sql(
    "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
    [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}]
)

单个字典:

conn.exec_driver_sql(
    "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
    dict(id=1, value="v1")
)

单个元组:

conn.exec_driver_sql(
    "INSERT INTO table (id, value) VALUES (?, ?)",
    (1, 'v1')
)

注意

Connection.exec_driver_sql()方法不参与ConnectionEvents.before_execute()ConnectionEvents.after_execute()事件。要拦截对Connection.exec_driver_sql()的调用,请使用ConnectionEvents.before_cursor_execute()ConnectionEvents.after_cursor_execute()

另请参阅

PEP 249

method execute(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → CursorResult[Any]

执行 SQL 语句构造并返回CursorResult

参数:

  • statement–要执行的语句。这始终是ClauseElementExecutable层次结构中的对象,包括:
  • Select
  • InsertUpdateDelete
  • TextClauseTextualSelect
  • DDL 和从 ExecutableDDLElement 继承的对象
  • parameters – 将绑定到语句中的参数。这可以是一个参数名到值的字典,或一个可变序列(例如列表)的字典。当传递一个字典列表时,底层语句执行将使用 DBAPI 的 cursor.executemany() 方法。当传递一个单一字典时,将使用 DBAPI 的 cursor.execute() 方法。
  • execution_options – 可选的执行选项字典,它将与语句执行相关联。该字典可以提供一组接受 Connection.execution_options() 的选项的子集。

返回:

一个 Result 对象。

method execution_options(**opt: Any) → Connection

设置在执行期间生效的连接非 SQL 选项。

此方法在原地修改了这个 Connection;返回值是调用该方法的相同 Connection 对象。请注意,这与其他对象(如 Engine.execution_options()Executable.execution_options())上的 execution_options 方法的行为相反。其原理是,许多此类执行选项在任何情况下都会修改基本 DBAPI 连接的状态,因此没有可行的方法将此类选项的效果局限于“子”连接。

2.0 版本中的变化:Connection.execution_options() 方法与具有此方法的其他对象不同,它会在原地修改连接而不创建副本。

如其他地方所述,Connection.execution_options() 方法接受任意的参数,包括用户定义的名称。所有给定的参数都可以以多种方式被使用,包括使用 Connection.get_execution_options() 方法。请参阅 Executable.execution_options()Engine.execution_options() 中的示例。

SQLAlchemy 自身当前识别的关键字包括所有 Executable.execution_options() 下列出的关键字,以及特定于 Connection 的其他关键字。

参数:

  • compiled_cache
    可用于:ConnectionEngine
    一个字典,当 Connection 将子句表达式编译为 Compiled 对象时,Compiled 对象将被缓存。这个字典将覆盖可能在 Engine 上配置的语句缓存。如果设置为 None,则禁用缓存,即使引擎配置了缓存大小。
    请注意,ORM 在某些操作中使用了自己的“已编译”缓存,包括 flush 操作。ORM 内部使用的缓存会覆盖此处指定的缓存字典。
  • logging_token
    可用于:ConnectionEngineExecutable
    在连接记录的日志消息中添加由括号括起的指定字符串令牌,即启用了 create_engine.echo 标志或通过 logging.getLogger("sqlalchemy.engine") 记录器启用的日志记录。这允许可用于调试并发连接场景的每个连接或每个子引擎令牌。
    版本 1.4.0b2 中的新功能。
    另请参阅
    设置每个连接/子引擎令牌 - 使用示例
    create_engine.logging_name - 为 Python 日志记录器对象本身添加一个名称。
  • isolation_level
    可用于:Connection, Engine.
    为此Connection对象的生命周期设置事务隔离级别。有效值包括create_engine()传递给create_engine.isolation_level参数接受的那些字符串值。这些级别是半数据库特定的;请参阅各个方言文档以获取有效级别。
    隔离级别选项通过在 DBAPI 连接上发出语句来应用隔离级别,并必然会影响原始 Connection 对象的整体。隔离级别将保持在给定设置,直到明确更改,或者当 DBAPI 连接本身被释放到连接池时,即调用Connection.close()方法时,此时事件处理程序将在 DBAPI 连接上发出附加语句,以恢复隔离级别更改。
    注意
    isolation_level执行选项只能在调用Connection.begin()方法之前建立,以及在发出任何否则会触发“自动开始”的 SQL 语句之前,或者在调用Connection.commit()Connection.rollback()之后直接调用。数据库无法更改进行中的事务的隔离级别。
    注意
    如果通过Connection.invalidate()方法使Connection无效,或者发生断开连接错误,则isolation_level执行选项会被隐式重置。在无效后产生的新连接将不会自动重新应用所选的隔离级别。
    另请参阅
    设置事务隔离级别,包括 DBAPI 自动提交
    Connection.get_isolation_level() - 查看当前实际级别
  • no_parameters
    可用于:ConnectionExecutable
    当为 True 时,如果最终的参数列表或字典完全为空,则会像 cursor.execute(statement) 那样在游标上调用语句,完全不传递参数集合。一些 DBAPI,如 psycopg2 和 mysql-python,只有在存在参数时才将百分号视为重要;这个选项允许代码生成包含百分号(可能还有其他字符)的 SQL,不管它是由 DBAPI 执行还是被管道传输到稍后由命令行工具调用的脚本中。
  • stream_results
    可用于:ConnectionExecutable
    如果可能的话,向方言指示结果应该是“流式”的,而不是预先缓冲的。对于诸如 PostgreSQL、MySQL 和 MariaDB 等后端,这表示使用“服务器端游标”而不是客户端游标。其他后端,如 Oracle 的后端,可能已经默认使用服务器端游标。
    通常,使用 Connection.execution_options.stream_results 与设置要以批次获取的固定行数结合使用,以便在同时不一次加载所有结果行到内存中的情况下有效迭代数据库行;可以在执行返回一个新的 Result 对象后,通过 Result.yield_per() 方法在 Result 对象上进行配置。如果未使用 Result.yield_per(),则 Connection.execution_options.stream_results 操作模式将使用一个动态大小的缓冲区,它会一次缓冲一组行,根据固定的增长大小在每个批次上增长,直到通过使用 Connection.execution_options.max_row_buffer 参数进行配置的限制为止。
    当使用 ORM 从结果中获取 ORM 映射对象时,应始终使用 Result.yield_per()Connection.execution_options.stream_results 一起使用,以便 ORM 不会一次将所有行都提取到新的 ORM 对象中。
    对于典型用法,应优先考虑 Connection.execution_options.yield_per 执行选项,该选项一次设置了 Connection.execution_options.stream_resultsResult.yield_per()。此选项在 Connection 以及 ORM Session 的核心级别都受支持;后者在 使用 Yield Per 获取大结果集 中进行了描述。
    另请参阅
    使用服务器端游标(也称为流式结果) - 关于 Connection.execution_options.stream_results 的背景信息
    Connection.execution_options.max_row_buffer
    Connection.execution_options.yield_per
    使用 Yield Per 获取大结果集 - 在 ORM 查询指南 中描述了 yield_per 的 ORM 版本
  • max_row_buffer
    可用于:ConnectionExecutable。当在支持服务器端游标的后端使用 Connection.execution_options.stream_results 执行选项时,设置要使用的最大缓冲区大小。如果未指定默认值,则默认值为 1000。
    另请参阅
    Connection.execution_options.stream_results
    使用服务器端游标(也称为流式结果)
  • yield_per
    可用于:ConnectionExecutable。设置 Connection.execution_options.stream_results 执行选项的整数值,并立即自动调用 Result.yield_per()。允许等效的功能,与使用此参数时与 ORM 存在的功能相同。
    版本 1.4.40 中的新增内容。
    另请参见
    使用服务器端游标(又名流式结果) - 关于在核心中使用服务器端游标的背景和示例。
    使用 yield_per 逐步获取大型结果集 - 在 ORM 查询指南 中描述了 ORM 版本的 yield_per
  • insertmanyvalues_page_size
    可用于:ConnectionEngine。将要格式化为 INSERT 语句的行数,当语句使用“insertmanyvalues”模式时,该模式是一种分页形式的批量插入,通常与 executemany 执行结合使用,用于许多后端,通常与 RETURNING 一起使用。默认为 1000。还可以使用 create_engine.insertmanyvalues_page_size 参数在每个引擎的基础上进行修改。
    版本 2.0 中的新增内容。
    另请参见
    插入语句的“插入多个值”行为
  • schema_translate_map
    可用于:ConnectionEngineExecutable
    将模式名称映射到模式名称的字典,将应用于编译 SQL 或 DDL 表达式元素为字符串时遇到的每个 TableTable.schema 元素;结果模式名称将根据原始名称在映射中的存在进行转换。
    另请参见
    模式名称的翻译
  • preserve_rowcount
    布尔值;当为 True 时,cursor.rowcount 属性将无条件地被记忆在结果中,并通过 CursorResult.rowcount 属性提供。通常,此属性仅对 UPDATE 和 DELETE 语句保留。使用此选项,可以访问 DBAPI 的 rowcount 值,以用于 INSERT 和 SELECT 等其他类型的语句,只要 DBAPI 支持这些语句。有关此属性行为的说明,请参阅 CursorResult.rowcount
    版本 2.0.28 中的新内容。


SqlAlchemy 2.0 中文文档(四十四)(4)https://developer.aliyun.com/article/1563071

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
4月前
|
SQL 存储 API
SqlAlchemy 2.0 中文文档(四十四)(6)
SqlAlchemy 2.0 中文文档(四十四)
85 4
|
4月前
|
SQL 缓存 关系型数据库
SqlAlchemy 2.0 中文文档(四十四)(2)
SqlAlchemy 2.0 中文文档(四十四)
81 4
|
4月前
|
存储 缓存 数据库
SqlAlchemy 2.0 中文文档(四十四)(5)
SqlAlchemy 2.0 中文文档(四十四)
90 4
|
4月前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(四十四)(9)
SqlAlchemy 2.0 中文文档(四十四)
50 3
|
4月前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(四十四)(4)
SqlAlchemy 2.0 中文文档(四十四)
53 3
|
4月前
|
SQL 存储 关系型数据库
SqlAlchemy 2.0 中文文档(三十四)(4)
SqlAlchemy 2.0 中文文档(三十四)
45 1
|
4月前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(三十四)(5)
SqlAlchemy 2.0 中文文档(三十四)
40 0
|
4月前
|
SQL 关系型数据库 MySQL
SqlAlchemy 2.0 中文文档(四十四)(7)
SqlAlchemy 2.0 中文文档(四十四)
48 0
|
4月前
|
SQL 关系型数据库 MySQL
SqlAlchemy 2.0 中文文档(四十四)(1)
SqlAlchemy 2.0 中文文档(四十四)
100 0
|
4月前
|
SQL 存储 缓存
SqlAlchemy 2.0 中文文档(四十四)(8)
SqlAlchemy 2.0 中文文档(四十四)
64 0