SqlAlchemy 2.0 中文文档(四十四)(7)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: SqlAlchemy 2.0 中文文档(四十四)

SqlAlchemy 2.0 中文文档(四十四)(6)https://developer.aliyun.com/article/1563073


设置事务隔离级别,包括 DBAPI 自动提交

大多数 DBAPI 支持可配置的事务隔离级别的概念。这些传统上是四个级别,“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ” 和 “SERIALIZABLE”。这些通常在 DBAPI 连接开始新事务之前应用,需要注意的是,当首次发出 SQL 语句时,大多数 DBAPI 会隐式开始此事务。

支持隔离级别的 DBAPI 通常也支持真正的“自动提交”概念,这意味着 DBAPI 连接本身将被放置在非事务性自动提交模式中。这通常意味着数据库自动发出“BEGIN”的典型 DBAPI 行为不再发生,但也可能包括其他指令。SQLAlchemy 将“自动提交”的概念视为任何其他隔离级别;因为它是一个不仅失去“读已提交”而且失去原子性的隔离级别。

提示

需要注意的是,如下面部分将在 理解 DBAPI 级别的自动提交隔离级别 中进一步讨论的那样,“自动提交”隔离级别像任何其他隔离级别一样不会影响 Connection 对象的“事务”行为,该对象继续调用 DBAPI 的 .commit().rollback() 方法(它们在自动提交模式下没有效果),并且 .begin() 方法假定 DBAPI 将隐式启动事务(这意味着 SQLAlchemy 的“begin”不会更改自动提交模式)。

SQLAlchemy 方言应尽可能支持这些隔离级别以及自动提交。

设置连接的隔离级别或 DBAPI 自动提交

对于从 Engine.connect() 获得的每个单独的 Connection 对象,可以使用 Connection.execution_options() 方法为该 Connection 对象设置隔离级别。该参数被称为 Connection.execution_options.isolation_level,其值通常为以下名称的子集:

# possible values for Connection.execution_options(isolation_level="<value>")
"AUTOCOMMIT"
"READ COMMITTED"
"READ UNCOMMITTED"
"REPEATABLE READ"
"SERIALIZABLE"

并非每个 DBAPI 都支持每个数值;如果在某个后端使用了不支持的值,将会引发错误。

例如,要强制在特定连接上使用 REPEATABLE READ,然后开始一个事务:

with engine.connect().execution_options(
    isolation_level="REPEATABLE READ"
) as connection:
    with connection.begin():
        connection.execute(text("<statement>"))

提示

Connection.execution_options() 方法的返回值是调用该方法的同一个 Connection 对象,这意味着它直接修改了该 Connection 对象的状态。这是 SQLAlchemy 2.0 新增的行为。这个行为不适用于 Engine.execution_options() 方法;该方法仍然返回一个 Engine 的副本,并且如下所述,可以用于构建具有不同执行选项的多个 Engine 对象,但它们仍然共享相同的方言和连接池。

注意

Connection.execution_options.isolation_level 参数不适用于语句级别选项,例如 Executable.execution_options() 的选项,并且如果在此级别设置,将被拒绝。这是因为该选项必须在每个事务基础上针对 DBAPI 连接进行设置。

设置引擎的隔离级别或 DBAPI 自动提交

Connection.execution_options.isolation_level 选项也可以设置为引擎范围内,通常更可取。这可以通过将 create_engine.isolation_level 参数传递给 create_engine() 来实现:

from sqlalchemy import create_engine
eng = create_engine(
    "postgresql://scott:tiger@localhost/test", isolation_level="REPEATABLE READ"
)

使用以上设置,每个新的 DBAPI 连接在创建时将被设置为在所有后续操作中使用"REPEATABLE READ"隔离级别设置。

维护单个引擎的多个隔离级别

隔离级别也可以针对每个引擎进行设置,使用create_engine.execution_options参数或Engine.execution_options()方法,后者将创建一个Engine的副本,该副本共享方言和连接池,但具有自己的每个连接的隔离级别设置:

from sqlalchemy import create_engine
eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    execution_options={"isolation_level": "REPEATABLE READ"},
)

使用以上设置,DBAPI 连接将被设置为在每个新的事务开始时使用"REPEATABLE READ"隔离级别设置;但是,作为池化的连接将被重置为在连接首次发生时存在的原始隔离级别。在create_engine()的级别上,最终效果与使用create_engine.isolation_level参数没有任何不同。

然而,经常选择在不同隔离级别中运行操作的应用程序可能希望创建多个“子引擎”来使用一个主Engine,其中每个引擎将配置为不同的隔离级别。一个这样的用例是一个应用程序,它有分为“事务性”和“只读”操作的操作,一个单独的Engine,使用"AUTOCOMMIT"可能被分离出来,从主引擎中分离出来:

from sqlalchemy import create_engine
eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

上面,Engine.execution_options()方法创建了原始Engine的浅层副本。engautocommit_engine共享相同的方言和连接池。然而,“AUTOCOMMIT”模式将在从autocommit_engine获取连接时设置。

无论设置的隔离级别是什么,在连接返回到连接池时都会无条件地恢复。

另见

SQLite 事务隔离

PostgreSQL 事务隔离

MySQL 事务隔离

SQL Server 事务隔离

Oracle 事务隔离

设置事务隔离级别 / DBAPI 自动提交 - 用于 ORM

使用 DBAPI 自动提交允许只读版本的透明重连接 - 一种利用 DBAPI 自动提交来对数据库进行透明重连接以进行只读操作的方法 ### 理解 DBAPI 级别的自动提交隔离级别

在上一部分中,我们介绍了 Connection.execution_options.isolation_level 参数的概念以及如何使用它来设置数据库隔离级别,包括 SQLAlchemy 将其视为另一个事务隔离级别的 DBAPI 级别的“自动提交”。在本节中,我们将试图澄清这种方法的含义。

如果我们想要检查一个 Connection 对象并使用它的“自动提交”模式,我们将按如下方式进行:

with engine.connect() as connection:
    connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(text("<statement>"))
    connection.execute(text("<statement>"))

上面说明了“DBAPI 自动提交”模式的正常使用。无需使用 Connection.begin()Connection.commit() 等方法,因为所有语句都会立即提交到数据库。当块结束时,Connection 对象将恢复“自动提交”隔离级别,并且 DBAPI 连接将释放到连接池,其中 DBAPI 的 connection.rollback() 方法通常会被调用,但是由于上面的语句已经提交了,这个回滚对数据库状态没有任何改变。

需要注意的是,“自动提交”模式甚至在调用 Connection.begin() 方法时也会持续存在;DBAPI 不会向数据库发送任何 BEGIN,也不会在调用 Connection.commit() 时发送 COMMIT。这种用法也不是错误的情景,因为可以预期“自动提交”隔离级别可能被应用于原本假设处于事务上下文的代码;毕竟,“隔离级别”本身就是事务的配置细节,就像任何其他隔离级别一样。

在下面的示例中,无论是什么样的 SQLAlchemy 级别的事务块,语句都保持 自动提交

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    # this begin() does not affect the DBAPI connection, isolation stays at AUTOCOMMIT
    with connection.begin() as trans:
        connection.execute(text("<statement>"))
        connection.execute(text("<statement>"))

当我们像上面的示例一样运行一个带有日志记录的代码块时,日志记录将尝试指示,尽管调用了 DBAPI 级别的 .commit(),但由于自动提交模式,它可能没有任何效果:

INFO sqlalchemy.engine.Engine BEGIN (implicit)
...
INFO sqlalchemy.engine.Engine COMMIT using DBAPI connection.commit(), DBAPI should ignore due to autocommit mode

同时,尽管我们正在使用“DBAPI autocommit”,但 SQLAlchemy 的事务语义,即Connection.begin()的 Python 内部行为以及“autobegin”的行为,仍然存在,尽管这些不会影响 DBAPI 连接本身。为了说明,下面的代码将引发错误,因为在自动开始已经发生后调用了Connection.begin()

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    # "transaction" is autobegin (but has no effect due to autocommit)
    connection.execute(text("<statement>"))
    # this will raise; "transaction" is already begun
    with connection.begin() as trans:
        connection.execute(text("<statement>"))

上述示例还展示了“autocommit”隔离级别是底层数据库事务的配置细节,并且独立于 SQLAlchemy 连接对象的开始/提交行为。 “autocommit”模式不会以任何方式与Connection.begin()交互,并且在执行有关事务的自身状态更改时(除了在引擎日志中建议这些块实际上并没有提交之外),Connection不会查询此状态。这种设计的理念是保持与Connection完全一致的使用模式,其中 DBAPI 自动提交模式可以独立更改,而无需指示其他地方的任何代码更改。

在不同隔离级别之间切换

隔离级别设置,包括自动提交模式,在连接释放回连接池时会自动重置。因此,最好避免尝试在单个Connection对象上切换隔离级别,因为这会导致冗余性过高。

为了演示如何在单个Connection检出的范围内以临时模式使用“autocommit”,必须重新应用Connection.execution_options.isolation_level参数以恢复先前的隔离级别。前一节说明了在进行自动提交时尝试调用Connection.begin()来启动事务的尝试;我们可以通过在调用Connection.begin()之前先恢复隔离级别来重写该示例:

# if we wanted to flip autocommit on and off on a single connection/
# which... we usually don't.
with engine.connect() as connection:
    connection.execution_options(isolation_level="AUTOCOMMIT")
    # run statement(s) in autocommit mode
    connection.execute(text("<statement>"))
    # "commit" the autobegun "transaction"
    connection.commit()
    # switch to default isolation level
    connection.execution_options(isolation_level=connection.default_isolation_level)
    # use a begin block
    with connection.begin() as trans:
        connection.execute(text("<statement>"))

上面,为了手动恢复隔离级别,我们利用了Connection.default_isolation_level来恢复默认的隔离级别(假设这是我们想要的)。然而,更好的做法可能是使用Connection的架构,该架构已经在检入时自动处理重置隔离级别。编写上述内容的首选方式是使用两个块。

# use an autocommit block
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as connection:
    # run statement in autocommit mode
    connection.execute(text("<statement>"))
# use a regular block
with engine.begin() as connection:
    connection.execute(text("<statement>"))

总结一下:

  1. “DBAPI 级别自动提交”隔离级别完全独立于Connection对象对“开始”和“提交”的概念。
  2. 使用单独的Connection检出每个隔离级别。避免在单个连接检出之间试图来回切换“自动提交”;让引擎来恢复默认的隔离级别。

设置连接的隔离级别或 DBAPI 自动提交

对于从Engine.connect()获取的单个Connection对象,可以使用Connection.execution_options()方法来设置该Connection对象的隔离级别。参数称为Connection.execution_options.isolation_level,其值通常是以下名称的子集:

# possible values for Connection.execution_options(isolation_level="<value>")
"AUTOCOMMIT"
"READ COMMITTED"
"READ UNCOMMITTED"
"REPEATABLE READ"
"SERIALIZABLE"

并非每个 DBAPI 都支持每个值;如果对于某个后端使用了不支持的值,则会引发错误。

例如,要在特定连接上强制使用可重复读,然后开始一个事务:

with engine.connect().execution_options(
    isolation_level="REPEATABLE READ"
) as connection:
    with connection.begin():
        connection.execute(text("<statement>"))

提示

Connection.execution_options() 方法的返回值是调用该方法的相同 Connection 对象,这意味着它会直接修改 Connection 对象的状态。这是 SQLAlchemy 2.0 的新行为。这种行为不适用于 Engine.execution_options() 方法;该方法仍然返回一个 Engine 的副本,并且如下所述,可以用于构建具有不同执行选项的多个 Engine 对象,但它们仍然共享相同的方言和连接池。

注意

Connection.execution_options.isolation_level 参数不适用于语句级别选项,例如 Executable.execution_options(),如果在此级别设置将会被拒绝。这是因为该选项必须在每个事务的 DBAPI 连接上设置。

设置引擎的隔离级别或 DBAPI 自动提交

Connection.execution_options.isolation_level 选项也可以设置为整个引擎范围内,通常更可取。这可以通过将 create_engine.isolation_level 参数传递给 create_engine() 来实现:

from sqlalchemy import create_engine
eng = create_engine(
    "postgresql://scott:tiger@localhost/test", isolation_level="REPEATABLE READ"
)

使用上述设置,每个新的 DBAPI 连接在创建时将被设置为对所有后续操作使用 "REPEATABLE READ" 隔离级别设置。

为单个引擎维护多个隔离级别

隔离级别也可以针对每个引擎进行设置,具有更大的灵活性,可以使用 create_engine.execution_options 参数传递给 create_engine()Engine.execution_options() 方法,后者将创建一个与原始引擎共享方言和连接池但具有自己的每个连接隔离级别设置的 Engine 的副本:

from sqlalchemy import create_engine
eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    execution_options={"isolation_level": "REPEATABLE READ"},
)

通过上述设置,每开始一个新事务时,DBAPI 连接将设置为使用 "REPEATABLE READ" 隔离级别设置;但是连接在被池化时将被重置为首次发生连接时存在的原始隔离级别。在 create_engine() 的级别上,最终效果与使用 create_engine.isolation_level 参数没有任何不同。

然而,频繁选择在不同隔离级别内运行操作的应用程序可能希望创建多个 “子引擎” 以及一个主 Engine,每个引擎都配置为不同的隔离级别。一个这样的用例是一个具有分为 “事务性” 和 “只读” 操作的应用程序,可以从主引擎中分离出一个单独的 Engine,该引擎使用 "AUTOCOMMIT"

from sqlalchemy import create_engine
eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

上述,Engine.execution_options() 方法创建原 Engine 的一个浅拷贝。engautocommit_engine 共享相同的方言和连接池。然而,当从 autocommit_engine 获取连接时,将设置 “AUTOCOMMIT” 模式。

无论隔离级别设置是哪一个,在连接返回到连接池时都会无条件地恢复。

另请参见

SQLite 事务隔离

PostgreSQL 事务隔离

MySQL 事务隔离

SQL Server 事务隔离

Oracle 事务隔离

设置事务隔离级别 / DBAPI AUTOCOMMIT - 用于 ORM

使用 DBAPI 自动提交允许用于只读版本的透明重新连接 - 一个使用 DBAPI 自动提交来透明地重新连接到数据库进行只读操作的示例

理解 DBAPI 级别的自动提交隔离级别

在上一节中,我们介绍了Connection.execution_options.isolation_level参数的概念以及如何使用它来设置数据库隔离级别,包括由 SQLAlchemy 视为另一种事务隔离级别的 DBAPI 级别“自动提交”。在本节中,我们将尝试澄清这种方法的含义。

如果我们想要检查一个Connection对象并将其用于“自动提交”模式,我们将按如下步骤进行:

with engine.connect() as connection:
    connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(text("<statement>"))
    connection.execute(text("<statement>"))

上面说明了“DBAPI 自动提交”模式的正常用法。无需使用诸如Connection.begin()Connection.commit()等方法,因为所有语句都会立即提交到数据库。当块结束时,Connection对象将恢复“自动提交”隔离级别,并且 DBAPI 连接被释放到连接池,其中 DBAPI connection.rollback()方法通常会被调用,但由于上述语句已经提交,此回滚对数据库状态没有任何更改。

需要注意的是,“自动提交”模式在调用Connection.begin()方法时仍然持续存在;DBAPI 不会向数据库发出任何 BEGIN,也不会在调用Connection.commit()时发出 COMMIT。这种用法也不是错误场景,因为预期可能会将“自动提交”隔离级别应用于原本假定了事务上下文的代码;毕竟,“隔离级别”本身就像任何其他隔离级别一样,是事务本身的配置细节。

在下面的示例中,语句仍然自动提交,而不管 SQLAlchemy 级别的事务块如何:

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    # this begin() does not affect the DBAPI connection, isolation stays at AUTOCOMMIT
    with connection.begin() as trans:
        connection.execute(text("<statement>"))
        connection.execute(text("<statement>"))

当我们像上面那样带有日志记录的运行一个块时,日志记录将尝试指出,虽然调用了 DBAPI 级别的.commit(),但由于自动提交模式,它可能不会产生任何效果:

INFO sqlalchemy.engine.Engine BEGIN (implicit)
...
INFO sqlalchemy.engine.Engine COMMIT using DBAPI connection.commit(), DBAPI should ignore due to autocommit mode

与此同时,即使我们使用了“DBAPI 自动提交”,SQLAlchemy 的事务语义,即Connection.begin()的 Python 内部行为以及“autobegin”的行为仍然存在,即使这些行为不影响 DBAPI 连接本身。为了说明,下面的代码将引发错误,因为在自动开始已经发生后调用了Connection.begin()

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
    # "transaction" is autobegin (but has no effect due to autocommit)
    connection.execute(text("<statement>"))
    # this will raise; "transaction" is already begun
    with connection.begin() as trans:
        connection.execute(text("<statement>"))

上面的例子还演示了相同的主题,即“自动提交”隔离级别是底层数据库事务的配置细节,与 SQLAlchemy 连接对象的 begin/commit 行为无关。 “自动提交”模式不会以任何方式与Connection.begin()交互,且Connection在执行自身与事务相关的状态更改时不会查询此状态(除了在引擎日志中建议这些块实际上并未提交之外)。这种设计的理由是保持与Connection完全一致的使用模式,其中 DBAPI 自动提交模式可以独立更改,而无需指示其他位置的代码更改。

在不同隔离级别之间切换

隔离级别设置,包括自动提交模式,在连接释放回连接池时会自动重置。因此,最好避免尝试在单个Connection对象上切换隔离级别,因为这会导致冗余的繁琐。

为了说明如何在单个Connection检出的范围内以即时模式使用“自动提交”,必须重新应用Connection.execution_options.isolation_level参数以恢复先前的隔离级别。前一节演示了在自动提交进行时调用Connection.begin()以启动事务的尝试;我们可以通过在调用Connection.begin()之前先恢复隔离级别来重写该示例,以实际执行:

# if we wanted to flip autocommit on and off on a single connection/
# which... we usually don't.
with engine.connect() as connection:
    connection.execution_options(isolation_level="AUTOCOMMIT")
    # run statement(s) in autocommit mode
    connection.execute(text("<statement>"))
    # "commit" the autobegun "transaction"
    connection.commit()
    # switch to default isolation level
    connection.execution_options(isolation_level=connection.default_isolation_level)
    # use a begin block
    with connection.begin() as trans:
        connection.execute(text("<statement>"))

在上面的例子中,为了手动恢复隔离级别,我们使用Connection.default_isolation_level来恢复默认的隔离级别(假设这是我们想要的)。然而,最好的做法可能是利用Connection的架构,该架构已经在检查时自动处理了隔离级别的重置。上述写法的首选方式是使用两个块。

# use an autocommit block
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as connection:
    # run statement in autocommit mode
    connection.execute(text("<statement>"))
# use a regular block
with engine.begin() as connection:
    connection.execute(text("<statement>"))

总结一下:

  1. “DBAPI 级别的自动提交”隔离级别完全独立于Connection对象对“begin”和“commit”的概念。
  2. 使用每个隔离级别的单独 Connection 检出。避免在单个连接检出上来回切换“自动提交”;让引擎来恢复默认的隔离级别。
在不同隔离级别之间切换

隔离级别设置,包括自动提交模式,在连接释放回连接池时会自动重置。因此,最好避免尝试在单个 Connection 对象上切换隔离级别,因为这会导致过多的冗余。

为了说明如何在单个 Connection 检出的范围内以临时模式使用“自动提交”,必须重新应用 Connection.execution_options.isolation_level 参数以前的隔离级别。上一节演示了在自动提交进行时调用 Connection.begin() 来启动事务的尝试;我们可以通过在调用 Connection.begin() 之前首先恢复隔离级别来重写该示例:

# if we wanted to flip autocommit on and off on a single connection/
# which... we usually don't.
with engine.connect() as connection:
    connection.execution_options(isolation_level="AUTOCOMMIT")
    # run statement(s) in autocommit mode
    connection.execute(text("<statement>"))
    # "commit" the autobegun "transaction"
    connection.commit()
    # switch to default isolation level
    connection.execution_options(isolation_level=connection.default_isolation_level)
    # use a begin block
    with connection.begin() as trans:
        connection.execute(text("<statement>"))

上面,在手动恢复隔离级别时,我们使用了 Connection.default_isolation_level 来恢复默认的隔离级别(假设这是我们想要的)。然而,更好的做法可能是利用 Connection 的架构,该架构已经在检入时自动处理重置隔离级别。上述写法的首选方式是使用两个代码块。

# use an autocommit block
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as connection:
    # run statement in autocommit mode
    connection.execute(text("<statement>"))
# use a regular block
with engine.begin() as connection:
    connection.execute(text("<statement>"))

总结一下:

  1. “DBAPI 级别自动提交”隔离级别完全独立于 Connection 对象的“开始”和“提交”概念。
  2. 使用每个隔离级别的单独 Connection 检出。避免在单个连接检出上来回切换“自动提交”;让引擎来恢复默认的隔离级别。

使用服务器端游标(又名流式结果)

一些后端提供对“服务器端游标”与“客户端游标”的概念的明确支持。这里的客户端游标意味着数据库驱动程序在从语句执行返回之前完全将所有行从结果集中获取到内存中。例如,像 PostgreSQL 和 MySQL/MariaDB 这样的驱动程序通常默认使用客户端游标。相比之下,服务器端游标表示结果行在客户端消耗时保留在数据库服务器的状态中。例如,Oracle 的驱动程序通常使用“服务器端”模型,而 SQLite 方言虽然不使用真正的“客户端/服务器”架构,但仍使用一种未缓冲的结果获取方法,将结果行保留在进程内存之外,直到它们被消耗。

从这个基本架构可以得出,“服务器端游标”在获取非常大的结果集时更加内存高效,同时可能会在客户端/服务器通信过程中引入更多复杂性,并且对于小的结果集(通常少于 10000 行)效率更低。

对于那些有条件支持缓冲或未缓冲结果的方言,通常使用“未缓冲”或服务器端游标模式时会有注意事项。例如,当使用 psycopg2 方言时,如果使用服务器端游标与任何类型的 DML 或 DDL 语句,则会引发错误。当使用带有服务器端游标的 MySQL 驱动程序时,DBAPI 连接处于更脆弱的状态,并且不会像从容处理错误条件那样优雅,也不会允许回滚操作继续进行,直到游标完全关闭。

因此,SQLAlchemy 的方言总是默认为游标的较少错误版本,这意味着对于 PostgreSQL 和 MySQL 方言,默认情况下使用缓冲的“客户端”游标,在调用游标的任何获取方法之前将结果集完全拉入内存。这种操作模式在绝大多数情况下都是适当的;未缓冲的游标通常除了在应用程序以分块方式获取非常大量的行的罕见情况下,此类行的处理可以在获取更多行之前完成之外,一般没有用处。

对于提供客户端和服务器端游标选项的数据库驱动程序,Connection.execution_options.stream_resultsConnection.execution_options.yield_per 执行选项提供了在每个 Connection 或每个语句基础上访问“服务器端游标”的能力。在使用 ORM Session 时也存在类似的选项。

通过 yield_per 进行固定缓冲的流式处理

由于单独的行提取操作使用完全未缓冲的服务器端游标通常比一次提取多行的批次更昂贵,Connection.execution_options.yield_per执行选项配置了一个Connection或语句以使用可用的服务器端游标,同时配置了一个固定大小的行缓冲区,该缓冲区将在消耗时按批次从服务器检索行。使用Connection.execution_options()方法在Connection上或在语句上使用Executable.execution_options()方法可以将此参数设置为正整数值。

从 1.4.40 版本开始新增:Connection.execution_options.yield_per作为一个仅限核心的选项是自 SQLAlchemy 1.4.40 版本开始新增的;对于先前的 1.4 版本,请直接使用Connection.execution_options.stream_resultsResult.yield_per()方法的组合。

使用此选项相当于手动设置Connection.execution_options.stream_results选项,该选项在下一节中描述,并在给定整数值的情况下调用Result.yield_per()方法。在这两种情况下,这种组合的效果包括:

  • 如果可用且尚未是该后端的默认行为,则为给定后端选择了服务器端游标模式。
  • 当结果行被获取时,它们将被分批缓冲,直到最后一批,每个批次的大小将等于传递给Connection.execution_options.yield_per选项或Result.yield_per()方法的整数参数;然后最后一批会根据剩余行数小于此大小来确定大小。
  • 如果使用Result.partitions()方法,默认使用的分区大小将与此整数大小相等。

下面的示例说明了这三种行为:

with engine.connect() as conn:
    with conn.execution_options(yield_per=100).execute(
        text("select * from table")
    ) as result:
        for partition in result.partitions():
            # partition is an iterable that will be at most 100 items
            for row in partition:
                print(f"{row}")

上述示例说明了将 yield_per=100 与使用 Result.partitions() 方法结合起来,在与从服务器获取的大小相匹配的批次中处理行。使用 Result.partitions() 是可选的,如果直接迭代 Result,将为每获取 100 行新建一个行批次缓冲区。不应使用诸如 Result.all() 的方法,因为这会一次性获取所有剩余的行,并且会使使用 yield_per 的目的丧失。

提示

如上所示,Result 对象可以用作上下文管理器。当使用服务器端游标进行迭代时,这是确保 Result 对象关闭的最佳方法,即使在迭代过程中发生异常也是如此。

Connection.execution_options.yield_per 选项同样适用于 ORM,由 Session 使用以获取 ORM 对象,在这里它还限制了一次生成的 ORM 对象的数量。有关使用 Connection.execution_options.yield_per 与 ORM 的进一步背景,请参阅 ORM 查询指南 中的 使用 Yield Per 获取大型结果集 部分。

新版本 1.4.40 中新增了 Connection.execution_options.yield_per 作为核心级执行选项,方便设置流式结果、缓冲区大小和分区大小,以一种可转移至 ORM 的类似用例方式进行设置。

使用 stream_results 实现动态增长缓冲区进行流式处理

要在没有特定分区大小的情况下启用服务器端游标,可以使用Connection.execution_options.stream_results选项,类似于Connection.execution_options.yield_per,可以在Connection对象或语句对象上调用。

当使用Connection.execution_options.stream_results选项直接迭代传递的Result对象时,内部使用默认的缓冲方案来获取行,首先缓冲一小组行,然后在每次获取时缓冲越来越大的缓冲区,直到预先配置的 1000 行的限制。可以使用Connection.execution_options.max_row_buffer执行选项来影响此缓冲区的最大大小:

with engine.connect() as conn:
    with conn.execution_options(stream_results=True, max_row_buffer=100).execute(
        text("select * from table")
    ) as result:
        for row in result:
            print(f"{row}")

Connection.execution_options.stream_results选项与Result.partitions()方法结合时,应该向Result.partitions()传递特定的分区大小,以避免获取整个结果集。设置使用Result.partitions()方法时,通常更直接的方法是使用Connection.execution_options.yield_per选项。

另请参见

使用 Yield Per 获取大型结果集 - 在 ORM 查询指南中

Result.partitions()

Result.yield_per()

通过 yield_per 使用固定缓冲区进行流式传输

由于单独的行获取操作与完全无缓冲的服务器端游标通常比一次获取多行的批次更昂贵,Connection.execution_options.yield_per执行选项配置Connection或语句以利用服务器端可用的游标,同时配置一定大小的行缓冲区,以批次方式从服务器检索行,因为它们被使用。这个参数可以使用Connection.execution_options()方法设置为正整数值,也可以在语句上使用Executable.execution_options()方法设置。

新版本 1.4.40 中新增:Connection.execution_options.yield_per作为一个仅限于核心的选项是 SQLAlchemy 1.4.40 中的新功能;对于先前的 1.4 版本,请直接使用 Connection.execution_options.stream_resultsResult.yield_per() 结合使用。

使用此选项等效于手动设置下一节中描述的Connection.execution_options.stream_results选项,然后在给定的整数值上调用Result.yield_per()方法。在这两种情况下,此组合的效果包括:

  • 如果可用且尚未是该后端的默认行为,则为给定后端选择服务器端游标模式
  • 当结果行被获取时,它们将被分批缓冲,每个批次的大小直到最后一个批次将等于传递给Connection.execution_options.yield_per选项或Result.yield_per()方法的整数参数;然后最后一个批次根据少于这个大小的剩余行大小进行调整
  • 如果使用Result.partitions()方法,则默认分区大小也将设置为此整数大小。

这三种行为如下示例所示:

with engine.connect() as conn:
    with conn.execution_options(yield_per=100).execute(
        text("select * from table")
    ) as result:
        for partition in result.partitions():
            # partition is an iterable that will be at most 100 items
            for row in partition:
                print(f"{row}")

上述示例说明了使用yield_per=100与使用Result.partitions()方法一起以批量处理与从服务器获取的大小匹配的行的组合。使用Result.partitions()是可选的,如果直接迭代Result,则每 100 行获取一次新的行批量。不应使用诸如Result.all()之类的方法,因为这将一次性完全获取所有剩余的行,从而使使用yield_per的目的失效。

提示

Result对象可以像上面示例的那样用作上下文管理器。当使用服务器端游标进行迭代时,这是确保Result对象关闭的最佳方法,即使在迭代过程中引发异常。

Connection.execution_options.yield_per选项也可移植到 ORM,由Session用于获取 ORM 对象,在这里它还限制了一次生成的 ORM 对象的数量。有关在 ORM 中使用Connection.execution_options.yield_per的更多背景信息,请参阅使用 Yield Per 获取大结果集 - ORM 查询指南中的相应部分。

新版本 1.4.40 中新增了Connection.execution_options.yield_per作为核心级别的执行选项,可以方便地设置流式结果、缓冲区大小和分区大小,一次性完成,以便与 ORM 的类似用例相匹配。

使用 stream_results 进行动态增长缓冲区的流式传输

要在没有特定分区大小的情况下启用服务器端游标,可以使用 Connection.execution_options.stream_results 选项,这与 Connection.execution_options.yield_per 相似,可以在 Connection 对象或语句对象上调用。

当使用 Connection.execution_options.stream_results 选项直接迭代生成的 Result 对象时,内部使用默认的缓冲方案来获取行,该方案首先缓冲少量行,然后在每次获取时缓冲越来越多的行,直到预先配置的 1000 行的限制。可以使用 Connection.execution_options.max_row_buffer 执行选项来影响此缓冲区的最大大小:

with engine.connect() as conn:
    with conn.execution_options(stream_results=True, max_row_buffer=100).execute(
        text("select * from table")
    ) as result:
        for row in result:
            print(f"{row}")

Connection.execution_options.stream_results 选项与 Result.partitions() 方法结合使用时,应向 Result.partitions() 传递特定的分区大小,以避免获取整个结果集。通常,使用 Connection.execution_options.yield_per 选项设置使用 Result.partitions() 方法会更为直接。

另请参阅

使用 Yield Per 获取大型结果集 - 在 ORM 查询指南 中

Result.partitions()

Result.yield_per()


SqlAlchemy 2.0 中文文档(四十四)(8)https://developer.aliyun.com/article/1563075

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
4月前
|
SQL 缓存 关系型数据库
SqlAlchemy 2.0 中文文档(四十四)(2)
SqlAlchemy 2.0 中文文档(四十四)
81 4
|
4月前
|
SQL 存储 API
SqlAlchemy 2.0 中文文档(四十四)(6)
SqlAlchemy 2.0 中文文档(四十四)
85 4
|
4月前
|
存储 缓存 数据库
SqlAlchemy 2.0 中文文档(四十四)(5)
SqlAlchemy 2.0 中文文档(四十四)
90 4
|
4月前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(四十四)(9)
SqlAlchemy 2.0 中文文档(四十四)
50 3
|
4月前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(四十四)(4)
SqlAlchemy 2.0 中文文档(四十四)
53 3
|
4月前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(三十四)(5)
SqlAlchemy 2.0 中文文档(三十四)
40 0
|
4月前
|
SQL 存储 关系型数据库
SqlAlchemy 2.0 中文文档(三十四)(4)
SqlAlchemy 2.0 中文文档(三十四)
45 1
|
4月前
|
SQL 缓存 关系型数据库
SqlAlchemy 2.0 中文文档(四十四)(3)
SqlAlchemy 2.0 中文文档(四十四)
77 0
|
4月前
|
SQL 存储 缓存
SqlAlchemy 2.0 中文文档(四十四)(8)
SqlAlchemy 2.0 中文文档(四十四)
64 0
|
4月前
|
SQL 关系型数据库 MySQL
SqlAlchemy 2.0 中文文档(四十四)(1)
SqlAlchemy 2.0 中文文档(四十四)
100 0