SqlAlchemy 2.0 中文文档(二十三)(4)

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: SqlAlchemy 2.0 中文文档(二十三)

SqlAlchemy 2.0 中文文档(二十三)(3)https://developer.aliyun.com/article/1560520


使用 SAVEPOINT

如果底层引擎支持 SAVEPOINT 事务,则可以使用Session.begin_nested()方法来区分 SAVEPOINT 事务:

Session = sessionmaker()
with Session.begin() as session:
    session.add(u1)
    session.add(u2)
    nested = session.begin_nested()  # establish a savepoint
    session.add(u3)
    nested.rollback()  # rolls back u3, keeps u1 and u2
# commits u1 and u2

每次调用Session.begin_nested()时,都会在当前数据库事务的范围内(如果尚未开始,则开始一个新的事务)向数据库发送新的“BEGIN SAVEPOINT”命令,并返回一个类型为SessionTransaction的对象,该对象表示对此 SAVEPOINT 的句柄。当调用该对象的.commit()方法时,将向数据库发出“RELEASE SAVEPOINT”命令;如果调用.rollback()方法,则发出“ROLLBACK TO SAVEPOINT”命令。封闭的数据库事务保持进行中。

Session.begin_nested()通常用作上下文管理器,其中可以捕获特定的每个实例错误,与事务状态的部分回滚一起使用,而不是回滚整个事务,如下例所示:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

当由Session.begin_nested()生成的上下文管理器完成时,它“提交”了保存点,其中包括刷新所有待定状态的通常行为。当发生错误时,保存点被回滚,并且被更改的对象的Session本地状态会被过期。

这种模式非常适合诸如使用 PostgreSQL 并捕获IntegrityError以检测重复行的情况;当出现此类错误时,PostgreSQL 通常会中止整个事务,但是在使用 SAVEPOINT 时,外部事务会被维持。在下面的示例中,一组数据被持久化到数据库中,偶尔会跳过“重复的主键”记录,而不会回滚整个操作:

from sqlalchemy import exc
with session.begin():
    for record in records:
        try:
            with session.begin_nested():
                obj = SomeRecord(id=record["identifier"], name=record["name"])
                session.add(obj)
        except exc.IntegrityError:
            print(f"Skipped record {record} - row already exists")

当调用Session.begin_nested()时,Session首先将当前所有待定状态刷新到数据库;这是无条件发生的,不管Session.autoflush参数的值如何,该参数通常用于禁用自动刷新。这种行为的理由是,当在这个嵌套事务上发生回滚时,Session可以使在 SAVEPOINT 范围内创建的任何内存状态过期,同时确保当这些过期对象被刷新时,SAVEPOINT 开始之前的对象图状态可重新从数据库加载。

在现代版本的 SQLAlchemy 中,当由Session.begin_nested()发起的 SAVEPOINT 被回滚时,自从创建 SAVEPOINT 以来被修改的内存对象状态会被过期,然而自 SAVEPOINT 开始以来未被改变的其他对象状态会被保留。这样,后续操作可以继续使用未受影响的数据,而无需从数据库中刷新。

另请参阅

Connection.begin_nested() - 核心 SAVEPOINT API ### 会话级别 vs. 引擎级别的事务控制

在核心中的连接和 ORM 中的_session.Session具有等效的事务语义,都在sessionmaker引擎的级别,以及会话连接的级别。以下各节根据以下方案详细说明了这些情况:

ORM                                           Core
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:
边做边提交

会话连接都具有Connection.commit()Connection.rollback()方法。使用 SQLAlchemy 2.0 风格的操作,这些方法在所有情况下都会影响最外层的事务。对于会话,假定Session.autobegin保持默认值True

引擎

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    conn.commit()

会话

Session = sessionmaker(engine)
with Session() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    session.commit()
单次开始

sessionmaker引擎都具有Engine.begin()方法,该方法将获取一个用于执行 SQL 语句的新对象(分别是会话连接),然后返回一个上下文管理器,该管理器将为该对象维护一个开始/提交/回滚的上下文。

引擎:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
# commits and closes automatically

会话:

Session = sessionmaker(engine)
with Session.begin() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
# commits and closes automatically
嵌套事务

当使用 SAVEPOINT 通过 Session.begin_nested()Connection.begin_nested() 方法时,返回的事务对象必须用于提交或回滚 SAVEPOINT。调用 Session.commit()Connection.commit() 方法将始终提交最外层事务;这是 SQLAlchemy 2.0 特定于行为的,与 1.x 系列相反。

引擎:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
    savepoint = conn.begin_nested()
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    savepoint.commit()  # or rollback
# commits automatically

会话:

Session = sessionmaker(engine)
with Session.begin() as session:
    savepoint = session.begin_nested()
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    savepoint.commit()  # or rollback
# commits automatically
```### 显式开始
`Session` 具有“自动开始”行为,这意味着一旦操作开始进行,它就会确保存在一个用于跟踪正在进行的操作的 `SessionTransaction`。当调用 `Session.commit()` 时,该事务将被完成。
通常希望特别是在框架集成中,控制“开始”操作发生的时机。为此,`Session` 使用“自动开始”策略,使得可以直接调用 `Session.begin()` 方法来为尚未启动事务的 `Session` 开始事务:
```py
Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"
    session.commit()
except:
    session.rollback()
    raise

上述模式更惯用地使用上下文管理器调用:

Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"

Session.begin() 方法和会话的“自动开始”过程使用相同的步骤序列开始事务。这包括当 SessionEvents.after_transaction_create() 事件发生时调用;此钩子被框架用于将其自己的事务处理过程与 ORM Session 集成。 ### 启用两阶段提交

对于支持两阶段操作的后端(当前为 MySQL 和 PostgreSQL),可以指示会话使用两阶段提交语义。这将协调跨数据库的事务提交,以便在所有数据库中提交或回滚事务。您还可以Session.prepare() 会话以与 SQLAlchemy 未管理的事务进行交互。要使用两阶段事务,请在会话上设置标志 twophase=True

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")
Session = sessionmaker(twophase=True)
# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})
session = Session()
# .... work with accounts and users
# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()
```### 设置事务隔离级别 / DBAPI AUTOCOMMIT
大多数 DBAPI 支持可配置的事务隔离级别概念。传统上有四个级别:“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”。这些通常在 DBAPI 连接开始新事务之前应用,需要注意的是,大多数 DBAPI 在首次发出 SQL 语句时会隐式开始此事务。
支持隔离级别的 DBAPI 通常还支持真实的“自动提交”概念,这意味着 DBAPI 连接本身将被放置在非事务自动提交模式中。这通常意味着自动向数据库发出“BEGIN”的典型 DBAPI 行为不再发生,但也可能包括其他指令。在使用此模式时,**DBAPI 在任何情况下都不使用事务**。SQLAlchemy 方法如`.begin()`、`.commit()` 和 `.rollback()` 会静默通过。
SQLAlchemy 的方言支持在每个`Engine` 或每个`Connection` 的基础上设置隔离模式,使用`create_engine()` 层次上以及 `Connection.execution_options()` 层次上的标志。
当使用 ORM `Session` 时,它充当引擎和连接的*外观*,但不直接暴露事务隔离。因此,为了影响事务隔离级别,我们需要在适当时对`Engine` 或 `Connection` 进行操作。
另请参阅
设置事务隔离级别,包括 DBAPI 自动提交 - 一定要查看 SQLAlchemy `Connection` 对象级别的隔离级别是如何工作的。
#### 为 Sessionmaker / Engine 设置隔离级别
要为 `Session` 或 `sessionmaker` 设置特定的隔离级别,全局首选技巧是可以始终根据特定的隔离级别构建一个 `Engine`,然后将其用作 `Session` 和/或 `sessionmaker` 的连接源:
```py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    isolation_level="REPEATABLE READ",
)
Session = sessionmaker(eng)

另一个选项,如果同时存在具有不同隔离级别的两个引擎,是使用 Engine.execution_options() 方法,该方法将产生原始 Engine 的浅拷贝,该拷贝与父引擎共享相同的连接池。当操作将被分为“事务性”和“自动提交”操作时,通常最好使用此方法:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)

在上述情况中,“eng” 和 "autocommit_engine" 共享相同的方言和连接池。但是,当从 autocommit_engine 获取连接时,将设置“AUTOCOMMIT”模式。当这两个 sessionmaker 对象“transactional_session” 和 “autocommit_session" 与数据库连接一起工作时,它们会继承这些特性。

autocommit_session” 仍然具有事务语义,包括 Session.commit()Session.rollback() 仍然将其自己视为“提交”和“回滚”对象,但是事务将会默默消失。因此,通常情况下,虽然不是严格要求,但 AUTOCOMMIT 隔离级别的会话应该以只读方式使用,也就是:

with autocommit_session() as session:
    some_objects = session.execute(text("<statement>"))
    some_other_objects = session.execute(text("<statement>"))
# closes connection
设置单独会话的隔离级别

当我们创建一个新的 Session,无论是直接使用构造函数还是调用由 sessionmaker 生成的可调用函数时,我们都可以直接传递 bind 参数,覆盖预先存在的绑定。例如,我们可以从默认的 sessionmaker 创建我们的 Session,并传递一个设置为自动提交的引擎:

plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")
# will normally use plain_engine
Session = sessionmaker(plain_engine)
# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session
    ...

对于配置了多个“绑定”(Sessionsessionmaker)的情况,我们可以重新完全指定binds参数,或者如果我们只想替换特定的绑定,则可以使用Session.bind_mapper()Session.bind_table()方法:

with Session() as session:
    session.bind_mapper(User, autocommit_engine)
为单个事务设置隔离级别

关于隔离级别的一个关键注意事项是,不能在已经开始事务的Connection上安全地修改设置。数据库不能更改正在进行的事务的隔离级别,并且一些 DBAPIs 和 SQLAlchemy 方言在这个领域的行为不一致。

因此,最好使用一个与所需隔离级别的引擎直接绑定的Session。然而,可以通过在事务开始时使用Session.connection()方法来影响每个连接的隔离级别:

from sqlalchemy.orm import Session
# assume session just constructed
sess = Session(bind=engine)
# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# ... work with session in SERIALIZABLE isolation level...
# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()
# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.

在上面的例子中,我们首先使用构造函数或sessionmaker生成一个Session。然后,我们通过调用Session.connection()显式设置数据库级别事务的开始,该方法提供了将传递给连接的执行选项,在开始数据库级别事务之前进行设置。事务使用所选的隔离级别进行。事务完成后,将在将连接返回到连接池之前将连接上的隔离级别重置为其默认值。

Session.begin()方法也可以用于开始Session级别的事务;在调用该方法后,可以使用Session.connection()设置每个连接的事务隔离级别:

sess = Session(bind=engine)
with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
    # ... work with session in SERIALIZABLE isolation level...
# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.

使用事件跟踪事务状态

请参阅事务事件部分,了解会话事务状态更改的可用事件挂钩的概述。

使用保存点

如果底层引擎支持 SAVEPOINT 事务,则可以使用Session.begin_nested()方法进行分割:

Session = sessionmaker()
with Session.begin() as session:
    session.add(u1)
    session.add(u2)
    nested = session.begin_nested()  # establish a savepoint
    session.add(u3)
    nested.rollback()  # rolls back u3, keeps u1 and u2
# commits u1 and u2

每次调用Session.begin_nested()时,都会在当前数据库事务的范围内(如果尚未开始,则开始一个)向数据库发出新的“BEGIN SAVEPOINT”命令,并返回一个SessionTransaction类型的对象,该对象表示对此保存点的句柄。当调用此对象的.commit()方法时,将向数据库发出“RELEASE SAVEPOINT”,如果调用.rollback()方法,则会发出“ROLLBACK TO SAVEPOINT”。外层数据库事务仍然在进行中。

Session.begin_nested()通常用作上下文管理器,其中可以捕获特定的每个实例错误,在此事务状态的一部分上发出回滚,而无需回滚整个事务,就像下面的示例中一样:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

当由Session.begin_nested()生成的上下文管理器完成时,它会“提交”保存点,其中包括刷新所有待处理状态的通常行为。当出现错误时,保存点会被回滚,并且对已更改的对象的Session的状态将被过期。

此模式非常适合于使用 PostgreSQL 并捕获IntegrityError以检测重复行的情况;当引发此类错误时,PostgreSQL 通常会中止整个事务,但是当使用 SAVEPOINT 时,外部事务会得以保留。在下面的示例中,将一系列数据持久化到数据库中,偶尔会跳过“重复主键”记录,而不会回滚整个操作:

from sqlalchemy import exc
with session.begin():
    for record in records:
        try:
            with session.begin_nested():
                obj = SomeRecord(id=record["identifier"], name=record["name"])
                session.add(obj)
        except exc.IntegrityError:
            print(f"Skipped record {record} - row already exists")

当调用Session.begin_nested()时,Session首先会将当前所有待定状态刷新到数据库;无论Session.autoflush参数的值是什么,这都是无条件的,通常可以用来禁用自动刷新。这种行为的原因是当此嵌套事务上发生回滚时,Session可以使在保存点范围内创建的任何内存状态过期,同时确保在刷新这些过期对象时,保存点开始前的对象图状态将可用于重新从数据库加载。

在现代版本的 SQLAlchemy 中,当由Session.begin_nested()初始化的保存点被回滚时,自从保存点创建以来被修改的内存对象状态将会被过期,但是其他自保存点开始时未改变的对象状态将会被保留。这样做是为了让后续操作可以继续使用那些未受影响的数据,而无需从数据库中刷新。

另请参阅

Connection.begin_nested() - 核心保存点 API


SqlAlchemy 2.0 中文文档(二十三)(5)https://developer.aliyun.com/article/1560525

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
3月前
|
SQL 数据库 Python
SqlAlchemy 2.0 中文文档(二十六)(4)
SqlAlchemy 2.0 中文文档(二十六)
43 2
|
3月前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(二十二)(3)
SqlAlchemy 2.0 中文文档(二十二)
19 5
|
3月前
|
数据库 Python 容器
SqlAlchemy 2.0 中文文档(十四)(4)
SqlAlchemy 2.0 中文文档(十四)
26 1
|
3月前
|
API 数据库 C++
SqlAlchemy 2.0 中文文档(十四)(1)
SqlAlchemy 2.0 中文文档(十四)
22 1
|
3月前
|
API 数据库 Python
SqlAlchemy 2.0 中文文档(十四)(5)
SqlAlchemy 2.0 中文文档(十四)
35 1
|
3月前
|
API 数据库 Python
SqlAlchemy 2.0 中文文档(十四)(2)
SqlAlchemy 2.0 中文文档(十四)
28 1
|
3月前
|
数据库 Python 容器
SqlAlchemy 2.0 中文文档(十四)(3)
SqlAlchemy 2.0 中文文档(十四)
25 1
|
3月前
|
SQL 关系型数据库 数据库
SqlAlchemy 2.0 中文文档(十七)(1)
SqlAlchemy 2.0 中文文档(十七)
21 1
|
3月前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(二十二)(2)
SqlAlchemy 2.0 中文文档(二十二)
32 3
|
3月前
|
SQL 存储 数据库连接
SqlAlchemy 2.0 中文文档(二十二)(1)
SqlAlchemy 2.0 中文文档(二十二)
36 2