ItemBase、UserBase
基类,声明在创建或读取数据时共有的属性
ItemCreate、UserCreate
创建数据时使用的 Model
Item、User
读取数据时使用的 Model
orm_mode
class Config:
orm_mode = True
- 这是一个 Pydantic 配置项
- orm_mode 会告诉 Pydantic 模型读取数据,即使它不是字典,而是 ORM 模型(或任何其他具有属性的任意对象)
# 正常情况
id = data["id"]
# 还会尝试从对象获取属性
id = data.id
设置了 orm_mode,Pydantic 模型与 ORM 就兼容了,只需在路径操作的 response_model 参数中声明它即可
orm_mode 的技术细节
- SQLAlchemy 默认情况下 lazy loading 懒加载,即需要获取数据时,才会主动从数据库中获取对应的数据
- 比如获取属性 current_user.items ,SQLAlchemy 会从 items 表中获取该用户的 item 数据,但在这之前不会主动获取
如果没有 orm_mode
- 从路径操作中返回一个 SQLAlchemy 模型,它将不会包括关系数据(比如 user 中有 item,则不会返回 item,后面再讲实际的栗子)
- 在 orm_mode 下,Pydantic 会尝试从属性访问它要的数据,可以声明要返回的特定数据,它甚至可以从 ORM 中获取它
curd.py 代码
作用
- 主要用来编写与数据库交互的函数,增删改查,方便整个项目不同地方都能进行复用
- 并且给这些函数添加专属的单元测试
实际代码
代码只实现了查询和创建
- 根据 id 查询 user
- 根据 email 查询 user
- 查询所有 user
- 创建 user
- 查询所有 item
- 创建 item
from sqlalchemy.orm import Session from .models import User, Item from .schemas import UserCreate, ItemCreate # 根据 id 获取 user def get_user(db: Session, user_id: int): return db.query(User).filter(User.id == user_id).first() # 根据 email 获取 user def get_user_by_email(db: Session, email: str): return db.query(User).filter(User.email == email).first() # 获取所有 user def get_users(db: Session, size: int = 0, limit: int = 100): return db.query(User).offset(size).limit(limit).all() # 创建 user,user 类型是 Pydantic Model def create_user(db: Session, user: UserCreate): fake_hashed_password = user.password + "superpolo" # 1、使用传进来的数据创建 SQLAlchemy Model 实例对象 db_user = User(email=user.email, hashed_password=fake_hashed_password) # 2、将实例对象添加到数据库会话 Session 中 db.add(db_user) # 3、将更改提交到数据库 db.commit() # 4、刷新实例,方便它包含来自数据库的任何新数据,比如生成的 ID db.refresh(db_user) return db_user # 获取所有 item def get_items(db: Session, size: int = 0, limit: int = 100): return db.query(Item).offset(size).limit(limit).all() # 创建 item,item 类型是 Pydantic Model def create_item(db: Session, item: ItemCreate, user_id: int): db_item = Item(**item.dict(), owner_id=user_id) db.add(db_item) db.commit() db.refresh(db_item) return db_item
create_user、create_item
函数内的操作步骤如下
# 1、使用传进来的数据创建 SQLAlchemy Model 实例对象 db_user = User(email=user.email, hashed_password=fake_hashed_password) # 2、将实例对象添加到数据库会话 Session 中 db.add(db_user) # 3、将更改提交到数据库 db.commit() # 4、刷新实例,方便它包含来自数据库的任何新数据,比如生成的 ID db.refresh(db_user)
main.py 代码
from typing import List import uvicorn from fastapi import Depends, FastAPI, HTTPException, status, Path, Query, Body from sqlalchemy.orm import Session from models import Base from schemas import User, UserCreate, ItemCreate, Item from database import SessionLocal, engine import curd Base.metadata.create_all(bind=engine) app = FastAPI() # 依赖项,获取数据库会话对象 def get_db(): db = SessionLocal() try: yield db finally: db.close() # 创建用户 @app.post("/users", response_model=User) async def create_user(user: UserCreate, db: Session = Depends(get_db)): # 1、先查询用户是否有存在 db_user = curd.get_user_by_email(db, user.email) if db_user: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="user 已存在") res_user = curd.create_user(db, user) return res_user # 根据 user_id 获取用户 @app.get("/user_id/{user_id}", response_model=User) async def get_user(user_id: int = Path(...), db: Session = Depends(get_db)): return curd.get_user(db, user_id) # 根据 email 获取用户 @app.get("/user_email/{email}", response_model=User) async def get_user_by_email(email: str = Path(...), db: Session = Depends(get_db)): return curd.get_user_by_email(db, email) # 获取所有用户 @app.get("/users_all/", response_model=List[User]) async def get_users(skip: int = Query(0), limit: int = Query(100), db: Session = Depends(get_db)): return curd.get_users(db, skip, limit) # 创建 item @app.post("/users/{user_id}/items", response_model=Item) async def get_user_item(user_id: int = Path(...), item: ItemCreate = Body(...), db: Session = Depends(get_db)): return curd.create_user_item(db, item, user_id) # 获取所有 item @app.get("/items/", response_model=List[Item]) async def get_items(skip: int = Query(0), limit: int = Query(100), db: Session = Depends(get_db)): return curd.get_items(db, skip, limit) if __name__ == "__main__": uvicorn.run(app="main:app", host="127.0.0.1", port=8080, reload=True, debug=True)
依赖项
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
- 每个请求都有一个独立的数据库会话(SessionLocal)
- 在请求完成后会自动关闭它
- 然后下一个请求来的时候,会创建一个新会话
声明依赖项
async def create_user(user: UserCreate, db: Session = Depends(get_db))
- SessionLocal 是 sessionmaker() 创建的,是 SQLAlchemy Session 的代理
- 通过声明 db: Session ,IDE 就可以提供智能代码提示啦
使用中间件 middleware 代替依赖项声明数据库会话
# 中间件 @app.middleware("http") async def db_session_middleware(request: Request, call_next): # 默认响应 response = Response("Internal server error", status_code=500) try: request.state.db = SessionLocal() response = await call_next(request) finally: # 关闭数据库会话 request.state.db.close() return response # 依赖项,获取数据库会话对象 def get_db(request: Request): return request.state.db
request.state
- request.state 是每个 Request 对象的一个属性
- 它用于存储附加到请求本身的任意对象,例如本例中的数据库会话 db
- 也就是说,我不叫 db,叫 sqlite_db 也可以,只是一个属性名
使用中间件 middleware 和使用 yield 的依赖项的区别
- 中间件需要更多的代码,而且稍微有点复杂
- 中间件必须是一个 async 函数,而且需要有 await 的代码,可能会阻塞程序并稍稍降低性能
- 每个请求运行的时候都会先运行中间件,所以会为每个请求都创建一个数据库连接,即使某个请求的路径操作函数并不需要和数据库交互
建议
- 创建数据库连接对象最好还是用带有 yield 的依赖项来完成
- 在其他使用场景也是,能满足需求的前提下,最好用带有 yield 的依赖项来完成