FastAPI第四天
1. 多应用程序管理
当我们开发的时候,往往会涉及到大量的路由,如果将所有的路由都写在一个文件中,不利于我们对于某个路由以及其处理函数进行修改,更不利于整个项目后期的维护升级。而且一个文件中代码行数过大还会使得开发尤为不便,因此需要将路由进行分文件(模块化)处理。
相信讲到这里,之前有学习过flask框架的应该都有感觉,这不就是flask中的蓝图吗?没错,FastAPI中的APIRouter与蓝图非常相似,同样都是为了分文件编写路由,也都是需要最终到主文件中进行注册。话不多说,借助官网的例子来理解一下。
首先来看看项目的目录结构
main
是最终的主文件,也就是FastAPI实例化以及路由注册的地方routers
和internal
分别是两个路由组文件
为了简便化,像在routers
中有两个路由的情况,我们直接把APIRouter实例化在__init__
文件中,然后分别引入就好。
items.py
from ..routers import router from fastapi import HTTPException data={1:{"name":'aaa'},2:{"name":'bbb'}} @router.get("/items/",tags=['items']) async def read_data(): return data @router.get("/items/{item_id}",tags=["items"]) async def read_item(item_id:int): if item_id not in data: raise HTTPException(status_code=404,detail="Item not Found!") return {"item_id":item_id,"name":data[item_id]["name"]} @router.put("/items/{item_id}",tags=["custom"],responses={403:{"description":"Operation forbidden"}}) async def update_item(item_id:int): if item_id!=3: raise HTTPException( status_code=403, detail="You can only update the item:3" ) return {"item_id":item_id,"name":"ccc"} 复制代码
users.py
from ..routers import router @router.get("/users/",tags=["users"]) async def read_users(): return [ {"username":"aaa"}, {"username":"bbb"} ] @router.get("/users/{username}",tags=['users']) async def read_user(username:str): return {"username":username} 复制代码
admin.py
from fastapi import APIRouter router=APIRouter() @router.post("/") async def update_admin(): return {"message":"Admin change"} 复制代码
main.py
from fastapi import FastAPI from .routers import items,users from .internal import admin app=FastAPI() app.include_router(users.router) app.include_router(items.router) app.include_router( admin.router, prefix="/admin", tags=['admin'], ) @app.get("/") async def root(): return {"message":"hello"} 复制代码
这里稍微总结一下有关的知识
prefix
代表前缀,比如prefix=/index
,那么后面如果是router.get("aaa")
请求的实际上是/index/aaa
tags
是标签,也就是Swagger中的名字
- 编写好
router
后不要忘了使用.include_router()
将写好的进行注册
2. 数据库相关
我们除了将数据存储到文件,其实更多的就是将数据存储到数据库中,这样更利于对较大量数据进行管理(增删改查),所以下面就来看看与数据库相关的操作。
首先是数据库连接部分
import sqlalchemy from databases import Database DATABASE_URL="sqlite:///./test.db" database=Database(DATABASE_URL) sqlalchemy_engine=sqlalchemy.create_engine(DATABASE_URL) def get_database()->Database: return database 复制代码
然后再来定义数据模型
from datetime import datetime from typing import Optional import sqlalchemy from pydantic import BaseModel, Field class PostBase(BaseModel): title: str content: str publication_date: datetime = Field(default_factory=datetime.now) class PostPartialUpdate(BaseModel): title: Optional[str] = None content: Optional[str] = None class PostCreate(PostBase): pass class PostDB(PostBase): id: int metadata = sqlalchemy.MetaData() posts=sqlalchemy.Table( "posts", metadata, sqlalchemy.Column("id",sqlalchemy.Integer,primary_key=True,autoincrement=True), sqlalchemy.Column("publication_date",sqlalchemy.DateTime(),nullable=False), sqlalchemy.Column("title",sqlalchemy.String(length=255),nullable=False), sqlalchemy.Column("content",sqlalchemy.Text(),nullable=False), ) 复制代码
首先定义了数据库的Pydantic模型PostBase
,然后定义数据库的提交表格式
这里就类似于创建数据库表的时候定义字段一样,有类型、主键、自增、是否可以为空等等,这部分内容就更偏向于数据库基础,btw过段时间也准备重新学习一遍Mysql。
有了数据库连接以及定义之后,就可以创建增删改查的接口了
from typing import List, Tuple from databases import Database from fastapi import Depends, FastAPI, HTTPException, Query, status from database import get_database, sqlalchemy_engine from models import ( metadata, posts, PostDB, PostCreate, PostPartialUpdate, ) app = FastAPI() @app.on_event("startup") async def startup(): await get_database().connect() metadata.create_all(sqlalchemy_engine) @app.on_event("shutdown") async def shutdown(): await get_database().disconnect() async def pagination( skip: int = Query(0, ge=0), limit: int = Query(10, ge=0), ) -> Tuple[int, int]: capped_limit = min(100, limit) return (skip, capped_limit) async def get_post_or_404( id: int, database: Database = Depends(get_database) ) -> PostDB: select_query = posts.select().where(posts.c.id == id) raw_post = await database.fetch_one(select_query) if raw_post is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) return PostDB(**raw_post) @app.get("/posts") async def list_posts( pagination: Tuple[int, int] = Depends(pagination), database: Database = Depends(get_database), ) -> List[PostDB]: skip, limit = pagination select_query = posts.select().offset(skip).limit(limit) rows = await database.fetch_all(select_query) results = [PostDB(**row) for row in rows] return results @app.get("/posts/{id}", response_model=PostDB) async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB: return post @app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED) async def create_post( post: PostCreate, database: Database = Depends(get_database) ) -> PostDB: insert_query = posts.insert().values(post.dict()) post_id = await database.execute(insert_query) post_db = await get_post_or_404(post_id, database) return post_db @app.patch("/posts/{id}", response_model=PostDB) async def update_post( post_update: PostPartialUpdate, post: PostDB = Depends(get_post_or_404), database: Database = Depends(get_database), ) -> PostDB: update_query = ( posts.update() .where(posts.c.id == post.id) .values(post_update.dict(exclude_unset=True)) ) post_id = await database.execute(update_query) post_db = await get_post_or_404(post_id, database) return post_db @app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_post( post: PostDB = Depends(get_post_or_404), database: Database = Depends(get_database) ): delete_query = posts.delete().where(posts.c.id == post.id) await database.execute(delete_query) 复制代码
在开始,我们设置了两个事件
在我们需要的时候引入依赖函数,得到数据库的连接或者关闭数据库连接。然后在路由中设置增删改查的操作
insert_query = posts.insert().values(post.dict()) post_id = await database.execute(insert_query) post_db = await get_post_or_404(post_id, database) 复制代码
插入语句就是先设置好插入请求语句,然后执行这条语句。
其他的也是类似,这样我们就能实现基本的数据库增删改查操作。