SqlAlchemy 2.0 中文文档(四十二)(4)https://developer.aliyun.com/article/1563015
应用 SQL 级别的绑定/结果处理
如在扩展现有类型部分所示,SQLAlchemy 允许在向语句发送参数以及从数据库加载结果行时调用 Python 函数,以对值进行转换,使其在发送到数据库时或从数据库加载时进行转换。还可以定义 SQL 级别的转换。这里的理念是,当只有关系数据库包含特定系列的函数时,这些函数对于在应用程序和持久性格式之间转换传入和传出数据是必要的。示例包括使用数据库定义的加密/解密函数,以及处理地理数据的存储过程。
任何TypeEngine
、UserDefinedType
或TypeDecorator
子类都可以包含TypeEngine.bind_expression()
和/或TypeEngine.column_expression()
的实现,当定义为返回非None
值时,应返回一个要注入到 SQL 语句中的ColumnElement
表达式,无论是围绕绑定参数还是列表达式。例如,为了构建一个Geometry
类型,该类型将对所有传出值应用 PostGIS 函数ST_GeomFromText
,对所有传入数据应用函数ST_AsText
,我们可以创建自己的UserDefinedType
子类,该子类提供这些方法与func
一起使用:
from sqlalchemy import func from sqlalchemy.types import UserDefinedType class Geometry(UserDefinedType): def get_col_spec(self): return "GEOMETRY" def bind_expression(self, bindvalue): return func.ST_GeomFromText(bindvalue, type_=self) def column_expression(self, col): return func.ST_AsText(col, type_=self)
我们可以将Geometry
类型应用到Table
元数据中,并在select()
构造中使用它:
geometry = Table( "geometry", metadata, Column("geom_id", Integer, primary_key=True), Column("geom_data", Geometry), ) print( select(geometry).where( geometry.c.geom_data == "LINESTRING(189412 252431,189631 259122)" ) )
结果的 SQL 嵌入了两个函数。ST_AsText
应用于列子句,以便返回值在传递到结果集之前通过函数运行,而ST_GeomFromText
应用于绑定参数,以便传入值被转换:
SELECT geometry.geom_id, ST_AsText(geometry.geom_data) AS geom_data_1 FROM geometry WHERE geometry.geom_data = ST_GeomFromText(:geom_data_2)
TypeEngine.column_expression()
方法与编译器的机制交互,使得 SQL 表达式不会干扰包装表达式的标记。例如,如果我们针对我们表达式的label()
进行了select()
,字符串标签会移动到包装表达式的外部:
print(select(geometry.c.geom_data.label("my_data")))
输出:
SELECT ST_AsText(geometry.geom_data) AS my_data FROM geometry
另一个例子是我们装饰BYTEA
以提供PGPString
,这将利用 PostgreSQL 的pgcrypto
扩展来透明地加密/解密值:
from sqlalchemy import ( create_engine, String, select, func, MetaData, Table, Column, type_coerce, TypeDecorator, ) from sqlalchemy.dialects.postgresql import BYTEA class PGPString(TypeDecorator): impl = BYTEA cache_ok = True def __init__(self, passphrase): super(PGPString, self).__init__() self.passphrase = passphrase def bind_expression(self, bindvalue): # convert the bind's type from PGPString to # String, so that it's passed to psycopg2 as is without # a dbapi.Binary wrapper bindvalue = type_coerce(bindvalue, String) return func.pgp_sym_encrypt(bindvalue, self.passphrase) def column_expression(self, col): return func.pgp_sym_decrypt(col, self.passphrase) metadata_obj = MetaData() message = Table( "message", metadata_obj, Column("username", String(50)), Column("message", PGPString("this is my passphrase")), ) engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", echo=True) with engine.begin() as conn: metadata_obj.create_all(conn) conn.execute( message.insert(), {"username": "some user", "message": "this is my message"}, ) print( conn.scalar(select(message.c.message).where(message.c.username == "some user")) )
pgp_sym_encrypt
和pgp_sym_decrypt
函数应用于 INSERT 和 SELECT 语句:
INSERT INTO message (username, message) VALUES (%(username)s, pgp_sym_encrypt(%(message)s, %(pgp_sym_encrypt_1)s)) -- {'username': 'some user', 'message': 'this is my message', -- 'pgp_sym_encrypt_1': 'this is my passphrase'} SELECT pgp_sym_decrypt(message.message, %(pgp_sym_decrypt_1)s) AS message_1 FROM message WHERE message.username = %(username_1)s -- {'pgp_sym_decrypt_1': 'this is my passphrase', 'username_1': 'some user'}
重新定义和创建新操作符
SQLAlchemy 核心定义了一组固定的表达式操作符,可用于所有列表达式。其中一些操作具有重载 Python 内置操作符的效果;此类操作符的示例包括ColumnOperators.__eq__()
(table.c.somecolumn == 'foo'
)、ColumnOperators.__invert__()
(~table.c.flag
)和ColumnOperators.__add__()
(table.c.x + table.c.y
)。其他操作符作为列表达式上的显式方法公开,例如ColumnOperators.in_()
(table.c.value.in_(['x', 'y'])
)和ColumnOperators.like()
(table.c.value.like('%ed%')
)。
当需要使用 SQL 操作符而已直接支持的情况下,最方便的方法是在任何 SQL 表达式对象上使用Operators.op()
方法;该方法接受一个表示要呈现的 SQL 操作符的字符串,并返回一个接受任意右侧表达式的 Python 可调用对象:
>>> from sqlalchemy import column >>> expr = column("x").op(">>")(column("y")) >>> print(expr) x >> y
当使用自定义 SQL 类型时,还有一种实现自定义操作符的方法,就像上面提到的那样,这些操作符在使用该列类型的任何列表达式上自动存在,而无需在每次使用操作符时直接调用Operators.op()
。
为了实现这一点,SQL 表达式构造会参考与构造关联的TypeEngine
对象,以确定内置运算符的行为,并寻找可能已被调用的新方法。TypeEngine
定义了一个由Comparator
类实现的“比较”对象,为 SQL 运算符提供基本行为,许多具体类型提供了它们自己的此类的子实现。用户定义的Comparator
实现可以直接构建到特定类型的简单子类中,以覆盖或定义新操作。下面,我们创建一个Integer
子类,它重写了ColumnOperators.__add__()
运算符,而该运算符又使用Operators.op()
来生成自定义的 SQL 代码:
from sqlalchemy import Integer class MyInt(Integer): class comparator_factory(Integer.Comparator): def __add__(self, other): return self.op("goofy")(other)
上述配置创建了一个新的类MyInt
,它将TypeEngine.comparator_factory
属性设置为引用一个新的类,该类是与Integer
类型关联的Comparator
类的子类。
使用方法:
>>> sometable = Table("sometable", metadata, Column("data", MyInt)) >>> print(sometable.c.data + 5) sometable.data goofy :data_1
对于ColumnOperators.__add__()
的实现是由拥有的 SQL 表达式进行参考的,通过使用自身作为expr
属性来实例化Comparator
。当实现需要直接引用原始ColumnElement
对象时,可以使用此属性:
from sqlalchemy import Integer class MyInt(Integer): class comparator_factory(Integer.Comparator): def __add__(self, other): return func.special_addition(self.expr, other)
对于 Comparator
添加的新方法,通过动态查找方案暴露在拥有 SQL 表达式对象上,这样可以将添加到 Comparator
的方法暴露到拥有的 ColumnElement
表达式构造上。例如,要为整数添加一个 log()
函数:
from sqlalchemy import Integer, func class MyInt(Integer): class comparator_factory(Integer.Comparator): def log(self, other): return func.log(self.expr, other)
使用上述类型:
>>> print(sometable.c.data.log(5)) log(:log_1, :log_2)
在使用 Operators.op()
进行返回布尔结果的比较操作时,应将 Operators.op.is_comparison
标志设置为 True
:
class MyInt(Integer): class comparator_factory(Integer.Comparator): def is_frobnozzled(self, other): return self.op("--is_frobnozzled->", is_comparison=True)(other)
一元操作也是可能的。例如,要添加 PostgreSQL 阶乘运算符的实现,我们结合 UnaryExpression
构造以及 custom_op
来生成阶乘表达式:
from sqlalchemy import Integer from sqlalchemy.sql.expression import UnaryExpression from sqlalchemy.sql import operators class MyInteger(Integer): class comparator_factory(Integer.Comparator): def factorial(self): return UnaryExpression( self.expr, modifier=operators.custom_op("!"), type_=MyInteger )
使用上述类型:
>>> from sqlalchemy.sql import column >>> print(column("x", MyInteger).factorial()) x !
另请参阅
Operators.op()
TypeEngine.comparator_factory
创建新类型
UserDefinedType
类被提供作为定义全新数据库类型的简单基类。使用它来表示 SQLAlchemy 不知道的本地数据库类型。如果只需要 Python 转换行为,请改用 TypeDecorator
。
对象名称 | 描述 |
UserDefinedType | 用户定义类型的基础。 |
class sqlalchemy.types.UserDefinedType
用户定义类型的基础。
这应该是新类型的基础。请注意,对于大多数情况,TypeDecorator
可能更合适:
import sqlalchemy.types as types class MyType(types.UserDefinedType): cache_ok = True def __init__(self, precision = 8): self.precision = precision def get_col_spec(self, **kw): return "MYTYPE(%s)" % self.precision def bind_processor(self, dialect): def process(value): return value return process def result_processor(self, dialect, coltype): def process(value): return value return process
一旦类型被创建,它立即可用:
table = Table('foo', metadata_obj, Column('id', Integer, primary_key=True), Column('data', MyType(16)) )
get_col_spec()
方法在大多数情况下将接收一个关键字参数 type_expression
,该参数指的是类型的拥有表达式在编译时,例如 Column
或 cast()
构造。仅当方法接受关键字参数(例如 **kw
)时才发送此关键字;用于检查此函数的传统形式的内省。
UserDefinedType.cache_ok
类级标志指示此自定义 UserDefinedType
是否安全用作缓存键的一部分。此标志默认为 None
,当 SQL 编译器尝试为使用此类型的语句生成缓存键时,最初会生成警告。如果不能保证 UserDefinedType
每次产生相同的绑定/结果行为和 SQL 生成,应将此标志设置为 False
;否则,如果类每次产生相同的行为,则可以设置为 True
。有关此功能的详细说明,请参阅 UserDefinedType.cache_ok
。
新版本 1.4.28 中:将 ExternalType.cache_ok
标志泛化,以便它同时适用于 TypeDecorator
和 UserDefinedType
。
成员
cache_ok,coerce_compared_value(),ensure_kwarg
类签名
类 sqlalchemy.types.UserDefinedType
(sqlalchemy.types.ExternalType
,sqlalchemy.types.TypeEngineMixin
,sqlalchemy.types.TypeEngine
,sqlalchemy.util.langhelpers.EnsureKWArg
)
attribute cache_ok: bool | None = None
继承自 ExternalType.cache_ok
属性的 ExternalType
指示使用此 ExternalType
的语句是否“可缓存”。
默认值None
将发出警告,然后不允许缓存包含此类型的语句。将其设置为False
以完全禁用使用此类型的语句的缓存,而无需警告。当设置为True
时,对象的类和其状态的选定元素将用作缓存键的一部分。例如,使用TypeDecorator
:
class MyType(TypeDecorator): impl = String cache_ok = True def __init__(self, choices): self.choices = tuple(choices) self.internal_only = True
上述类型的缓存键将等同于:
>>> MyType(["a", "b", "c"])._static_cache_key (<class '__main__.MyType'>, ('choices', ('a', 'b', 'c')))
缓存方案将从与__init__()
方法中的参数名称相对应的类型中提取属性。在上面的例子中,“choices”属性成为缓存键的一部分,但“internal_only”不是,因为没有名为“internal_only”的参数。
可缓存元素的要求是它们是可哈希的,并且还要表明对于给定缓存值,每次使用此类型的表达式渲染的 SQL 都相同。
为了适应引用不可哈希结构的数据类型,如字典、集合和列表的数据类型,可以通过将可哈希结构分配给名称与参数名称对应的属性来使这些对象“可缓存”。例如,接受查找值字典的数据类型可以将其发布为排序的元组系列。给定一个以前不可缓存的类型如下:
class LookupType(UserDefinedType): '''a custom type that accepts a dictionary as a parameter. this is the non-cacheable version, as "self.lookup" is not hashable. ''' def __init__(self, lookup): self.lookup = lookup def get_col_spec(self, **kw): return "VARCHAR(255)" def bind_processor(self, dialect): # ... works with "self.lookup" ...
“查找”是一个字典。该类型将无法生成缓存键:
>>> type_ = LookupType({"a": 10, "b": 20}) >>> type_._static_cache_key <stdin>:1: SAWarning: UserDefinedType LookupType({'a': 10, 'b': 20}) will not produce a cache key because the ``cache_ok`` flag is not set to True. Set this flag to True if this type object's state is safe to use in a cache key, or False to disable this warning. symbol('no_cache')
如果我们确实设置了这样的缓存键,它将无法使用。我们将得到一个包含字典的元组结构,这个字典本身不能作为“缓存字典”中的键使用,例如 SQLAlchemy 的语句缓存,因为 Python 字典不是可哈希的:
>>> # set cache_ok = True >>> type_.cache_ok = True >>> # this is the cache key it would generate >>> key = type_._static_cache_key >>> key (<class '__main__.LookupType'>, ('lookup', {'a': 10, 'b': 20})) >>> # however this key is not hashable, will fail when used with >>> # SQLAlchemy statement cache >>> some_cache = {key: "some sql value"} Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: unhashable type: 'dict'
可通过将排序的元组分配给“.lookup”属性来使上述类型可缓存:
class LookupType(UserDefinedType): '''a custom type that accepts a dictionary as a parameter. The dictionary is stored both as itself in a private variable, and published in a public variable as a sorted tuple of tuples, which is hashable and will also return the same value for any two equivalent dictionaries. Note it assumes the keys and values of the dictionary are themselves hashable. ''' cache_ok = True def __init__(self, lookup): self._lookup = lookup # assume keys/values of "lookup" are hashable; otherwise # they would also need to be converted in some way here self.lookup = tuple( (key, lookup[key]) for key in sorted(lookup) ) def get_col_spec(self, **kw): return "VARCHAR(255)" def bind_processor(self, dialect): # ... works with "self._lookup" ...
在上面的情况下,LookupType({"a": 10, "b": 20})
的缓存键将为:
>>> LookupType({"a": 10, "b": 20})._static_cache_key (<class '__main__.LookupType'>, ('lookup', (('a', 10), ('b', 20))))
新版本 1.4.14 中新增了:- 添加了cache_ok
标志,允许对TypeDecorator
类的缓存进行一些可配置性。
新版本 1.4.28 中新增了:- 添加了ExternalType
混合类型,它将cache_ok
标志推广到了TypeDecorator
和UserDefinedType
类。
另请参阅
SQL 编译缓存
method coerce_compared_value(op: OperatorType | None, value: Any) → TypeEngine[Any
为表达式中的“强制转换”Python 值建议一种类型。
UserDefinedType
的默认行为与 TypeDecorator
的默认行为相同;默认情况下,它返回 self
,假设比较的值应该被强制转换为与此相同的类型。有关更多详细信息,请参见 TypeDecorator.coerce_compared_value()
。
attribute ensure_kwarg: str = 'get_col_spec'
一个用于指示方法名称的正则表达式,该方法应接受**kw
参数。
类将扫描匹配名称模板的方法,并在必要时装饰它们,以确保接受**kw
参数。
使用自定义类型和反射
需要注意的是,被修改以具有额外的 Python 行为的数据库类型,包括基于TypeDecorator
的类型以及其他用户定义的数据类型子类,在数据库模式中没有任何表示。当使用数据库中描述的反射功能时,SQLAlchemy 使用一个固定的映射,将数据库服务器报告的数据类型信息链接到一个 SQLAlchemy 数据类型对象上。例如,如果我们在 PostgreSQL 模式中查看特定数据库列的定义,可能会收到字符串"VARCHAR"
。SQLAlchemy 的 PostgreSQL 方言有一个硬编码的映射,将字符串名称"VARCHAR"
链接到 SQLAlchemy VARCHAR
类,这就是当我们发出像Table('my_table', m, autoload_with=engine)
这样的语句时,其中的 Column
对象内会有一个 VARCHAR
的实例存在的原因。
这意味着如果一个 Table
对象使用的类型对象不直接对应于数据库本机类型名称,如果我们在其他地方使用反射为此数据库表创建新的 Table
对象,则它将没有此数据类型。例如:
>>> from sqlalchemy import ( ... Table, ... Column, ... MetaData, ... create_engine, ... PickleType, ... Integer, ... ) >>> metadata = MetaData() >>> my_table = Table( ... "my_table", metadata, Column("id", Integer), Column("data", PickleType) ... ) >>> engine = create_engine("sqlite://", echo="debug") >>> my_table.create(engine) INFO sqlalchemy.engine.base.Engine CREATE TABLE my_table ( id INTEGER, data BLOB )
在上面,我们使用了PickleType
,它是一个作用于LargeBinary
数据类型之上的TypeDecorator
,在 SQLite 中对应着数据库类型BLOB
。在 CREATE TABLE 中,我们可以看到使用了BLOB
数据类型。SQLite 数据库对我们使用的PickleType
一无所知。
如果我们看一下my_table.c.data.type
的数据类型,因为这是我们直接创建的 Python 对象,它是PickleType
:
>>> my_table.c.data.type PickleType()
然而,如果我们使用反射创建另一个Table
实例,我们创建的 SQLite 数据库中不会反映出PickleType
的使用;相反,我们得到的是BLOB
:
>>> metadata_two = MetaData() >>> my_reflected_table = Table("my_table", metadata_two, autoload_with=engine) INFO sqlalchemy.engine.base.Engine PRAGMA main.table_info("my_table") INFO sqlalchemy.engine.base.Engine () DEBUG sqlalchemy.engine.base.Engine Col ('cid', 'name', 'type', 'notnull', 'dflt_value', 'pk') DEBUG sqlalchemy.engine.base.Engine Row (0, 'id', 'INTEGER', 0, None, 0) DEBUG sqlalchemy.engine.base.Engine Row (1, 'data', 'BLOB', 0, None, 0) >>> my_reflected_table.c.data.type BLOB()
通常,当应用程序使用自定义类型定义明确的Table
元数据时,不需要使用表反射,因为必要的Table
元数据已经存在。然而,对于一个应用程序或一组应用程序需要同时使用包含自定义 Python 级数据类型的明确Table
元数据以及设置其Column
对象作为从数据库反映的Table
对象的情况,仍然需要展示自定义数据类型的附加 Python 行为,必须采取额外的步骤来允许这种情况。
最直接的方法是按照覆盖反射列中描述的覆盖特定列。在这种技术中,我们只需将反射与那些我们想要使用自定义或装饰数据类型的列的显式Column
对象结合使用:
>>> metadata_three = MetaData() >>> my_reflected_table = Table( ... "my_table", ... metadata_three, ... Column("data", PickleType), ... autoload_with=engine, ... )
上面的my_reflected_table
对象被反映出来,并将从 SQLite 数据库加载“id”列的定义。但对于“data”列,我们用一个显式的Column
定义来覆盖了反射对象,其中包括我们想要的 Python 数据类型,PickleType
。反射过程将保留此Column
对象不变:
>>> my_reflected_table.c.data.type PickleType()
从数据库本地类型对象转换为自定义数据类型的更详细的方法是使用DDLEvents.column_reflect()
事件处理程序。例如,如果我们知道我们想要的所有BLOB
数据类型实际上都是PickleType
,我们可以设置一个跨越整个的规则:
from sqlalchemy import BLOB from sqlalchemy import event from sqlalchemy import PickleType from sqlalchemy import Table @event.listens_for(Table, "column_reflect") def _setup_pickletype(inspector, table, column_info): if isinstance(column_info["type"], BLOB): column_info["type"] = PickleType()
当上述代码在任何表反射发生之前调用(还要注意它应该在应用程序中仅调用一次,因为它是一个全局规则)时,对于包含具有BLOB
数据类型列的任何Table
,结果数据类型将存储在Column
对象中作为PickleType
。
实际上,上述基于事件的方法可能会有额外的规则,以便仅影响那些数据类型很重要的列,例如表名和可能列名的查找表,或者其他启发式方法,以准确确定应该用 Python 数据类型建立哪些列。
eSQL 方言有一个硬编码的映射,将字符串名称"VARCHAR"
链接到 SQLAlchemy VARCHAR
类,这就是当我们发出像Table('my_table', m, autoload_with=engine)
这样的语句时,其中的 Column
对象内会有一个 VARCHAR
的实例存在的原因。
这意味着如果一个 Table
对象使用的类型对象不直接对应于数据库本机类型名称,如果我们在其他地方使用反射为此数据库表创建新的 Table
对象,则它将没有此数据类型。例如:
>>> from sqlalchemy import ( ... Table, ... Column, ... MetaData, ... create_engine, ... PickleType, ... Integer, ... ) >>> metadata = MetaData() >>> my_table = Table( ... "my_table", metadata, Column("id", Integer), Column("data", PickleType) ... ) >>> engine = create_engine("sqlite://", echo="debug") >>> my_table.create(engine) INFO sqlalchemy.engine.base.Engine CREATE TABLE my_table ( id INTEGER, data BLOB )
在上面,我们使用了PickleType
,它是一个作用于LargeBinary
数据类型之上的TypeDecorator
,在 SQLite 中对应着数据库类型BLOB
。在 CREATE TABLE 中,我们可以看到使用了BLOB
数据类型。SQLite 数据库对我们使用的PickleType
一无所知。
如果我们看一下my_table.c.data.type
的数据类型,因为这是我们直接创建的 Python 对象,它是PickleType
:
>>> my_table.c.data.type PickleType()
然而,如果我们使用反射创建另一个Table
实例,我们创建的 SQLite 数据库中不会反映出PickleType
的使用;相反,我们得到的是BLOB
:
>>> metadata_two = MetaData() >>> my_reflected_table = Table("my_table", metadata_two, autoload_with=engine) INFO sqlalchemy.engine.base.Engine PRAGMA main.table_info("my_table") INFO sqlalchemy.engine.base.Engine () DEBUG sqlalchemy.engine.base.Engine Col ('cid', 'name', 'type', 'notnull', 'dflt_value', 'pk') DEBUG sqlalchemy.engine.base.Engine Row (0, 'id', 'INTEGER', 0, None, 0) DEBUG sqlalchemy.engine.base.Engine Row (1, 'data', 'BLOB', 0, None, 0) >>> my_reflected_table.c.data.type BLOB()
通常,当应用程序使用自定义类型定义明确的Table
元数据时,不需要使用表反射,因为必要的Table
元数据已经存在。然而,对于一个应用程序或一组应用程序需要同时使用包含自定义 Python 级数据类型的明确Table
元数据以及设置其Column
对象作为从数据库反映的Table
对象的情况,仍然需要展示自定义数据类型的附加 Python 行为,必须采取额外的步骤来允许这种情况。
最直接的方法是按照覆盖反射列中描述的覆盖特定列。在这种技术中,我们只需将反射与那些我们想要使用自定义或装饰数据类型的列的显式Column
对象结合使用:
>>> metadata_three = MetaData() >>> my_reflected_table = Table( ... "my_table", ... metadata_three, ... Column("data", PickleType), ... autoload_with=engine, ... )
上面的my_reflected_table
对象被反映出来,并将从 SQLite 数据库加载“id”列的定义。但对于“data”列,我们用一个显式的Column
定义来覆盖了反射对象,其中包括我们想要的 Python 数据类型,PickleType
。反射过程将保留此Column
对象不变:
>>> my_reflected_table.c.data.type PickleType()
从数据库本地类型对象转换为自定义数据类型的更详细的方法是使用DDLEvents.column_reflect()
事件处理程序。例如,如果我们知道我们想要的所有BLOB
数据类型实际上都是PickleType
,我们可以设置一个跨越整个的规则:
from sqlalchemy import BLOB from sqlalchemy import event from sqlalchemy import PickleType from sqlalchemy import Table @event.listens_for(Table, "column_reflect") def _setup_pickletype(inspector, table, column_info): if isinstance(column_info["type"], BLOB): column_info["type"] = PickleType()
当上述代码在任何表反射发生之前调用(还要注意它应该在应用程序中仅调用一次,因为它是一个全局规则)时,对于包含具有BLOB
数据类型列的任何Table
,结果数据类型将存储在Column
对象中作为PickleType
。
实际上,上述基于事件的方法可能会有额外的规则,以便仅影响那些数据类型很重要的列,例如表名和可能列名的查找表,或者其他启发式方法,以准确确定应该用 Python 数据类型建立哪些列。