SqlAlchemy 2.0 中文文档(二十三)(4)https://developer.aliyun.com/article/1560521
会话级与引擎级事务控制
Core 中的Connection
和 ORM 中的_session.Session
都具有等效的事务语义,无论是在sessionmaker
与Engine
之间,还是在Session
与Connection
之间。以下各节详细说明了这些情景,基于以下方案:
ORM Core ----------------------------------------- ----------------------------------- sessionmaker Engine Session Connection sessionmaker.begin() Engine.begin() some_session.commit() some_connection.commit() with some_sessionmaker() as session: with some_engine.connect() as conn: with some_sessionmaker.begin() as session: with some_engine.begin() as conn: with some_session.begin_nested() as sp: with some_connection.begin_nested() as sp:
边做边提交
Session
和 Connection
均提供了 Connection.commit()
和 Connection.rollback()
方法。使用 SQLAlchemy 2.0 风格的操作时,这些方法在所有情况下都会影响最外层的事务。对于 Session
,假定 Session.autobegin
保持默认值 True
。
Engine
:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.connect() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) conn.commit()
Session
:
Session = sessionmaker(engine) with Session() as session: session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) session.commit()
只初始化一次
sessionmaker
和 Engine
均提供了 Engine.begin()
方法,该方法将获取一个新对象来执行 SQL 语句(分别是 Session
和 Connection
),然后返回一个上下文管理器,用于维护该对象的开始/提交/回滚上下文。
引擎:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.begin() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) # commits and closes automatically
会话:
Session = sessionmaker(engine) with Session.begin() as session: session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) # commits and closes automatically
嵌套事务
当使用 SAVEPOINT 通过 Session.begin_nested()
或 Connection.begin_nested()
方法时,必须使用返回的事务对象来提交或回滚 SAVEPOINT。调用 Session.commit()
或 Connection.commit()
方法将始终提交最外层的事务;这是 SQLAlchemy 2.0 特有的行为,与 1.x 系列相反。
引擎:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.begin() as conn: savepoint = conn.begin_nested() conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) savepoint.commit() # or rollback # commits automatically
会话:
Session = sessionmaker(engine) with Session.begin() as session: savepoint = session.begin_nested() session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) savepoint.commit() # or rollback # commits automatically
边提交边执行
Session
和Connection
均提供Connection.commit()
和Connection.rollback()
方法。使用 SQLAlchemy 2.0 风格的操作,这些方法在所有情况下都会影响到最外层的事务。对于Session
,假定Session.autobegin
保持其默认值为True
。
Engine
:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.connect() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) conn.commit()
Session
:
Session = sessionmaker(engine) with Session() as session: session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) session.commit()
开始一次
sessionmaker
和Engine
均提供Engine.begin()
方法,该方法将获取一个新对象以执行 SQL 语句(分别是Session
和Connection
),然后返回一个上下文管理器,用于维护该对象的开始/提交/回滚上下文。
引擎:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.begin() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) # commits and closes automatically
会话:
Session = sessionmaker(engine) with Session.begin() as session: session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) # commits and closes automatically
嵌套事务
使用通过Session.begin_nested()
或Connection.begin_nested()
方法创建的 SAVEPOINT 时,必须使用返回的事务对象提交或回滚 SAVEPOINT。调用Session.commit()
或Connection.commit()
方法将始终提交最外层的事务;这是 SQLAlchemy 2.0 特定行为,与 1.x 系列相反。
引擎:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.begin() as conn: savepoint = conn.begin_nested() conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) savepoint.commit() # or rollback # commits automatically
会话:
Session = sessionmaker(engine) with Session.begin() as session: savepoint = session.begin_nested() session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) savepoint.commit() # or rollback # commits automatically
显式开始
Session
具有“自动开始”行为,这意味着一旦开始执行操作,它就会确保存在一个 SessionTransaction
来跟踪正在进行的操作。当调用 Session.commit()
时,此事务将被完成。
通常情况下,特别是在框架集成中,控制“开始”操作发生的时间点是可取的。为此,Session
使用“自动开始”策略,以便可以直接调用 Session.begin()
方法来启动一个尚未开始事务的 Session
:
Session = sessionmaker(bind=engine) session = Session() session.begin() try: item1 = session.get(Item, 1) item2 = session.get(Item, 2) item1.foo = "bar" item2.bar = "foo" session.commit() except: session.rollback() raise
上述模式通常使用上下文管理器更具惯用性:
Session = sessionmaker(bind=engine) session = Session() with session.begin(): item1 = session.get(Item, 1) item2 = session.get(Item, 2) item1.foo = "bar" item2.bar = "foo"
Session.begin()
方法和会话的“自动开始”过程使用相同的步骤序列来开始事务。这包括当它发生时调用 SessionEvents.after_transaction_create()
事件;此钩子由框架使用,以便将其自己的事务处理集成到 ORM Session
的事务处理中。
启用两阶段提交
对于支持两阶段操作的后端(目前是 MySQL 和 PostgreSQL),可以指示会话使用两阶段提交语义。这将协调跨数据库的事务提交,以便在所有数据库中要么提交事务,要么回滚事务。您还可以为与 SQLAlchemy 未管理的事务交互的会话 Session.prepare()
。要使用两阶段事务,请在会话上设置标志 twophase=True
:
engine1 = create_engine("postgresql+psycopg2://db1") engine2 = create_engine("postgresql+psycopg2://db2") Session = sessionmaker(twophase=True) # bind User operations to engine 1, Account operations to engine 2 Session.configure(binds={User: engine1, Account: engine2}) session = Session() # .... work with accounts and users # commit. session will issue a flush to all DBs, and a prepare step to all DBs, # before committing both transactions session.commit()
设置事务隔离级别 / DBAPI 自动提交
大多数 DBAPI 支持可配置的事务隔离级别的概念。传统上,这些级别是“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”。这些通常在 DBAPI 连接开始新事务之前应用,注意大多数 DBAPI 在首次发出 SQL 语句时会隐式地开始此事务。
支持隔离级别的 DBAPI 通常也支持真正的“自动提交”概念,这意味着 DBAPI 连接本身将被放置到非事务自动提交模式中。这通常意味着数据库自动不再发出“BEGIN”,但也可能包括其他指令。使用此模式时,DBAPI 在任何情况下都不使用事务。SQLAlchemy 方法如.begin()
、.commit()
和.rollback()
会悄无声息地传递。
SQLAlchemy 的方言支持在每个Engine
或每个Connection
上设置可设置的隔离模式,使用的标志既可以在create_engine()
级别,也可以在Connection.execution_options()
级别。
当使用 ORM Session
时,它充当引擎和连接的门面,但不直接暴露事务隔离。因此,为了影响事务隔离级别,我们需要在适当的时候对Engine
或Connection
进行操作。
也请参阅
设置事务隔离级别,包括 DBAPI 自动提交 - 一定要查看 SQLAlchemy Connection
对象级别上隔离级别的工作方式。
设置会话/引擎范围的隔离级别
要全局设置一个具有特定隔离级别的Session
或sessionmaker
,第一种技术是可以在所有情况下构造一个具有特定隔离级别的Engine
,然后将其用作Session
和/或sessionmaker
的连接来源:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine( "postgresql+psycopg2://scott:tiger@localhost/test", isolation_level="REPEATABLE READ", ) Session = sessionmaker(eng)
另一个选项,如果同时存在两个具有不同隔离级别的引擎,可以使用Engine.execution_options()
方法,该方法将生成原始Engine
的浅拷贝,该浅拷贝与父引擎共享相同的连接池。当操作将被分为“事务”和“自动提交”操作时,通常更可取:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT") transactional_session = sessionmaker(eng) autocommit_session = sessionmaker(autocommit_engine)
在上面的例子中,“eng
”和“autocommit_engine
”共享相同的方言和连接池。然而,当从autocommit_engine
获取连接时,将设置“AUTOCOMMIT”模式。然后,这两个sessionmaker
对象“transactional_session
”和“autocommit_session
”在与数据库连接一起工作时继承这些特性。
“autocommit_session
” 保持事务语义,包括Session.commit()
和Session.rollback()
仍然认为自己是“提交”和“回滚”对象,但事务将会静默不存在。因此,通常情况下,尽管不是严格要求,但一个具有 AUTOCOMMIT 隔离级别的 Session 应该以只读方式使用,即:
with autocommit_session() as session: some_objects = session.execute(text("<statement>")) some_other_objects = session.execute(text("<statement>")) # closes connection
为单个 Session 设置隔离级别
当我们创建一个新的Session
时,可以直接传递bind
参数,覆盖预先存在的绑定,无论是直接使用构造函数还是调用由sessionmaker
产生的可调用对象。例如,我们可以从默认的sessionmaker
创建我们的Session
并传递一个设置为自动提交的引擎:
plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT") # will normally use plain_engine Session = sessionmaker(plain_engine) # make a specific Session that will use the "autocommit" engine with Session(bind=autocommit_engine) as session: # work with session ...
对于配置有多个binds
的Session
或sessionmaker
的情况,我们可以重新指定完整的binds
参数,或者如果我们只想替换特定的绑定,我们可以使用Session.bind_mapper()
或Session.bind_table()
方法:
with Session() as session: session.bind_mapper(User, autocommit_engine)
为单个事务设置隔离级别
关于隔离级别的一个关键注意事项是,不能在已经启动事务的 Connection
上安全地修改设置。数据库无法更改正在进行的事务的隔离级别,并且一些 DBAPI 和 SQLAlchemy 方言在这个领域的行为不一致。
因此最好使用一个最初绑定到具有所需隔离级别的引擎的 Session
。但是,通过在事务开始时使用 Session.connection()
方法,可以影响每个连接的隔离级别:
from sqlalchemy.orm import Session # assume session just constructed sess = Session(bind=engine) # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a real # database transaction. sess.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # ... work with session in SERIALIZABLE isolation level... # commit transaction. the connection is released # and reverted to its previous isolation level. sess.commit() # subsequent to commit() above, a new transaction may be begun if desired, # which will proceed with the previous default isolation level unless # it is set again.
在上面的示例中,我们首先使用构造函数或 sessionmaker
生成一个 Session
。然后,通过调用 Session.connection()
明确设置数据库级事务的开始,该方法提供了将传递给连接的执行选项,在开始数据库级事务之前。事务使用此选择的隔离级别进行。当事务完成时,连接上的隔离级别将重置为默认值,然后将连接返回到连接池。
Session.begin()
方法也可用于开始 Session
级事务;在此调用之后调用 Session.connection()
可以用于设置每个连接的事务隔离级别:
sess = Session(bind=engine) with sess.begin(): # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a # real database transaction. sess.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # ... work with session in SERIALIZABLE isolation level... # outside the block, the transaction has been committed. the connection is # released and reverted to its previous isolation level.
为 Sessionmaker / Engine 设置隔离级别
要为 Session
或 sessionmaker
全局设置特定的隔离级别,第一种技术是可以在所有情况下构建一个针对特定隔离级别的 Engine
,然后将其用作 Session
和/或 sessionmaker
的连接来源:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine( "postgresql+psycopg2://scott:tiger@localhost/test", isolation_level="REPEATABLE READ", ) Session = sessionmaker(eng)
另一个选项,如果同时有两个具有不同隔离级别的引擎,则可以使用Engine.execution_options()
方法,它将生成原始Engine
的浅拷贝,与父引擎共享相同的连接池。当操作将分成“事务”和“自动提交”操作时,这通常是首选:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT") transactional_session = sessionmaker(eng) autocommit_session = sessionmaker(autocommit_engine)
在上述示例中,“eng
”和“autocommit_engine
”都共享相同的方言和连接池。然而,当从autocommit_engine
获取连接时,将设置“AUTOCOMMIT”模式。然后,当这两个sessionmaker
对象“transactional_session
”和“autocommit_session
”与数据库连接一起工作时,它们继承了这些特征。
“autocommit_session
”仍然具有事务语义,包括当它们从autocommit_engine
获取时,Session.commit()
和Session.rollback()
仍然认为自己是“提交”和“回滚”对象,但事务将默默地不存在。因此,通常,虽然不是严格要求,但具有 AUTOCOMMIT 隔离的会话应该以只读方式使用,即:
with autocommit_session() as session: some_objects = session.execute(text("<statement>")) some_other_objects = session.execute(text("<statement>")) # closes connection
为单个会话设置隔离
当我们创建一个新的Session
时,可以直接传递bind
参数,覆盖预先存在的绑定。例如,我们可以从默认的sessionmaker
创建我们的Session
,并传递设置为自动提交的引擎:
plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT") # will normally use plain_engine Session = sessionmaker(plain_engine) # make a specific Session that will use the "autocommit" engine with Session(bind=autocommit_engine) as session: # work with session ...
对于配置了多个binds
的Session
或sessionmaker
的情况,我们可以重新完全指定binds
参数,或者如果我们只想替换特定的绑定,我们可以使用Session.bind_mapper()
或Session.bind_table()
方法:
with Session() as session: session.bind_mapper(User, autocommit_engine)
为单个事务设置隔离
关于隔离级别的一个关键警告是,在已经开始事务的 Connection
上无法安全地修改设置。数据库无法更改正在进行的事务的隔离级别,并且一些 DBAPI 和 SQLAlchemy 方言在这个领域的行为不一致。
因此,最好使用一个明确绑定到具有所需隔离级别的引擎的 Session
。但是,可以通过在事务开始时使用 Session.connection()
方法来影响每个连接的隔离级别:
from sqlalchemy.orm import Session # assume session just constructed sess = Session(bind=engine) # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a real # database transaction. sess.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # ... work with session in SERIALIZABLE isolation level... # commit transaction. the connection is released # and reverted to its previous isolation level. sess.commit() # subsequent to commit() above, a new transaction may be begun if desired, # which will proceed with the previous default isolation level unless # it is set again.
在上面的例子中,我们首先使用构造函数或 sessionmaker
来生成一个 Session
。然后,我们通过调用 Session.connection()
显式设置数据库级事务的开始,该方法提供将在开始数据库级事务之前传递给连接的执行选项。事务会以此选定的隔离级别继续进行。当事务完成时,连接上的隔离级别将被重置为其默认值,然后连接将返回到连接池。
Session.begin()
方法也可用于开始 Session
级别的事务;在此调用之后调用 Session.connection()
可用于设置每个连接的事务隔离级别:
sess = Session(bind=engine) with sess.begin(): # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a # real database transaction. sess.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # ... work with session in SERIALIZABLE isolation level... # outside the block, the transaction has been committed. the connection is # released and reverted to its previous isolation level.
使用事件跟踪事务状态
请参阅 事务事件 部分,了解会话事务状态更改的可用事件挂钩的概述。
将会话加入到外部事务(例如用于测试套件)
如果正在使用的Connection
已经处于事务状态(即已建立Transaction
),则可以通过将Session
绑定到该Connection
来使Session
参与该事务。通常的理由是一个测试套件允许 ORM 代码自由地与Session
一起工作,包括能够调用Session.commit()
,之后整个数据库交互都被回滚。
从版本 2.0 开始更改:在 2.0 中,“加入外部事务”配方再次得到了改进;不再需要事件处理程序来“重置”嵌套事务。
该配方通过在事务内部建立Connection
(可选地建立 SAVEPOINT),然后将其传递给Session
作为“bind”来实现;传递了Session.join_transaction_mode
参数,设置为"create_savepoint"
,这表示应创建新的 SAVEPOINT 以实现Session
的 BEGIN/COMMIT/ROLLBACK,这将使外部事务保持与传递时相同的状态。
当测试拆卸时,外部事务将被回滚,以便撤消测试期间的任何数据更改:
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from unittest import TestCase # global application scope. create Session class, engine Session = sessionmaker() engine = create_engine("postgresql+psycopg2://...") class SomeTest(TestCase): def setUp(self): # connect to the database self.connection = engine.connect() # begin a non-ORM transaction self.trans = self.connection.begin() # bind an individual Session to the connection, selecting # "create_savepoint" join_transaction_mode self.session = Session( bind=self.connection, join_transaction_mode="create_savepoint" ) def test_something(self): # use the session in tests. self.session.add(Foo()) self.session.commit() def test_something_with_rollbacks(self): self.session.add(Bar()) self.session.flush() self.session.rollback() self.session.add(Foo()) self.session.commit() def tearDown(self): self.session.close() # rollback - everything that happened with the # Session above (including calls to commit()) # is rolled back. self.trans.rollback() # return connection to the Engine self.connection.close()
上述配方是 SQLAlchemy 自身 CI 的一部分,以确保其按预期工作。
ession.connection()` 显式设置数据库级事务的开始,该方法提供将在开始数据库级事务之前传递给连接的执行选项。事务会以此选定的隔离级别继续进行。当事务完成时,连接上的隔离级别将被重置为其默认值,然后连接将返回到连接池。
Session.begin()
方法也可用于开始 Session
级别的事务;在此调用之后调用 Session.connection()
可用于设置每个连接的事务隔离级别:
sess = Session(bind=engine) with sess.begin(): # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a # real database transaction. sess.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # ... work with session in SERIALIZABLE isolation level... # outside the block, the transaction has been committed. the connection is # released and reverted to its previous isolation level.
使用事件跟踪事务状态
请参阅 事务事件 部分,了解会话事务状态更改的可用事件挂钩的概述。
将会话加入到外部事务(例如用于测试套件)
如果正在使用的Connection
已经处于事务状态(即已建立Transaction
),则可以通过将Session
绑定到该Connection
来使Session
参与该事务。通常的理由是一个测试套件允许 ORM 代码自由地与Session
一起工作,包括能够调用Session.commit()
,之后整个数据库交互都被回滚。
从版本 2.0 开始更改:在 2.0 中,“加入外部事务”配方再次得到了改进;不再需要事件处理程序来“重置”嵌套事务。
该配方通过在事务内部建立Connection
(可选地建立 SAVEPOINT),然后将其传递给Session
作为“bind”来实现;传递了Session.join_transaction_mode
参数,设置为"create_savepoint"
,这表示应创建新的 SAVEPOINT 以实现Session
的 BEGIN/COMMIT/ROLLBACK,这将使外部事务保持与传递时相同的状态。
当测试拆卸时,外部事务将被回滚,以便撤消测试期间的任何数据更改:
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from unittest import TestCase # global application scope. create Session class, engine Session = sessionmaker() engine = create_engine("postgresql+psycopg2://...") class SomeTest(TestCase): def setUp(self): # connect to the database self.connection = engine.connect() # begin a non-ORM transaction self.trans = self.connection.begin() # bind an individual Session to the connection, selecting # "create_savepoint" join_transaction_mode self.session = Session( bind=self.connection, join_transaction_mode="create_savepoint" ) def test_something(self): # use the session in tests. self.session.add(Foo()) self.session.commit() def test_something_with_rollbacks(self): self.session.add(Bar()) self.session.flush() self.session.rollback() self.session.add(Foo()) self.session.commit() def tearDown(self): self.session.close() # rollback - everything that happened with the # Session above (including calls to commit()) # is rolled back. self.trans.rollback() # return connection to the Engine self.connection.close()
上述配方是 SQLAlchemy 自身 CI 的一部分,以确保其按预期工作。