之前我们的都是创建在一个文件中,但是我们在实际中,肯定不能这么设计,那么我们去创建一个目录,叫models。大致如下。
主要目录是
- __init__.py 是一个空文件,但是说明models是一个package
- crud.py 数据库操作相关
- database.py 数据库配置相关
- models.py 数据库模型表
- schemas.py 模型验证
- main.py 主文件
那么我们在crud.py目录修改如下
from sqlalchemy.orm import Session from models.models import * from models.schemas import * # 通过id查询用户 def get_user(db: Session, user_id: int): return db.query(User).filter(User.id == user_id).first() def get_user_emai(db:Session,email:str): return db.query(User).filter(User.email==email).first() # 新建用户 def db_create_user(db: Session, user: UserCreate): fake_hashed_password = user.password + "notreallyhashed" db_user = User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() # 提交保存到数据库中 db.refresh(db_user) # 刷新 return db_user def get_item(db: Session, skip: int = 0, limit: int = 100): return db.query(Item).offset(skip).limit(limit).all() def get_user_item(db:Session,userid:int): user=db.query(User).filter(User.id==userid).first() return db.query(Item).filter(Item.owner==user).offset(1).limit(1).all() # 新建用户的item def create_user_item(db: Session, item: ItemCreate, user_id: int): db_item = Item(**item.dict(), owner_id=user_id) db.add(db_item) db.commit() db.refresh(db_item) return db_item
database.py代码如下,
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base()
models.py代码如下
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String from sqlalchemy.orm import relationship from models.database import Base class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) email = Column(String, unique=True, index=True) hashed_password = Column(String) is_active = Column(Boolean, default=True) items = relationship("Item", back_populates="owner") class Item(Base): __tablename__ = "items" id = Column(Integer, primary_key=True, index=True) title = Column(String, index=True) description = Column(String, index=True) owner_id = Column(Integer, ForeignKey("users.id")) owner = relationship("User", back_populates="items")
schemas.py代码如下,定义请求参数模型验证与响应模型验证的Pydantic模型。
from pydantic import BaseModel from typing import List, Optional class ItemBase(BaseModel): title: str description: Optional[str] = None class ItemCreate(ItemBase): pass class Items(ItemBase): id: int owner_id: int class Config: orm_mode = True class UserBase(BaseModel): email: str class UserCreate(UserBase): """ 请求模型验证: email: password: """ password: str class Users(UserBase): """ 响应模型: id:email: is_active并且设置orm_mode与之兼容 """ id: int is_active: bool items: List[Items] = [] class Config: orm_mode = True
我们去改造下main.py
from fastapi import FastAPI, Depends, HTTPException from models.crud import * from models.database import * app = FastAPI() # Dependency def get_db(): """ 每一个请求处理完毕后会关闭当前连接,不同的请求使用不同的连接 :return: """ db = SessionLocal() try: yield db finally: db.close() # 新建用户 @app.post("/users/", response_model=Users) def create_user(user: UserCreate, db: Session = Depends(get_db)): db_crest = get_user_emai(db, user.email) if not db_crest: return db_create_user(db=db, user=user) raise HTTPException(status_code=200, detail="账号不能重复") @app.post("/user/item/{user_id}", response_model=List[Items]) def get_user_items(user_id: int, db: Session = Depends(get_db)): return get_user_item(db=db, userid=user_id) # 通过id查询用户 @app.get("/user/{user_id}", response_model=Users) def read_user(user_id: int, db: Session = Depends(get_db)): db_user = get_user(db, user_id=user_id) if not db_user: raise HTTPException(status_code=404, detail="用户查找不到") return db_user # 所有item @app.get("/items/", response_model=List[Items]) def read_items(skip: int = 0, limit: int = 0, db: Session = Depends(get_db)): items = get_item(db=db, skip=skip, limit=limit) return items # 创建用户的item @app.post("/users/{user_id}/items", response_model=Items) def create_item_user(user_id: int, item: ItemCreate, db: Session = Depends(get_db)): return create_user_item(db=db, item=item, user_id=user_id)
这样我们的目录调整了完毕。整体结构如下
我们目前是这么改造的。后续还会持续改造的。目前我们没有对API接口main文件进行改造,下面的分享我们会对api接口做改造。