SqlAlchemy 2.0 中文文档(二十三)(3)

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: SqlAlchemy 2.0 中文文档(二十三)

SqlAlchemy 2.0 中文文档(二十三)(2)https://developer.aliyun.com/article/1560516


使用 SAVEPOINT

如果底层引擎支持,可以使用 Session.begin_nested() 方法来划定 SAVEPOINT 事务:

Session = sessionmaker()
with Session.begin() as session:
    session.add(u1)
    session.add(u2)
    nested = session.begin_nested()  # establish a savepoint
    session.add(u3)
    nested.rollback()  # rolls back u3, keeps u1 and u2
# commits u1 and u2

每次调用 Session.begin_nested(),都会在当前数据库事务的范围内向数据库发出一个新的“BEGIN SAVEPOINT”命令(如果尚未开始,则开始一个),并返回一个类型为 SessionTransaction 的对象,代表这个 SAVEPOINT 的句柄。当调用该对象的 .commit() 方法时,会向数据库发出“RELEASE SAVEPOINT”,如果调用 .rollback() 方法,则会发出“ROLLBACK TO SAVEPOINT”。封闭的数据库事务仍在进行中。

Session.begin_nested() 通常用作上下文管理器,可以捕获特定的每个实例错误,并在该事务状态的部分发出回滚,而不会回滚整个事务,如下例所示:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

当由 Session.begin_nested() 产生的上下文管理器完成时,“提交” savepoint,其中包括刷新所有挂起状态的常规行为。当出现错误时,保存点会被回滚,并且更改的对象的 Session 本地状态会过期。

这种模式非常适用于诸如使用 PostgreSQL 并捕获 IntegrityError 来检测重复行的情况;通常情况下,当出现此类错误时,PostgreSQL 会中止整个事务,但是使用 SAVEPOINT 时,外部事务会得以保留。在下面的示例中,将一系列数据持久化到数据库中,并且偶尔会跳过“重复的主键”记录,而无需回滚整个操作:

from sqlalchemy import exc
with session.begin():
    for record in records:
        try:
            with session.begin_nested():
                obj = SomeRecord(id=record["identifier"], name=record["name"])
                session.add(obj)
        except exc.IntegrityError:
            print(f"Skipped record {record} - row already exists")

当调用 Session.begin_nested() 时,Session 首先会将当前所有挂起的状态刷新到数据库;这种情况会无条件发生,不管 Session.autoflush 参数的值是什么,该参数通常用于禁用自动刷新。这种行为的原因是,当此嵌套事务发生回滚时,Session 可以使在 SAVEPOINT 范围内创建的任何内存状态过期,同时确保在刷新这些过期对象时,SAVEPOINT 开始之前的对象图状态将可用于重新从数据库加载。

在 SQLAlchemy 的现代版本中,当由 Session.begin_nested() 启动的 SAVEPOINT 被回滚时,自 SAVEPOINT 创建以来已修改的内存对象状态会过期,但自 SAVEPOINT 开始后未修改的其他对象状态将保持不变。这样,后续操作可以继续使用其它未受影响的数据,而无需从数据库刷新。

另请参见

Connection.begin_nested() - 核心 SAVEPOINT API ### 会话级别与引擎级别的事务控制

在核心中的Connection和 ORM 中的_session.Session具有等效的事务语义,无论是在sessionmakerEngine的级别,还是在SessionConnection的级别。以下部分根据以下方案详细说明这些情况:

ORM                                           Core
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:
边提交边进行

SessionConnection都具有Connection.commit()Connection.rollback()方法。使用 SQLAlchemy 2.0 风格的操作,这些方法在所有情况下都会影响最外层的事务。对于Session,假定Session.autobegin保持默认值True

Engine:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    conn.commit()

Session:

Session = sessionmaker(engine)
with Session() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    session.commit()
一次开始

sessionmakerEngine都具有Engine.begin()方法,该方法将获取一个新对象来执行 SQL 语句(分别是SessionConnection),然后返回一个上下文管理器,用于维护该对象的开始/提交/回滚上下文。

引擎:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
# commits and closes automatically

会话:

Session = sessionmaker(engine)
with Session.begin() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
# commits and closes automatically
嵌套事务

使用 SAVEPOINT 通过 Session.begin_nested()Connection.begin_nested() 方法时,必须使用返回的事务对象来提交或回滚 SAVEPOINT。 调用 Session.commit()Connection.commit() 方法将始终提交最外层事务;这是 SQLAlchemy 2.0 特定的行为,与 1.x 系列相反。

引擎:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
    savepoint = conn.begin_nested()
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    savepoint.commit()  # or rollback
# commits automatically

会话:

Session = sessionmaker(engine)
with Session.begin() as session:
    savepoint = session.begin_nested()
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    savepoint.commit()  # or rollback
# commits automatically
```### 显式开始
`Session` 具有“自动开始”行为,这意味着一旦操作开始进行,它就会确保存在一个 `SessionTransaction` 来跟踪正在进行的操作。 当调用 `Session.commit()` 时,此事务将完成。
通常希望,特别是在框架集成中,控制“开始”操作发生的时间点。 为此,`Session` 使用“自动开始”策略,使得可以直接调用 `Session.begin()` 方法,以便为尚未启动事务的 `Session` 调用:
```py
Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"
    session.commit()
except:
    session.rollback()
    raise

上述模式更常用地使用上下文管理器调用:

Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"

Session.begin() 方法和会话的“自动开始”过程使用相同的步骤序列开始事务。 这包括在发生时调用 SessionEvents.after_transaction_create() 事件;此挂钩被框架用于将其自己的事务处理过程与 ORM Session 集成。

对于支持两阶段操作的后端(目前支持 MySQL 和 PostgreSQL),会话可以被指示使用两阶段提交语义。这将协调跨数据库的事务提交,以便在所有数据库中要么提交事务,要么回滚事务。您还可以Session.prepare() 会话以与 SQLAlchemy 不管理的事务进行交互。要使用两阶段事务,请在会话上设置标志 twophase=True

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")
Session = sessionmaker(twophase=True)
# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})
session = Session()
# .... work with accounts and users
# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()
```### 设置事务隔离级别 / DBAPI 自动提交
大多数 DBAPI 支持可配置的事务隔离级别的概念。传统上有四个级别:“READ UNCOMMITTED”,“READ COMMITTED”,“REPEATABLE READ” 和 “SERIALIZABLE”。这些通常应用于 DBAPI 连接在开始新事务之前,注意大多数 DBAPI 在首次发出 SQL 语句时会隐式开始此事务。
支持隔离级别的 DBAPI 通常也支持真正的 “自动提交” 概念,这意味着 DBAPI 连接本身将被置于非事务自动提交模式。这通常意味着数据库自动发出 “BEGIN” 的典型 DBAPI 行为不再发生,但它也可能包括其他指令。在使用此模式时,**DBAPI 在任何情况下都不使用事务**。SQLAlchemy 方法如 `.begin()`、`.commit()` 和 `.rollback()` 将静默传递。
SQLAlchemy 的方言支持在每个 `Engine` 或每个 `Connection` 基础上设置可设置的隔离模式,使用 `create_engine()` 层级以及 `Connection.execution_options()` 层级的标志。
当使用 ORM `Session` 时,它充当引擎和连接的 *facade*,但不直接暴露事务隔离。因此,为了影响事务隔离级别,我们需要根据情况对 `Engine` 或 `Connection` 采取行动。
另请参阅
设置事务隔离级别包括 DBAPI 自动提交 - 请确保查看 SQLAlchemy `Connection` 对象的隔离级别工作方式。
#### 为 Sessionmaker / Engine 设置隔离
要为特定的隔离级别全局设置`Session`或`sessionmaker`,第一种技术是可以针对所有情况构建一个具有特定隔离级别的`Engine`,然后将其用作`Session`和/或`sessionmaker`的连接源:
```py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    isolation_level="REPEATABLE READ",
)
Session = sessionmaker(eng)

另一个选项,如果同时存在具有不同隔离级别的两个引擎,是使用Engine.execution_options()方法,该方法将生成一个原始Engine的浅拷贝,该浅拷贝与父引擎共享相同的连接池。当操作将被分成“事务”和“自动提交”操作时,这通常是更可取的:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)

在上述示例中,“eng”和"autocommit_engine"共享相同的方言和连接池。然而,当从autocommit_engine获取连接时,将设置“AUTOCOMMIT”模式。这两个sessionmaker对象“transactional_session”和“autocommit_session”在与数据库连接工作时继承这些特性。

autocommit_session仍然具有事务语义,包括Session.commit()Session.rollback()仍然认为自己在“提交”和“回滚”对象,然而事务将会默默地不存在。因此,通常情况下,尽管不是严格要求,使用 AUTOCOMMIT 隔离的会话应该以只读方式使用,即:

with autocommit_session() as session:
    some_objects = session.execute(text("<statement>"))
    some_other_objects = session.execute(text("<statement>"))
# closes connection
设置单个会话的隔离级别

当我们创建一个新的Session,可以直接使用构造函数,也可以在调用由sessionmaker生成的可调用对象时,直接传递bind参数,覆盖预先存在的绑定。例如,我们可以从默认的sessionmaker创建我们的Session并传递一个设置为自动提交的引擎:

plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")
# will normally use plain_engine
Session = sessionmaker(plain_engine)
# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session
    ...

对于Sessionsessionmaker配置了多个binds的情况,我们可以重新完整指定binds参数,或者如果我们只想替换特定的 binds,则可以使用Session.bind_mapper()Session.bind_table()方法:

with Session() as session:
    session.bind_mapper(User, autocommit_engine)
为个别事务设置隔离级别

关于隔离级别的一个关键警告是,在已经开始事务的Connection上不能安全地修改设置。数据库不能在进行中的事务中更改隔离级别,而一些 DBAPIs 和 SQLAlchemy 方言在这方面的行为不一致。

因此,最好使用一个提前绑定到具有所需隔离级别的引擎的Session。然而,通过在事务开始时使用Session.connection()方法可以影响每个连接的隔离级别:

from sqlalchemy.orm import Session
# assume session just constructed
sess = Session(bind=engine)
# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# ... work with session in SERIALIZABLE isolation level...
# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()
# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.

在上面的例子中,我们首先使用构造函数或sessionmaker生成一个Session。然后,我们通过调用Session.connection()显式设置数据库级事务的开始,该方法提供了在数据库级事务开始之前将传递给连接的执行选项。事务使用此选定的隔离级别进行。当事务完成时,隔离级别会重置为其默认值,然后将连接返回到连接池。

Session.begin()方法也可以用于开始Session级事务;在调用该方法后,可以使用Session.connection()来设置每个连接的事务隔离级别:

sess = Session(bind=engine)
with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
    # ... work with session in SERIALIZABLE isolation level...
# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.

使用事件跟踪事务状态

请参阅事务事件部分,了解有关会话事务状态更改的可用事件挂钩的概述。 ## 加入会话到外部事务(例如用于测试套件)

如果正在使用处于事务状态的 Connection(即已建立 Transaction),则可以通过将 Session 绑定到该 Connection 来使 Session 参与该事务。通常的理由是允许 ORM 代码自由地与 Session 一起工作,包括调用 Session.commit(),之后整个数据库交互都被回滚。

在 2.0 版本中更改:2.0 版本再次对“加入到外部事务”配方进行了改进;不再需要事件处理程序来“重置”嵌套事务。

该配方的工作方式是在事务内部建立一个 Connection,可选地建立一个 SAVEPOINT,然后将其传递给 Session 作为“bind”;Session.join_transaction_mode 参数传递了设置为 "create_savepoint",表示应该创建新的 SAVEPOINT 来实现 Session 的 BEGIN/COMMIT/ROLLBACK,这将使外部事务处于传递时的相同状态。

当测试拆解时,外部事务会被回滚,以便将测试中的任何数据更改还原:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase
# global application scope.  create Session class, engine
Session = sessionmaker()
engine = create_engine("postgresql+psycopg2://...")
class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()
        # begin a non-ORM transaction
        self.trans = self.connection.begin()
        # bind an individual Session to the connection, selecting
        # "create_savepoint" join_transaction_mode
        self.session = Session(
            bind=self.connection, join_transaction_mode="create_savepoint"
        )
    def test_something(self):
        # use the session in tests.
        self.session.add(Foo())
        self.session.commit()
    def test_something_with_rollbacks(self):
        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()
        self.session.add(Foo())
        self.session.commit()
    def tearDown(self):
        self.session.close()
        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()
        # return connection to the Engine
        self.connection.close()

上述配方是 SQLAlchemy 自己的 CI 的一部分,以确保它仍然按预期工作。 ## 管理事务

在 1.4 版本中更改:会话事务管理已经进行了修改,使其更清晰、更易于使用。特别是,现在它具有“自动开始”操作,这意味着可以控制事务开始的时间点,而无需使用传统的“自动提交”模式。

Session 跟踪一次性的“虚拟”事务的状态,使用一个叫做 SessionTransaction 的对象。然后,该对象利用底层的 Engine 或引擎来启动使用 Connection 对象所需的真实连接级事务。

当需要时,这个“虚拟”事务会自动创建,或者可以使用 Session.begin() 方法手动开始。尽可能大程度地支持 Python 上下文管理器的使用,不仅在创建 Session 对象的级别上,还在维护 SessionTransaction 的范围上。

下面,假设我们从一个 Session 开始:

from sqlalchemy.orm import Session
session = Session(engine)

现在我们可以使用上下文管理器在标记的事务中运行操作:

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised

在上述上下文结束时,假设没有引发任何异常,任何待处理的对象都将被刷新到数据库,并且数据库事务将被提交。如果在上述块中引发了异常,则事务将被回滚。在这两种情况下,上述 Session 在退出块后都可以在后续事务中使用。

Session.begin() 方法是可选的,Session 也可以使用逐步提交的方法,在需要时自动开始事务;只需提交或回滚:

session = Session(engine)
session.add(some_object())
session.add(some_other_object())
session.commit()  # commits
# will automatically begin again
result = session.execute(text("< some select statement >"))
session.add_all([more_objects, ...])
session.commit()  # commits
session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()  # rolls back still_another_object

Session本身具有Session.close()方法。如果Session是在尚未提交或回滚的事务内开始的,则此方法将取消(即回滚)该事务,并且还将清除Session对象状态中包含的所有对象。如果Session的使用方式不保证调用Session.commit()Session.rollback()(例如不在上下文管理器或类似位置),则可以使用close方法确保释放所有资源:

# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()

最后,会话构建/关闭过程本身也可以通过上下文管理器运行。这是确保Session对象使用范围在固定块内的最佳方法。首先通过Session构造函数进行说明:

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())
    session.commit()  # commits
    session.add(still_another_object)
    session.flush()  # flush still_another_object
    session.commit()  # commits
    result = session.execute(text("<some SELECT statement>"))
# remaining transactional state from the .execute() call is
# discarded

同样,sessionmaker也可以以相同的方式使用:

Session = sessionmaker(engine)
with Session() as session:
    with session.begin():
        session.add(some_object)
    # commits
# closes the Session

sessionmaker本身包括一个sessionmaker.begin()方法,允许同时执行这两个操作:

with Session.begin() as session:
    session.add(some_object)


SqlAlchemy 2.0 中文文档(二十三)(4)https://developer.aliyun.com/article/1560521

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
4月前
|
SQL 数据库 Python
SqlAlchemy 2.0 中文文档(二十六)(4)
SqlAlchemy 2.0 中文文档(二十六)
66 2
|
4月前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(二十二)(3)
SqlAlchemy 2.0 中文文档(二十二)
25 5
|
4月前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(二十二)(2)
SqlAlchemy 2.0 中文文档(二十二)
43 3
|
4月前
|
SQL 存储 数据库连接
SqlAlchemy 2.0 中文文档(二十二)(1)
SqlAlchemy 2.0 中文文档(二十二)
53 2
|
4月前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(二十六)(1)
SqlAlchemy 2.0 中文文档(二十六)
32 2
|
4月前
|
自然语言处理 数据库 Python
SqlAlchemy 2.0 中文文档(二十六)(2)
SqlAlchemy 2.0 中文文档(二十六)
31 2
|
4月前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(二十六)(3)
SqlAlchemy 2.0 中文文档(二十六)
41 2
|
4月前
|
SQL 缓存 数据库连接
SqlAlchemy 2.0 中文文档(二十二)(5)
SqlAlchemy 2.0 中文文档(二十二)
30 1
|
4月前
|
缓存 自然语言处理 数据库
SqlAlchemy 2.0 中文文档(二十六)(5)
SqlAlchemy 2.0 中文文档(二十六)
38 1
|
4月前
|
SQL 缓存 API
SqlAlchemy 2.0 中文文档(二十八)(4)
SqlAlchemy 2.0 中文文档(二十八)
66 1