FastAPI(44)- 操作关系型数据库(下)

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: FastAPI(44)- 操作关系型数据库(下)

ItemBase、UserBase

基类,声明在创建或读取数据时共有的属性

 

ItemCreate、UserCreate

创建数据时使用的 Model

 

Item、User

读取数据时使用的 Model

 

orm_mode

class Config:

  orm_mode = True

  • 这是一个 Pydantic 配置项
  • orm_mode 会告诉 Pydantic 模型读取数据,即使它不是字典,而是 ORM 模型(或任何其他具有属性的任意对象)

# 正常情况

id = data["id"]


# 还会尝试从对象获取属性

id = data.id

设置了 orm_mode,Pydantic 模型与 ORM 就兼容了,只需在路径操作的 response_model 参数中声明它即可

 

orm_mode 的技术细节

  • SQLAlchemy 默认情况下 lazy loading 懒加载,即需要获取数据时,才会主动从数据库中获取对应的数据
  • 比如获取属性 current_user.items ,SQLAlchemy 会从 items 表中获取该用户的 item 数据,但在这之前不会主动获取

 

如果没有 orm_mode

  • 从路径操作中返回一个 SQLAlchemy 模型,它将不会包括关系数据(比如 user 中有 item,则不会返回 item,后面再讲实际的栗子)
  • 在 orm_mode 下,Pydantic 会尝试从属性访问它要的数据,可以声明要返回的特定数据,它甚至可以从 ORM 中获取它

 

curd.py 代码


作用

  • 主要用来编写与数据库交互的函数,增删改查,方便整个项目不同地方都能进行复用
  • 并且给这些函数添加专属的单元测试

 

实际代码

代码只实现了查询和创建

  1. 根据 id 查询 user
  2. 根据 email 查询 user
  3. 查询所有 user
  4. 创建 user
  5. 查询所有 item
  6. 创建 item


from sqlalchemy.orm import Session
from .models import User, Item
from .schemas import UserCreate, ItemCreate
# 根据 id 获取 user
def get_user(db: Session, user_id: int):
    return db.query(User).filter(User.id == user_id).first()
# 根据 email 获取 user
def get_user_by_email(db: Session, email: str):
    return db.query(User).filter(User.email == email).first()
# 获取所有 user
def get_users(db: Session, size: int = 0, limit: int = 100):
    return db.query(User).offset(size).limit(limit).all()
# 创建 user,user 类型是 Pydantic Model
def create_user(db: Session, user: UserCreate):
    fake_hashed_password = user.password + "superpolo"
    # 1、使用传进来的数据创建 SQLAlchemy Model 实例对象
    db_user = User(email=user.email, hashed_password=fake_hashed_password)
    # 2、将实例对象添加到数据库会话 Session 中
    db.add(db_user)
    # 3、将更改提交到数据库
    db.commit()
    # 4、刷新实例,方便它包含来自数据库的任何新数据,比如生成的 ID
    db.refresh(db_user)
    return db_user
# 获取所有 item
def get_items(db: Session, size: int = 0, limit: int = 100):
    return db.query(Item).offset(size).limit(limit).all()
# 创建 item,item 类型是 Pydantic Model
def create_item(db: Session, item: ItemCreate, user_id: int):
    db_item = Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item


create_user、create_item

函数内的操作步骤如下

# 1、使用传进来的数据创建 SQLAlchemy Model 实例对象
db_user = User(email=user.email, hashed_password=fake_hashed_password)
# 2、将实例对象添加到数据库会话 Session 中
db.add(db_user)
# 3、将更改提交到数据库
db.commit()
# 4、刷新实例,方便它包含来自数据库的任何新数据,比如生成的 ID
db.refresh(db_user)


main.py 代码

from typing import List
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, status, Path, Query, Body
from sqlalchemy.orm import Session
from models import Base
from schemas import User, UserCreate, ItemCreate, Item
from database import SessionLocal, engine
import curd
Base.metadata.create_all(bind=engine)
app = FastAPI()
# 依赖项,获取数据库会话对象
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
# 创建用户
@app.post("/users", response_model=User)
async def create_user(user: UserCreate, db: Session = Depends(get_db)):
    # 1、先查询用户是否有存在
    db_user = curd.get_user_by_email(db, user.email)
    if db_user:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="user 已存在")
    res_user = curd.create_user(db, user)
    return res_user
# 根据 user_id 获取用户
@app.get("/user_id/{user_id}", response_model=User)
async def get_user(user_id: int = Path(...), db: Session = Depends(get_db)):
    return curd.get_user(db, user_id)
# 根据 email 获取用户
@app.get("/user_email/{email}", response_model=User)
async def get_user_by_email(email: str = Path(...), db: Session = Depends(get_db)):
    return curd.get_user_by_email(db, email)
# 获取所有用户
@app.get("/users_all/", response_model=List[User])
async def get_users(skip: int = Query(0),
                    limit: int = Query(100),
                    db: Session = Depends(get_db)):
    return curd.get_users(db, skip, limit)
# 创建 item
@app.post("/users/{user_id}/items", response_model=Item)
async def get_user_item(user_id: int = Path(...), item: ItemCreate = Body(...), db: Session = Depends(get_db)):
    return curd.create_user_item(db, item, user_id)
# 获取所有 item
@app.get("/items/", response_model=List[Item])
async def get_items(skip: int = Query(0),
                    limit: int = Query(100),
                    db: Session = Depends(get_db)):
    return curd.get_items(db, skip, limit)
if __name__ == "__main__":
    uvicorn.run(app="main:app", host="127.0.0.1", port=8080, reload=True, debug=True)


依赖项

def get_db():

   db = SessionLocal()

   try:

       yield db

   finally:

       db.close()

  • 每个请求都有一个独立的数据库会话(SessionLocal)
  • 在请求完成后会自动关闭它
  • 然后下一个请求来的时候,会创建一个新会话

 

声明依赖项

async def create_user(user: UserCreate, db: Session = Depends(get_db))

  • SessionLocal 是 sessionmaker() 创建的,是 SQLAlchemy Session 的代理
  • 通过声明 db: Session ,IDE 就可以提供智能代码提示啦

 

使用中间件 middleware 代替依赖项声明数据库会话

# 中间件
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    # 默认响应
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        # 关闭数据库会话
        request.state.db.close()
    return response
# 依赖项,获取数据库会话对象
def get_db(request: Request):
    return request.state.db


request.state

  • request.state 是每个 Request 对象的一个属性
  • 它用于存储附加到请求本身的任意对象,例如本例中的数据库会话 db
  • 也就是说,我不叫 db,叫 sqlite_db 也可以,只是一个属性名

 

使用中间件 middleware 和使用 yield 的依赖项的区别

  • 中间件需要更多的代码,而且稍微有点复杂
  • 中间件必须是一个 async 函数,而且需要有 await 的代码,可能会阻塞程序并稍稍降低性能
  • 每个请求运行的时候都会先运行中间件,所以会为每个请求都创建一个数据库连接,即使某个请求的路径操作函数并不需要和数据库交互

 

建议

  • 创建数据库连接对象最好还是用带有 yield 的依赖项来完成
  • 在其他使用场景也是,能满足需求的前提下,最好用带有 yield 的依赖项来完成
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
6月前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之在执行语句时遇到语法错误,是由什么导致的
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。
|
6月前
|
SQL 关系型数据库 分布式数据库
PolarDB操作报错合集之在调用ModifySecurityIps修改白名单时遇到错误码,是什么导致的
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。
|
6月前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之在进行批量导出数据时,如何过滤掉视图并只导出表
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。
|
4月前
|
安全 关系型数据库 数据库
FastAPI数据库操作秘籍:如何通过高效且安全的数据库访问策略,使你的Web应用飞速运转并保持数据完整性?
【8月更文挑战第31天】在构建现代Web应用时,数据库操作至关重要。FastAPI不仅简化了API创建,还提供了高效数据库交互的方法。本文探讨如何在FastAPI中实现快速、安全的数据处理。FastAPI支持多种数据库,如SQLite、PostgreSQL和MySQL;选择合适的数据库可显著提升性能。通过安装相应驱动并配置连接参数,结合ORM库(如Tortoise-ORM或SQLAlchemy),可以简化数据库操作。使用索引、批量操作及异步处理等最佳实践可进一步提高效率。同时,确保使用参数化查询防止SQL注入,并从环境变量中读取敏感信息以增强安全性。
186 1
|
5月前
|
Oracle 关系型数据库 数据库
|
5月前
|
消息中间件 关系型数据库 数据库
实时计算 Flink版操作报错合集之在使用RDS数据库作为源端,遇到只能同步21个任务,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 存储 关系型数据库
|
6月前
|
运维 关系型数据库 分布式数据库
PolarDB产品使用问题之RENAME TABLE操作的速度与表的大小是否有关
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
6月前
|
网络协议 关系型数据库 分布式数据库
PolarDB操作报错合集之Quartz+polardb去执行定时任务报错,该怎么办
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。
|
6月前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之遇到报错:Nested transactions are not supported,该怎么办
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。
103 1