使用引擎和连接
本节详细介绍了 Engine
、Connection
和相关对象的直接用法。值得注意的是,在使用 SQLAlchemy ORM 时,通常不直接访问这些对象;相反,Session
对象用作与数据库的接口。但是,对于以直接使用文本 SQL 语句和/或 SQL 表达式构造为中心,而不涉及 ORM 的高级管理服务的应用程序,Engine
和 Connection
是王者(和女王?)- 继续阅读。
基本用法
从引擎配置中回想起,Engine
是通过 create_engine()
调用创建的:
engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test")
create_engine()
的典型用法是针对每个特定的数据库 URL,在单个应用程序进程的生命周期中全局持有一次。一个 Engine
管理了许多个体的 DBAPI 连接代表该进程,并且旨在以并发方式调用。Engine
与 DBAPI 的 connect()
函数不是同义词,后者只代表一个连接资源 - 当应用程序的模块级别仅创建一次时,Engine
在效率上最高,而不是每个对象或每个函数调用一次。
Engine
最基本的功能是提供对 Connection
的访问,然后可以调用 SQL 语句。向数据库发送文本语句的示例:
from sqlalchemy import text with engine.connect() as connection: result = connection.execute(text("select username from users")) for row in result: print("username:", row.username)
上面,Engine.connect()
方法返回一个Connection
对象,通过在 Python 上下文管理器中使用它(例如 with:
语句),Connection.close()
方法会在块结束时自动调用。Connection
是一个代理对象,用于实际的 DBAPI 连接。DBAPI 连接是在创建Connection
时从连接池中检索的。
返回的对象称为CursorResult
,它引用一个 DBAPI 游标并提供类似于 DBAPI 游标的获取行的方法。当所有结果行(如果有)耗尽时,CursorResult
将关闭 DBAPI 游标。一个不返回行的CursorResult
,例如没有返回行的 UPDATE 语句,会在构造时立即释放游标资源。
当Connection
在with:
块结束时关闭时,引用的 DBAPI 连接将被释放到连接池中。从数据库本身的角度来看,连接池实际上不会“关闭”连接,假设池有空间来存储此连接以供下次使用。当连接返回到池中以供重新使用时,池机制会对 DBAPI 连接发出rollback()
调用,以便删除任何事务状态或锁定(这被称为 Reset On Return),并且连接已准备好供下次使用。
我们上面的示例演示了执行文本 SQL 字符串,应该使用text()
构造来指示我们想要使用文本 SQL。Connection.execute()
方法当然可以容纳更多内容;请参阅 Working with Data 中的 SQLAlchemy Unified Tutorial 进行教程。
使用事务
注意
本节描述了在直接使用 Engine
和 Connection
对象时如何使用事务。当使用 SQLAlchemy ORM 时,事务控制的公共 API 是通过 Session
对象实现的,该对象在内部使用 Transaction
对象。有关更多信息,请参阅管理事务。
边做边提交
Connection
对象始终在事务块的上下文中发出 SQL 语句。第一次调用 Connection.execute()
方法执行 SQL 语句时,将自动开始此事务,使用的行为称为自动开始。事务保持在 Connection
对象的范围内,直到调用 Connection.commit()
或 Connection.rollback()
方法。在事务结束后,Connection
等待再次调用 Connection.execute()
方法,此时它会再次自动开始。
这种调用风格被称为边做边提交,在下面的示例中进行了说明:
with engine.connect() as connection: connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"}) connection.execute( some_other_table.insert(), {"q": 8, "p": "this is some more data"} ) connection.commit() # commit the transaction
在“边做边提交”风格中,我们可以在使用 Connection.execute()
发出的其他语句序列中自由调用 Connection.commit()
和 Connection.rollback()
方法;每次事务结束并发出新语句时,都会隐式开始新事务:
with engine.connect() as connection: connection.execute(text("<some statement>")) connection.commit() # commits "some statement" # new transaction starts connection.execute(text("<some other statement>")) connection.rollback() # rolls back "some other statement" # new transaction starts connection.execute(text("<a third statement>")) connection.commit() # commits "a third statement"
2.0 版本新增:“边做边提交”风格是 SQLAlchemy 2.0 的新功能。在使用“未来”风格引擎时,它也可在 SQLAlchemy 1.4 的“过渡”模式中使用。
一次开始
Connection
对象提供了一种更明确的事务管理风格,称为一次性开始。与“按照进度提交”相比,“一次性开始”允许显式声明事务的起始点,并允许事务本身可以被框定为上下文管理器块,以便事务的结束变得隐式。要使用“一次性开始”,使用Connection.begin()
方法,该方法返回一个表示 DBAPI 事务的Transaction
对象。此对象还通过其自身的Transaction.commit()
和Transaction.rollback()
方法支持显式管理,但作为首选实践,还支持上下文管理器接口,其中当块正常结束时,它将自行提交并在引发异常时发出回滚,然后将异常传播到外部。以下说明了“一次性开始”块的形式:
with engine.connect() as connection: with connection.begin(): connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"}) connection.execute( some_other_table.insert(), {"q": 8, "p": "this is some more data"} ) # transaction is committed
连接并从引擎一次性开始
上述“一次性开始”块的方便缩写形式是在原始Engine
对象的级别使用Engine.begin()
方法,而不是执行两个分开的步骤Engine.connect()
和Connection.begin()
;Engine.begin()
方法返回一个特殊的上下文管理器,该管理器内部同时维护Connection
的上下文管理器以及通常由Connection.begin()
方法返回的Transaction
的上下文管理器:
with engine.begin() as connection: connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"}) connection.execute( some_other_table.insert(), {"q": 8, "p": "this is some more data"} ) # transaction is committed, and Connection is released to the connection # pool
提示
在Engine.begin()
块中,我们可以调用Connection.commit()
或Connection.rollback()
方法,它们将提前结束由该块标记的事务。但是,如果我们这样做,直到该块结束之前,Connection
上将不会发出进一步的 SQL 操作:
>>> from sqlalchemy import create_engine >>> e = create_engine("sqlite://", echo=True) >>> with e.begin() as conn: ... conn.commit() ... conn.begin() 2021-11-08 09:49:07,517 INFO sqlalchemy.engine.Engine BEGIN (implicit) 2021-11-08 09:49:07,517 INFO sqlalchemy.engine.Engine COMMIT Traceback (most recent call last): ... sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager. Please complete the context manager before emitting further commands.
混合风格
在一个Engine.connect()
块中可以自由混合“随时提交”和“一次性开始”的风格,只要调用Connection.begin()
不会与“自动开始”行为冲突。为了实现这一点,Connection.begin()
应该在发出任何 SQL 语句之前或直接在前一次调用Connection.commit()
或Connection.rollback()
之后调用:
with engine.connect() as connection: with connection.begin(): # run statements in a "begin once" block connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"}) # transaction is committed # run a new statement outside of a block. The connection # autobegins connection.execute( some_other_table.insert(), {"q": 8, "p": "this is some more data"} ) # commit explicitly connection.commit() # can use a "begin once" block here with connection.begin(): # run more statements connection.execute(...)
当开发使用“一次性开始”(begin once)的代码时,如果事务已经“自动开始”,库将引发InvalidRequestError
。
设置事务隔离级别,包括 DBAPI 自动提交
大多数 DBAPI 都支持可配置的事务隔离级别的概念。传统上,这些级别有“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”四个级别。这些通常在 DBAPI 连接开始新事务之前应用,注意大多数 DBAPI 在首次发出 SQL 语句时会隐式开始这个事务。
支持隔离级别的 DBAPI 通常也支持真正的“自动提交”概念,这意味着 DBAPI 连接本身将被放置在非事务性的自动提交模式中。这通常意味着数据库自动不再发出“BEGIN”,但也可能包括其他指令。SQLAlchemy 将“自动提交”的概念视为任何其他隔离级别;因为它是一个不仅丢失“读取提交”而且丢失原子性的隔离级别。
提示
需要注意的是,正如将在下面的部分进一步讨论的那样,在理解 DBAPI 级别的自动提交隔离级别中,“自动提交”隔离级别不像任何其他隔离级别一样,不会影响Connection
对象的“事务”行为,该对象继续调用 DBAPI 的.commit()
和.rollback()
方法(它们在自动提交下没有任何效果),并且.begin()
方法假定 DBAPI 将隐式启动一个事务(这意味着 SQLAlchemy 的“begin”不会更改自动提交模式)。
SQLAlchemy 方言应尽可能支持这些隔离级别以及自动提交。
设置连接的隔离级别或 DBAPI 自动提交
对于从 Engine.connect()
获取的单个 Connection
对象,可以使用 Connection.execution_options()
方法设置该 Connection
对象的隔离级别。该参数被称为 Connection.execution_options.isolation_level
,其值是字符串,通常是以下名称的子集:
# possible values for Connection.execution_options(isolation_level="<value>") "AUTOCOMMIT" "READ COMMITTED" "READ UNCOMMITTED" "REPEATABLE READ" "SERIALIZABLE"
并非每个 DBAPI 都支持每个值;如果在某个后端使用不支持的值,则会引发错误。
例如,要在特定连接上强制执行 REPEATABLE READ,然后开始事务:
with engine.connect().execution_options( isolation_level="REPEATABLE READ" ) as connection: with connection.begin(): connection.execute(text("<statement>"))
提示
Connection.execution_options()
方法的返回值是调用该方法的相同 Connection
对象,这意味着它直接修改了 Connection
对象的状态。这是 SQLAlchemy 2.0 的新行为。这种行为不适用于 Engine.execution_options()
方法;该方法仍然返回一个 Engine
的副本,并且如下所述,可以用来构造具有不同执行选项的多个 Engine
对象,但仍共享相同的方言和连接池。
注
Connection.execution_options.isolation_level
参数在语句级别选项上不适用,例如 Executable.execution_options()
,如果在此级别设置将被拒绝。这是因为该选项必须在每个事务的 DBAPI 连接上设置。
设置引擎的隔离级别或 DBAPI 自动提交
Connection.execution_options.isolation_level
选项也可以在引擎范围内设置,通常更可取。这可以通过将 create_engine.isolation_level
参数传递给 create_engine()
来实现:
from sqlalchemy import create_engine eng = create_engine( "postgresql://scott:tiger@localhost/test", isolation_level="REPEATABLE READ" )
在上述设置下,每个新的 DBAPI 连接在创建时将被设置为使用"REPEATABLE READ"
隔离级别设置来进行所有后续操作。
为单个引擎维护多个隔离级别
隔离级别也可以针对每个引擎进行设置,使用create_engine.execution_options
参数或Engine.execution_options()
方法,后者将创建一个共享方言和连接池但具有自己的每个连接隔离级别设置的Engine
的副本:
from sqlalchemy import create_engine eng = create_engine( "postgresql+psycopg2://scott:tiger@localhost/test", execution_options={"isolation_level": "REPEATABLE READ"}, )
在上述设置下,每个新启动的事务都将 DBAPI 连接设置为使用"REPEATABLE READ"
隔离级别;但连接池中的连接将被重置为连接首次出现时存在的原始隔离级别。在create_engine()
的级别上,最终效果与使用create_engine.isolation_level
参数没有任何区别。
然而,经常选择在不同隔离级别中运行操作的应用程序可能希望为一个主Engine
创建多个“子引擎”,每个引擎都配置为不同的隔离级别。其中一种用例是具有“事务性”和“只读”操作的应用程序,可以将一个单独的Engine
从主引擎中分离出来,并使用"AUTOCOMMIT"
:
from sqlalchemy import create_engine eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
上述,Engine.execution_options()
方法创建了原始Engine
的浅复制。eng
和autocommit_engine
共享相同的方言和连接池。然而,当从autocommit_engine
获取连接时,将设置“AUTOCOMMIT”模式。
无论隔离级别设置为何种级别,当连接返回到连接池时,隔离级别都会无条件地恢复。
另请参阅
SQLite 事务隔离
PostgreSQL 事务隔离
MySQL 事务隔离
SQL Server 事务隔离
Oracle 事务隔离级别
设置事务隔离级别 / DBAPI 自动提交 - 用于 ORM
使用 DBAPI 自动提交允许进行只读版本的透明重新连接 - 一个使用 DBAPI 自动提交来透明重新连接到数据库以进行只读操作的示例 ### 理解 DBAPI 级别的自动提交隔离级别
在上一节中,我们介绍了 Connection.execution_options.isolation_level
参数的概念以及它如何用于设置数据库隔离级别,包括 SQLAlchemy 处理为另一个事务隔离级别的 DBAPI 级别的“自动提交”。在本节中,我们将尝试澄清这种方法的影响。
如果我们想要检出一个 Connection
对象并使用它的“自动提交”模式,我们将按以下步骤进行:
with engine.connect() as connection: connection.execution_options(isolation_level="AUTOCOMMIT") connection.execute(text("<statement>")) connection.execute(text("<statement>"))
上面演示了“DBAPI 自动提交”模式的正常用法。不需要使用 Connection.begin()
或 Connection.commit()
等方法,因为所有语句都会立即提交到数据库。当块结束时,Connection
对象将恢复“自动提交”隔离级别,并且 DBAPI 连接将释放到连接池,在连接池中通常会调用 DBAPI connection.rollback()
方法,但由于上述语句已经提交,此回滚对数据库状态没有影响。
注意,“自动提交”模式即使在调用 Connection.begin()
方法时仍然持续存在;DBAPI 不会向数据库发送任何 BEGIN 命令,也不会在调用 Connection.commit()
时提交。这种用法也不是错误情况,因为可以预期“自动提交”隔离级别可能应用于原本假定事务上下文的代码;毕竟,“隔离级别”就像事务本身的配置细节一样。
在下面的示例中,无论是否有 SQLAlchemy 级别的事务块,语句都会保持自动提交状态:
with engine.connect() as connection: connection = connection.execution_options(isolation_level="AUTOCOMMIT") # this begin() does not affect the DBAPI connection, isolation stays at AUTOCOMMIT with connection.begin() as trans: connection.execute(text("<statement>")) connection.execute(text("<statement>"))
当我们打开日志记录并运行像上面这样的块时,日志记录将尝试指示,尽管调用了 DBAPI 级别的 .commit()
,但由于自动提交模式,它可能不会起作用:
INFO sqlalchemy.engine.Engine BEGIN (implicit) ... INFO sqlalchemy.engine.Engine COMMIT using DBAPI connection.commit(), DBAPI should ignore due to autocommit mode
与此同时,即使我们使用“DBAPI 自动提交”,SQLAlchemy 的事务语义,即Connection.begin()
的 Python 内部行为以及“自动开始”的行为,仍然存在,即使这些不影响 DBAPI 连接本身。举例说明,下面的代码将引发错误,因为在自动开始已经发生后调用了Connection.begin()
:
with engine.connect() as connection: connection = connection.execution_options(isolation_level="AUTOCOMMIT") # "transaction" is autobegin (but has no effect due to autocommit) connection.execute(text("<statement>")) # this will raise; "transaction" is already begun with connection.begin() as trans: connection.execute(text("<statement>"))
上面的示例还展示了同样的主题,即“自动提交”隔离级别是底层数据库事务的配置细节,独立于 SQLAlchemy Connection 对象的开始/提交行为。 “自动提交”模式不会以任何方式与Connection.begin()
交互,Connection
在执行其自身与事务相关的状态更改时不会查询此状态(除了在引擎日志中建议这些块实际上没有提交)。这种设计的原因是为了保持与Connection
完全一致的使用模式,在此模式中,DBAPI 自动提交模式可以独立更改,而无需指示任何其他代码更改。
在隔离级别之间进行切换
当连接被释放回连接池时,隔离级别设置,包括自动提交模式,会自动重置。因此,最好避免尝试在单个Connection
对象上切换隔离级别,因为这会导致冗余。
为了说明如何在单个Connection
检出的范围内以即时模式使用“自动提交”,必须重新应用Connection.execution_options.isolation_level
参数,以恢复先前的隔离级别。上一节示例了在自动提交进行时调用Connection.begin()
以启动事务的尝试;我们可以通过在调用Connection.begin()
之前先恢复隔离级别来重写该示例:
# if we wanted to flip autocommit on and off on a single connection/ # which... we usually don't. with engine.connect() as connection: connection.execution_options(isolation_level="AUTOCOMMIT") # run statement(s) in autocommit mode connection.execute(text("<statement>")) # "commit" the autobegun "transaction" connection.commit() # switch to default isolation level connection.execution_options(isolation_level=connection.default_isolation_level) # use a begin block with connection.begin() as trans: connection.execute(text("<statement>"))
以上,在手动恢复隔离级别时,我们使用了Connection.default_isolation_level
来恢复默认隔离级别(假设这是我们想要的)。然而,更好的做法可能是与Connection
的架构一起工作,该架构已经在签入时自动处理重置隔离级别。编写上述内容的首选方法是使用两个块
# use an autocommit block with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as connection: # run statement in autocommit mode connection.execute(text("<statement>")) # use a regular block with engine.begin() as connection: connection.execute(text("<statement>"))
总而言之:
- “DBAPI 级别的自动提交”隔离级别与
Connection
对象的“开始”和“提交”概念完全独立。 - 每个隔离级别使用单独的
Connection
签出。避免在单个连接签出之间尝试来回切换“自动提交”;让引擎完成恢复默认隔离级别的工作 ## 使用服务器端游标(又名流式结果)
一些后端特性明确支持“服务器端游标”和“客户端游标”的概念。这里的客户端游标意味着数据库驱动在语句执行完成之前完全将结果集中的所有行都提取到内存中。例如,PostgreSQL 和 MySQL/MariaDB 的驱动通常默认使用客户端游标。相比之下,服务器端游标表示结果行在客户端消耗时仍保持在数据库服务器状态中。例如,Oracle 的驱动通常使用“服务器端”模型,而 SQLite 方言虽然没有使用真正的“客户端/服务器”架构,但仍然使用一种未缓冲的结果提取方法,在结果行被消耗之前会将其留在进程内存之外。
从这个基本架构可以得出结论,当提取非常大的结果集时,“服务器端游标”在内存效率上更高,同时可能会在客户端/服务器通信过程中引入更多复杂性,并且对于小结果集(通常少于 10000 行)效率较低。
对于那些有条件支持缓冲或未缓冲结果的方言,通常对使用“未缓冲”或服务器端游标模式会有一些注意事项。例如,当使用 psycopg2 方言时,如果使用服务器端游标与任何类型的 DML 或 DDL 语句,将会引发错误。当使用 MySQL 驱动程序与服务器端游标时,DBAPI 连接处于更脆弱的状态,并且无法从错误条件中优雅地恢复,也不会允许回滚进行,直到游标完全关闭。
由于这个原因,SQLAlchemy 的方言将始终默认使用游标的较少出错版本,这意味着对于 PostgreSQL 和 MySQL 方言,默认情况下使用缓冲的“客户端”游标,在从游标调用任何提取方法之前,会将完整结果集拉入内存。这种操作模式在绝大多数情况下都是适用的;未缓冲的游标通常不是很有用,除非是在应用程序以块的方式提取大量行的罕见情况下,这些行的处理可以在提取更多行之前完成。
对于提供客户端和服务器端游标选项的数据库驱动程序,Connection.execution_options.stream_results
和 Connection.execution_options.yield_per
执行选项提供了在每个Connection
或每个语句基础上访问“服务器端游标”的能力。在使用 ORM Session
时也存在类似的选项。
通过 yield_per 进行固定缓冲区流式处理
由于完全未缓冲的服务器端游标的单个行提取操作通常比一次提取批量行要昂贵,Connection.execution_options.yield_per
执行选项配置了一个Connection
或语句以利用服务器端游标(如果可用),同时配置了一个固定大小的行缓冲区,该缓冲区将在消耗时按批次从服务器检索行。此参数可以使用Connection.execution_options()
方法在Connection
上或使用Executable.execution_options()
方法在语句上设置为正整数值。
新版本 1.4.40 中的新功能:Connection.execution_options.yield_per
作为仅限核心的选项是 SQLAlchemy 1.4.40 中的新功能;对于先前的 1.4 版本,请直接结合使用 Connection.execution_options.stream_results
和 Result.yield_per()
。
使用此选项等同于手动设置Connection.execution_options.stream_results
选项,如下一节所述,并在给定整数值的Result
对象上调用Result.yield_per()
方法。在这两种情况下,这种组合的效果包括:
- 为给定后端选择了服务器端游标模式,如果可用且尚未是该后端的默认行为
- 当获取结果行时,它们将被分批缓冲,每个批次的大小直到最后一个批次将等于传递给
Connection.execution_options.yield_per
选项或Result.yield_per()
方法的整数参数;然后最后一个批次的大小将根据少于此大小的剩余行数确定 - 如果使用,
Result.partitions()
方法使用的默认分区大小也将设置为此整数大小。
以下示例说明了这三种行为:
with engine.connect() as conn: with conn.execution_options(yield_per=100).execute( text("select * from table") ) as result: for partition in result.partitions(): # partition is an iterable that will be at most 100 items for row in partition: print(f"{row}")
上面的示例说明了yield_per=100
与使用Result.partitions()
方法一起批量处理与从服务器获取的大小匹配的行的组合。使用Result.partitions()
是可选的,如果直接迭代Result
,每获取 100 行将缓冲一个新的批次行。不应使用诸如Result.all()
之类的方法,因为这将一次性完全获取所有剩余行,从而破坏使用yield_per
的目的。
提示
如上所示,Result
对象可以用作上下文管理器。在使用服务器端游标进行迭代时,这是确保Result
对象关闭的最佳方式,即使在迭代过程中引发异常。
Connection.execution_options.yield_per
选项也可以被 ORM 使用,被 Session
用于获取 ORM 对象,它还限制一次生成的 ORM 对象的数量。请参阅 使用 Connection.execution_options.yield_per
从大型结果集中提取 - 在 ORM 查询指南 中了解如何在 ORM 中使用 Connection.execution_options.yield_per
的更多背景信息。
新版本 1.4.40 中新增了Connection.execution_options.yield_per
,作为一个核心级别的执行选项,可以方便地设置流式结果、缓冲区大小和分区大小,一次性完成,这种方式可以转移到 ORM 的类似用例中。
SqlAlchemy 2.0 中文文档(四十四)(2)https://developer.aliyun.com/article/1563069