SQLAlchemy是Python SQL工具箱和ORM框架,它为应用程序开发人员提供了全面而灵活的SQL功能。它提供了一整套企业级持久化方案,旨在高效,高性能地访问数据库,并符合Pythonic之禅。项目代码量比较大,接近200个文件,7万行代码, 我们一起来挑战一下。由于篇幅原因,分成上下两篇,上篇我们学习了core部分的engine,dialect,connection和pool等部分,下篇主要学习core部分剩余的sql表达式和orm部分,包括如下内容:
- SQL-schema使用示例
- DDL(Data Definition Language)创建table
- DML(Data Manipulation Language)使用insert插入数据
- DQL(Data Query Language)使用select查询数据
- ORM示例
- model核心功能
- 小结
- 小技巧
SQL-schema使用示例
上篇中,我们使用的sql都是手工编写的语句,下面这样:
create table x (a integer, b integer) insert into x (a, b) values (1, 1) 复制代码
在sqlalchemy中可以通过定义schema的方式进行数据操作,完整的示例如下:
from sqlalchemy import create_engine from sqlalchemy import MetaData from sqlalchemy import Table from sqlalchemy import Column from sqlalchemy import Integer from sqlalchemy import String from sqlalchemy.sql import select engine = create_engine('sqlite:///:memory:', echo=True) metadata = MetaData() users = Table('users', metadata, Column('id', Integer, primary_key=True), Column('name', String), Column('fullname', String), ) metadata.create_all(engine) ins = users.insert().values(name='jack', fullname='Jack Jones') print(ins) result = engine.execute(ins) print(result, result.inserted_primary_key) s = select([users]) result = conn.execute(s) for row in result: print(row) result = engine.execute("select * from users") for row in result: print(row) 复制代码
示例程序的执行过程:
- 创建engine,用于数据库连接
- 创建metadata,用于管理schema
- 创建users表的Table,绑定到metadata;同时包括id,name和fullname三个column
- 将metadata提交到engine(创建表)
- 使用users插入数据
- 查询users的数据
- 使用普通sql的方式验证数据
下面是示例的执行日志,清晰展示了上面过程:
... 2021-04-19 10:02:09,166 INFO sqlalchemy.engine.base.Engine CREATE TABLE users ( id INTEGER NOT NULL, name VARCHAR, fullname VARCHAR, PRIMARY KEY (id) ) 2021-04-19 10:02:09,166 INFO sqlalchemy.engine.base.Engine () 2021-04-19 10:02:09,167 INFO sqlalchemy.engine.base.Engine COMMIT INSERT INTO users (name, fullname) VALUES (:name, :fullname) 2021-04-19 10:02:09,167 INFO sqlalchemy.engine.base.Engine INSERT INTO users (name, fullname) VALUES (?, ?) 2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine ('jack', 'Jack Jones') 2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine COMMIT <sqlalchemy.engine.result.ResultProxy object at 0x7ffca0607070> [1] 2021-04-27 11:38:19,134 INFO sqlalchemy.engine.base.Engine SELECT users.id, users.name, users.fullname FROM users 2021-04-27 11:38:19,134 INFO sqlalchemy.engine.base.Engine () (1, 'jack', 'Jack Jones') 2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine select * from users 2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine () (1, 'jack', 'Jack Jones') 复制代码
在开始之前,我们需要简单了解一下SQL语句的分类:
在我们的schema使用示例中,就包括了DDL,DML和DQL三种类型的语句,下面我们按照这3种类型,详细了解一下sqlalchemy的sql表达式部分。sql表达式主要在sql包中,部分文件的功能如下:
模块 | 描述 |
base.py | 基础类 |
compiler.py | sql编译 |
crud.py | crud的参数处理 |
ddl.py | DDL语句 |
default_comparator.py | 比较 |
dml.py | DML语句 |
elements.py | 基本类型 |
operators.py | sql操作符 |
schema.py | schema定义 |
selectable.py | DQL |
sqltypes.py&&type_api.py | sql数据类型 |
vistitors.py | 递归算法 |
DDL(Data Definition Language)创建table
首先了解一下schema的基础实现visitable:
class VisitableType(type): def __init__(cls, clsname, bases, clsdict): if clsname != "Visitable" and hasattr(cls, "__visit_name__"): _generate_dispatch(cls) super(VisitableType, cls).__init__(clsname, bases, clsdict) def _generate_dispatch(cls): if "__visit_name__" in cls.__dict__: visit_name = cls.__visit_name__ if isinstance(visit_name, str): getter = operator.attrgetter("visit_%s" % visit_name) def _compiler_dispatch(self, visitor, **kw): try: meth = getter(visitor) except AttributeError: raise exc.UnsupportedCompilationError(visitor, cls) else: return meth(self, **kw) cls._compiler_dispatch = _compiler_dispatch class Visitable(util.with_metaclass(VisitableType, object)): pass 复制代码
Visitable约定子类必须提供 visit_name 的类属性,用来绑定编译方法。参与sql的类都继承自Visitable:
class SchemaItem(SchemaEventTarget, visitors.Visitable): __visit_name__ = "schema_item" class MetaData(SchemaItem): __visit_name__ = "metadata" class Table(DialectKWArgs, SchemaItem, TableClause): __visit_name__ = "table" class Column(DialectKWArgs, SchemaItem, ColumnClause): __visit_name__ = "column" class TypeEngine(Visitable): ... class Integer(_LookupExpressionAdapter, TypeEngine): __visit_name__ = "integer" 复制代码
MetaData是schema的集合,记录了所有的Table定义, 通过 _add_table
函数用来添加表:
class MetaData(SchemaItem): def __init__( self, bind=None, reflect=False, schema=None, quote_schema=None, naming_convention=None, info=None, ): # table集合 self.tables = util.immutabledict() self.schema = quoted_name(schema, quote_schema) self._schemas = set() def _add_table(self, name, schema, table): key = _get_table_key(name, schema) dict.__setitem__(self.tables, key, table) if schema: self._schemas.add(schema) 复制代码
Table是column的集合,在创建table对象的时候,把自己添加到metadata中:
class Table(DialectKWArgs, SchemaItem, TableClause): def __new__(cls, *args, **kw): name, metadata, args = args[0], args[1], args[2:] schema = metadata.schema table = object.__new__(cls) # 添加到metadata metadata._add_table(name, schema, table) table._init(name, metadata, *args, **kw) return table def _init(self, name, metadata, *args, **kwargs): super(Table, self).__init__( quoted_name(name, kwargs.pop("quote", None)) ) self.metadata = metadata self.schema = metadata.schema # column集合 self._columns = ColumnCollection() self._init_items(*args) def _init_items(self, *args): # column for item in args: if item is not None: item._set_parent_with_dispatch(self) 复制代码
Column是通过下面的方法将column添加到table的colummns中:
class Column(DialectKWArgs, SchemaItem, ColumnClause): def __init__(self, *args, **kwargs): pass def _set_parent(self, table): table._columns.replace(self) class ColumnCollection(util.OrderedProperties): def replace(self, column): ... self._data[column.key] = column ... 复制代码
现阶段,我们大概厘清了metadata,table和column的数据结构:metadata持有table集合,table持有column集合。接下来我们看看这个数据结构如何转换成sql语句,API是通过 MetaData.create_all
函数实现:
class MetaData(SchemaItem): def create_all(self, bind=None, tables=None, checkfirst=True): bind._run_visitor( ddl.SchemaGenerator, self, checkfirst=checkfirst, tables=tables ) class Engine(Connectable, log.Identified): def _run_visitor( self, visitorcallable, element, connection=None, **kwargs ): with self._optional_conn_ctx_manager(connection) as conn: conn._run_visitor(visitorcallable, element, **kwargs) class Connection(Connectable): def _run_visitor(self, visitorcallable, element, **kwargs): visitorcallable(self.dialect, self, **kwargs).traverse_single(element) 复制代码
create-table的sql编译主要由ddl中的SchemaGenerator实现, 下面是SchemaGenerator的继承关系和核心的traverse_single函数:
class ClauseVisitor(object): def traverse_single(self, obj, **kw): # 遍历所有的visit实现 for v in self.visitor_iterator: meth = getattr(v, "visit_%s" % obj.__visit_name__, None) if meth: return meth(obj, **kw) @property def visitor_iterator(self): v = self while v: yield v v = getattr(v, "_next", None) class SchemaVisitor(ClauseVisitor): ... class DDLBase(SchemaVisitor): ... class SchemaGenerator(DDLBase): ... 复制代码
创建meta,table和columun的过程:
class SchemaGenerator(DDLBase): def visit_metadata(self, metadata): tables = list(metadata.tables.values()) collection = sort_tables_and_constraints( [t for t in tables if self._can_create_table(t)] ) for table, fkcs in collection: if table is not None: # 创建表 self.traverse_single( table, create_ok=True, include_foreign_key_constraints=fkcs, _is_metadata_operation=True, ) def visit_table( self, table, create_ok=False, include_foreign_key_constraints=None, _is_metadata_operation=False, ): for column in table.columns: if column.default is not None: # 创建column-DDLElement self.traverse_single(column.default) self.connection.execute( # fmt: off # 创建create-table-DDLElement CreateTable( table, include_foreign_key_constraints= # noqa include_foreign_key_constraints, ) # fmt: on ) 复制代码
CreateTableDDLElement和CreateColumnDDLElement的继承关系:
class _DDLCompiles(ClauseElement): def _compiler(self, dialect, **kw): return dialect.ddl_compiler(dialect, self, **kw) class DDLElement(Executable, _DDLCompiles): ... class _CreateDropBase(DDLElement): ... class CreateTable(_CreateDropBase): __visit_name__ = "create_table" def __init__( self, element, on=None, bind=None, include_foreign_key_constraints=None ): super(CreateTable, self).__init__(element, on=on, bind=bind) self.columns = [CreateColumn(column) for column in element.columns] class CreateColumn(_DDLCompiles): __visit_name__ = "create_column" def __init__(self, element): self.element = element 复制代码
最终这些DDLElement在compiler中被DDLCompiler编译成sql语句, CREATE TABLE
是这样被编译的:
def visit_create_table(self, create): table = create.element preparer = self.preparer text = "\nCREATE " if table._prefixes: text += " ".join(table._prefixes) + " " text += "TABLE " + preparer.format_table(table) + " " create_table_suffix = self.create_table_suffix(table) if create_table_suffix: text += create_table_suffix + " " text += "(" separator = "\n" # if only one primary key, specify it along with the column first_pk = False for create_column in create.columns: column = create_column.element try: processed = self.process( create_column, first_pk=column.primary_key and not first_pk ) if processed is not None: text += separator separator = ", \n" text += "\t" + processed if column.primary_key: first_pk = True except exc.CompileError as ce: ... const = self.create_table_constraints( table, _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa ) if const: text += separator + "\t" + const text += "\n)%s\n\n" % self.post_create_table(table) return text def visit_create_column(self, create, first_pk=False): column = create.element text = self.get_column_specification(column, first_pk=first_pk) const = " ".join( self.process(constraint) for constraint in column.constraints ) if const: text += " " + const return text 复制代码
在前面column介绍中,我们略过了数据类型。大家都知道sql的数据类型和python数据类型有差异, 下面是一些常见的SQL数据类型:
class TypeEngine(Visitable): ... class Integer(_LookupExpressionAdapter, TypeEngine): __visit_name__ = "integer" ... class String(Concatenable, TypeEngine): __visit_name__ = "string" ... class CHAR(String): __visit_name__ = "CHAR" ... class VARCHAR(String): __visit_name__ = "VARCHAR" ... 复制代码
数据类型由GenericTypeCompiler进行编译:
class TypeCompiler(util.with_metaclass(util.EnsureKWArgType, object)): def process(self, type_, **kw): return type_._compiler_dispatch(self, **kw) class GenericTypeCompiler(TypeCompiler): def visit_INTEGER(self, type_, **kw): return "INTEGER" def visit_string(self, type_, **kw): return self.visit_VARCHAR(type_, **kw) def visit_VARCHAR(self, type_, **kw): return self._render_string_type(type_, "VARCHAR") def _render_string_type(self, type_, name): text = name if type_.length: text += "(%d)" % type_.length if type_.collation: text += ' COLLATE "%s"' % type_.collation return text 复制代码
DML(Data Manipulation Language)使用insert插入数据
数据插入的API由TableClause提供的insert函数:
class TableClause(Immutable, FromClause): @util.dependencies("sqlalchemy.sql.dml") def insert(self, dml, values=None, inline=False, **kwargs): return dml.Insert(self, values=values, inline=inline, **kwargs) 复制代码
dml中提供了Insert类的实现:
class UpdateBase( HasCTE, DialectKWArgs, HasPrefixes, Executable, ClauseElement ): ... class ValuesBase(UpdateBase): ... class Insert(ValuesBase): __visit_name__ = "insert" ... 复制代码
按照ddl的经验,我们查找insert语句的编译方法,在SQLCompiler中:
class SQLCompiler(Compiled): def visit_insert(self, insert_stmt, asfrom=False, **kw): crud_params = crud._setup_crud_params( self, insert_stmt, crud.ISINSERT, **kw ) if insert_stmt._has_multi_parameters: crud_params_single = crud_params[0] else: crud_params_single = crud_params preparer = self.preparer supports_default_values = self.dialect.supports_default_values text = "INSERT " text += "INTO " table_text = preparer.format_table(insert_stmt.table) if crud_params_single or not supports_default_values: text += " (%s)" % ", ".join( [preparer.format_column(c[0]) for c in crud_params_single] ) ... if insert_stmt.select is not None: select_text = self.process(self._insert_from_select, **kw) if self.ctes and toplevel and self.dialect.cte_follows_insert: text += " %s%s" % (self._render_cte_clause(), select_text) else: text += " %s" % select_text elif not crud_params and supports_default_values: text += " DEFAULT VALUES" elif insert_stmt._has_multi_parameters: text += " VALUES %s" % ( ", ".join( "(%s)" % (", ".join(c[1] for c in crud_param_set)) for crud_param_set in crud_params ) ) else: text += " VALUES (%s)" % ", ".join([c[1] for c in crud_params]) return text 复制代码
可以看到insert语句就是对Insert对象,通过字符串模版拼接而来。
DQL(Data Query Language)使用select查询数据
数据查询select语句也都有特定的数据结构Select,继承关系如下:
class SelectBase(HasCTE, Executable, FromClause): ... class GenerativeSelect(SelectBase): ... class Select(HasPrefixes, HasSuffixes, GenerativeSelect): __visit_name__ = "select" def __init__( self, columns=None, whereclause=None, from_obj=None, distinct=False, having=None, correlate=True, prefixes=None, suffixes=None, **kwargs ): GenerativeSelect.__init__(self, **kwargs) ... 复制代码
select的编译语句也在SQLCompiler中:
class SQLCompiler(Compiled): def visit_select( self, select, asfrom=False, parens=True, fromhints=None, compound_index=0, nested_join_translation=False, select_wraps_for=None, lateral=False, **kwargs ): ... froms = self._setup_select_stack(select, entry, asfrom, lateral) column_clause_args = kwargs.copy() column_clause_args.update( {"within_label_clause": False, "within_columns_clause": False} ) text = "SELECT " # we're off to a good start ! text += self.get_select_precolumns(select, **kwargs) # the actual list of columns to print in the SELECT column list. inner_columns = [ c for c in [ self._label_select_column( select, column, populate_result_map, asfrom, column_clause_args, name=name, ) for name, column in select._columns_plus_names ] if c is not None ] ... text = self._compose_select_body( text, select, inner_columns, froms, byfrom, kwargs ) if select._statement_hints: per_dialect = [ ht for (dialect_name, ht) in select._statement_hints if dialect_name in ("*", self.dialect.name) ] if per_dialect: text += " " + self.get_statement_hint_text(per_dialect) if self.ctes and toplevel: text = self._render_cte_clause() + text if select._suffixes: text += " " + self._generate_prefixes( select, select._suffixes, **kwargs ) self.stack.pop(-1) if (asfrom or lateral) and parens: return "(" + text + ")" else: return text 复制代码
select语句一样是采用字符串拼接得到。
ORM 示例
orm的使用和schema使用方式略有不同, 下面是orm的示例:
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker engine = create_engine('sqlite:///:memory:', echo=True) Model = declarative_base() class User(Model): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) fullname = Column(String) nickname = Column(String) def __repr__(self): return "<User(name='%s', fullname='%s', nickname='%s')>" % ( self.name, self.fullname, self.nickname) Model.metadata.create_all(engine) print("="*10) Session = sessionmaker(bind=engine) session = Session() ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname') session.add(ed_user) session.commit() print(ed_user.id) result = engine.execute("select * from users") for row in result: print(row) 复制代码
对比schema和orm的差异,可以得到下表:
schema方式|orm方式 创建engine,用于数据库连接|- 创建metadata,用于管理schema|创建Model 创建users表的Table|创建User模型 将metadata提交到engine(创建表)|- -|创建session 使用users插入数据|使用session插入数据
总结一下主要就2点差异:
- orm时候不用显示的创建表的schema
- orm的数据处理都使用session来操作,而不是使用connection
model核心功能
Model类使用declarative_base动态创建:
class DeclarativeMeta(type): def __init__(cls, classname, bases, dict_): if "_decl_class_registry" not in cls.__dict__: _as_declarative(cls, classname, cls.__dict__) type.__init__(cls, classname, bases, dict_) def __setattr__(cls, key, value): _add_attribute(cls, key, value) def __delattr__(cls, key): _del_attribute(cls, key) def declarative_base( bind=None, metadata=None, mapper=None, cls=object, name="Base", constructor=_declarative_constructor, class_registry=None, metaclass=DeclarativeMeta, ): # 创建metadata lcl_metadata = metadata or MetaData() if class_registry is None: class_registry = weakref.WeakValueDictionary() bases = not isinstance(cls, tuple) and (cls,) or cls class_dict = dict( _decl_class_registry=class_registry, metadata=lcl_metadata ) # 构造函数 if constructor: class_dict["__init__"] = constructor if mapper: class_dict["__mapper_cls__"] = mapper # class-meta return metaclass(name, bases, class_dict) 复制代码
关于如何动态创建类,在小技巧中进行介绍。declarative_base主要定义了Model类的几个特性:
- Model类的构造函数
__init__
使用_declarative_constructor - Model类的子类在构造的时候会调用_as_declarative
- model对象会使用_add_attribute进行赋值
先从构造函数_declarative_constructor开始:
def _declarative_constructor(self, **kwargs): cls_ = type(self) for k in kwargs: if not hasattr(cls_, k): raise TypeError( "%r is an invalid keyword argument for %s" % (k, cls_.__name__) ) setattr(self, k, kwargs[k]) _declarative_constructor.__name__ = "__init__" 复制代码
看起来非常简单,但是这里做了一个类和对象实例之间的校验转换。我们先看一段演示代码:
class DummyModel(object): name = ["dummy_model"] # 引用类型 a = DummyModel() b = DummyModel() assert id(a.name) == id(b.name) == id(DummyModel.name) a.name.append("a") assert id(a.name) == id(b.name) == id(DummyModel.name) 复制代码
DummyModel的类属性name和a对象的name属性都是同一个引用。如果使用Model类:
Model = declarative_base() class UserModel(Model): __tablename__ = 'user' # 必须字段 id = Column(Integer, primary_key=True) # 必须字段 name = Column(String) c = UserModel() c.name = "c" d = UserModel() d.name = "d" # 注意并不是Column assert isinstance(UserModel.name, InstrumentedAttribute) assert isinstance(c.name, str) assert d.name == "d" assert id(c.name) != id(d.name) != id(UserModel.name) 复制代码
可以发现UserModel的类属性name和d对象的name属性完全不一样,类定义的是Cloumn(InstrumentedAttribute),对象变成了str。这个就是orm模型的特性之一,Model是定义格式模版,对象实例化后转化为普通数据。
Model的另外一个功能是隐式创建Table对象,在_as_declarative函数中通过_MapperConfig实现
class _MapperConfig(object): def setup_mapping(cls, cls_, classname, dict_): cfg_cls = _MapperConfig cfg_cls(cls_, classname, dict_) def __init__(self, cls_, classname, dict_): ... self._setup_table() ... def _setup_table(self): ... table_cls = Table args, table_kw = (), {} if table_args: if isinstance(table_args, dict): table_kw = table_args elif isinstance(table_args, tuple): if isinstance(table_args[-1], dict): args, table_kw = table_args[0:-1], table_args[-1] else: args = table_args autoload = dict_.get("__autoload__") if autoload: table_kw["autoload"] = True cls.__table__ = table = table_cls( tablename, cls.metadata, *(tuple(declared_columns) + tuple(args)), **table_kw ) ... 复制代码
而Column是通过下面的函数实现:
def _add_attribute(cls, key, value): if "__mapper__" in cls.__dict__: if isinstance(value, Column): _undefer_column_name(key, value) cls.__table__.append_column(value) cls.__mapper__.add_property(key, value) ... else: type.__setattr__(cls, key, value) 复制代码
Model通过上面的方式,隐式创建了Schema(Table),实际使用过程中只需要使用Model类,不用关注Schema的定义。
session的源码由于篇幅和时间有限,留待以后再行分析
小结
sqlalchemy可以在低层次上提供了sql语句的方式使用;在次层次上提供定义schema方式使用;在高层次上提供orm的实现,让应用可以根据项目的特点自主选择不同层级的API。
使用schema时候,主要使用Metadata,Table和Column等定义Schema数据结构,使用编译器自动将schema转换成合法的sql语句。
使用orm的时候,则是创建特定的数据模型,模型对象会隐式创建schema,通过session方式进行数据访问。
最后再回顾一下sqlalchemy的架构图:
小技巧
sqlalchemy中提供了一个动态创建类的方式,主要在declarative_base和DeclarativeMeta中实现。我参考这个实现方式做了一个类工厂:
class DeclarativeMeta(type): def __init__(cls, klass_name, bases, dict_): print("class_init", klass_name, bases, dict_) type.__init__(cls, klass_name, bases, dict_) def get_attr(self, key): print("getattr", self, key) return self.__dict__[key] def constructor(self, *args, **kwargs): print("constructor", self, args, kwargs) for k, v in kwargs.items(): setattr(self, k, v) def dynamic_class(name): class_dict = { "__init__": constructor, "__getattr__": get_attr } return DeclarativeMeta(name, (object,), class_dict) DummyModel = dynamic_class("Dummy") dummy = DummyModel(1, name="hello", age=18) print(dummy, type(dummy), dummy.name, dummy.age) # class_init Dummy (<class 'object'>,) {'__init__': <function test_dynamic_class.<locals>.constructor at 0x7f898827ef70>, '__getattr__': <function test_dynamic_class.<locals>.get_attr at 0x7f89882105e0>} # constructor <sample.Dummy object at 0x7f89882a5820> (1,) {'name': 'hello', 'age': 18} # <sample.Dummy object at 0x7f89882a5820> <class 'sample.Dummy'> hello 18 复制代码
示例中我动态创建了一个DummyModel类,type(dummy)
可以看到,这个类名是 Dummy。这个类可以的构造函数可以接受name和age两个属性。这种创建方式和collections.namedtuple有点类似。
一点感悟
sqlalchemy的源码非常复杂,前前后后一共准备了一个月,形成的2篇文档仅仅涉及核心流程和用法,细节部分缺失较多,以后有机会还需要继续阅读。在这一个月中,克服了工作较忙,没有时间写稿的烦躁;克服了阅读进入困境,一度想放弃的心理障碍;克服了deadline临近,文稿还只是一个雏形,使用存稿顶替的羞愧;克服了笔记软件故障,写完的文稿丢失,完全重写的懊恼。战胜这些困难,最终还是得以完成,心理上有大满足。当然最大的收获还是对ORM中间件有了初步的了解,也希望梳理的ORM流程对大家有一定的帮助,如果获得大家的支持会更加满意♥️。