SQLAlchemy参考

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: 1. 基本流程 # -*- coding: utf-8 -*- from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy import Column from sqlalchemy.

1. 基本流程

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column
from sqlalchemy.types import String, Integer
from sqlalchemy.ext.declarative import declarative_base

engine = create_engine('postgresql://test@localhost:5432/test')
DBSession = sessionmaker(engine)
session = DBSession()

BaseModel = declarative_base()

class User(BaseModel):
    __tablename__ = 'user'

    id = Column(String, primary_key=True)
    username = Column(String, index=True)

class Session(BaseModel):
    __tablename__ = 'session'

    id = Column(String, primary_key=True)
    user = Column(String, index=True)
    ip = Column(String)

query = session.query(Session, User.username).join(User, User.id == Session.user)
for i in query:
    print dir(i)

2. 创建连接

SQLAlchemy 的连接创建是 Lazy 的方式, 即在需要使用时才会去真正创建. 之前做的工作, 全是"定义".

连接的定义是在 engine 中做的.

2.1. Engine

engine 的定义包含了三部分的内容, 一是具体数据库类型的实现, 二是连接池, 三是策略(即engine 自己的实现).

所谓的数据库类型即是 MYSQL , Postgresql , SQLite 这些不同的数据库.

一般创建 engine 是使用 create_engine 方法:

engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')

参数字符串的各部分的意义:

dialect+driver://username:password@host:port/database

对于这个字符串, SQLAlchemy 提供了工具可用于处理它:

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy.engine.url import make_url
from sqlalchemy.engine.url import URL

s = 'postgresql://test@localhost:5432/bbcustom'
url = make_url(s)
s = URL(drivername='postgresql', username='test', password="",
        host="localhost", port=5432, database="bbcustom")

engine = create_engine(url)
engine = create_engine(s)

print engine.execute('select id from "user"').fetchall()

create_engine 函数有很多的控制参数, 这个后面再详细说.

2.2. Engine的策略

create_engine 的调用, 实际上会变成 strategy.create 的调用. 而 strategy 就是 engine 的实现细节. strategy 可以在 create_engine 调用时通过 strategy 参数指定, 目前官方的支持有三种:

  • plain, 默认的
  • threadlocal, 连接是线程局部的
  • mock, 所有的 SQL 语句的执行会使用指定的函数

mock 这个实现, 会把所有的 SQL 语句的执行交给指定的函数来做, 这个函数是由 create_engine的 executor 参数指定:

def f(sql, *args, **kargs):
    print sql, args, kargs

s = 'postgresql://test@localhost:5432/bbcustom'
engine = create_engine(s, strategy='mock', executor=f)

print engine.execute('select id from "user"')

2.3. 各数据库实现

各数据库的实现在 SQLAlchemy 中分成了两个部分, 一是数据库的类型, 二是具体数据库中适配的客户端实现. 比如对于 Postgresql 的访问, 可以使用 psycopg2 , 也可以使用 pg8000 :

s = 'postgresql+psycopg2://test@localhost:5432/bbcustom'
s = 'postgresql+pg8000://test@localhost:5432/bbcustom'
engine = create_engine(s)

具体的适配工作, 是需要在代码中实现一个 Dialect 类来完成的. 官方的实现在 dialects 目录下.

获取具体的 Dialect 的行为, 则是前面提到的 URL 对象的 get_dialect 方法. create_engine 时你单传一个字符串, SQLAlchemy 自己也会使用 make_url 得到一个 URL 的实例).

2.4. 连接池

SQLAlchemy 支持连接池, 在 create_engine 时添加相关参数即可使用.

  • pool_size 连接数
  • max_overflow 最多多几个连接
  • pool_recycle 连接重置周期
  • pool_timeout 连接超时时间

连接池效果:

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy.engine.url import make_url
from sqlalchemy.engine.url import URL

s = 'postgresql://test@localhost:5432/bbcustom'
engine = create_engine(s, pool_size=2, max_overflow=0)


from threading import Thread

def f():
    print engine.execute('select pg_sleep(5)').fetchall()


p = []
for i in range(3):
    p.append(Thread(target=f))

for t in p:
    t.start()

连接池的实现, 在 create_engine 调用时也可以指定:

from sqlalchemy.pool import QueuePool
engine = create_engine('sqlite:///file.db', poolclass=QueuePool)

还有:

from sqlalchemy.pool import NullPool
engine = create_engine(
         'postgresql+psycopg2://scott:tiger@localhost/test',
          poolclass=NullPool)

或者仅仅是获取连接的方法:

import sqlalchemy.pool as pool
import psycopg2

def getconn():
    c = psycopg2.connect(username='ed', host='127.0.0.1', dbname='test')
    # do things with 'c' to set up
    return c

engine = create_engine('postgresql+psycopg2://', creator=getconn)

连接池可以被单独使用:

import sqlalchemy.pool as pool
import psycopg2

def getconn():
    c = psycopg2.connect(username='ed', host='127.0.0.1', dbname='test')
    return c

mypool = pool.QueuePool(getconn, max_overflow=10, pool_size=5)

conn = mypool.connect()
cursor = conn.cursor()
cursor.execute("select foo")

连接池可以被多个 engine 共享使用:

e = create_engine('postgresql://', pool=mypool)

3. 模型使用

3.1. 模型定义

对于 Table 的定义, 本来是直接的实例化调用, 通过 declarative 的包装, 可以像"定义类"这样的更直观的方式来完成.

user = Table('user', metadata,
    Column('user_id', Integer, primary_key = True),
    Column('user_name', String(16), nullable = False),
    Column('email_address', String(60)),
    Column('password', String(20), nullable = False)
)
# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy.types import String, Integer, CHAR, BIGINT
from sqlalchemy.ext.declarative import declarative_base
BaseModel = declarative_base()
Engine = create_engine('postgresql://test@localhost:5432/test', echo=True)


class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(CHAR(32), primary_key=True)
    title = Column(String(64), server_default='', nullable=False)
    text = Column(String, server_default='', nullable=False)
    user = Column(CHAR(32), index=True, server_default='', nullable=False)
    create = Column(BIGINT, index=True, server_default='0', nullable=False)


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(CHAR(32), primary_key=True)
    name = Column(String(32), server_default='', nullable=False)
    username = Column(String(32), index=True, server_default='', nullable=False)
    password = Column(String(64), server_default='', nullable=False)


def init_db():
    BaseModel.metadata.create_all(Engine)

def drop_db():
    BaseModel.metadata.drop_all(Engine)


if __name__ == '__main__':
    #init_db()
    drop_db()
    #BaseModel.metadata.tables['user'].create(Engine, checkfirst=True)
    #BaseModel.metadata.tables['user'].drop(Engine, checkfirst=False)
    pass

3.2. 创建

session = Session()
session.add(User(id=uuid.uuid4().hex))
session.add(Blog(id=uuid.uuid4().hex))
session.add_all([
    User(id=uuid.uuid4().hex),
    Blog(id=uuid.uuid4().hex)
])
session.commit()

执行的顺序并不一定会和代码顺序一致, SQLAlchemy 自己会整合逻辑再执行.

3.3. 查询

SQLAlchemy 实现的查询非常强大, 写起来有一种随心所欲的感觉.

查询的结果, 有几种不同的类型, 这个需要注意, 像是:

  • instance
  • instance of list
  • keyed tuple of list
  • value of list
基本查询
session.query(User).filter_by(username='abc').all()
session.query(User).filter(User.username=='abc').all()
session.query(Blog).filter(Blog.create >= 0).all()
session.query(Blog).filter(Blog.create >= 0).first()
session.query(Blog).filter(Blog.create >= 0 | Blog.title == 'A').first()
session.query(Blog).filter(Blog.create >= 0 & Blog.title == 'A').first()
session.query(Blog).filter(Blog.create >= 0).offset(1).limit(1).scalar()
session.query(User).filter(User.username ==  'abc').scalar()
session.query(User.id).filter(User.username ==  'abc').scalar()
session.query(Blog.id).filter(Blog.create >= 0).all()
session.query(Blog.id).filter(Blog.create >= 0).all()[0].id
dict(session.query(Blog.id, Blog.title).filter(Blog.create >= 0).all())
session.query(Blog.id, Blog.title).filter(Blog.create >= 0).first().title
session.query(User.id).order_by('id desc').all()
session.query(User.id).order_by('id').first()
session.query(User.id).order_by(User.id).first()
session.query(User.id).order_by(-User.id).first()
session.query('id', 'username').select_from(User).all()
session.query(User).get('16e19a64d5874c308421e1a835b01c69')

多表查询
session.query(Blog, User).filter(Blog.user == User.id).first().User.username
session.query(Blog, User.id, User.username).filter(Blog.user == User.id).first().id
session.query(Blog.id,
              User.id,
              User.username).filter(Blog.user == User.id).first().keys()

条件查询
from sqlalchemy import or_, not_

session.query(User).filter(or_(User.id == '',
                               User.id == '16e19a64d5874c308421e1a835b01c69')).all()
session.query(User).filter(not_(User.id == '16e19a64d5874c308421e1a835b01c69')).all()
session.query(User).filter(User.id.in_(['16e19a64d5874c308421e1a835b01c69'])).all()
session.query(User).filter(User.id.like('16e19a%')).all()
session.query(User).filter(User.id.startswith('16e19a')).all()
dir(User.id)

函数
from sqlalchemy import func
session.query(func.count('1')).select_from(User).scalar()
session.query(func.count('1'), func.max(User.username)).select_from(User).first()
session.query(func.count('1')).select_from(User).scalar()
session.query(func.md5(User.username)).select_from(User).all()
session.query(func.current_timestamp()).scalar()
session.query(User).count()

3.4. 修改

还是通常的两种方式:

session.query(User).filter(User.username == 'abc').update({'name': '123'})
session.commit()

user = session.query(User).filter_by(username='abc').scalar()
user.name = '223'
session.commit()

如果涉及对属性原值的引用, 则要考虑 synchronize_session 这个参数.

  • 'evaluate' 默认值, 会同时修改当前 session 中的对象属性.
  • 'fetch' 修改前, 会先通过 select 查询条目的值.
  • False 不修改当前 session 中的对象属性.

在默认情况下, 因为会有修改当前会话中的对象属性, 所以如果语句中有 SQL 函数, 或者"原值引用", 那是无法完成的操作, 自然也会报错, 比如:

from sqlalchemy import func
session.query(User).update({User.name: func.trim('123 ')})
session.query(User).update({User.name: User.name + 'x'})

这种情况下, 就不能要求 SQLAlchemy 修改当前 session 的对象属性了, 而是直接进行数据库的交互, 不管当前会话值:

session.query(User).update({User.name: User.name + 'x'}, synchronize_session=False)

是否修改当前会话的对象属性, 涉及到当前会话的状态. 如果当前会话过期, 那么在获取相关对象的属性值时, SQLAlchemy 会自动作一次数据库查询, 以便获取正确的值:

user = session.query(User).filter_by(username='abc').scalar()
print user.name
session.query(User).update({User.name: 'new'}, synchronize_session=False)
print user.name
session.commit()
print user.name

执行了 update 之后, 虽然相关对象的实际的属性值已变更, 但是当前会话中的对象属性值并没有改变. 直到 session.commit() 之后, 当前会话变成"过期"状态, 再次获取 user.name 时, SQLAlchemy 通过 user 的 id 属性, 重新去数据库查询了新值. (如果 user 的 id 变了呢? 那就会出事了啊.)

synchronize_session 设置成 'fetch' 不会有这样的问题, 因为在做 update 时已经修改了当前会话中的对象了.

不管 synchronize_session 的行为如何, commit 之后 session 都会过期, 再次获取相关对象值时, 都会重新作一次查询.

3.5. 删除

session.query(User).filter_by(username='abc').delete()

user = session.query(User).filter_by(username='abc').first()
session.delete(user)

删除同样有像修改一样的 synchronize_session 参数的问题, 影响当前会话的状态.

3.6. JOIN

SQLAlchemy 可以很直观地作 join 的支持:

r = session.query(Blog, User).join(User, Blog.user == User.id).all()
for blog, user in r:
    print blog.id, blog.user, user.id


r = session.query(Blog, User.name, User.username).join(User, Blog.user == User.id).all()
print r

4. 外键和关系定义

4.1. 外键约束

使用 ForeignKey 来定义一个外键约定:

from sqlalchemy import Column, ForeignKey
from sqlalchemy.types import String, Integer, CHAR, BIGINT

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)
    text = Column(String, server_default='', nullable=False)
    user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)
    create = Column(BIGINT, index=True, server_default='0', nullable=False)


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)
    username = Column(String(32), index=True, server_default='', nullable=True)
    password = Column(String(64), server_default='', nullable=False)

创建时:

session = Session()
user = User(name='first', username=u'新的')
session.add(user)
session.flush()
blog = Blog(title=u'第一个', user=user.id)
session.add(blog)
session.commit()

session.flush() 是进行数据库交互, 但是事务并没有提交. 进行数据库交互之后, user.id 才有值.

定义了外键, 对查询来说, 并没有影响. 外键只是单纯的一条约束而已. 当然, 可以在外键上定义一些关联的事件操作, 比如当外键条目被删除时, 字段置成 null , 或者关联条目也被删除等.

4.2. 关系定义

要定义关系, 必有使用 ForeignKey 约束. 当然, 这里说的只是在定义模型时必有要有, 至于数据库中是否真有外键约定, 这并不重要.

from sqlalchemy import Column, ForeignKey
from sqlalchemy.types import String, Integer, CHAR, BIGINT
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship


class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)
    user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)
    create = Column(BIGINT, index=True, server_default='0', nullable=False)

    user_obj = relationship('User')


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

    blog_list = relationship('Blog', order_by='Blog.create')

关系只是 SQLAlchemy 提供的工具, 与数据库无关, 所以任何时候添加都是可以的.

上面的 User-Blog 是一个"一对多"关系, 通过 Blog 的 user 这个 ForeignKey , SQLAlchemy 可以自动处理关系的定义. 在查询时, 返回的结果自然也是, 一个是列表, 一个是单个对象:

session = Session()
print session.query(Blog).get(1).user_obj
print session.query(User).get(1).blog_list

这种关系的定义, 并不影响查询并获取对象的行为, 不会添加额外的 join 操作. 在对象上取一个user_obj 或者取 blog_list 都是发生了一个新的查询操作.

上面的关系定义, 对应的属性是实际查询出的实例列表, 当条目数多的时候, 这样可能会有问题. 比如用户名下有成千上万的文章, 一次全取出就太暴力了. 关系对应的属性可以定义成一个 Query :

class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

    blog_list = relationship('Blog', order_by='Blog.create', lazy="dynamic")

这样在获取实例时就可以自由控制了:

session.query(User).get(1).blog_list.all()
session.query(User).get(1).blog_list.filter(Blog.title == 'abc').first()

4.3. 关系的查询

关系定义之后, 除了在查询时会有自动关联的效果, 在作查询时, 也可以对定义的关系做操作:

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(Integer, autoincrement=True, primary_key=True)
    title = Column(Unicode(32), server_default='')
    user = Column(Integer, ForeignKey('user.id'), index=True)

    user_obj = relationship('User')


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(Unicode(32), server_default='')

    blogs = relationship('Blog')

对于 一对多 的关系, 使用 any() 函数查询:

user = session.query(User).filter(User.blogs.any(Blog.title == u'A')).first()

SQLAlchemy 会使用 exists 条件, 类似于:

SELECT *
FROM user
WHERE EXISTS
    (SELECT 1
     FROM blog
     WHERE user.id = blog.user AND blog.title = ?)
 LIMIT ? OFFSET ?

反之, 如果是 多对一 的关系, 则使用 has() 函数查询:

blog = session.query(Blog).filter(Blog.user_obj.has(User.name == u'XX')).first()

最后的 SQL 语句都是一样的.

4.4. 关系的获取形式

前面介绍的关系定义中, 提到了两种关系的获取形式, 一种是:

user_obj = relationship('User')

这种是在对象上获取关系对象时, 再去查询.

另一种是:

blog_list = relationship('Blog', lazy="dynamic")

这种的结果, 是在对象上获取关系对象时, 只返回 Query , 而查询的细节由人为来控制.

总的来说, 关系的获取分成两种, Lazy 或 Eager . 在直接查询层面, 上面两种都属于 Lazy 的方式, 而 Eager 的一种, 就是在获取对象时的查询语句, 是直接带 join 的, 这样关系对象的数据在一个查询语句中就直接获取到了:

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)
    user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)

    user_obj = relationship('User', lazy='joined', cascade='all')


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

这样在查询时:

blog = session.query(Blog).first()
print blog.user_obj

便会多出 LEFT OUTER JOIN 的语句, 结果中直接获取到对应的 User 实例对象.

也可以把 joined 换成子查询, subquery :

class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

    blog_list = relationship('Blog', cascade='all', lazy='subquery')


if __name__ == '__main__':

    session = Session()
    user = session.query(User).first()
    session.commit()

子查询会用到临时表.

上面定义的:

blog_list = relationship('Blog', lazy="dynamic")
user_obj = relationship('User', lazy='joined')
blog_list = relationship('Blog', lazy='subquery')

都算是一种默认方式. 在具体使用查询时, 还可以通过 options() 方法定义关联的获取方式:

from sqlalchemy.orm import lazyload, joinedload, subqueryload
user = session.query(User).options(lazyload('blog_list')).first()
print user.blog_list

更多的用法:

session.query(Parent).options(
    joinedload('foo').joinedload('bar').joinedload('bat')
    ).all()

session.query(A).options(
    defaultload("atob").joinedload("btoc")
    ).all()

session.query(MyClass).options(lazyload('*'))

session.query(MyClass).options(
    lazyload('*'), joinedload(MyClass.widget)
    )

session.query(User, Address).options(Load(Address).lazyload('*'))

如果关联的定义之前是 Lazy 的, 但是实际使用中, 希望在手工 join 之后, 把关联对象直接包含进结果实例, 可以使用 contains_eager() 来包装一下:

from sqlalchemy.orm import contains_eager

blog = session.query(Blog).join(Blog.user_obj)\
              .options(contains_eager(Blog.user_obj)).first()
print blog.user_obj

4.5. 关系的表现形式

关系在对象属性中的表现, 默认是列表, 但是, 这不是唯一的形式. 根据需要, 可以作成 dictionary , set 或者其它你需要的对象.

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(Integer, autoincrement=True, primary_key=True)
    title = Column(Unicode(32), server_default='')
    user = Column(Integer, ForeignKey('user.id'), index=True)

    user_obj = relationship('User')


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(Unicode(32), server_default='')

    blogs = relationship('Blog')

对于上面的两个模型:

user = session.query(User).first()
print user.blogs

现在 user.blogs 是一个列表. 我们可以在 relationship() 调用时通过 collection_class 参数指定一个类, 来重新定义关系的表现形式:

user = User(name=u'XX')
session.add_all([Blog(title=u'A', user_obj=user), Blog(title=u'B', user_obj=user)])
session.commit()

user = session.query(User).first()
print user.blogs
set , 集合:

blogs = relationship('Blog', collection_class=set)

#InstrumentedSet([<__main__.Blog object at 0x1a58710>, <__main__.Blog object at 0x1a587d0>])

attribute_mapped_collection , 字典, 键值从属性取:

from sqlalchemy.orm.collections import attribute_mapped_collection

blogs = relationship('Blog', collection_class=attribute_mapped_collection('title'))

#{u'A': <__main__.Blog object at 0x20ed810>, u'B': <__main__.Blog object at 0x20ed8d0>}

如果  title 重复的话, 结果会覆盖.
mapped_collection , 字典, 键值自定义:

from sqlalchemy.orm.collections import mapped_collection

blogs = relationship('Blog', collection_class=mapped_collection(lambda blog: blog.title.lower()))

#{u'a': <__main__.Blog object at 0x1de4890>, u'b': <__main__.Blog object at 0x1de4950>}

4.6. 多对多关系

先考虑典型的多对多关系结构:

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)

    tag_list = relationship('Tag')
    tag_list = relationship('BlogAndTag')


class Tag(BaseModel):
    __tablename__ = 'tag'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(16), server_default='', nullable=False)


class BlogAndTag(BaseModel):
    __tablename__ = 'blog_and_tag'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    blog = Column(BIGINT, ForeignKey('blog.id'), index=True)
    tag = Column(BIGINT, ForeignKey('tag.id'), index=True)
    create = Column(BIGINT, index=True, server_default='0')

在 Blog 中的:

tag_list = relationship('Tag')

显示是错误的, 因为在 Tag 中并没有外键. 而:

tag_list = relationship('BlogAndTag')

这样虽然正确, 但是 tag_list 的关系只是到达 BlogAndTag 这一层, 并没有到达我们需要的 Tag .

这种情况下, 一个多对多关系是有三张表来表示的, 在定义 relationship 时, 就需要一个secondary 参数来指明关系表:

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)

    tag_list = relationship('Tag', secondary=lambda: BlogAndTag.__table__)


class Tag(BaseModel):
    __tablename__ = 'tag'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(16), server_default='', nullable=False)


class BlogAndTag(BaseModel):
    __tablename__ = 'blog_and_tag'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    blog = Column(BIGINT, ForeignKey('blog.id'), index=True)
    tag = Column(BIGINT, ForeignKey('tag.id'), index=True)
    create = Column(BIGINT, index=True, server_default='0')

这样, 在操作时可以直接获取到对应的实例列表:

blog = session.query(Blog).filter(Blog.title == 'a').one()
print blog.tag_list

访问 tag_list 时, SQLAlchemy 做的是一个普通的多表查询.

tag_list 属性同时支持赋值操作:

session = Session()
blog = session.query(Blog).filter(Blog.title == 'a').one()
blog.tag_list = [Tag(name='t1')]
session.commit()

提交时, SQLAlchemy 总是会创建 Tag , 及对应的关系 BlogAndTag .

而如果是:

session = Session()
blog = session.query(Blog).filter(Blog.title == 'a').one()
blog.tag_list = []
session.commit()

tag = session.query(Tag).filter(Tag.name == 'x').one()
blog.tag_list.remove(tag)
session.commit()

那么 SQLAlchemy 只会删除对应的关系 BlogAndTag , 不会删除实体 Tag .

如果你直接删除实体, 那么对应的关系是不会自动删除的:

session = Session()
blog = session.query(Blog).filter(Blog.title == 'a').one()
tag = Tag(name='ok')
blog.tag_list = [tag]
session.commit()

tag = session.query(Tag).filter(Tag.name == 'ok').one()
session.delete(tag)
session.commit()

4.7. Cascades 自动关系处理

前面提到的, 当操作关系, 实体时, 与其相关联的关系, 实体是否会被自动处理的问题, 在 SQLAlchemy 中是通过 Cascades 机制来定义和解决的. ( Cascades 这个词是来源于Hibernate .)

cascade 是一个 relationship 的参数, 其值是逗号分割的多个字符串, 以表示不同的行为. 默认值是 " save-updatemerge" , 稍后会介绍每个词项的作用.

这里的所有规则介绍, 只涉及从 Parent 到 Child , Parent 即定义 relationship的类. 不涉及backref .

cascade 所有的可选字符串项是:

  • all , 所有操作都会自动处理到关联对象上.
  • save-update , 关联对象自动添加到会话.
  • delete , 关联对象自动从会话中删除.
  • delete-orphan , 属性中去掉关联对象, 则会话中会自动删除关联对象.
  • merge , session.merge() 时会处理关联对象.
  • refresh-expire , session.expire() 时会处理关联对象.
  • expunge , session.expunge() 时会处理关联对象.
save-update
当一个对象被添加进 session 后, 此对象标记为  save-update 的  relationship 关系对象也会同时添加进这个 session .

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)
    user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

    blog_list = relationship('Blog', cascade='')
    blog_list_auto = relationship('Blog', cascade='save-update')


if __name__ == '__main__':

    session = Session()

    user = User(name=u'哈哈')
    blog = Blog(title=u'第一个')
    user.blog_list = [blog]
    #user.blog_list_auto = [blog]
    session.add(user)
    print blog in session
    session.commit()

delete
当一个对象在 session 中被标记为删除时, 其属性中  relationship 关联的对象也会被标记成删除, 否则, 关联对象中的对应外键字段会被改成  NULL , 不能为  NULL 则报错.

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)
    user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

    blog_list = relationship('Blog', cascade='save-update, delete')


if __name__ == '__main__':
    session = Session()

    #user = User(name=u'用户')
    #user.blog_list = [Blog(title=u'哈哈')]
    #session.add(user)
    user = session.query(User).first()
    session.delete(user)
    session.commit()

delete-orphan
当  relationship 属性变化时, 被 "去掉" 的对象会被自动删除. 比如之前是:

user.blog_list = [blog, blog2]

现在变成:

user.blog_list = [blog2]

那么  blog 这个关联实体是会自动删除的.

这各机制只适用于 "一对多" 的关系中, "多对多" 和反过来的 "多对一" 都不适用. 在 relationship 定义时, 可以添加  single_parent = True 参数来强制约束. 当然, 在实现上 SQLAlchemy 是会先查出所有关联实体, 然后计算差集确认哪些需要被删除.

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)
    user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

    blog_list = relationship('Blog', cascade='save-update, delete-orphan')


if __name__ == '__main__':

    session = Session()

    #user = User(name=u'用户')
    #blog = Blog(title=u'一')
    #blog2 = Blog(title=u'二')
    #user.blog_list = [blog, blog2]
    #session.add(user)
    user = session.query(User).first()
    blog2 =  session.query(Blog).filter(Blog.title == u'二').first()
    user.blog_list = [blog2]
    #session.delete(user)
    session.commit()

merge
这个选项是标识在  session.merge() 时处理关联对象.  session.merge() 的作用, 是把一个会话外的实例, "整合"进会话, 比如 "有则修改, 无则创建" 就是典型的一种 "整合":

user = User(id=1, name="1")
session.add(user)
session.commit()

user = User(id=1)
user = session.merge(user)
print user.name

user = User(id=1, name="2")
user = session.merge(user)
session.commit()

cascade 中的  merge 作用:

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)
    user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

    blog_list = relationship('Blog',
                             cascade='save-update, delete, delete-orphan, merge')


if __name__ == '__main__':

    session = Session()

    user = User(id=1, name='1')
    session.add(user)
    session.commit(user)

    user = User(id=1, blog_list=[Blog(title='哈哈')])
    session.merge(user)

    session.commit()

refresh-expire
当使用  session.expire() 标识一个对象过期时, 此对象的关联对象是否也被标识为过期(访问属性会重新查询数据库).

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)
    user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

    blog_list = relationship('Blog',
            cascade='save-update, delete, delete-orphan, merge, refresh-expire')


if __name__ == '__main__':

    session = Session()

    #user = User(id=1, name='1')
    #blog = Blog(title="abc")
    #user.blog_list = [blog]
    #session.add(user)

    user = session.query(User).first()
    blog = user.blog_list[0]
    print user.name
    print blog.title
    session.expire(user)
    print 'EXPIRE'
    print user.name
    print blog.title

    session.commit()

expunge
与  merge 相反, 当  session.expunge() 把对象从会话中去除的时候, 此对象的关联对象也同时从会话中消失.

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    title = Column(String(64), server_default='', nullable=False)
    user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(BIGINT, primary_key=True, autoincrement=True)
    name = Column(String(32), server_default='', nullable=False)

    blog_list = relationship('Blog', cascade='delete, delete-orphan, expunge')


if __name__ == '__main__':

    session = Session()
    user = User(name=u'用户')
    blog = Blog(title=u'第一个')
    user.blog_list = [blog]

    session.add(user)
    session.add(blog)

    session.expunge(user)
    print blog in session

    #session.commit()

4.8. 属性代理

考虑这样的情况, 关系是关联的整个模型对象的, 但是, 有时我们对于这个关系, 并不关心整个对象, 只关心其中的某个属性. 考虑下面的场景:

from sqlalchemy.ext.associationproxy import association_proxy

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(Integer, autoincrement=True, primary_key=True)
    title = Column(Unicode(32), nullable=False, server_default='')
    user = Column(Integer, ForeignKey('user.id'), index=True)


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(Unicode(32), nullable=False, server_default='')

    blog_list = relationship('Blog')
    blog_title_list = association_proxy('blog_list', 'title')

blog_list 是一个正确的一对多关系. 下面的 blog_title_list 就是这个关系上的一个属性代理.blog_title_list 只处理 blog_list 这个关系中对应的对象的 title 属性, 包括获取和设置两个方向.

session = Session()

user = User(name='xxx')
user.blog_list = [Blog(title='ABC')]
session.add(user)
session.commit()

user = session.query(User).first()
print user.blog_title_list

上面是获取属性的示例. 在"设置", 或者说"创建"时, 直接操作是有错的:

user = session.query(User).first()
user.blog_title_list = ['NEW']
session.add(user)
session.commit()

原因在于, 对于类 Blog 的初始化形式. association_proxy('blog_list', 'title') 中的 title 只是获取时的属性定义, 而在上面的设置过程中, 实际上的调用形式为:

Blog('NEW')

Blog 类没有明确定义 __init__() 方法, 所有这种形式的调用会报错. 可以把 __init__() 方法补上:

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(Integer, autoincrement=True, primary_key=True)
    title = Column(Unicode(32), nullable=False, server_default='')
    user = Column(Integer, ForeignKey('user.id'), index=True)

    def __init__(self, title):
        self.title = title

这样调用就没有问题了.

另一个方法, 是在调用 association_proxy() 时使用 creator 参数明确定义"值"和"实例"的关系:

class User(BaseModel):
    __tablename__ = 'user'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(Unicode(32), nullable=False, server_default='')

    blog_list = relationship('Blog')
    blog_title_list = association_proxy('blog_list', 'title',
                                        creator=lambda t: User(title=t))

creator 定义的方法, 返回的对象可以被对应的 blog_list 关系接收即可.

在查询方面, 多对一 的关系代理上, 可以直接使用属性:

class Blog(BaseModel):
    __tablename__ = 'blog'

    id = Column(Integer, autoincrement=True, primary_key=True)
    title = Column(Unicode(32), server_default='')
    user = Column(Integer, ForeignKey('user.id'), index=True)

    user_obj = relationship('User')
    user_name = association_proxy('user_obj', 'name')

查询:

blog = session.query(Blog).filter(Blog.user_name == u'XX').first()

反过来的 一对多 关系代理上, 可以使用 contains() 函数:

user = session.query(User).filter(User.blogs_title.contains('A')).first()

5. 会话与事务控制

5.1. 基本使用

SQLAlchemy 的 session 是用于管理数据库操作的一个像容器一样的东西. 模型实例对象本身独立存在, 而要让其修改(创建)生效, 则需要把它们加入某个 session . 同时你也可以把模型实例对象从 session 中去除. 被 session 管理的实例对象, 在 session.commit() 时被提交到数据库. 同时 session.rollback() 是回滚变更.

session.flush() 的作用是在事务管理内与数据库发生交互, 对应的实例状态被反映到数据库. 比如自增 ID 被填充上值.

user = User(name=u'名字')
session.add(user)
session.commit()
try:
    user = session.Query(User).first()
    user.name = u'改名字
    session.commit()
except:
    session.rollback()

5.2. for update

SQLAlchemy 的 Query 支持 select ... for update / share .

session.Query(User).with_for_update().first()
session.Query(User).with_for_update(read=True).first()

完整形式是:

with_for_update(read=False, nowait=False, of=None)
read
是标识加互斥锁还是共享锁. 当为  True 时, 即  for share 的语句, 是共享锁. 多个事务可以获取共享锁, 互斥锁只能一个事务获取. 有"多个地方"都希望是"这段时间我获取的数据不能被修改, 我也不会改", 那么只能使用共享锁.

nowait
其它事务碰到锁, 是否不等待直接"报错".

of
指明上锁的表, 如果不指明, 则查询中涉及的所有表(行)都会加锁.

5.3. 事务嵌套

SQLAlchemy 中的事务嵌套有两种情况. 一是在 session 中管理的事务, 本身有层次性. 二是 session 和原始的 connection 之间, 是一种层次关系, 在这 session , connection 两个概念之中的事务同样具有这样的层次.

session 中的事务, 可能通过 begin_nested() 方法做 savepoint :

session.add(u1)
session.add(u2)

session.begin_nested()
session.add(u3)
session.rollback() # rolls back u3, keeps u1 and u2

session.commit()

或者使用上下文对象:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print "Skipped record %s" % record
session.commit()

嵌套的事务的一个效果, 是最外层事务提交整个变更才会生效.

user = User(name='2')

session.begin_nested()
session.add(user)
session.commit()

session.rollback()

于是, 前面说的第二种情况有一种应用方式, 就是在 connection 上做一个事务, 最终也在 connection 上回滚这个事务, 如果 session 是 bind 到这个连接上的, 那么 session 上所做的更改全部不会生效:

conn = Engine.connect()
session = Session(bind=conn)
trans = conn.begin()

user = User(name='2')
session.begin_nested()
session.add(user)
session.commit()

session.commit()

trans.rollback()

在测试中这种方式可能会有用.

5.4. 二段式提交

二段式提交, Two-Phase, 是为解决分布式环境下多点事务控制的一套协议.

与一般事务控制的不同是, 一般事务是 begin, 之后 commit 结束.

而二段式提交的流程上, begin 之后, 是 prepare transaction 'transaction_id' , 这时相关事务数据已经持久化了. 之后, 再在任何时候(哪怕重启服务), 作 commit prepared 'transaction_id' 或者 rollback prepared 'transaction_id' .

从多点事务的控制来看, 应用层要做的事是, 先把任务分发出去, 然后收集"事务准备"的状态(prepare transaction 的结果). 根据收集的结果决定最后是 commit 还是 rollback .

简单来说, 就是事务先保存, 再说提交的事.

SQLAlchemy 中对这个机制的支持, 是在构建会话类是加入 twophase 参数:

Session = sessionmaker(twophase=True)

然后会话类可以根据一些策略, 绑定多个 Engine , 可以是多个数据库连接, 比如:

Session = sessionmaker(twophase=True)
Session.configure(binds={User: Engine, Blog: Engine2})

这样, 在获取一个会话实例之后, 就处在二段式提交机制的支持之下, SQLAlchemy 自己会作多点的协调了. 完整的流程:

Engine = create_engine('postgresql://test@localhost:5432/test', echo=True)
Engine2 = create_engine('postgresql://test@localhost:5432/test2', echo=True)

Session = sessionmaker(twophase=True)

Session.configure(binds={User: Engine, Blog: Engine2})
session = Session()

user = User(name=u'名字')
session.add(user)
session.commit()

对应的 SQL 大概就是:

begin;
insert into "user" (name) values (?);
prepare transaction 'xx';
commit prepared 'xx';

使用时, Postgresql 数据库需要把 max_prepared_transactions 这个配置项的值改成大于 0 .

6. 字段类型

6.1. 基本类型

字段类型是在定义模型时, 对每个 Column 的类型约定. 不同类型的字段类型在输入输出上, 及支持的操作方面, 有所区别.

这里只介绍 sqlalchemy.types.* 中的类型, SQL 标准类型方面, 是写什么最后生成的 DDL 语句就是什么, 比如 BIGINTBLOG 这些, 但是这些类型并不一定在所有数据库中都有支持. 除此而外, SQLAlchemy 也支持一些特定数据库的特定类型, 这些需要从具体的 dialects 实现里导入.

Integer/BigInteger/SmallInteger
整形.

Boolean
布尔类型. Python 中表现为  True/False , 数据库根据支持情况, 表现为  BOOLEAN 或  SMALLINT. 实例化时可以指定是否创建约束(默认创建).

Date/DateTime/Time  (timezone=False)
日期类型,  Time 和  DateTime 实例化时可以指定是否带时区信息.

Interval
时间偏差类型. 在 Python 中表现为  datetime.timedelta() , 数据库不支持此类型则存为日期.

Enum  (*enums, **kw)
枚举类型, 根据数据库支持情况, SQLAlchemy 会使用原生支持或者使用  VARCHAR 类型附加约束的方式实现. 原生支持中涉及新类型创建, 细节在实例化时控制.

Float
浮点小数.

Numeric  (precision=None, scale=None, decimal_return_scale=None, ...)
定点小数, Python 中表现为  Decimal .

LargeBinary  (length=None)
字节数据. 根据数据库实现, 在实例化时可能需要指定大小.

PickleType
Python 对象的序列化类型.

String  (length=None, collation=None, ...)
字符串类型, Python 中表现为  Unicode , 数据库表现为  VARCHAR , 通常都需要指定长度.

Unicode
类似与字符串类型, 在某些数据库实现下, 会明确表示支持非 ASCII 字符. 同时输入输出也强制是  Unicode 类型.

Text
长文本类型, Python 表现为  Unicode , 数据库表现为  TEXT .

UnicodeText
参考  Unicode .

7. 混合属性机制

7.1. 直接行为

混合属性, 官方文档中称之为 Hybrid Attributes . 这种机制表现为, 一个属性, 在  和层面, 和 实例 的层面, 其行为是不同的. 之所以需要关注这部分的差异, 原因源于 Python 上下文和 SQL 上下文的差异.

 层面经常是作为 SQL 查询时的一部分, 它面向的是 SQL 上下文. 而 实例 是已经得到或者创建的结果, 它面向的是 Python 上下文.

定义模型的 Column() 就是一个典型的混合属性. 作为实例属性时, 是具体的对象值访问, 而作为类属性时, 则有构成 SQL 语句表达式的功能.

class Interval(BaseModel):
    __tablename__ = 'interval'

    id = Column(Integer, autoincrement=True, primary_key=True)
    start = Column(Integer)
    end = Column(Integer)

session.add(Interval(start=0, end=100))
session.commit()

实例行为:

ins = session.query(Interval).first()
print ins.end - ins.start

类行为:

ins = session.query(Interval).filter(Interval.end - Interval.start > 10).first()

这种机制其实一直在被使用, 但是可能大家都没有留意一个属性在类和实例上的区别.

如果属性需要被进一步封装, 那么就需要明确声明 Hybrid Attributes 了:

from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method

class Interval(BaseModel):
    __tablename__ = 'interval'

    id = Column(Integer, autoincrement=True, primary_key=True)
    start = Column(Integer)
    end = Column(Integer)

    @hybrid_property
    def length(self):
        return self.end - self.start

    @hybrid_method
    def bigger(self, i):
        return self.length > i


session.add(Interval(start=0, end=100))
session.commit()

ins = session.query(Interval).filter(Interval.length > 10).first()
ins = session.query(Interval).filter(Interval.bigger(10)).first()
print ins.bigger(1)

setter 的定义同样使用对应的装饰器即可:

class Interval(BaseModel):
    __tablename__ = 'interval'

    id = Column(Integer, autoincrement=True, primary_key=True)
    start = Column(Integer)
    end = Column(Integer)

    @hybrid_property
    def length(self):
        return abs(self.end - self.start)

    @length.setter
    def length(self, l):
        self.end = self.start + l

7.2. 表达式行为

前面说的属性, 在类和实例上有不同行为, 可以看到, 在类上的行为, 其实就是生成 SQL 表达式时的行为. 上面的例子只是简单的运算, SQLAlchemy 可以自动处理好 Python 函数和 SQL 函数的区别. 但是如果是一些特性更强的 SQL 函数, 就需要手动指定了. 于时, 这时的情况变成, 实例行为是 Python 范畴的调用行为, 而类行为则是生成 SQL 函数的相关表达式.

同时是前面的例子, 对于 length 的定义, 更严格上来说, 应该是取绝对值的.

class Interval(BaseModel):
    __tablename__ = 'interval'

    id = Column(Integer, autoincrement=True, primary_key=True)
    start = Column(Integer)
    end = Column(Integer)

    @hybrid_property
    def length(self):
        return abs(self.end - self.start)

但是, 如果使用了 Python 的 abs() 函数, 在生成 SQL 表达式时显示有无法处理了. 所以, 需要手动定义:

from sqlalchemy import func

class Interval(BaseModel):
    __tablename__ = 'interval'

    id = Column(Integer, autoincrement=True, primary_key=True)
    start = Column(Integer)
    end = Column(Integer)

    @hybrid_property
    def length(self):
        return abs(self.end - self.start)

    @length.expression
    def length(self):
        return func.abs(self.end - self.start)

这样查询时就可以直接使用:

ins = session.query(Interval).filter(Interval.length > 1).first()

对应的 SQL :

SELECT *
FROM interval
WHERE abs(interval."end" - interval.start) > ?
 LIMIT ? OFFSET ?

7.3. 应用于关系

总体上没有特别之处:

class Account(BaseModel):
    __tablename__ = 'account'

    id = Column(Integer, autoincrement=True, primary_key=True)
    user = Column(Integer, ForeignKey('user.id'), index=True)
    balance = Column(Integer, server_default='0')


class User(BaseModel):
    __tablename__ = 'user'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(Unicode(32), nullable=False, server_default='')

    accounts = relationship('Account')
    #balance = association_proxy('accounts', 'balance')

    @hybrid_property
    def balance(self):
        return sum(x.balance for x in self.accounts)

查询时:

user = session.query(User).first()
print user.balance

这里涉及的东西都是 Python 自己的, 包括那个 sum() 函数, 和 SQL 没有关系.

如果想实现的是, 使用 SQL 的 sum() 函数, 取出指定用户的总账户金额数, 那么就要考虑把balance 作成表达式的形式:

from sqlalchemy import select

@hybrid_property
def balance(self):
    return select([func.sum(Account.balance)]).where(Account.user == self.id).label('balance_v')
    #return func.sum(Account.balance)

这样的话, User.balance 只是单纯的一个表达式了, 查询时指定字段:

user = session.query(User, User.balance).first()
print user.balance_v

注意, 如果写成:

session.query(User.balance).first()

意义就不再是"获取第一个用户的总金额", 而变成"获取总金额的第一个". 这里很坑吧.

像上面这样改, 实例层面就无法使用 balance 属性. 所以, 还是先前介绍的, 表达式可以单独处理:

@hybrid_property
def balance(self):
    return sum(x.balance for x in self.accounts)

@balance.expression
def balance(self):
    return select([func.sum(Account.balance)]).where(Account.user == self.id).label('balance_v')

定义了表达式的 balance , 这部分作为查询条件上当然也是可以的:

user = session.query(User).filter(User.balance > 1).first()

8. 示例: AdjacencyList, 单向链接列表

这里说的 AdjacencyList , 就是最常用来在关系数据库中表示树结构的, parent 方式:

id name parent
1 null
2 1
3 2

上面的数据, 表示的结构就是:

一
  |- 二
    |- 三

模型定义很好做:

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, ForeignKey
from sqlalchemy.types import Integer, Unicode
from sqlalchemy.orm import relationship, sessionmaker, joinedload

BaseModel = declarative_base()
Engine = create_engine('sqlite://', echo=True)
Session = sessionmaker(Engine)

class Node(BaseModel):
    __tablename__ = 'node'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(Unicode(32), nullable=False, server_default='')
    parent = Column(Integer, ForeignKey('node.id'), index=True,
                    nullable=False, server_default='0')

这里不让 parent 字段有 null , 而使用 0 代替.

这个例子在关系上, 有一个纠结的地方, 因为 node 这个表, 它是自关联的, 所以如果想要 children和 parent_obj 这两个关系时:

children = relationship('Node')
parent_obj = relationship('Node')

呃, 尴尬了.

如果是两个表, 那么 SQLAlchemy 可以通过外键在哪张表这个信息, 来确定关系的方向:

class Blog(BaseModel):
    ...
    user = Column(Integer, ForeignKey('user.id'))
    user_obj = relationship('User')

class User(BaseModel):
    ...
    blog_list = relationship('Blog')

因为外键在 Blog 中, 所以 Blog -> User 的 user_obj 是一个 N -> 1 关系.

反之, User -> Blog 的 blog_list 则是一个 1 -> N 的关系.

而自相关的 Node 无法直接判断方向, 所以 SQLAlchemy 会按 1 -> N 处理, 那么:

children = relationship('Node')
parent_obj = relationship('Node')

这两条之中, children 是正确的, 是我们想要的. 要定义 parent_obj 则需要在 relationship 中通过参数明确表示方向:

parent_obj = relationship('Node', remote_side=[id])

这种方式就定义了一个, "到 id" 的 N -> 1 关系.

现在完整的模型定义是:

class Node(BaseModel):
    __tablename__ = 'node'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(Unicode(32), nullable=False, server_default='')
    parent = Column(Integer, ForeignKey('node.id'), index=True,
                    nullable=False, server_default='0')

    children = relationship('Node') # 1 -> N
    parent_obj = relationship('Node', remote_side=[id])

查询方面没什么特殊的了, 不过我发现在自相关的模型关系, lazy 选项不起作用:

children = relationship('Node', lazy="joined")
parent_obj = relationship('Node', remote_side=[id], lazy="joined")

都是无效的, 只有在查询时, 手动使用 options() 定义:

n = session.query(Node).filter(Node.name==u'一')\
           .options(joinedload('parent_obj')).first()

如果要一次查出多级的子节点:

n = session.query(Node).filter(Node.name==u'一')\
           .options(joinedload('children').joinedload('children')).first()
print n.name, n.children, n.children[0].children

多个 joinedload() 串连的话, 可以使用 joinedload_all() 来整合:

from sqlalchemy.orm import joinedload_all

n = session.query(Node).filter(Node.name==u'一')\
           .options(joinedload_all('children', 'children')).first()

在修改方面, 删除的话, 配置了 cascade , 删除父节点, 则子节点也会自动删除:

children = relationship('Node', lazy='joined', cascade='all') # 1 -> N
node = session.query(Node).filter(Node.name == u'一').first()
session.delete(node)
session.commit()

如果只删除子节点, 那么 delete-orphan 选项就很好用了:

children = relationship('Node', lazy='joined', cascade='all, delete-orphan') # 1 -> N
node = session.query(Node).filter(Node.name == u'一').first()
node.children = []
session.commit()

9. 示例: 属性实体化建模

假设有这样的场景, 某实体在具体条目上, 其属性是不定的, 或者其属性是充分稀疏的:

id name attr_0 attr_1 attr_2 ... attr_n
1 foo 1 abc 33 ... any

这种情况下, 把属性看成是单独的实体, 是一个更好的建模方式:

id name
1 foo
id entity_id attr_name attr_value
1 1 attr_0 1
2 1 attr_1 33
... ... ... ...
n 1 attr_n any

这种模型下, ORM 层面我们考虑封装一个对操作更友好的上层操作接口, 比如:

obj = Entity()
obj['attr_0'] = '1'
obj['attr_1'] = '33'
session.add(obj)
session.commit()

实现上, 就是把对象的方法, 包装成 SQLAlchemy 的 ORM 中的对应的关系操作.

class BaseModel(declarative_base()):
    __abstract__ = True

class Entity(BaseModel):
    __tablename__ = 'entity'

    id = Column(Integer, autoincrement=True, primary_key=True)
    name = Column(Unicode(32), server_default='')

    _attributes = relationship('Attribute',
                               collection_class=attribute_mapped_collection('key'),
                               lazy='joined', cascade="all, delete-orphan")
    attributes = association_proxy('_attributes', 'value',
                                   creator=lambda k, v: Attribute(key=k, value=v))


class Attribute(BaseModel):
    __tablename__ = 'attribute'

    id = Column(Integer, autoincrement=True, primary_key=True)
    entity = Column(Integer, ForeignKey('entity.id'), index=True)
    key = Column(Unicode(32), server_default='')
    value = Column(UnicodeText, server_default='')


if __name__ == '__main__':
    BaseModel.metadata.create_all(Engine)

    session = Session()

    entity = Entity(name=u'哈哈')
    entity.attributes[u'first'] = u'abc'
    entity.attributes[u'sec'] = u'hoho'
    session.add(entity)
    session.commit()

    entity = session.query(Entity).first()
    print entity.attributes
    del entity.attributes['first']
    session.commit()

    entity = session.query(Entity).first()
    print entity.attributes

实现上就两点:

  • _attributes 关系中, 指定 collection_class , 于是就可以得到一个像 dict 的属性对象了.
  • association_proxy 从 dict 的属性对象中只抽出我们关心的 value 属性值.

这个场景中, 还可以再进一步, 在 Entity 类上实现 dict 的一些方法, 直接操作其 attributes 属性,association_proxy 就直接返回 Entity 的实例, 这样代码可以变成这样:

entity = Entity(name=u'ABC')
entity[u'first'] = u'a'
entity[u'sec'] = u'hoho'

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
6月前
|
SQL 数据库 Python
SqlAlchemy 2.0 中文文档(十一)(3)
SqlAlchemy 2.0 中文文档(十一)
57 11
|
6月前
|
SQL 存储 数据库
SqlAlchemy 2.0 中文文档(十一)(4)
SqlAlchemy 2.0 中文文档(十一)
62 11
|
6月前
|
存储 SQL Python
SqlAlchemy 2.0 中文文档(十一)(5)
SqlAlchemy 2.0 中文文档(十一)
49 10
|
6月前
|
SQL API 数据库
SqlAlchemy 2.0 中文文档(十一)(1)
SqlAlchemy 2.0 中文文档(十一)
51 2
|
6月前
|
存储 SQL 数据库
SqlAlchemy 2.0 中文文档(十一)(2)
SqlAlchemy 2.0 中文文档(十一)
40 2
|
7月前
|
SQL 存储 Go
【译】SQLAlchemy文档:SQLAlchemy 统一教程
【译】SQLAlchemy文档:SQLAlchemy 统一教程
102 1
|
6月前
|
SQL 关系型数据库 MySQL
SqlAlchemy 2.0 中文文档(四十六)(2)
SqlAlchemy 2.0 中文文档(四十六)
62 0
|
6月前
|
SQL 数据库 索引
SqlAlchemy 2.0 中文文档(四十六)(3)
SqlAlchemy 2.0 中文文档(四十六)
39 0
|
6月前
|
SQL 数据库 索引
SqlAlchemy 2.0 中文文档(四十六)(7)
SqlAlchemy 2.0 中文文档(四十六)
44 0
|
6月前
|
存储 关系型数据库 MySQL
SqlAlchemy 2.0 中文文档(四十六)(6)
SqlAlchemy 2.0 中文文档(四十六)
38 0

热门文章

最新文章