Flask全套知识点从入门到精通,学完可直接做项目(四)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: Flask全套知识点从入门到精通,学完可直接做项目(四)

filter过滤数据

image.png

过滤是数据提取的一个很重要的功能,以下对一些常用的过滤条件 进行解释,并且这些过滤条件都是只能通过filter方法实现的:

  • equals
news=session.query(News).filter(News.title =="title1").first()


  • not equals
query(User).filter(User.name != 'ed')
  • like & ilike [不区分大小写]
query(User).filter(User.name.like('%ed%'))
  • in
query(User).filter(User.name.in_(['ed','wendy','jack']))
  • not in
query(User).filter(~User.name.in_(['ed','wendy','jack']))
  • is null
1. query(User).filter(User.name==None)
2. # 或者是 query(User).filter(User.name.is_(None))
  • is not null
1. query(User).filter(User.name != None)
2. # 或者是query(User).filter(User.name.isnot(None))

and

query(User).filter(and_(User.name=='ed',User.fullname=='Ed Jones'))
# 或者是传递多个参数
query(User).filter(User.name=='ed',User.fullname=='Ed Jones')
# 或者是通过多次filter操作
query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones')
  • or
query(User).filter(or_(User.name=='ed',User.name=='wendy'))

如果想要查看orm底层转换的sql语句,可以在filter方法后面不要再 执行任何方法直接打印就可以看到了。比如:

news =session.query(News).filter(or_(News.title=='abc',News.content=='abc'))
print(news)
from random import randint
from uuid import uuid4
from sqlalchemy import
Column,Integer,String,Float,Text,and_,or_
from db_util import Base,Session
class Article(Base):
    __tablename__ = 't_article'
    id =Column(Integer,primary_key=True,autoincrement=True)
    title =Column(String(50),nullable=False)
    price = Column(Float,nullable=False)
    content = Column(Text)
    def __repr__(self):
        return f"<Article(title:{self.title} price:{self.price} content:{self.content})>"
def create_data():
     with Session() as ses:
        for i in range(10):
            if i%2 == 0:
                art = Article(title =f'title{i+1}',price=randint(1,100),content= uuid4())
            else:
                art = Article(title =f'TITLE{i+1}',price=randint(1,100))
            ses.add(art)
        ses.commit()
def query_data():
    with Session() as ses:
        # rs =ses.query(Article).filter_by(id=1).first()
        rs =ses.query(Article).filter(Article.id ==1).first()
        print(rs)
def query_data_equal():
    with Session() as ses:
        rs =ses.query(Article).filter(Article.title =='title2').first()
        print(rs)
def query_data_not_equal():
    with Session() as ses:
        rs =ses.query(Article).filter(Article.title !='title2').all()
        print(rs)
def query_data_like():
    with Session() as ses:
        # select * from t_article wheretitle like 'title%';
        rs =ses.query(Article).filter(Article.title.like('title%')).all()
        for r in rs:
            print(r)
def query_data_in():
    with Session() as ses:
      rs=ses.query(Article).filter(Article.title.in_(['title1','title3','title6'])).all()
        for r in rs:
            print(r)
def query_data_not_in():
    with Session() as ses:
     rs=ses.query(Article).filter(~Article.title.in_(['title1','title3','title6'])).all()
        for r in rs:
            print(r)
def query_data_null():
    with Session() as ses:
        rs =ses.query(Article).filter(Article.content== None).all()
        for r in rs:
            print(r)
def query_data_not_null():
    with Session() as ses:
        rs =ses.query(Article).filter(Article.content!= None).all()
        for r in rs:
            print(r)
def query_data_and():
    with Session() as ses:
        # rs =ses.query(Article).filter(Article.title!='title4' and Article.price>8).all()
        # rs =ses.query(Article).filter(Article.title!='title4',Article.price >50 ).all()
        rs =ses.query(Article).filter(and_(Article.title !='title4',Article.price >50)).all()
        for r in rs:
            print(r)
def query_data_or():
    with Session() as ses:
          rs =ses.query(Article).filter(or_(Article.title=='title4',Article.price >50) ).all()
        for r in rs:
            print(r)
if __name__ == '__main__':
    # Base.metadata.create_all()
    # create_data()
    # query_data()
    # query_data_equal()
    # query_data_not_equal()
    # query_data_like()
    # query_data_in()
    # query_data_not_in()
    # query_data_null()
    # query_data_not_null()
    # query_data_and()
    query_data_or()

表关系

表之间的关系存在三种:一对一、一对多、多对多。 而SQLAlchemy中的ORM也可以模拟这三种关系。 因为一对一其实在SQLAlchemy中底层是通过一对多的方式模拟 的,所以先来看下一对多的关系:

外键: 使用SQLAlchemy创建外键非常简单。在从表中增加一个字段,指 定这个字段外键的是哪个表的哪个字段就可以了。从表中外键的字 段,必须和主表的主键字段类型保持一致。

class User(Base):
    __tablename__ = 't_user'
    id =Column(Integer,primary_key=True,autoincrement=True)
    uname =Column(String(50),nullable=False,name='name')
class News(Base):
    __tablename__ = 't_news'
    id =Column(Integer,primary_key=True,autoincrement=True)
    title =Column(String(50),nullable=False)
    content = Column(Text,nullable=False)
    uid =Column(Integer,ForeignKey('t_user.id',)

外键约束有以下几项

RESTRICT:若子表中有父表对应的关联数据,删除父表对应数 据,会阻止删除。默认项

NO ACTION:在MySQL中,同RESTRICT。

CASCADE:级联删除。

SET NULL:父表对应数据被删除,子表对应数据项会设置为 NULL。

from sqlalchemy import
Column,Integer,String,Text,ForeignKey
from db_util import Base,Session
class User(Base):
    __tablename__ = 't_user'
    id =Column(Integer,primary_key=True,autoincrement=True)
    uname =Column(String(50),nullable=False,name='name')
class News(Base):
    __tablename__ = 't_news'
    id =Column(Integer,primary_key=True,autoincrement=True)
    title =Column(String(50),nullable=False)
    content = Column(Text,nullable=False)
    # uid =Column(Integer,ForeignKey('t_user.id')) # 默认不让删主表数据
    # uid =Column(Integer,ForeignKey('t_user.id',ondelete = 'RESTRICT')) # 默认的策略
    # uid =Column(Integer,ForeignKey('t_user.id',ondelete = 'NO ACTION')) # 默认的策略
    # uid =Column(Integer,ForeignKey('t_user.id',ondelete = 'CASCADE')) # 级联删除,发主表的数据被删除,子表的里数据也会删除
    uid =Column(Integer,ForeignKey('t_user.id',ondelete = 'SET NULL')) # 发现主表数据被删除时,子表的数据列会清空
def create_data():
    user = User(uname = 'zs')
    news1 =News(title='python',content='flask',uid = 1)
    news2 =News(title='MySQL',content='SQL',uid = 1)
    with Session() as ses:
        ses.add(user)
        ses.commit()
    with Session() as ses:
        ses.add(news1)
        ses.add(news2)
        ses.commit()
if __name__ == '__main__':
    Base.metadata.create_all()
    create_data()

ORM关系之一对多

image.png

mysql级别的外键,还不够爽,必须拿到一个表的外键,然后通过 这个外键再去另外一张表中查找,这样太麻烦了。


SQLAlchemy提供了一个 relationship ,这个类可以定义属性,以后在访 问相关联的表的时候就直接可以通过属性访问的方式就可以访问得 到了。 另外,可以通过 backref 来指定反向访问的属性名称。newss是指有多 篇新闻。他们之间的关系是一个“一对多”的关系

from sqlalchemy import
Column,Integer,String,Text,ForeignKey
from sqlalchemy.orm  import relationship
from db_util import Base,Session
class User(Base):
    __tablename__ = 't_user'
    id =Column(Integer,primary_key=True,autoincrement=True)
    uname =Column(String(50),nullable=False,name='name')
    # news = relationship('News') # 不友好
    def __repr__(self):
        return f'<User: id={self.id} uname={self.uname}>'
# 1对多 ForeignKey的关键字要建立在 多一边
class News(Base):
    __tablename__ = 't_news'
    id =Column(Integer,primary_key=True,autoincrement=True)
    title =Column(String(50),nullable=False)
    content = Column(Text,nullable=False)
    uid =Column(Integer,ForeignKey('t_user.id'))
    user =relationship('User',backref='news')  # 将主表的数据注入到这个字段
    def __repr__(self):
        return f'<News: id={self.id} title={self.title} content={self.content} uid={self.uid}>'
def create_data():
    user = User(uname = 'sxt')
    news1 =News(title='Python',content='flask',uid = 1)
    news2 =News(title='MySQL',content='SQL',uid = 1)
    with Session() as ses:
        ses.add(user)
        ses.commit()
    with Session() as ses:
        ses.add(news1)
        ses.add(news2)
        ses.commit()
def query_data():
    with Session() as ses:
        # news1 = ses.query(News).first()
        # print(news1)
        # select u.id u.uname from t_news n left join t_user u n.uid = u.id where n.id =
1;
        news1 = ses.query(News).first()
        uid = news1.uid
        user = ses.query(User).first()
        print(user)
def query_data2():
    # 通地子表查询主表的数据
    with Session() as ses:
        news1 = ses.query(News).first()
        print(news1.user)
def query_data3():
    # 通地主表查找子表的数据
    with Session() as ses:
        user1 = ses.query(User).first()
        print(user1.news)
if __name__ == '__main__':
    # Base.metadata.create_all()
    # create_data()
    # query_data()
    # query_data2()
    query_data3()

ORM关系之一对一

image.png

在sqlalchemy中,如果想要将两个模型映射成一对一的关系,那么 应该在父模型中,指定引用的时候,要传递一个 uselist=False 这个参数 进去。 就是告诉父模型,以后引用这个从模型的时候,不再是一个列表 了,而是一个对象了

class LoginUser(Base):
    __tablename__ = 't_user_login'
    id =Column(Integer,primary_key=True,autoincrement=True)
    uname =Column(String(32),nullable=False)
    passwd =Column(String(32),nullable=False)
# 创建1对1的关系, 创建一个字段来做别一个表的标识(外键)
class User(Base):
    __tablename__ = 't_user'
     id =Column(Integer,primary_key=True,autoincrement=True)
    name =Column(String(32),nullable=False,name='name')
    gender = Column(String(1))
    address = Column(String(64))
    login_id =Column(Integer,ForeignKey('t_user_login.id'))
    login_user =relationship('LoginUser',backref=backref('user',uselist=False))

ORM关系之多对多

image.png

多对多的关系需要通过一张中间表来绑定他们之间的关系。

先把两个需要做多对多的模型定义出来

使用Table定义一个中间表,中间表一般就是包含两个模型的外 键字段就可以了,并且让他们两个来作为一个“复合主键”

在两个需要做多对多的模型中随便选择一个模型,定义一个 relationship属性,来绑定三者之间的关系,在使用relationship 的时候,需要传入一个secondary=中间表对象名  

from sqlalchemy import
Column,Integer,String,ForeignKey
from sqlalchemy import Table
from sqlalchemy.orm  import
relationship,backref
from db_util import Base,Session
# 创建第3张表,来建立多对多关系
# 放到2个模型之上
news_tag = Table(
    't_news_tag',
    Base.metadata,
  Column('news_id',Integer,ForeignKey('t_news.id'),primary_key = True),
  Column('tag_id',Integer,ForeignKey('t_tag.id'),primary_key = True),)
class News(Base):
     __tablename__ = 't_news'
    id =Column(Integer,primary_key=True,autoincrement=True)
    title =Column(String(32),nullable=False)
    tags =relationship('Tag',backref='newss',secondary= news_tag)
    def __repr__(self):  
        return f'<News: id={self.id} title={self.title}>'
class Tag(Base):
    __tablename__ = 't_tag'
    id =Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(32),nullable=False)
    # news =relationship('News',backref='tags',secondary= news_tag)
    def __repr__(self):  
        return f'<Tag: id={self.id} name={self.name}>'
def create_data():
    news1 = News(title = 'Python更新了!')
    news2 = News(title = 'SQLAlchemy功能又强大了!')
    tag1 = Tag(name = 'IT新闻')
    tag2 = Tag(name = '科学技术')
    news1.tags.append(tag1)
    news1.tags.append(tag2)
    news2.tags.append(tag1)
    news2.tags.append(tag2)
    with  Session() as ses:
        ses.add(news1)
        ses.add(news2)
        ses.commit()
def query_data():
    with  Session() as ses:
        news = ses.query(News).first()
        print(news.tags)
if __name__ == '__main__':
    # Base.metadata.create_all()
    # create_data()
    query_data()

ORM层面删除数据注意事项

ORM层面删除数据,会无视mysql级别的外键约束。 直接会将对应的数据删除,然后将从表中的那个外键设置为NULL, 也就是数据库的 SET NULL 。 如果想要避免这种行为,应该将从表中的外键的 nullable=False 。

from sqlalchemy import Column, Integer,
String, ForeignKey
from sqlalchemy.orm import relationship
from db_util import Base, Session
class User(Base):
    __tablename__ = 't_user'
    id = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(32))
class Article(Base):
    __tablename__ = 't_article'
    id = Column(Integer, primary_key=True,autoincrement=True)
    title = Column(String(32))
    uid = Column(Integer,ForeignKey("t_user.id"))
    # uid = Column(Integer,ForeignKey("t_user.id"),nullable = False)
    user =relationship('User',backref='articles')
def create_data():
    Base.metadata.drop_all() # 删除已有的表
    Base.metadata.create_all() # 创建表
    # 初始化数据
    user = User(name='zs')
    art1 = Article(title='Python', uid=1)
    art2 = Article(title='MySQL', uid=1)
    user.articles.append(art1)
    user.articles.append(art2)
    with Session() as ses:
        ses.add(user)
        ses.commit()
def delete_data():
    # 默认删除主表数据时,会将子表的引用主表数据的外键设置Null
    with Session() as ses:
        user = ses.query(User).first()
        ses.delete(user)
        ses.commit()
if __name__ == '__main__':
    # create_data()
    delete_data()

ORM层面的relationship方法中cascade


在SQLAlchemy,只要将一个数据添加到session中,和他相关联的 数据都可以一起存入到数据库中了。 这些是怎么设置的呢?其实是通过relationship的时候,有一个关键 字参数cascade可以设置这些属性,


cascade属性值为:


save-update:默认选项。在添加一条数据的时候,会把其他和他 相关联的数据都添加到数据库中。这种行为就是save-update属性 影响的。


delete:表示当删除某一个模型中的数据的时候,是否也删掉使用 relationship和他关联的数据。 delete-orphan:表示当对一个ORM对象解除了父表中的关联对象 的时候,自己便会被删除掉。当然如果父表中的数据被删除,自己 也会被删除。这个选项只能用在一对多上,并且还需要在子模型中 的relationship中,增加一个single_parent=True的参数。


merge:默认选项。当在使用session.merge,合并一个对象的时 候,会将使用了relationship相关联的对象也进行merge操作。


expunge:移除操作的时候,会将相关联的对象也进行移除。这个 操作只是从session中移除,并不会真正的从数据库中删除。


all:是对save-update, merge, refresh-expire, expunge, delete 几种的缩写。

from sqlalchemy import Column, Integer,
String, ForeignKey
from sqlalchemy.orm import
relationship,backref
from db_util import Base, Session
class User(Base):
    __tablename__ = 't_user'
    id = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(32))
    # articles =relationship('Article',backref='user',cascade='')
    # articles =relationship('Article',backref='user',cascade='save-update') # 默认cascade的值是saveupdate
    # articles =relationship('Article',backref='user',cascade='save-update,delete') #delete可以帮助删除关联表的数据
    # articles =relationship('Article',backref='user',cascade='save-update,delete,deleteorphan',single_parent=True) # 当关联关系被解除时,子表数据会被清空
class Article(Base):
    __tablename__ = 't_article'
    id = Column(Integer, primary_key=True,autoincrement=True)
    title = Column(String(32))
    uid = Column(Integer,ForeignKey("t_user.id"))
    # user =relationship('User',backref='articles',cascade='save-update,delete') # 会把主表的数据删除
    user =relationship('User',backref=backref('articles',cascade='saveupdate,delete,deleteorphan'))
def create_data():
    Base.metadata.drop_all() # 删除已有的表
    Base.metadata.create_all() # 创建表
    # 初始化数据
    user = User(name='SXT')
    art1 = Article(title='Python', uid=1)
    art2 = Article(title='MySQL', uid=1)
    user.articles.append(art1)
    user.articles.append(art2)
    # 保存数据
    with Session() as ses:
        ses.add(user)
        ses.commit()
def delete_data():
    with Session() as ses:
        user = ses.query(User).first()
        ses.delete(user)
        ses.commit()
def delete_art():
    with Session() as ses:
        art = ses.query(Article).first()
        ses.delete(art)
        ses.commit()
def update_data():
    with Session() as ses:
        user = ses.query(User).first()
        user.articles = []
        ses.commit()
if __name__ == '__main__':
    # create_data()
    # delete_data()
    # update_data()
    delete_art()

排序

order_by方法排序:可以指定根据模型中某个属性进行排序,"模型 名.属性名.desc()"代表的是降序排序。relationship的方法中order_by属性:在指定relationship方法的时 候,添加order_by属性来指定排序的字段。

方式1:order_by方法指定

# 升序
users =ses.query(User).order_by(User.age).all()
# 降序
users =ses.query(User).order_by(User.age.desc()).all()

方式2:涉及两表时,定义模型时,用relationship方法中的 order_by属性指定排序方式

from random import randint
from sqlalchemy import
Column,Integer,String,ForeignKey
from sqlalchemy.orm import
relationship,backref
from db_util import Base,Session
class User(Base):
    __tablename__ = 't_user'
    id = Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(32))
    age = Column(Integer)
    def __repr__(self):
        return f'<User: id={self.id} name={self.name} age={self.age}>'  
class News(Base):
    __tablename__ = 't_news'
    id =Column(Integer,primary_key=True,autoincrement=True)
    title =Column(String(32),nullable=False)
    content =Column(String(32),nullable=False)
    read_count = Column(Integer)
    uid =Column(Integer,ForeignKey('t_user.id'))
    user =relationship('User',backref=backref('newss',order_by=read_count))
    def __repr__(self):
        return f'<User: id={self.id} title={self.title} content={self.content} read_count={self.read_count}>'  
def create_user():
    with Session() as ses:
        for i in range(10):
            user = User(name = f'用户{i}',age= randint(6,20))
            ses.add(user)
        for i in range(10):
            news = News(title = f'新闻{i}',content = '新闻',read_count =randint(1,1000))
            user.newss.append(news)
        ses.commit()
def query_user():
    with Session() as ses:
        users = ses.query(User).all()
        for i in users[-1].newss:
            print(i)
if __name__ == '__main__':
    # Base.metadata.drop_all()
    # Base.metadata.create_all()
    # create_user()
    query_user()


注意 __mapper_args__ 参数的1.1版本已被抛弃

limit、offset、slice使用

image.png

limit:可以限制查询的时候只查询前几条数据。 属top-N查询

offset:可以限制查找数据的时候过滤掉前面多少条。可指定开 始查询时的偏移量。

切片:可以对Query对象使用切片操作,来获取想要的数据。

可以使用 slice(start,stop) 方法来做切片操作。

也可以使用 [start:stop] 的方式来进行切片操作。

一般在实际开发中,中括号的形式是用得比较多的。  

from random import randint
from sqlalchemy import Column,Integer,String
from db_util import Base,Session
class News(Base):
    __tablename__ = 't_news'
    id =Column(Integer,primary_key=True,autoincrement=True)
    title =Column(String(32),nullable=False)
    content =Column(String(32),nullable=False)
    read_count = Column(Integer)
    def __repr__(self):
        return f'<User: id={self.id} title={self.title} content={self.content} read_count={self.read_count}>'  
def create_data():
    Base.metadata.drop_all()
    Base.metadata.create_all()
    with Session() as ses:
        for i in range(10):
            news = News(title=f'title{i}',content=f'info{i}',read_count= randint(0,1000))
            ses.add(news)
        ses.commit()
def query_by_limit():
    with Session() as ses:
        newss = ses.query(News).limit(3).all()
        for n in newss:
            print(n)
def query_by_offset():
    with Session() as ses:
        newss = ses.query(News).offset(3).all()
     for n in newss:
            print(n)        
def query_by_page():
    # limit topN数据
    # offset 跳过n数据
    # 分页效果 1-3 4-6 7-9  
    # 3 0     1 (pagenum-1)*pagesize
    # 3 3     2   (2-1)*3 = 3
    # 3 6     3   (3-1)*3 = 6
    # 3 9     4   (4-1)*3 = 6
    with Session() as ses:
        # (pagenum-1)*pagesize
        newss = ses.query(News).limit(3).offset(3).all()
        for n in newss:
            print(n)
def query_by_slice():
    with Session() as ses:
        # 从哪个索引开始,到哪个索引结束
        newss = ses.query(News).slice(3,6).all()
        for n in newss:
            print(n)
def query_by_qiepian():
    with Session() as ses:
        # 从哪个索引开始,到哪个索引结束
        newss = ses.query(News).all()[3:6]
        for n in newss:
            print(n)
if __name__ == '__main__':
    # create_data()
    # query_by_limit()
    # query_by_offset()
    # query_by_page()
    # query_by_slice()
    query_by_qiepian()

懒加载

在一对多,或者多对多关系的时候,如果想要获取多的一方这一部 分的数据的时候,往往能通过一个属性就可以全部获取了。 如有一个作者,想要这个作者的所有文章,通过user.articles就可 以获取所有的 但有时候我们不想获取所有的数据,如只想获取这个作者今天发表 的文章,那么这时候我们可以给relationship方法添加属性 lazy='dynamic' ,以后通过 user.articles 获取到的就不是一个列表,而是一个 AppenderQuery对象了。这样就可以对这个对象再进行一层过滤和 排序等操作 通过 lazy='dynamic' ,获取出来的多的那一部分的数据,就是一个 AppenderQuery 对象了。这种对象既可以添加新数据,也可以跟 Query 一 样,可以再进行一层过滤.

lazy可用的选项:


1 select : (默认) 后台会用select语句一次性加载所有数据,即访问 到属性的时候,就会全部加载该属性的数据


2 joined - 数据会被JOIN语句加载,即对关联的两个表进行join操 作,从而获取到所有相关的对象 3 subquery - 数据被用subquery子查询SQL语句加载


4 dynamic :这个也是懒加载。在访问属性的时候,并不在内存中加 载数据,而是返回一个 AppenderQuery 对象, 需要执行相应方法才可 以获取对象。适用于数据量大的时候


注意


lazy="dynamic" 只可以用在一对多和多对对关系中,不可以用在一 对一和多对一中。 这样也合理:如果返回结果很少的话,就没必要延迟加载数据了。


from random import randint
from sqlalchemy import
Column,Integer,String,ForeignKey
from sqlalchemy.orm import
relationship,backref
from db_util import Base,Session
class User(Base):
    __tablename__ = 't_user'
    id = Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(32))
    age = Column(Integer)
    def __repr__(self):
        return f'<User: id={self.id} name={self.name} age={self.age}>'  
class News(Base):
    __tablename__ = 't_news'
    id =Column(Integer,primary_key=True,autoincrement=True)
    title =Column(String(32),nullable=False)
    content =Column(String(32),nullable=False)
    read_count = Column(Integer)
    uid =Column(Integer,ForeignKey('t_user.id'))
    user =relationship('User',backref=backref('newss',lazy='dynamic'))
    def __repr__(self):
        return f'<News: id={self.id} title={self.title} content={self.content} read_count={self.read_count}>'  
def create_data():
    with Session() as ses:
        for i in range(10):
            user = User(name =f'name{i}',age = randint(6, 30))
            ses.add(user)
        for i in range(10):
            news =News(title=f'title{i}',content=f'info{i}',read_count= randint(0,1000))
            user.newss.append(news)
        ses.commit()
def query_data():
    with Session() as ses:
        users = ses.query(User)
        print(users)
        print(type(users))
def query_data2():
    with Session() as ses:
        users = ses.query(User).all()
        print(users[-1].newss)
        print(type(users[-1].newss))
def query_data3():
    # 'lazy = select 默认不能2次过滤'
    with Session() as ses:
        users = ses.query(User).all()
        newss =users[-1].newss.filter(News.read_count >500).all()
        print(newss)
if __name__ == '__main__':
    # Base.metadata.drop_all()
    # Base.metadata.create_all()
    # create_data()
    # query_data2()
    query_data3()

分组group_by和过滤分组having

group_by

根据某个字段进行分组。如想要根据年龄进行分组,来统计每个分 组分别有多少人

r =session.query(User.age,func.count(User.id)).group_by(User.age).all()

having having

是对分组查找结果作进一步过滤。如只想要看未成年人的人 数, 那么可以首先对年龄进行分组统计人数,然后再对分组进行having 过滤。

r =session.query(User.age,func.count(User.id)).group_by(User.age).having(User.age < 18).all()
from random import randint
from sqlalchemy import
Column,Integer,String,ForeignKey,func
from sqlalchemy.orm import
relationship,backref
from db_util import Base,Session
class User(Base):
    __tablename__ = 't_user'
    id = Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(32))
    age = Column(Integer)
    def __repr__(self):
        return f'<User: id={self.id} name={self.name} age={self.age}>'
def create_data():
    Base.metadata.drop_all()
    Base.metadata.create_all()
    with Session() as ses:
        for i in range(10):
            user = User(name =f'name{i}',age = randint(6, 30))
            ses.add(user)
        ses.commit()
def query_by_age():
    # 统计 每个年龄的人数
    with Session() as ses:
         user =ses.query(User.age,func.count(User.id)).group_by(User.age).all()
        print(user)
def query_by_age_gt_18():
    #统计 每个年龄的人数,要求排除未成年人
    with Session() as ses:
        user =ses.query(User.age,func.count(User.id)).group_by(User.age).having(User.age>18).all()
        print(user)
def query_by_age_lt_18():
    # 统计 每个年龄的人数,要求未成年人
    with Session() as ses:
        user =ses.query(User.age,func.count(User.id)).group_by(User.age).having(User.age<18).all()
        print(user)
if __name__ =='__main__':
    # create_data()
    # query_data()
    # query_by_age_gt_18()
    query_by_age_lt_18()

Flask-SQLAlchemy的使用

image.png

Flask-SQLAlchemy是一个插件, Flask-SQLAlchemy是对SQLAlchemy进行了一个简单的封装的一个 插件, 使得我们在flask中使用sqlalchemy更加的简单。


安装 pip install flask-sqlalchemy


Flask-SQLAlchemy的使用要点


数据库连接


数据库初始化不再是通过create_engine。


1 跟sqlalchemy一样,定义好数据库连接字符串DB_URI。


2 将这个定义好的数据库连接字符串DB_URI,通过 SQLALCHEMY_DATABASE_URI 这个key名配置到 app.config 中。

app.config["SQLALCHEMY_DATABASE_URI"] =DB_URI

3 使用 flask_sqlalchemy.SQLAlchemy 这个类定义一个对象,并将 app 传入进 去。

db = SQLAlchemy(app)

创建ORM模型类


之前都是通过Base = declarative_base()来初始化一个基类,然后 再继承,在Flask-SQLAlchemy中更加简单了


1 还是跟使用sqlalchemy一样,定义模型。现在不再是需要使用 delarative_base 来创建一个基类。 而是使用 db.Model 来作为基类


2 在模型类中, Column 、 String 、 Integer 以及 relationship 等,都不需要导入了,直接使用 db 下面相应的属性名就可以了


3 在定义模型的时候,可以不写 __tablename__ ,那么 flask_sqlalchemy 会默认使用当前的模型的名 字转换成小写来作为表的名字


并且如果这个模型的名字用到了多个单词并且使用了驼峰命名 法,那么会在多个单词之间使用下划线来进行连接


将ORM模型映射到数据库表


写完模型类后,要将模型映射到数据库的表中,使用以下代码即可


1. 删除数据库表: db.drop_all()


2. 创建数据库表: db.create_all()


session的使用


以后session也不需要使用 sessionmaker 来创建了, 直接使用 db.session 就可以了, 操作这个session的时候就跟之前的 sqlalchemy 的 session 是一样一样的。


添加数据


这时候就可以在数据库中看到已经生成了对应表了 添加数据和之前的没有区别,只是session成为了一个db的属性


查询数据


1 单表查询:查询数据不再是之前的session.query方法了,而是将query属性 放在了db.Model上, 所以查询就是通过“模型名.query”的方式进行查询了, query 就跟 之前的sqlalchemy中的query方法是一样用的。


2 多表查询: 如果查找数据涉及多个模型,只能使用db.session.query(模型 名).all() 这种方式


修改数据 :修改数据和之前的没有区别,只是session成为了一个db的属性


删除数据: 删除数据跟添加数据和修改数据类似,只不过session是db的一 个属性而已

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app =  Flask(__name__)
# 数据库的变量
HOST = '192.168.30.151'  # 127.0.0.1/localhost
PORT = 3306
DATA_BASE = 'flask_db'
USER = 'root'
PWD = '123'
DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
app.config['SQLALCHEMY_DATABASE_URI'] =DB_URI
app.config['SQLALCHEMY_TRACK_MODIFICATIONS']= False
# 链接数据库
db =  SQLAlchemy(app)
# 创建模型类
class User(db.Model):
    __tablename__ = 't_user' # flask_alchemy 可以忽略不写
    id = db.Column(db.Integer,primary_key =True,autoincrement = True)
    name = db.Column(db.String(32))
    def __repr__(self):
        return f'<User id={self.id} name={self.name}>'
class News(db.Model):
    __tablename__ = 't_news' # flask_alchemy 可以忽略不写
    id = db.Column(db.Integer,primary_key =True,autoincrement = True)
    content = db.Column(db.String(100))
    uid =db.Column(db.Integer,db.ForeignKey('t_user.id'))
    user =db.relationship('User',backref='newss')
    def __repr__(self):
        return f'<News id={self.id} content={self.content}>'
# 删除表
# db.drop_all()
# 创建表
# db.create_all()
# 增加数据
def create_data():
    user = User(name = 'zs')
    news = News(content = 'Python内容')
    user.newss.append(news)
    db.session.add(user)
    db.session.commit()
# 查询单表数据
def query_data_one():
    users = User.query.all()
    print(users)
# 查询多表
def query_data_many():
    rs =db.session.query(User,News.content).join(News,News.uid == User.id).all()
    print(rs)
# 修改数据
def update_data():
    user = User.query.first()
    user.name = 'lisi'
    db.session.commit()
# 删除数据
def delete_data():
    news = News.query.first()
    db.session.delete(news)
    db.session.commit()
if __name__ == "__main__":
    # create_data()
    # query_data_one()
    # query_data_many()
    # update_data()
    delete_data()

Flask-Migrate

image.png

介绍


flask-migrate是flask的一个扩展模块,主要是扩展数据库表结构 的。 flask-migrate是基于Alembic进行的一个封装,并集成到Flask中, 所有的迁移操作其实都是Alembic做的,他能跟踪模型的变化,并 将变化映射到数据库中。


安装 pip install flask-migrate


使用方法 模型类

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app =  Flask(__name__)
# 数据库的变量
HOST = '192.168.30.151'  # 127.0.0.1/localhost
PORT = 3306
DATA_BASE = 'flask_db'
USER = 'root'
PWD = '123'
DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
# mysql+pymysql://root:123@192.168.30.151/flask_db
app.config['SQLALCHEMY_DATABASE_URI'] =DB_URI
app.config['SQLALCHEMY_TRACK_MODIFICATIONS']= False
db =  SQLAlchemy(app)
# 创建模型类
class User(db.Model):
    __tablename__ = 't_user'
    id = db.Column(db.Integer,primary_key =True,autoincrement = True)
    name = db.Column(db.String(32))
    age = db.Column(db.Integer)
    def __repr__(self):
        return f'<User id={self.id} name={self.name}>'
from flask_migrate import Migrate
Migrate(app,db)

注意 创建Migrate(app,db)对象

创建迁移仓库

这个命令会创建migrations文件夹,所有迁移文件都放在里面。

flask db init

生成脚本文件

flask db migrate

更新数据库

flask db upgrade

返回以前的版本

flask db downgrade version_

Flask项目结构重构

image.png

一个良好的项目结构努力可以清晰的看出来各个模块的作用,方便 扩展,易于修改 虽然Flask并没有强制要求开发者项目的目录层次结构应该是怎么样 的,但是如果我们以包和模块的形式组织项目的话,后期的开发会 非常的有条理。

项目结构

基本结构如下:可根据实际需求做微小调整。

|project_name
|--pro_name # 整个程序的包目录
|----__init__.py # 项目包文件
|----templates # 模板文件
|------common # 通用模板
|------errors # 错误页面
|------user # 用户模板
|------email # 邮件模板
|----static # 静态资源文件
|------js # JS脚本
|------css # 样式表
|------img # 图片
|------favicon.ico # 网站图表
|----user # 用户模块
|------__init__.py # 用户模块-包文件
|------views.py # 用户模块-视图文件
|----item # 产品模块
|------__init__.py # 产品模块-包文件
|------views.py # 产品模块-视图文件
|----models.py # 数据模型
|--app.py # 项目启动控制文件
|--config.py # 配置文件
|--requirements.txt # 依赖包列表
|--migrations # 数据库迁移目录

注意1


整个程序的包目录名不能为 app ,不然会报 Error: Failed to find Flask application or factory in module 'app'. Use 'FLASK_APP=app:name' to specify one.


注意2


项目启动控制文件名为 app.py ,不然会报 Error: Could not locate a Flask application. You did not provide the "FLASK_APP" environment variable, and a "wsgi.py" or "app.py" module was not found in the current directory.


解决方案2 使用.env文件解决

# .env
FLASK_APP=pro_name/__init__:create_app()


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
7月前
|
缓存 前端开发 JavaScript
flask各种版本的项目,终端命令运行方式的实现
flask各种版本的项目,终端命令运行方式的实现
331 4
|
7月前
|
数据可视化 API 开发者
通俗易懂:一步步教你 Flask 项目自动生成 API 文档
Flasgger,作为一款强大的 Flask 扩展,自动从 Flask 应用中提取并生成 OpenAPI 规范文档,配备 SwaggerUI,为开发者提供了一条快捷通道,让 API 的文档编制和交互式测试变得简单易行。Flasgger 的设计原则是简化开发流程,通过与 Flask 框架的无缝整合,让开发者可以更专注于应用逻辑的构建。
|
2月前
|
前端开发 JavaScript 数据库连接
一、Flask入门介绍
一、Flask入门介绍
49 1
|
4月前
|
Linux Python
【Azure 应用服务】Azure App Service For Linux 上实现 Python Flask Web Socket 项目 Http/Https
【Azure 应用服务】Azure App Service For Linux 上实现 Python Flask Web Socket 项目 Http/Https
|
5月前
|
安全 前端开发 API
震惊!掌握Django/Flask后,我竟然轻松征服了所有Web项目难题!
【7月更文挑战第15天】Python Web开发中,Django以其全面功能见长,如ORM、模板引擎,助你驾驭复杂需求;Flask则以轻量灵活取胜,适合快速迭代。两者结合使用,无论是数据库操作、用户认证还是API开发,都能让你应对Web挑战游刃有余。掌握这两者,Web项目难题变得易如反掌!
80 10
|
6月前
|
数据处理 Python
Flask 项目工程目录层级划分
本文介绍了如何将 Flask 项目工程目录层级按照主题分类划分,主要包括模型层、视图层、表单层、模板文件和静态文件。通过合理地组织项目文件,可以提高项目的可读性、可维护性和可扩展性。
104 5
|
7月前
|
前端开发 JavaScript 数据库
Flask狼书笔记 | 09_图片社交网站 - 大型项目的架构与需求(2)
9.8 收藏图片 前面已经学习过如何使用关联表来表示多对多关系,缺点是只能表示关系,不能存储数据(如我还想记录下收藏图片的时间戳)。这种情况下,我们可以使用关联模型来表示多对多关系。 在关联模型中,我们将Photo模型与User模型的多对多关系,分离成了User模型和Collect模型的一对多关系,和Photo模型与Collect模型的一对多关系。
154 0
|
7月前
|
数据库连接 Python
Flask 框架入门与实践:构建你的第一个 Web 应用
【5月更文挑战第18天】本文介绍了使用 Flask 框架构建第一个 Web 应用的步骤。首先通过 `pip install Flask` 安装框架,然后编写基本的 Python 代码创建应用,包括定义路由和响应。示例展示如何显示 &quot;Hello, World!&quot;,并扩展到显示用户信息的功能。利用模板(如 `index.html`)可使页面更丰富。随着学习深入,可以利用 Flask 的更多特性,如表单处理和数据库连接,来构建更复杂的 Web 应用。本文旨在激发读者对 Flask 和 Web 开发的兴趣,鼓励不断探索和实践。
160 7
|
6月前
|
前端开发 索引 Python
【已解决】Flask项目报错TypeError: tuple indices must be integers or slices, not str
【已解决】Flask项目报错TypeError: tuple indices must be integers or slices, not str
|
7月前
|
开发者 索引 Python
Flask环境搭建与项目初始化
【4月更文挑战第15天】本文指导如何搭建Flask开发环境并初始化项目。首先确保安装Python,然后通过pip安装Flask。创建名为`myflaskapp`的项目目录,包含`app.py`入口文件。在`app.py`中初始化Flask应用,定义路由和视图函数。运行`python app.py`启动开发服务器,访问`http://127.0.0.1:5000/`查看结果。完成基本设置后,可按需求扩展应用功能。