SqlAlchemy 2.0 中文文档(二十三)(3)https://developer.aliyun.com/article/1560520
使用 SAVEPOINT
如果底层引擎支持 SAVEPOINT 事务,则可以使用Session.begin_nested()
方法来区分 SAVEPOINT 事务:
Session = sessionmaker() with Session.begin() as session: session.add(u1) session.add(u2) nested = session.begin_nested() # establish a savepoint session.add(u3) nested.rollback() # rolls back u3, keeps u1 and u2 # commits u1 and u2
每次调用Session.begin_nested()
时,都会在当前数据库事务的范围内(如果尚未开始,则开始一个新的事务)向数据库发送新的“BEGIN SAVEPOINT”命令,并返回一个类型为SessionTransaction
的对象,该对象表示对此 SAVEPOINT 的句柄。当调用该对象的.commit()
方法时,将向数据库发出“RELEASE SAVEPOINT”命令;如果调用.rollback()
方法,则发出“ROLLBACK TO SAVEPOINT”命令。封闭的数据库事务保持进行中。
Session.begin_nested()
通常用作上下文管理器,其中可以捕获特定的每个实例错误,与事务状态的部分回滚一起使用,而不是回滚整个事务,如下例所示:
for record in records: try: with session.begin_nested(): session.merge(record) except: print("Skipped record %s" % record) session.commit()
当由Session.begin_nested()
生成的上下文管理器完成时,它“提交”了保存点,其中包括刷新所有待定状态的通常行为。当发生错误时,保存点被回滚,并且被更改的对象的Session
本地状态会被过期。
这种模式非常适合诸如使用 PostgreSQL 并捕获IntegrityError
以检测重复行的情况;当出现此类错误时,PostgreSQL 通常会中止整个事务,但是在使用 SAVEPOINT 时,外部事务会被维持。在下面的示例中,一组数据被持久化到数据库中,偶尔会跳过“重复的主键”记录,而不会回滚整个操作:
from sqlalchemy import exc with session.begin(): for record in records: try: with session.begin_nested(): obj = SomeRecord(id=record["identifier"], name=record["name"]) session.add(obj) except exc.IntegrityError: print(f"Skipped record {record} - row already exists")
当调用Session.begin_nested()
时,Session
首先将当前所有待定状态刷新到数据库;这是无条件发生的,不管Session.autoflush
参数的值如何,该参数通常用于禁用自动刷新。这种行为的理由是,当在这个嵌套事务上发生回滚时,Session
可以使在 SAVEPOINT 范围内创建的任何内存状态过期,同时确保当这些过期对象被刷新时,SAVEPOINT 开始之前的对象图状态可重新从数据库加载。
在现代版本的 SQLAlchemy 中,当由Session.begin_nested()
发起的 SAVEPOINT 被回滚时,自从创建 SAVEPOINT 以来被修改的内存对象状态会被过期,然而自 SAVEPOINT 开始以来未被改变的其他对象状态会被保留。这样,后续操作可以继续使用未受影响的数据,而无需从数据库中刷新。
另请参阅
Connection.begin_nested()
- 核心 SAVEPOINT API ### 会话级别 vs. 引擎级别的事务控制
在核心中的连接
和 ORM 中的_session.Session
具有等效的事务语义,都在sessionmaker
与引擎
的级别,以及会话
与连接
的级别。以下各节根据以下方案详细说明了这些情况:
ORM Core ----------------------------------------- ----------------------------------- sessionmaker Engine Session Connection sessionmaker.begin() Engine.begin() some_session.commit() some_connection.commit() with some_sessionmaker() as session: with some_engine.connect() as conn: with some_sessionmaker.begin() as session: with some_engine.begin() as conn: with some_session.begin_nested() as sp: with some_connection.begin_nested() as sp:
边做边提交
会话
和连接
都具有Connection.commit()
和Connection.rollback()
方法。使用 SQLAlchemy 2.0 风格的操作,这些方法在所有情况下都会影响最外层的事务。对于会话
,假定Session.autobegin
保持默认值True
。
引擎
:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.connect() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) conn.commit()
会话
:
Session = sessionmaker(engine) with Session() as session: session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) session.commit()
单次开始
sessionmaker
和引擎
都具有Engine.begin()
方法,该方法将获取一个用于执行 SQL 语句的新对象(分别是会话
和连接
),然后返回一个上下文管理器,该管理器将为该对象维护一个开始/提交/回滚的上下文。
引擎:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.begin() as conn: conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) # commits and closes automatically
会话:
Session = sessionmaker(engine) with Session.begin() as session: session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) # commits and closes automatically
嵌套事务
当使用 SAVEPOINT 通过 Session.begin_nested()
或 Connection.begin_nested()
方法时,返回的事务对象必须用于提交或回滚 SAVEPOINT。调用 Session.commit()
或 Connection.commit()
方法将始终提交最外层事务;这是 SQLAlchemy 2.0 特定于行为的,与 1.x 系列相反。
引擎:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname") with engine.begin() as conn: savepoint = conn.begin_nested() conn.execute( some_table.insert(), [ {"data": "some data one"}, {"data": "some data two"}, {"data": "some data three"}, ], ) savepoint.commit() # or rollback # commits automatically
会话:
Session = sessionmaker(engine) with Session.begin() as session: savepoint = session.begin_nested() session.add_all( [ SomeClass(data="some data one"), SomeClass(data="some data two"), SomeClass(data="some data three"), ] ) savepoint.commit() # or rollback # commits automatically ```### 显式开始 `Session` 具有“自动开始”行为,这意味着一旦操作开始进行,它就会确保存在一个用于跟踪正在进行的操作的 `SessionTransaction`。当调用 `Session.commit()` 时,该事务将被完成。 通常希望特别是在框架集成中,控制“开始”操作发生的时机。为此,`Session` 使用“自动开始”策略,使得可以直接调用 `Session.begin()` 方法来为尚未启动事务的 `Session` 开始事务: ```py Session = sessionmaker(bind=engine) session = Session() session.begin() try: item1 = session.get(Item, 1) item2 = session.get(Item, 2) item1.foo = "bar" item2.bar = "foo" session.commit() except: session.rollback() raise
上述模式更惯用地使用上下文管理器调用:
Session = sessionmaker(bind=engine) session = Session() with session.begin(): item1 = session.get(Item, 1) item2 = session.get(Item, 2) item1.foo = "bar" item2.bar = "foo"
Session.begin()
方法和会话的“自动开始”过程使用相同的步骤序列开始事务。这包括当 SessionEvents.after_transaction_create()
事件发生时调用;此钩子被框架用于将其自己的事务处理过程与 ORM Session
集成。 ### 启用两阶段提交
对于支持两阶段操作的后端(当前为 MySQL 和 PostgreSQL),可以指示会话使用两阶段提交语义。这将协调跨数据库的事务提交,以便在所有数据库中提交或回滚事务。您还可以Session.prepare()
会话以与 SQLAlchemy 未管理的事务进行交互。要使用两阶段事务,请在会话上设置标志 twophase=True
:
engine1 = create_engine("postgresql+psycopg2://db1") engine2 = create_engine("postgresql+psycopg2://db2") Session = sessionmaker(twophase=True) # bind User operations to engine 1, Account operations to engine 2 Session.configure(binds={User: engine1, Account: engine2}) session = Session() # .... work with accounts and users # commit. session will issue a flush to all DBs, and a prepare step to all DBs, # before committing both transactions session.commit() ```### 设置事务隔离级别 / DBAPI AUTOCOMMIT 大多数 DBAPI 支持可配置的事务隔离级别概念。传统上有四个级别:“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”。这些通常在 DBAPI 连接开始新事务之前应用,需要注意的是,大多数 DBAPI 在首次发出 SQL 语句时会隐式开始此事务。 支持隔离级别的 DBAPI 通常还支持真实的“自动提交”概念,这意味着 DBAPI 连接本身将被放置在非事务自动提交模式中。这通常意味着自动向数据库发出“BEGIN”的典型 DBAPI 行为不再发生,但也可能包括其他指令。在使用此模式时,**DBAPI 在任何情况下都不使用事务**。SQLAlchemy 方法如`.begin()`、`.commit()` 和 `.rollback()` 会静默通过。 SQLAlchemy 的方言支持在每个`Engine` 或每个`Connection` 的基础上设置隔离模式,使用`create_engine()` 层次上以及 `Connection.execution_options()` 层次上的标志。 当使用 ORM `Session` 时,它充当引擎和连接的*外观*,但不直接暴露事务隔离。因此,为了影响事务隔离级别,我们需要在适当时对`Engine` 或 `Connection` 进行操作。 另请参阅 设置事务隔离级别,包括 DBAPI 自动提交 - 一定要查看 SQLAlchemy `Connection` 对象级别的隔离级别是如何工作的。 #### 为 Sessionmaker / Engine 设置隔离级别 要为 `Session` 或 `sessionmaker` 设置特定的隔离级别,全局首选技巧是可以始终根据特定的隔离级别构建一个 `Engine`,然后将其用作 `Session` 和/或 `sessionmaker` 的连接源: ```py from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine( "postgresql+psycopg2://scott:tiger@localhost/test", isolation_level="REPEATABLE READ", ) Session = sessionmaker(eng)
另一个选项,如果同时存在具有不同隔离级别的两个引擎,是使用 Engine.execution_options()
方法,该方法将产生原始 Engine
的浅拷贝,该拷贝与父引擎共享相同的连接池。当操作将被分为“事务性”和“自动提交”操作时,通常最好使用此方法:
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT") transactional_session = sessionmaker(eng) autocommit_session = sessionmaker(autocommit_engine)
在上述情况中,“eng
” 和 "autocommit_engine"
共享相同的方言和连接池。但是,当从 autocommit_engine
获取连接时,将设置“AUTOCOMMIT”模式。当这两个 sessionmaker
对象“transactional_session
” 和 “autocommit_session"
与数据库连接一起工作时,它们会继承这些特性。
“autocommit_session
” 仍然具有事务语义,包括 Session.commit()
和 Session.rollback()
仍然将其自己视为“提交”和“回滚”对象,但是事务将会默默消失。因此,通常情况下,虽然不是严格要求,但 AUTOCOMMIT 隔离级别的会话应该以只读方式使用,也就是:
with autocommit_session() as session: some_objects = session.execute(text("<statement>")) some_other_objects = session.execute(text("<statement>")) # closes connection
设置单独会话的隔离级别
当我们创建一个新的 Session
,无论是直接使用构造函数还是调用由 sessionmaker
生成的可调用函数时,我们都可以直接传递 bind
参数,覆盖预先存在的绑定。例如,我们可以从默认的 sessionmaker
创建我们的 Session
,并传递一个设置为自动提交的引擎:
plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT") # will normally use plain_engine Session = sessionmaker(plain_engine) # make a specific Session that will use the "autocommit" engine with Session(bind=autocommit_engine) as session: # work with session ...
对于配置了多个“绑定”(Session
或sessionmaker
)的情况,我们可以重新完全指定binds
参数,或者如果我们只想替换特定的绑定,则可以使用Session.bind_mapper()
或Session.bind_table()
方法:
with Session() as session: session.bind_mapper(User, autocommit_engine)
为单个事务设置隔离级别
关于隔离级别的一个关键注意事项是,不能在已经开始事务的Connection
上安全地修改设置。数据库不能更改正在进行的事务的隔离级别,并且一些 DBAPIs 和 SQLAlchemy 方言在这个领域的行为不一致。
因此,最好使用一个与所需隔离级别的引擎直接绑定的Session
。然而,可以通过在事务开始时使用Session.connection()
方法来影响每个连接的隔离级别:
from sqlalchemy.orm import Session # assume session just constructed sess = Session(bind=engine) # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a real # database transaction. sess.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # ... work with session in SERIALIZABLE isolation level... # commit transaction. the connection is released # and reverted to its previous isolation level. sess.commit() # subsequent to commit() above, a new transaction may be begun if desired, # which will proceed with the previous default isolation level unless # it is set again.
在上面的例子中,我们首先使用构造函数或sessionmaker
生成一个Session
。然后,我们通过调用Session.connection()
显式设置数据库级别事务的开始,该方法提供了将传递给连接的执行选项,在开始数据库级别事务之前进行设置。事务使用所选的隔离级别进行。事务完成后,将在将连接返回到连接池之前将连接上的隔离级别重置为其默认值。
Session.begin()
方法也可以用于开始Session
级别的事务;在调用该方法后,可以使用Session.connection()
设置每个连接的事务隔离级别:
sess = Session(bind=engine) with sess.begin(): # call connection() with options before any other operations proceed. # this will procure a new connection from the bound engine and begin a # real database transaction. sess.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # ... work with session in SERIALIZABLE isolation level... # outside the block, the transaction has been committed. the connection is # released and reverted to its previous isolation level.
使用事件跟踪事务状态
请参阅事务事件部分,了解会话事务状态更改的可用事件挂钩的概述。
使用保存点
如果底层引擎支持 SAVEPOINT 事务,则可以使用Session.begin_nested()
方法进行分割:
Session = sessionmaker() with Session.begin() as session: session.add(u1) session.add(u2) nested = session.begin_nested() # establish a savepoint session.add(u3) nested.rollback() # rolls back u3, keeps u1 and u2 # commits u1 and u2
每次调用Session.begin_nested()
时,都会在当前数据库事务的范围内(如果尚未开始,则开始一个)向数据库发出新的“BEGIN SAVEPOINT”命令,并返回一个SessionTransaction
类型的对象,该对象表示对此保存点的句柄。当调用此对象的.commit()
方法时,将向数据库发出“RELEASE SAVEPOINT”,如果调用.rollback()
方法,则会发出“ROLLBACK TO SAVEPOINT”。外层数据库事务仍然在进行中。
Session.begin_nested()
通常用作上下文管理器,其中可以捕获特定的每个实例错误,在此事务状态的一部分上发出回滚,而无需回滚整个事务,就像下面的示例中一样:
for record in records: try: with session.begin_nested(): session.merge(record) except: print("Skipped record %s" % record) session.commit()
当由Session.begin_nested()
生成的上下文管理器完成时,它会“提交”保存点,其中包括刷新所有待处理状态的通常行为。当出现错误时,保存点会被回滚,并且对已更改的对象的Session
的状态将被过期。
此模式非常适合于使用 PostgreSQL 并捕获IntegrityError
以检测重复行的情况;当引发此类错误时,PostgreSQL 通常会中止整个事务,但是当使用 SAVEPOINT 时,外部事务会得以保留。在下面的示例中,将一系列数据持久化到数据库中,偶尔会跳过“重复主键”记录,而不会回滚整个操作:
from sqlalchemy import exc with session.begin(): for record in records: try: with session.begin_nested(): obj = SomeRecord(id=record["identifier"], name=record["name"]) session.add(obj) except exc.IntegrityError: print(f"Skipped record {record} - row already exists")
当调用Session.begin_nested()
时,Session
首先会将当前所有待定状态刷新到数据库;无论Session.autoflush
参数的值是什么,这都是无条件的,通常可以用来禁用自动刷新。这种行为的原因是当此嵌套事务上发生回滚时,Session
可以使在保存点范围内创建的任何内存状态过期,同时确保在刷新这些过期对象时,保存点开始前的对象图状态将可用于重新从数据库加载。
在现代版本的 SQLAlchemy 中,当由Session.begin_nested()
初始化的保存点被回滚时,自从保存点创建以来被修改的内存对象状态将会被过期,但是其他自保存点开始时未改变的对象状态将会被保留。这样做是为了让后续操作可以继续使用那些未受影响的数据,而无需从数据库中刷新。
另请参阅
Connection.begin_nested()
- 核心保存点 API
SqlAlchemy 2.0 中文文档(二十三)(5)https://developer.aliyun.com/article/1560525