SqlAlchemy 2.0 中文文档(二十三)(2)https://developer.aliyun.com/article/1560516
使用 SAVEPOINT
如果底层引擎支持,可以使用 Session.begin_nested()
方法来划定 SAVEPOINT 事务:
Session = sessionmaker() with Session.begin() as session: session.add(u1) session.add(u2) nested = session.begin_nested() # establish a savepoint session.add(u3) nested.rollback() # rolls back u3, keeps u1 and u2 # commits u1 and u2
每次调用 Session.begin_nested()
,都会在当前数据库事务的范围内向数据库发出一个新的“BEGIN SAVEPOINT”命令(如果尚未开始,则开始一个),并返回一个类型为 SessionTransaction
的对象,代表这个 SAVEPOINT 的句柄。当调用该对象的 .commit()
方法时,会向数据库发出“RELEASE SAVEPOINT”,如果调用 .rollback()
方法,则会发出“ROLLBACK TO SAVEPOINT”。封闭的数据库事务仍在进行中。
Session.begin_nested()
通常用作上下文管理器,可以捕获特定的每个实例错误,并在该事务状态的部分发出回滚,而不会回滚整个事务,如下例所示:
for record in records: try: with session.begin_nested(): session.merge(record) except: print("Skipped record %s" % record) session.commit()
当由 Session.begin_nested()
产生的上下文管理器完成时,“提交” savepoint,其中包括刷新所有挂起状态的常规行为。当出现错误时,保存点会被回滚,并且更改的对象的 Session
本地状态会过期。
这种模式非常适用于诸如使用 PostgreSQL 并捕获 IntegrityError
来检测重复行的情况;通常情况下,当出现此类错误时,PostgreSQL 会中止整个事务,但是使用 SAVEPOINT 时,外部事务会得以保留。在下面的示例中,将一系列数据持久化到数据库中,并且偶尔会跳过“重复的主键”记录,而无需回滚整个操作:
from sqlalchemy import exc with session.begin(): for record in records: try: with session.begin_nested(): obj = SomeRecord(id=record["identifier"], name=record["name"]) session.add(obj) except exc.IntegrityError: print(f"Skipped record {record} - row already exists")
当调用 Session.begin_nested()
时,Session
首先会将当前所有挂起的状态刷新到数据库;这种情况会无条件发生,不管 Session.autoflush
参数的值是什么,该参数通常用于禁用自动刷新。这种行为的原因是,当此嵌套事务发生回滚时,Session
可以使在 SAVEPOINT 范围内创建的任何内存状态过期,同时确保在刷新这些过期对象时,SAVEPOINT 开始之前的对象图状态将可用于重新从数据库加载。
在 SQLAlchemy 的现代版本中,当由 Session.begin_nested()
启动的 SAVEPOINT 被回滚时,自 SAVEPOINT 创建以来已修改的内存对象状态会过期,但自 SAVEPOINT 开始后未修改的其他对象状态将保持不变。这样,后续操作可以继续使用其它未受影响的数据,而无需从数据库刷新。
另请参见
Connection.begin_nested()
- 核心 SAVEPOINT API ### 会话级别与引擎级别的事务控制
在核心中的Connection
和 ORM 中的_session.Session
具有等效的事务语义,无论是在sessionmaker
与Engine
的级别,还是在Session
与Connection
的级别。以下部分根据以下方案详细说明这些情况:
ORM Core ----------------------------------------- ----------------------------------- sessionmaker Engine Session Connection sessionmaker.begin() Engine.begin() some_session.commit() some_connection.commit() with some_sessionmaker() as session: with some_engine.connect() as conn: with some_sessionmaker.begin() as session: with some_engine.begin() as conn: with some_session.begin_nested() as sp: with some_connection.begin_nested() as sp:
边提交边进行
Session
和Connection
都具有Connection.commit()
和Connection.rollback()
方法。使用 SQLAlchemy 2.0 风格的操作,这些方法在所有情况下都会影响最外层的事务。对于Session
,假定Session.autobegin
保持默认值True
。
Engine
:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.connect() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) conn.commit()
Session
:
Session = sessionmaker(engine) with Session() as session: session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) session.commit()
一次开始
sessionmaker
和Engine
都具有Engine.begin()
方法,该方法将获取一个新对象来执行 SQL 语句(分别是Session
和Connection
),然后返回一个上下文管理器,用于维护该对象的开始/提交/回滚上下文。
引擎:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.begin() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) # commits and closes automatically
会话:
Session = sessionmaker(engine) with Session.begin() as session: session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) # commits and closes automatically
嵌套事务
使用 SAVEPOINT 通过 Session.begin_nested()
或 Connection.begin_nested()
方法时,必须使用返回的事务对象来提交或回滚 SAVEPOINT。 调用 Session.commit()
或 Connection.commit()
方法将始终提交最外层事务;这是 SQLAlchemy 2.0 特定的行为,与 1.x 系列相反。
引擎:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.begin() as conn: savepoint = conn.begin_nested() conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) savepoint.commit() # or rollback # commits automatically
会话:
Session = sessionmaker(engine) with Session.begin() as session: savepoint = session.begin_nested() session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) savepoint.commit() # or rollback # commits automatically ```### 显式开始 `Session` 具有“自动开始”行为,这意味着一旦操作开始进行,它就会确保存在一个 `SessionTransaction` 来跟踪正在进行的操作。 当调用 `Session.commit()` 时,此事务将完成。 通常希望,特别是在框架集成中,控制“开始”操作发生的时间点。 为此,`Session` 使用“自动开始”策略,使得可以直接调用 `Session.begin()` 方法,以便为尚未启动事务的 `Session` 调用: ```py Session = sessionmaker(bind=engine) session = Session() session.begin() try: item1 = session.get(Item, 1) item2 = session.get(Item, 2) item1.foo = "bar" item2.bar = "foo" session.commit() except: session.rollback() raise
上述模式更常用地使用上下文管理器调用:
Session = sessionmaker(bind=engine) session = Session() with session.begin(): item1 = session.get(Item, 1) item2 = session.get(Item, 2) item1.foo = "bar" item2.bar = "foo"
Session.begin()
方法和会话的“自动开始”过程使用相同的步骤序列开始事务。 这包括在发生时调用 SessionEvents.after_transaction_create()
事件;此挂钩被框架用于将其自己的事务处理过程与 ORM Session
集成。
对于支持两阶段操作的后端(目前支持 MySQL 和 PostgreSQL),会话可以被指示使用两阶段提交语义。这将协调跨数据库的事务提交,以便在所有数据库中要么提交事务,要么回滚事务。您还可以Session.prepare()
会话以与 SQLAlchemy 不管理的事务进行交互。要使用两阶段事务,请在会话上设置标志 twophase=True
:
engine1 = create_engine("postgresql+psycopg2://db1") engine2 = create_engine("postgresql+psycopg2://db2") Session = sessionmaker(twophase=True) # bind User operations to engine 1, Account operations to engine 2 Session.configure(binds={User: engine1, Account: engine2}) session = Session() # .... work with accounts and users # commit. session will issue a flush to all DBs, and a prepare step to all DBs, # before committing both transactions session.commit() ```### 设置事务隔离级别 / DBAPI 自动提交 大多数 DBAPI 支持可配置的事务隔离级别的概念。传统上有四个级别:“READ UNCOMMITTED”,“READ COMMITTED”,“REPEATABLE READ” 和 “SERIALIZABLE”。这些通常应用于 DBAPI 连接在开始新事务之前,注意大多数 DBAPI 在首次发出 SQL 语句时会隐式开始此事务。 支持隔离级别的 DBAPI 通常也支持真正的 “自动提交” 概念,这意味着 DBAPI 连接本身将被置于非事务自动提交模式。这通常意味着数据库自动发出 “BEGIN” 的典型 DBAPI 行为不再发生,但它也可能包括其他指令。在使用此模式时,**DBAPI 在任何情况下都不使用事务**。SQLAlchemy 方法如 `.begin()`、`.commit()` 和 `.rollback()` 将静默传递。 SQLAlchemy 的方言支持在每个 `Engine` 或每个 `Connection` 基础上设置可设置的隔离模式,使用 `create_engine()` 层级以及 `Connection.execution_options()` 层级的标志。 当使用 ORM `Session` 时,它充当引擎和连接的 *facade*,但不直接暴露事务隔离。因此,为了影响事务隔离级别,我们需要根据情况对 `Engine` 或 `Connection` 采取行动。 另请参阅 设置事务隔离级别包括 DBAPI 自动提交 - 请确保查看 SQLAlchemy `Connection` 对象的隔离级别工作方式。 #### 为 Sessionmaker / Engine 设置隔离 要为特定的隔离级别全局设置`Session`或`sessionmaker`,第一种技术是可以针对所有情况构建一个具有特定隔离级别的`Engine`,然后将其用作`Session`和/或`sessionmaker`的连接源: ```py from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine( "postgresql+psycopg2://scott:tiger@localhost/test", isolation_level="REPEATABLE READ", ) Session = sessionmaker(eng)
另一个选项,如果同时存在具有不同隔离级别的两个引擎,是使用Engine.execution_options()
方法,该方法将生成一个原始Engine
的浅拷贝,该浅拷贝与父引擎共享相同的连接池。当操作将被分成“事务”和“自动提交”操作时,这通常是更可取的:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT") transactional_session = sessionmaker(eng) autocommit_session = sessionmaker(autocommit_engine)
在上述示例中,“eng
”和"autocommit_engine"
共享相同的方言和连接池。然而,当从autocommit_engine
获取连接时,将设置“AUTOCOMMIT”模式。这两个sessionmaker
对象“transactional_session
”和“autocommit_session
”在与数据库连接工作时继承这些特性。
“autocommit_session
” 仍然具有事务语义,包括Session.commit()
和Session.rollback()
仍然认为自己在“提交”和“回滚”对象,然而事务将会默默地不存在。因此,通常情况下,尽管不是严格要求,使用 AUTOCOMMIT 隔离的会话应该以只读方式使用,即:
with autocommit_session() as session: some_objects = session.execute(text("<statement>")) some_other_objects = session.execute(text("<statement>")) # closes connection
设置单个会话的隔离级别
当我们创建一个新的Session
,可以直接使用构造函数,也可以在调用由sessionmaker
生成的可调用对象时,直接传递bind
参数,覆盖预先存在的绑定。例如,我们可以从默认的sessionmaker
创建我们的Session
并传递一个设置为自动提交的引擎:
plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT") # will normally use plain_engine Session = sessionmaker(plain_engine) # make a specific Session that will use the "autocommit" engine with Session(bind=autocommit_engine) as session: # work with session ...
对于Session
或sessionmaker
配置了多个binds
的情况,我们可以重新完整指定binds
参数,或者如果我们只想替换特定的 binds,则可以使用Session.bind_mapper()
或Session.bind_table()
方法:
with Session() as session: session.bind_mapper(User, autocommit_engine)
为个别事务设置隔离级别
关于隔离级别的一个关键警告是,在已经开始事务的Connection
上不能安全地修改设置。数据库不能在进行中的事务中更改隔离级别,而一些 DBAPIs 和 SQLAlchemy 方言在这方面的行为不一致。
因此,最好使用一个提前绑定到具有所需隔离级别的引擎的Session
。然而,通过在事务开始时使用Session.connection()
方法可以影响每个连接的隔离级别:
from sqlalchemy.orm import Session # assume session just constructed sess = Session(bind=engine) # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a real # database transaction. sess.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # ... work with session in SERIALIZABLE isolation level... # commit transaction. the connection is released # and reverted to its previous isolation level. sess.commit() # subsequent to commit() above, a new transaction may be begun if desired, # which will proceed with the previous default isolation level unless # it is set again.
在上面的例子中,我们首先使用构造函数或sessionmaker
生成一个Session
。然后,我们通过调用Session.connection()
显式设置数据库级事务的开始,该方法提供了在数据库级事务开始之前将传递给连接的执行选项。事务使用此选定的隔离级别进行。当事务完成时,隔离级别会重置为其默认值,然后将连接返回到连接池。
Session.begin()
方法也可以用于开始Session
级事务;在调用该方法后,可以使用Session.connection()
来设置每个连接的事务隔离级别:
sess = Session(bind=engine) with sess.begin(): # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a # real database transaction. sess.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # ... work with session in SERIALIZABLE isolation level... # outside the block, the transaction has been committed. the connection is # released and reverted to its previous isolation level.
使用事件跟踪事务状态
请参阅事务事件部分,了解有关会话事务状态更改的可用事件挂钩的概述。 ## 加入会话到外部事务(例如用于测试套件)
如果正在使用处于事务状态的 Connection
(即已建立 Transaction
),则可以通过将 Session
绑定到该 Connection
来使 Session
参与该事务。通常的理由是允许 ORM 代码自由地与 Session
一起工作,包括调用 Session.commit()
,之后整个数据库交互都被回滚。
在 2.0 版本中更改:2.0 版本再次对“加入到外部事务”配方进行了改进;不再需要事件处理程序来“重置”嵌套事务。
该配方的工作方式是在事务内部建立一个 Connection
,可选地建立一个 SAVEPOINT,然后将其传递给 Session
作为“bind”;Session.join_transaction_mode
参数传递了设置为 "create_savepoint"
,表示应该创建新的 SAVEPOINT 来实现 Session
的 BEGIN/COMMIT/ROLLBACK,这将使外部事务处于传递时的相同状态。
当测试拆解时,外部事务会被回滚,以便将测试中的任何数据更改还原:
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from unittest import TestCase # global application scope. create Session class, engine Session = sessionmaker() engine = create_engine("postgresql+psycopg2://...") class SomeTest(TestCase): def setUp(self): # connect to the database self.connection = engine.connect() # begin a non-ORM transaction self.trans = self.connection.begin() # bind an individual Session to the connection, selecting # "create_savepoint" join_transaction_mode self.session = Session( bind=self.connection, join_transaction_mode="create_savepoint" ) def test_something(self): # use the session in tests. self.session.add(Foo()) self.session.commit() def test_something_with_rollbacks(self): self.session.add(Bar()) self.session.flush() self.session.rollback() self.session.add(Foo()) self.session.commit() def tearDown(self): self.session.close() # rollback - everything that happened with the # Session above (including calls to commit()) # is rolled back. self.trans.rollback() # return connection to the Engine self.connection.close()
上述配方是 SQLAlchemy 自己的 CI 的一部分,以确保它仍然按预期工作。 ## 管理事务
在 1.4 版本中更改:会话事务管理已经进行了修改,使其更清晰、更易于使用。特别是,现在它具有“自动开始”操作,这意味着可以控制事务开始的时间点,而无需使用传统的“自动提交”模式。
Session
跟踪一次性的“虚拟”事务的状态,使用一个叫做 SessionTransaction
的对象。然后,该对象利用底层的 Engine
或引擎来启动使用 Connection
对象所需的真实连接级事务。
当需要时,这个“虚拟”事务会自动创建,或者可以使用 Session.begin()
方法手动开始。尽可能大程度地支持 Python 上下文管理器的使用,不仅在创建 Session
对象的级别上,还在维护 SessionTransaction
的范围上。
下面,假设我们从一个 Session
开始:
from sqlalchemy.orm import Session session = Session(engine)
现在我们可以使用上下文管理器在标记的事务中运行操作:
with session.begin(): session.add(some_object()) session.add(some_other_object()) # commits transaction at the end, or rolls back if there # was an exception raised
在上述上下文结束时,假设没有引发任何异常,任何待处理的对象都将被刷新到数据库,并且数据库事务将被提交。如果在上述块中引发了异常,则事务将被回滚。在这两种情况下,上述 Session
在退出块后都可以在后续事务中使用。
Session.begin()
方法是可选的,Session
也可以使用逐步提交的方法,在需要时自动开始事务;只需提交或回滚:
session = Session(engine) session.add(some_object()) session.add(some_other_object()) session.commit() # commits # will automatically begin again result = session.execute(text("< some select statement >")) session.add_all([more_objects, ...]) session.commit() # commits session.add(still_another_object) session.flush() # flush still_another_object session.rollback() # rolls back still_another_object
Session
本身具有Session.close()
方法。如果Session
是在尚未提交或回滚的事务内开始的,则此方法将取消(即回滚)该事务,并且还将清除Session
对象状态中包含的所有对象。如果Session
的使用方式不保证调用Session.commit()
或Session.rollback()
(例如不在上下文管理器或类似位置),则可以使用close
方法确保释放所有资源:
# expunges all objects, releases all transactions unconditionally # (with rollback), releases all database connections back to their # engines session.close()
最后,会话构建/关闭过程本身也可以通过上下文管理器运行。这是确保Session
对象使用范围在固定块内的最佳方法。首先通过Session
构造函数进行说明:
with Session(engine) as session: session.add(some_object()) session.add(some_other_object()) session.commit() # commits session.add(still_another_object) session.flush() # flush still_another_object session.commit() # commits result = session.execute(text("<some SELECT statement>")) # remaining transactional state from the .execute() call is # discarded
同样,sessionmaker
也可以以相同的方式使用:
Session = sessionmaker(engine) with Session() as session: with session.begin(): session.add(some_object) # commits # closes the Session
sessionmaker
本身包括一个sessionmaker.begin()
方法,允许同时执行这两个操作:
with Session.begin() as session: session.add(some_object)
SqlAlchemy 2.0 中文文档(二十三)(4)https://developer.aliyun.com/article/1560521