SqlAlchemy 2.0 中文文档(二十四)(2)https://developer.aliyun.com/article/1560545
使用 INSERT、UPDATE 和 ON CONFLICT(即 upsert)返回 ORM 对象
SQLAlchemy 2.0 包括增强功能,用于发出几种类型的启用 ORM 的 INSERT、UPDATE 和 upsert 语句。查看文档 ORM-Enabled INSERT, UPDATE, and DELETE statements 以获取文档。有关 upsert,请参见 ORM “upsert” Statements。
使用 PostgreSQL ON CONFLICT 与 RETURNING 返回 upserted ORM 对象
本节已移至 ORM “upsert” Statements。
使用 PostgreSQL ON CONFLICT 与 RETURNING 返回 upserted ORM 对象
本节已移至 ORM “upsert” Statements。
分区策略(例如,每个会话使用多个数据库后端)
简单的垂直分区
垂直分区通过配置Session
的Session.binds
参数,将不同的类、类层次结构或映射表放置在多个数据库中。此参数接收一个包含任意组合的 ORM 映射类、映射层次结构中的任意类(如声明性基类或混合类)、Table
对象和Mapper
对象作为键的字典,然后通常引用Engine
或较不常见的 Connection
对象作为目标。每当Session
需要代表特定类型的映射类发出 SQL 以定位适当的数据库连接源时,就会查询该字典:
engine1 = create_engine("postgresql+psycopg2://db1") engine2 = create_engine("postgresql+psycopg2://db2") Session = sessionmaker() # bind User operations to engine 1, Account operations to engine 2 Session.configure(binds={User: engine1, Account: engine2}) session = Session()
在上面,针对任一类的 SQL 操作都将使用与该类链接的Engine
。该功能涵盖了读写操作;针对映射到engine1
的实体的 Query
(通过查看请求的项目列表中的第一个实体确定)将使用engine1
来运行查询。刷新操作将根据每个类使用两个引擎,因为它刷新了User
和Account
类型的对象。
在更常见的情况下,通常有基础类或混合类可用于区分不同数据库连接的操作。Session.binds
参数可以容纳任何任意的 Python 类作为键,如果发现它在特定映射类的__mro__
(Python 方法解析顺序)中,则会使用该键。假设两个声明性基类分别表示两个不同的数据库连接:
from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Session class BaseA(DeclarativeBase): pass class BaseB(DeclarativeBase): pass class User(BaseA): ... class Address(BaseA): ... class GameInfo(BaseB): ... class GameStats(BaseB): ... Session = sessionmaker() # all User/Address operations will be on engine 1, all # Game operations will be on engine 2 Session.configure(binds={BaseA: engine1, BaseB: engine2})
在上面,从BaseA
和BaseB
继承的类将根据它们是否继承自任何超类来将它们的 SQL 操作路由到两个引擎中的一个。对于从多个“绑定”超类继承的类的情况,将选择目标类层次结构中最高的超类来表示应使用哪个引擎。
另请参阅
Session.binds
为多引擎会话协调事务
使用多个绑定引擎的一个注意事项是,如果提交操作在一个后端成功提交后在另一个后端失败,则可能会出现问题。这是一个一致性问题,在关系数据库中通过“两阶段事务”解决,它在提交序列中添加了一个额外的“准备”步骤,允许多个数据库在实际完成事务之前同意提交。
由于 DBAPI 中的支持有限,SQLAlchemy 对跨后端的两阶段事务的支持也有限。通常来说,它已知与 PostgreSQL 后端一起工作良好,与 MySQL 后端一起工作效果较差。然而,当后端支持时,Session
完全能够利用两阶段事务功能,方法是在 sessionmaker
或 Session
中设置 Session.use_twophase
标志。参见启用两阶段提交以获取示例。
自定义垂直分区
更全面的基于规则的类级分区可以通过覆盖 Session.get_bind()
方法来构建。下面我们演示了一个自定义的 Session
,它提供以下规则:
- 刷新操作以及批量的“更新”和“删除”操作都传递到名为
leader
的引擎。 - 所有子类为
MyOtherClass
的对象的操作都发生在other
引擎上。 - 对于所有其他类的读取操作都发生在随机选择的
follower1
或follower2
数据库上。
engines = { "leader": create_engine("sqlite:///leader.db"), "other": create_engine("sqlite:///other.db"), "follower1": create_engine("sqlite:///follower1.db"), "follower2": create_engine("sqlite:///follower2.db"), } from sqlalchemy.sql import Update, Delete from sqlalchemy.orm import Session, sessionmaker import random class RoutingSession(Session): def get_bind(self, mapper=None, clause=None): if mapper and issubclass(mapper.class_, MyOtherClass): return engines["other"] elif self._flushing or isinstance(clause, (Update, Delete)): # NOTE: this is for example, however in practice reader/writer # splits are likely more straightforward by using two distinct # Sessions at the top of a "reader" or "writer" operation. # See note below return engines["leader"] else: return engines[random.choice(["follower1", "follower2"])]
上述Session
类是通过 sessionmaker
的 class_
参数来插入的:
Session = sessionmaker(class_=RoutingSession)
这种方法可以与多个 MetaData
对象结合使用,使用类似于使用声明性 __abstract__
关键字的方法,如 abstract 中所述。
注意
上述示例说明了根据 SQL 语句是否期望写入数据将特定 SQL 语句路由到所谓的“主”或“从”数据库,但这可能不是一个实用的方法,因为它会导致在同一操作中读取和写入之间存在不协调的事务行为。实践中,最好在整个操作/事务进行的基础上,提前构建Session
作为“读取者”或“写入者”会话。这样,将要写入数据的操作也会在同一个事务范围内发出其读取查询。有关在sessionmaker
中设置“只读”操作的配方,使用自动提交连接,以及用于包含 DML/COMMIT 的“写入”操作的另一个配方,请参阅为 Sessionmaker / Engine 设置隔离级别的示例。
另请参阅
SQLAlchemy 中的 Django 风格数据库路由器 - 关于Session.get_bind()
更全面示例的博文
水平分区
水平分区将单个表(或一组表)的行分区到多个数据库中。SQLAlchemy 的Session
包含对这个概念的支持,但要完全使用它,需要使用Session
和Query
子类。这些子类的基本版本可在水平分区 ORM 扩展中找到。一个使用示例位于:水平分区。
简单的垂直分区
垂直分区将不同的类、类层次结构或映射表配置到多个数据库中,通过配置Session
的Session.binds
参数。该参数接收一个字典,其中包含任意组合的 ORM 映射类、映射层次结构内的任意类(例如声明基类或混合类)、Table
对象和Mapper
对象作为键,这些键通常引用Engine
或更少见的情况下引用Connection
对象作为目标。每当Session
需要代表特定类型的映射类发出 SQL 以定位数据库连接的适当源时,就会查询该字典:
engine1 = create_engine("postgresql+psycopg2://db1") engine2 = create_engine("postgresql+psycopg2://db2") Session = sessionmaker() # bind User operations to engine 1, Account operations to engine 2 Session.configure(binds={User: engine1, Account: engine2}) session = Session()
在上述情况下,针对任一类的 SQL 操作将使用与该类链接的Engine
。该功能在读写操作中都是全面的;针对映射到engine1
的实体的Query
(通过查看请求的项目列表中的第一个实体来确定)将使用engine1
来运行查询。刷新操作将基于每个类使用两个引擎,因为它会刷新User
和Account
类型的对象。
在更常见的情况下,通常有基类或混合类可用于区分命令操作的目标数据库连接。Session.binds
参数可以接受任何任意的 Python 类作为键,如果在特定映射类的__mro__
(Python 方法解析顺序)中找到,则会使用该键。假设有两个声明基类代表两个不同的数据库连接:
from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Session class BaseA(DeclarativeBase): pass class BaseB(DeclarativeBase): pass class User(BaseA): ... class Address(BaseA): ... class GameInfo(BaseB): ... class GameStats(BaseB): ... Session = sessionmaker() # all User/Address operations will be on engine 1, all # Game operations will be on engine 2 Session.configure(binds={BaseA: engine1, BaseB: engine2})
在上述情况下,从BaseA
和BaseB
继承的类将根据它们是否继承自其中任何一个超类而将其 SQL 操作路由到两个引擎中的一个。对于从多个“绑定”超类继承的类,将选择目标类层次结构中最高的超类来表示应该使用哪个引擎。
另请参阅
Session.binds
多引擎会话的事务协调
在使用多个绑定引擎的情况下,有一个需要注意的地方是,在一个提交操作在一个后端成功提交后,另一个后端可能失败。这是一个一致性问题,在关系型数据库中通过“两阶段事务”解决,该事务将一个额外的“准备”步骤添加到提交序列中,允许多个数据库在实际完成事务之前同意提交。
由于 DBAPI 的支持有限,SQLAlchemy 对跨后端的两阶段事务的支持也有限。最典型的是,它在 PostgreSQL 后端上运行良好,并且在 MySQL 后端上的支持较少。但是,当后端支持时,Session
完全能够利用两阶段事务功能,方法是在sessionmaker
或Session
中设置Session.use_twophase
标志。参见启用两阶段提交以获取示例。
自定义垂直分区
可以通过重写Session.get_bind()
方法来构建更全面的基于规则的类级分区。以下是一个自定义Session
的示例,提供以下规则:
- 刷新操作以及批量“更新”和“删除”操作将传送到名为
leader
的引擎。 - 所有子类化
MyOtherClass
的对象操作都发生在other
引擎上。 - 所有其他类的读取操作都在
follower1
或follower2
数据库的随机选择上进行。
engines = { "leader": create_engine("sqlite:///leader.db"), "other": create_engine("sqlite:///other.db"), "follower1": create_engine("sqlite:///follower1.db"), "follower2": create_engine("sqlite:///follower2.db"), } from sqlalchemy.sql import Update, Delete from sqlalchemy.orm import Session, sessionmaker import random class RoutingSession(Session): def get_bind(self, mapper=None, clause=None): if mapper and issubclass(mapper.class_, MyOtherClass): return engines["other"] elif self._flushing or isinstance(clause, (Update, Delete)): # NOTE: this is for example, however in practice reader/writer # splits are likely more straightforward by using two distinct # Sessions at the top of a "reader" or "writer" operation. # See note below return engines["leader"] else: return engines[random.choice(["follower1", "follower2"])]
上述Session
类是通过向sessionmaker
传递class_
参数来插入的:
Session = sessionmaker(class_=RoutingSession)
这种方法可以与多个MetaData
对象结合使用,例如使用声明性的__abstract__
关键字的方法,如在 abstract 中所述。
注意
尽管上面的示例说明了将特定的 SQL 语句路由到基于语句是否期望写入数据的所谓 “leader” 或 “follower” 数据库,但这可能不是一种实际的方法,因为它导致在同一操作中读取和写入之间的不协调事务行为。实际上,最好是根据正在进行的整体操作 / 事务,提前将 Session
构造为 “读取者” 或 “写入者” 会话。这样,将要写入数据的操作也会在同一个事务范围内发出其读取查询。请参阅 为 Sessionmaker / Engine 设置隔离 中的示例,该示例设置了一个用于 “只读” 操作的 sessionmaker
,使用自动提交连接,另一个用于包含 DML / COMMIT 的 “写入” 操作。
另请参阅
SQLAlchemy 中的 Django 风格数据库路由器 - 有关 Session.get_bind()
的更全面示例的博客文章
水平分区
水平分区将单个表(或一组表)的行跨多个数据库进行分区。SQLAlchemy Session
包含对此概念的支持,但要充分利用它,需要使用 Session
和 Query
的子类。这些子类的基本版本在 水平分片 ORM 扩展中可用。使用示例位于:水平分片。
批量操作
遗留特性
SQLAlchemy 2.0 将 Session
的“批量插入”和“批量更新”功能集成到了 2.0 风格的 Session.execute()
方法中,直接使用了 Insert
和 Update
构造。请参阅 ORM 启用的 INSERT、UPDATE 和 DELETE 语句 文档,包括 遗留 Session 批量 INSERT 方法 ,其中说明了从旧方法迁移到新方法的示例。
上下文/线程本地会话
回顾一下何时构建会话,何时提交,何时关闭?一节中,介绍了“会话范围”的概念,强调了在 Web 应用程序中链接Session
的范围与 Web 请求的范围之间的实践。大多数现代 Web 框架都包括集成工具,以便自动管理Session
的范围,并且应该使用这些工具,只要它们可用。
SQLAlchemy 包括其自己的辅助对象,它有助于建立用户定义的Session
范围。它也被第三方集成系统用于帮助构建它们的集成方案。
该对象是scoped_session
对象,它表示一组Session
对象的注册表。如果您对注册表模式不熟悉,可以在企业架构模式中找到一个很好的介绍。
警告
scoped_session
注册表默认使用 Python 的threading.local()
来跟踪Session
实例。这不一定与所有应用服务器兼容,特别是那些使用绿色线程或其他替代形式的并发控制的服务器,这可能导致在中高并发情况下使用时出现竞争条件(例如,随机发生的故障)。请阅读下面的线程局部范围和在 Web 应用程序中使用线程局部范围以更充分地理解使用threading.local()
来跟踪Session
对象的影响,并在使用不基于传统线程的应用服务器时考虑更明确的范围。
注意
scoped_session
对象是许多 SQLAlchemy 应用程序中非常流行和有用的对象。然而,重要的是要注意,它只提供了解决Session
管理问题的一个方法。如果你对 SQLAlchemy 还不熟悉,特别是如果“线程本地变量”这个术语对你来说很陌生,我们建议你如果可能的话,首先熟悉一下诸如Flask-SQLAlchemy或zope.sqlalchemy之类的现成集成系统。
通过调用它并传递一个可以创建新Session
对象的工厂来构造scoped_session
。工厂只是在调用时生成一个新对象的东西,在Session
的情况下,最常见的工厂是在本节前面介绍的sessionmaker
。下面我们举例说明这种用法:
>>> from sqlalchemy.orm import scoped_session >>> from sqlalchemy.orm import sessionmaker >>> session_factory = sessionmaker(bind=some_engine) >>> Session = scoped_session(session_factory)
我们创建的scoped_session
对象现在将在我们“调用”注册表时调用sessionmaker
:
>>> some_session = Session()
在上面,some_session
是Session
的一个实例,我们现在可以用它来与数据库交互。这个相同的Session
也存在于我们创建的scoped_session
注册表中。如果我们第二次调用注册表,我们会得到相同的Session
:
>>> some_other_session = Session() >>> some_session is some_other_session True
这种模式允许应用程序的不同部分调用全局的scoped_session
,这样所有这些区域就可以在不需要显式传递的情况下共享同一个会话。我们在注册表中建立的Session
将保持不变,直到我们显式告诉注册表将其销毁,方法是调用scoped_session.remove()
:
>>> Session.remove()
scoped_session.remove()
方法首先调用当前 Session
上的 Session.close()
,其效果是首先释放任何由 Session
拥有的连接/事务资源,然后丢弃 Session
本身。这里的“释放”意味着连接被返回到其连接池,并且任何事务状态都被回滚,最终使用底层 DBAPI 连接的 rollback()
方法。
此时,scoped_session
对象是“空的”,在再次调用时将创建一个新的 Session
。如下所示,这不是我们之前所拥有的相同 Session
:
>>> new_session = Session() >>> new_session is some_session False
上述一系列步骤简要说明了“注册表”模式的概念。有了这个基本概念,我们可以讨论这种模式如何进行的一些细节。
隐式方法访问
scoped_session
的工作很简单;为所有请求它的人保留一个 Session
。为了更透明地访问这个 Session
,scoped_session
还包括代理行为,这意味着注册表本身可以直接像 Session
一样对待;当在此对象上调用方法时,它们会代理到注册表维护的底层 Session
:
Session = scoped_session(some_factory) # equivalent to: # # session = Session() # print(session.scalars(select(MyClass)).all()) # print(Session.scalars(select(MyClass)).all())
上述代码实现了通过调用注册表获取当前 Session
然后使用该 Session
的相同任务。
线程本地作用域
熟悉多线程编程的用户会注意到,将任何东西表示为全局变量通常是一个坏主意,因为这意味着全局对象将被许多线程同时访问。Session
对象完全设计成以非并发方式使用,从多线程的角度来看,这意味着“一次只能在一个线程中”。因此,我们上面对 scoped_session
的使用示例,其中相同的 Session
对象在多次调用中保持不变,暗示着需要某种处理方式,以使多个线程中的多次调用实际上不会获取到同一个会话的句柄。我们称这个概念为线程本地存储,意思是,使用一个特殊的对象,它将为每个应用程序线程维护一个独立的对象。Python 通过 threading.local() 构造提供了这个功能。scoped_session
对象默认使用此对象作为存储,以便为所有调用 scoped_session
注册表的人维护一个单一的 Session
,但仅在单个线程范围内。在不同线程中调用注册表的调用者会获取一个仅限于该其他线程的 Session
实例。
使用这种技术,scoped_session
提供了一种快速且相对简单(如果熟悉线程本地存储的话)的方式,在应用程序中提供一个单一的全局对象,可以安全地从多个线程调用。
scoped_session.remove()
方法始终会删除与该线程关联的当前 Session
(如果有的话)。然而,threading.local()
对象的一个优点是,如果应用程序线程本身结束,那么该线程的“存储”也会被垃圾回收。因此,在一个产生并销毁线程的应用程序中使用线程局部范围实际上是“安全”的,而不需要调用 scoped_session.remove()
。然而,事务本身的范围,即通过 Session.commit()
或 Session.rollback()
结束它们,通常仍然是必须在适当的时候明确安排的东西,除非应用程序实际上将线程的寿命与事务的寿命绑定在一起。## 使用线程局部范围与 Web 应用程序
如在何时构建会话,何时提交它,何时关闭它?一节中所讨论的,一个 web 应用程序是围绕着网络请求的概念构建的,并且将这样的应用程序与 Session
集成通常意味着将 Session
与该请求相关联。事实证明,大多数 Python web 框架,特别是异步框架 Twisted 和 Tornado 之类的著名例外,以简单的方式使用线程,使得一个特定的网络请求在一个单独的工作线程的范围内接收、处理和完成。当请求结束时,工作线程被释放到一个工作线程池中,在那里它可以处理另一个请求。
这种简单的网络请求和线程的对应关系意味着,将 Session
关联到一个线程意味着它也与在该线程内运行的网络请求关联,反之亦然,前提是 Session
只在网络请求开始后创建并在网络请求结束前被销毁。因此,将 scoped_session
用作将 Session
与 web 应用程序集成的快速方法是一种常见做法。下面的顺序图说明了这个流程:
Web Server Web Framework SQLAlchemy ORM Code -------------- -------------- ------------------------------ startup -> Web framework # Session registry is established initializes Session = scoped_session(sessionmaker()) incoming web request -> web request -> # The registry is *optionally* starts # called upon explicitly to create # a Session local to the thread and/or request Session() # the Session registry can otherwise # be used at any time, creating the # request-local Session() if not present, # or returning the existing one Session.execute(select(MyClass)) # ... Session.add(some_object) # ... # if data was modified, commit the # transaction Session.commit() web request ends -> # the registry is instructed to # remove the Session Session.remove() sends output <- outgoing web <- response
使用上述流程,将 Session
与 Web 应用程序集成的过程只有两个要求:
- 在 Web 应用程序首次启动时创建一个单一的
scoped_session
注册表,确保此对象可被应用程序的其余部分访问。 - 确保在 Web 请求结束时调用
scoped_session.remove()
,通常是通过与 Web 框架的事件系统集成来建立“请求结束时”事件。
如前所述,上述模式只是整合 Session
到 Web 框架的一种潜在方式,特别是假定Web 框架将 Web 请求与应用线程关联。然而,强烈建议使用 Web 框架本身提供的集成工具,如果有的话,而不是 scoped_session
。
特别是,虽然使用线程本地存储很方便,但最好将 Session
直接与请求关联,而不是与当前线程关联。下一节关于自定义范围详细介绍了一种更高级的配置,可以将 scoped_session
的使用与直接基于请求的范围,或任何类型的范围结合起来。
使用自定义创建的范围
scoped_session
对象的默认行为“线程本地”范围只是如何“范围” Session
的许多选项之一。可以根据任何现有的“我们正在处理的当前事物”的系统来定义自定义范围。
假设 Web 框架定义了一个库函数 get_current_request()
。使用此框架构建的应用程序可以随时调用此函数,结果将是表示正在处理的当前请求的某种 Request
对象。如果 Request
对象是可哈希的,那么此函数可以很容易地与 scoped_session
集成,以将 Session
与请求关联起来。下面我们结合 Web 框架提供的假设事件标记器 on_request_end
,说明了这一点,该标记器允许在请求结束时调用代码:
from my_web_framework import get_current_request, on_request_end from sqlalchemy.orm import scoped_session, sessionmaker Session = scoped_session(sessionmaker(bind=some_engine), scopefunc=get_current_request) @on_request_end def remove_session(req): Session.remove()
在上述情况中,我们以通常的方式实例化scoped_session
,唯一的区别是我们将请求返回函数作为“scopefunc”传递。这指示scoped_session
在每次调用注册表返回当前Session
时使用此函数生成字典键。在这种情况下,我们特别需要确保实现可靠的“删除”系统,因为否则此字典不会自行管理。
SqlAlchemy 2.0 中文文档(二十四)(4)https://developer.aliyun.com/article/1560553