Python 使用SQLAlchemy数据库模块

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: SQLAlchemy 是用Python编程语言开发的一个开源项目,它提供了SQL工具包和ORM对象关系映射工具,使用MIT许可证发行,SQLAlchemy 提供高效和高性能的数据库访问,实现了完整的企业级持久模型。ORM(对象关系映射)是一种编程模式,用于将对象与关系型数据库中的表和记录进行映射,从而实现通过面向对象的方式进行数据库操作。ORM 的目标是在编程语言中使用类似于面向对象编程的语法,而不是使用传统的 SQL 查询语言,来操作数据库。

SQLAlchemy 是用Python编程语言开发的一个开源项目,它提供了SQL工具包和ORM对象关系映射工具,使用MIT许可证发行,SQLAlchemy 提供高效和高性能的数据库访问,实现了完整的企业级持久模型。

ORM(对象关系映射)是一种编程模式,用于将对象与关系型数据库中的表和记录进行映射,从而实现通过面向对象的方式进行数据库操作。ORM 的目标是在编程语言中使用类似于面向对象编程的语法,而不是使用传统的 SQL 查询语言,来操作数据库。

主要思想是将数据库表的结构映射到程序中的对象,通过对对象的操作来实现对数据库的操作,而不是直接编写 SQL 查询。ORM 工具负责将数据库记录转换为程序中的对象,反之亦然。

ORM 的核心概念包括:

  1. 实体(Entity): 在 ORM 中,实体是指映射到数据库表的对象。每个实体对应数据库中的一条记录。
  2. 属性(Attribute): 实体中的属性对应数据库表中的列。每个属性表示一个字段。
  3. 关系(Relationship): ORM 允许定义实体之间的关系,例如一对多、多对一、多对多等。这种关系会映射到数据库表之间的关系。
  4. 映射(Mapping): ORM 负责将实体的属性和方法映射到数据库表的列和操作。
  5. 会话(Session): ORM 提供了会话来管理对象的生命周期,包括对象的创建、更新和删除。
  6. 查询语言: ORM 通常提供一种查询语言,允许开发者使用面向对象的方式编写查询,而不是直接使用 SQL。

对象映射ROM模型可连接任何关系数据库,连接方法大同小异,以下总结了如何连接常用的几种数据库方式。

# sqlite 创建数据库连接
engine = create_engine('sqlite:///database.db', echo=False)

# sqlite 创建内存数据库
engine = create_engine('sqlite://')
engine = create_engine('sqlite:///:memory:', echo=True)

# PostgreSQL 创建数据库连接
engine = create_engine('postgresql://scott:tiger@localhost/mydatabase')                # default
engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')       # psycopg2
engine = create_engine('postgresql+pg8000://scott:tiger@localhost/mydatabase')         # pg8000

# MySQL 创建数据库连接
engine = create_engine('mysql://scott:tiger@localhost/foo')                  # default
engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo')          # mysql-python
engine = create_engine('mysql+mysqlconnector://scott:tiger@localhost/foo')   # MySQL-connector-python
engine = create_engine('mysql+oursql://scott:tiger@localhost/foo')           # OurSQL

# Oracle 创建数据库连接
engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname')
engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname')

# MSSQL 创建数据库连接
engine = create_engine('mssql+pyodbc://scott:tiger@mydsn')                   # pyodbc
engine = create_engine('mssql+pymssql://scott:tiger@hostname:port/dbname')   # pymssql

数据表创建

简单的创建一个User映射类,映射到UserDB库上,分别增加几个常用的数据库字段,并插入一些测试数据。

import sqlite3,datetime,time
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, Date, Time, Boolean, DECIMAL, Enum, Text

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'

    # 主键 primary_key | 自动增长 autoincrement | 不为空 nullable | 唯一性约束 unique
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)

    # 字符串类型
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")

    # 姓名字段默认值是0
    age = Column(Integer,nullable=False, default=0)

    # 增加创建日期 [日期:时间]
    create_time = Column(DateTime, default=datetime.datetime.now)

    # onupdate=datetime.now 每次更新数据的时候都要更新该字段值
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)

    # 增加用户分数
    user_value = Column(Float, default=0.0)

    # 枚举类型定义
    # tag = Column(Enum("python",'flask','django'))

    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写
    def __repr__(self):
        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)

if __name__ == "__main__":
    print("当前表名: {}".format(User.__table__))

    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表
    Base.metadata.create_all(engine, checkfirst=True)

    # 逐条增加新记录
    insert_user = User(username='lyshark', password='123456', age=24, user_value=12.5)
    session.add(insert_user)

    insert_user = User(username='sqlalchemy', password='123', age=34, user_value=45.8)
    session.add(insert_user)

    # 插入多条记录
    session.add_all(
        [
         User(username="admin", password="123123", age=54, user_value=66.9),
         User(username="root", password="3456576", age=67, user_value=98.4)
        ]
    )

    # 提交事务
    session.commit()

数据库查询

演示了通过ORM关系映射实现对单表的简单查询与筛选过滤功能。

import sqlite3,time,datetime
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float,DateTime

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")
    age = Column(Integer,nullable=False, default=0)
    create_time = Column(DateTime, default=datetime.datetime.now)
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)
    user_value = Column(Float, default=0.0)

    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写
    def __repr__(self):
        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # 查询所有字段
    all_value = session.query(User).all()
    for item in all_value:
        print("ID: {} --> 用户: {}".format(item.id, item.username))

    # 查询指定字段
    key_value = session.query(User.username,User.password).all()
    print(key_value)

    # 查询第一条
    first_value = session.query(User).first()
    print("第一条记录: {} {}".format(first_value.username, first_value.password))

    # 使用过滤器 [ 过滤出age>20的用户,输出其(id,username)字段 ]
    filter_value = session.query(User.id,User.username).filter(User.age > 20).all()
    print("过滤结果: {}".format(filter_value))

    # 排序输出 [ 正序/倒序 ]
    sort_value = session.query(User.username,User.age).order_by(User.age).all()
    print("正序排列: {}".format(sort_value))

    sort_value = session.query(User.username,User.age).order_by(User.age.desc()).all()
    print("倒序排列: {}".format(sort_value))

    # 查询计数
    count_value = session.query(User).count()
    print("记录条数: {}".format(count_value))

    # and/or 条件过滤 默认为and 在filter()中用,分隔多个条件表示,如果是or则需增加or_连接多个条件
    and_value = session.query(User.username,User.age).filter(User.age >= 20, User.age <= 40).all()
    print("与查询: {}".format(and_value))

    or_value = session.query(User.username,User.age).filter(or_(User.age >= 20, User.age <= 40)).all()
    print("或查询: {}".format(or_value))

    # 等于查询
    equal_value = session.query(User.username,User.password).filter(User.age == 67).all()
    print("等于查询: {}".format(equal_value))

    not_equal_value = session.query(User.username,User.password).filter(User.age != 67).all()
    print("不等于查询: {}".format(not_equal_value))

    # like模糊匹配
    like_value = session.query(User.username,User.create_time).filter(User.username.like("%ly%")).all()
    print("模糊匹配: {}".format(like_value))

    # in查询范围
    in_value = session.query(User.username,User.password).filter(User.age.in_([24,34])).all()
    print("查询两者: {}".format(in_value))

    not_in_value = session.query(User.username,User.password).filter(User.age.notin_([24,34])).all()
    print("查询非两者: {}".format(not_in_value))

    # op正则匹配查询
    op_value = session.query(User.username).filter(User.username.op("regexp")("^a")).all()
    print("正则匹配: {}".format(op_value))

    # 调用数据库内置函数
    func_value = session.query(func.count(User.age)).one()
    print("调用函数: {}".format(func_value))

    # 数据切片
    cat_value = session.query(User.username).all()[:2]
    print("输出前两条: {}".format(cat_value))

    cat_value = session.query(User.username).offset(5).limit(3).all()
    print("第6行开始显示前3个: {}".format(cat_value))

    cat_value = session.query(User.username).order_by(User.id.desc())[0:10]
    print("输出最后10个: {}".format(cat_value))

    # 非空查询
    isnot_value = session.query(User).filter(User.username.isnot(None)).all()
    print("非空显示: {}".format(isnot_value))

    null_value = session.query(User).filter(User.username.is_(None)).all()
    print("为空显示: {}".format(null_value))

    # 分组测试
    group_by = session.query(User.username,func.count(User.id)).group_by(User.age).all()
    print("根据年龄分组: {}".format(group_by))

    # 进一步过滤查询
    having_by = session.query(User.username,User.age).group_by(User.age).having(User.age > 30).all()
    print("以age分组,并查询age大于30的记录: {}".format(having_by))

数据库修改

演示了修改数据库参数以及对数据库指定记录的删除功能。

import sqlite3,time,datetime
from sqlalchemy import func
from sqlalchemy import or_
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float,DateTime

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")
    age = Column(Integer,nullable=False, default=0)
    create_time = Column(DateTime, default=datetime.datetime.now)
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)
    user_value = Column(Float, default=0.0)

    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写
    def __repr__(self):
        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # 修改数据: 先查询在修改
    select_update = session.query(User).filter_by(username="lyshark").first()
    select_update.password = "test1234"
    session.commit()

    # 修改数据: 直接修改
    session.query(User).filter_by(username="lyshark").update({
   User.password: 'abcd'})
    session.commit()

    session.query(User).filter_by(username="lyshark").update({
   "password": '123456'})
    session.commit()

    # 删除数据: 先查询在删除
    del_ptr = session.query(User).filter_by(username="lyshark").first()
    session.delete(del_ptr)
    session.commit()

    # 删除数据: 直接删除
    session.query(User).filter(User.username=="sqlalchemy").delete()
    session.commit()

数据库查询转字典

将从数据库中过滤查询指定的记录,并将该记录转换为字典JSON格式,利于解析。

import sqlite3,time,datetime,json
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float,DateTime

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")
    age = Column(Integer,nullable=False, default=0)
    create_time = Column(DateTime, default=datetime.datetime.now)
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)
    user_value = Column(Float, default=0.0)

    # 查询结果转字典 (保留数据类型)
    def single_to_dict(self):
        return {
   c.name: getattr(self, c.name) for c in self.__table__.columns}

    # 查询结果转字典 (全转为字符串)
    def dobule_to_dict(self):
        result = {
   }
        for key in self.__mapper__.c.keys():
            if getattr(self, key) is not None:
                result[key] = str(getattr(self, key))
            else:
                result[key] = getattr(self, key)
        return result

# 将查询结果转为JSON
def to_json(all_vendors):
    v = [ ven.dobule_to_dict() for ven in all_vendors ]
    return v

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # 查询结果转为字典(保持数据库格式)
    key_value = session.query(User).first()
    data = key_value.single_to_dict()
    print("转为字典: {}".format(data))

    # 查询结果转为字典(字符串格式)
    key_value = session.query(User).first()
    data = key_value.dobule_to_dict()
    print("转为字符串字典: {}".format(data))

    # 查询结果转为JSON格式
    key_value = session.query(User)
    data = to_json(key_value)
    print("转为JSON格式: {}".format(data))

数据库类内函数调用

用户在使用ORM模型定义类时,可以同时在该映射类中定义各种针对类模型的处理函数,实现对数据的动态处理

from werkzeug.security import generate_password_hash,check_password_hash
import sqlite3,datetime,time
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, Date, Time, Boolean, DECIMAL, Enum, Text

# 创建SQLITE数据库
engine = create_engine("sqlite:///:memory:", encoding='utf-8')
Base = declarative_base()                                       # 生成orm基类

# 创建会话
Session_class = sessionmaker(bind=engine)                       # 创建与数据库的会话session
session = Session_class()                                       # 生成session实例

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)
    username = Column(String(64))
    _password_hash_ = Column(String(256))                       # 加下划线作为私有函数,无法被外部访问
    email = Column(String(64))

    # 设置一个password字段用来设置密码
    @property
    def password(self):
        raise Exception("密码不能被读取")

    # 赋值password字段时,则自动加密存储
    @password.setter
    def password(self, value):
        self._password_hash_ = generate_password_hash(value)

    # 使用 check_password,进行密码校验 返回True False。
    def check_password(self, pasword):
        return check_password_hash(self._password_hash_, pasword)

    # 设置输出函数
    def print_function(self):
        # 密码不可读调用 self.password 会报错
        # print("用户: {} 密码: {}".format(self.username,self.password))

        print("用户: {} email: {}".format(self.username, self.email))
        return True

if __name__ == "__main__":
    print("当前表名: {}".format(User.__table__))

    # 创建数据表
    Base.metadata.create_all(engine, checkfirst=True)

    # 插入测试数据
    insert = User(username="lyshark",password="123123",email="lyshark@163.com")
    session.add(insert)

    insert = User(username="admin",password="556677",email="lyshark@163.com")
    session.add(insert)
    session.commit()

    # 查询测试
    tag = session.query(User).filter_by(username="lyshark").first()
    print("测试密码是否正确: {}".format(tag.check_password("123123")))

    # 调用函数验证当前用户
    tag = session.query(User).filter_by(username="admin").first()
    func = tag.print_function()
    print("输出测试: {}".format(func))

数据库聚合函数

通过func库调用数据库内的聚合函数,实现统计最大最小平均数等数据。

import sqlite3,datetime,time
from sqlalchemy import func
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, Date, Time, Boolean, DECIMAL, Enum, Text

# 建立基本映射类
Base = declarative_base()

# 创建SQLITE数据库
engine = create_engine('sqlite:///:memory:', echo=False)

# 创建映射类User
class User(Base):
    __tablename__ = 'UserDB'

    # 主键 primary_key | 自动增长 autoincrement | 不为空 nullable | 唯一性约束 unique
    id = Column(Integer, primary_key=True, autoincrement=True, nullable=True, unique=True)

    # 字符串类型
    username = Column(String(32), nullable=True, default="none")
    password = Column(String(32), nullable=True, default="none")

    # 姓名字段默认值是0
    age = Column(Integer,nullable=False, default=0)

    # 增加创建日期 [日期:时间]
    create_time = Column(DateTime, default=datetime.datetime.now)

    # onupdate=datetime.now 每次更新数据的时候都要更新该字段值
    update_time = Column(DateTime, onupdate=datetime.datetime.now, default=datetime.datetime.now)

    # 增加用户分数
    user_value = Column(Float, default=0.0)

    # 枚举类型定义
    # tag = Column(Enum("python",'flask','django'))

    # __repr__方法用于输出该类的对象被print()时输出的字符串,如果不想写可以不写
    def __repr__(self):
        return "<UserDB(username='%s',password='%s')>" % (self.username,self.password)

if __name__ == "__main__":
    print("当前表名: {}".format(User.__table__))

    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表
    Base.metadata.create_all(engine, checkfirst=True)

    # 统计总数
    count = session.query(func.count(User.id)).first()
    print("总记录: {}".format(count))

    # age 字段平均值
    age_avg = session.query(func.avg(User.age)).first()
    print("平均值: {}".format(age_avg))

    # age 字段最大值
    age_max = session.query(func.max(User.age)).first()
    print("最大值: {}".format(age_max))

    # age 最小值
    age_min = session.query(func.min(User.age)).one()
    print("最小值: {}".format(age_min))

    # age 求和 只求前三个的和
    age_sum = session.query(func.sum(User.age)).one()
    print("求总数: {}".format(age_sum))

    # 提交事务
    session.commit()

ORM定义一对多关系

SQLAlchemy提供了一个relationship,这个类可以定义属性,以后在访问相关联的表的时候就直接可以通过属性访问的方式就可以访问得到。

from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey

# 打开数据库
Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)

# 主表
class Author(Base):
    __tablename__ = 'author'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(10), nullable=False)

    # 定义外键关联到Book模型上面,主表是author
    books = relationship('Book', backref='author')

    def __repr__(self):
        return '<Author:(id={}, name={})>'.format(self.id, self.name)

# 从表
class Book(Base):
    __tablename__ = 'book'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(20), nullable=False)

    # 外键关联到主表author的id字段上
    author_id = Column(Integer, ForeignKey('author.id',ondelete="RESTRICT"))

    def __repr__(self):
        return '<Book:(id={}, name={}, author_id={})>'.format(self.id, self.name, self.author_id)

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # ------------------------------------------------------
    # 创建数据并插入
    author1 = Author(id=1, name="张三")
    author2 = Author(id=2, name="李四")

    book1 = Book(name="<Python 开发>", author_id=1)
    book2 = Book(name="<C++ 开发教程>", author_id=1)
    book3 = Book(name="<C# 从入门到精通>", author_id=1)

    book4 = Book(name="<渗透测试指南>", author_id=2)
    book5 = Book(name="<nmap 扫描工具指南>", author_id=2)

    session.add_all([author1,book1,book2,book3])
    session.add_all([author2,book4,book5])
    session.commit()

    # ------------------------------------------------------
    # 关联插入模式

    author1 = Author(id=3, name="王五")

    book1 = Book(name="<Python 开发>", author_id=1)
    book2 = Book(name="<C++ 开发教程>", author_id=1)
    book3 = Book(name="<C# 从入门到精通>", author_id=1)

    author1.books.append(book1)
    author1.books.append(book2)
    author1.books.append(book3)

    session.add(author1)
    session.commit()

    # ------------------------------------------------------
    # 一对多查询测试
    book = session.query(Book).get(1)
    print("书籍作者: {}".format(book.author.name))

    author = session.query(Author).get(1)
    print("书籍数量: {}".format(len(author.books)))

    for book_name in author.books:
        print("书籍: {}".format(book_name.name))

ORM定义一对一关系

如果想要将两个模型映射成一对一的关系,那么应该在父模型中,指定引用的时候,要传递一个uselist=False参数进去。就是告诉父模型,以后引用这个从模型的时候,不再是一个列表了,而是一个对象了。

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,relationship,backref
from sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey

# 打开数据库
Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)

# 主表
class User(Base):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(50), nullable=False)

    def __repr__(self):
        return 'User(username:%s)' % self.username

# 从表
class UserExtend(Base):
    __tablename__ = 'user_extend'
    id = Column(Integer,primary_key=True,autoincrement=True)
    school = Column(String(50))
    age = Column(String(32))
    sex = Column(String(32))

    uid = Column(Integer,ForeignKey('user.id'))
    user = relationship('User',backref=backref('extend', uselist=False))

    #uselist=False 告诉父模型 以后引用时不再是列表 而是对象
    def __repr__(self):
        return 'extend(school:%s)'%self.school

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # ------------------------------------------------------
    # 插入测试数据
    user = User(username="lyshark")
    extend = UserExtend(school="<家里蹲大学>", age="22", sex="M")
    extend.user = user

    session.add(extend)
    session.commit()

    # ------------------------------------------------------
    # 一对一关系测试
    user_ptr = session.query(User).first()
    print("用户名: {} --> 学校: {}".format(user_ptr.username,user_ptr.extend.school))

    extend_ptr = session.query(UserExtend).first()
    print("用户名: {} --> 学校: {} --> 年龄: {} --> 性别: {}".format(extend_ptr.user.username,extend_ptr.school,extend_ptr.age,extend_ptr.sex))

ORM定义多对多关系

多对多与上面的一对多,一对一不同,创建多对对必须使用中间表Table来解决查询问题。

  • 多对多的关系需要通过一张中间表来绑定他们之间的关系。
  • 先把两个需要做多对多的模型定义出来
  • 使用Table定义一个中间表,中间表一般就是包含两个模型的外键字段就可以了,并且让他们两个来作为一个“复合主键”。
  • 在两个需要做多对多的模型中随便选择一个模型,定义一个relationship属性,来绑定三者之间的关系,在使用relationship的时候,需要传入一个secondary=中间表。
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,relationship,backref
from sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey, Table
from sqlalchemy import Column,INT,VARCHAR,ForeignKey
from sqlalchemy.orm import relationship

# 打开数据库
Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)

# 女孩表
class Girls(Base):
    __tablename__ = "girl"
    id = Column(Integer,primary_key=True, autoincrement=True)
    name = Column(String(32))

    # 建立多对多关系
    g2b = relationship("Boys",backref="b2g",secondary="hotel")

# 男孩表
class Boys(Base):
    __tablename__ = "boy"
    id = Column(Integer,primary_key=True, autoincrement=True)
    name = Column(String(32))

# 映射关系表
class Table(Base):
    __tablename__ = "hotel"
    id = Column(Integer, primary_key=True, autoincrement=True)
    boy_id = Column(Integer,ForeignKey("boy.id"))
    girl_id = Column(Integer,ForeignKey("girl.id"))

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # ------------------------------------------------------
    # 增加数据 - relationship 正向
    girl_obj = Girls(name="女孩主")
    girl_obj.g2b = [Boys(name="男孩从1"),Boys(name="男孩从2")]
    session.add(girl_obj)
    session.commit()

    # 增加数据 - relationship 反向
    boy_obj = Boys(name="男孩主")
    boy_obj.b2g = [Girls(name="女孩从1"),Girls(name="女孩从2")]
    session.add(boy_obj)
    session.commit()

    # ------------------------------------------------------
    # 正向查询
    girl_obj_list = session.query(Girls).all()
    for girl_obj in girl_obj_list:
        for boy in girl_obj.g2b:
            print(girl_obj.name,boy.name)

    # 反向查询
    boy_obj_list = session.query(Boys).all()
    for boy in boy_obj_list:
        for girl in boy.b2g:
            print(girl.name,boy.name)

连接查询与子查询

连接查询通过JOIN语句实现,子查询则通过subquery实现,首先需要创建一对多关系然后才可使用子查询。

from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String, Text, func, ForeignKey

# 打开数据库
Base = declarative_base()
engine = create_engine('sqlite:///:memory:', echo=False)

# 主表
class Author(Base):
    __tablename__ = 'author'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(10), nullable=False)

    # 定义外键关联到Book模型上面,主表是author
    books = relationship('Book', backref='author')

    def __repr__(self):
        return '<Author:(id={}, name={})>'.format(self.id, self.name)

# 从表
class Book(Base):
    __tablename__ = 'book'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(20), nullable=False)

    # 外键关联到主表author的id字段上
    author_id = Column(Integer, ForeignKey('author.id',ondelete="RESTRICT"))

    def __repr__(self):
        return '<Book:(id={}, name={}, author_id={})>'.format(self.id, self.name, self.author_id)

if __name__ == "__main__":
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()

    # 创建数据表(存在则跳过)
    Base.metadata.create_all(engine, checkfirst=True)

    # ------------------------------------------------------
    # 创建数据并插入
    author1 = Author(id=1, name="张三")
    author2 = Author(id=2, name="李四")

    book1 = Book(name="<Python 开发>", author_id=1)
    book2 = Book(name="<C++ 开发教程>", author_id=1)
    book3 = Book(name="<C# 从入门到精通>", author_id=1)

    book4 = Book(name="<渗透测试指南>", author_id=2)
    book5 = Book(name="<nmap 扫描工具指南>", author_id=2)

    session.add_all([author1,book1,book2,book3])
    session.add_all([author2,book4,book5])
    session.commit()

    # ------------------------------------------------------
    # 连表查询
    no_join_select = session.query(Author).filter(Author.id == Book.id).filter(Author.name == "王五").all()
    print("查询主键==从键 并且 Author.name == 王五的记录: {}".format(no_join_select))

    # JOIN 连接查询
    join = session.query(Author).join(Book).filter(Book.name=="<nmap 扫描工具指南>").first().name
    print("查询主表Author中的Book书名的作者是: {}".format(join))

    join = session.query(Book).join(Author).filter(Author.name=="李四").all()
    for book in join:
        print("查询从表Book中的Author作者有哪些书: {}".format(book.name))

    # subquery 子查询
    sbq = session.query(Book.author_id,func.count('*').label("book_count")).group_by(Book.author_id).subquery()
    print("查询出书籍编号计数(子语句): {}".format(sbq))

    sub_join = session.query(Author.name,sbq.c.book_count).outerjoin(sbq,Author.id == sbq.c.author_id).all()
    print("查询用户有几本书(主语句): {}".format(sub_join))
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1天前
|
人工智能 安全 Java
Python 多线程编程实战:threading 模块的最佳实践
Python 多线程编程实战:threading 模块的最佳实践
10 5
|
1天前
|
人工智能 数据库 开发者
Python中的atexit模块:优雅地处理程序退出
Python中的atexit模块:优雅地处理程序退出
8 3
|
2天前
|
缓存 NoSQL 关系型数据库
在Python Web开发过程中:数据库与缓存,MySQL和NoSQL数据库的主要差异是什么?
MySQL与NoSQL的主要区别在于数据结构、查询语言和可扩展性。MySQL是关系型数据库,依赖预定义的数据表结构,使用SQL进行复杂查询,适合垂直扩展。而NoSQL提供灵活的存储方式(如JSON、哈希表),无统一查询语言,支持横向扩展,适用于处理大规模、非结构化数据和高并发场景。选择哪种取决于应用需求、数据模型及扩展策略。
10 0
|
3天前
|
SQL 关系型数据库 MySQL
第十三章 Python数据库编程
第十三章 Python数据库编程
|
3天前
|
存储 网络协议 关系型数据库
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
|
4天前
|
开发者 Python
Python的os模块详解
Python的os模块详解
15 0
|
7天前
|
数据挖掘 API 数据安全/隐私保护
python请求模块requests如何添加代理ip
python请求模块requests如何添加代理ip
|
7天前
|
NoSQL MongoDB Redis
Python与NoSQL数据库(MongoDB、Redis等)面试问答
【4月更文挑战第16天】本文探讨了Python与NoSQL数据库(如MongoDB、Redis)在面试中的常见问题,包括连接与操作数据库、错误处理、高级特性和缓存策略。重点介绍了使用`pymongo`和`redis`库进行CRUD操作、异常捕获以及数据一致性管理。通过理解这些问题、易错点及避免策略,并结合代码示例,开发者能在面试中展现其技术实力和实践经验。
129 8
Python与NoSQL数据库(MongoDB、Redis等)面试问答
|
7天前
|
SQL 关系型数据库 MySQL
Python与MySQL数据库交互:面试实战
【4月更文挑战第16天】本文介绍了Python与MySQL交互的面试重点,包括使用`mysql-connector-python`或`pymysql`连接数据库、执行SQL查询、异常处理、防止SQL注入、事务管理和ORM框架。易错点包括忘记关闭连接、忽视异常处理、硬编码SQL、忽略事务及过度依赖低效查询。通过理解这些问题和提供策略,可提升面试表现。
28 6
|
8天前
|
测试技术 Python
Python 有趣的模块之pynupt——通过pynput控制鼠标和键盘
Python 有趣的模块之pynupt——通过pynput控制鼠标和键盘