我们之前登录认证的一些内容都直接写入到代码中,我们现在统一的给放到config文件中。
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
我们需要把uer.py修改
from config import * #对应的方法 def create_access_token(data: dict): to_encode = data.copy() encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_cure_user(request: Request, token: Optional[str] = Header(...)) -> UserBase: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="验证失败" ) credentials_FOR_exception = HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="用户未登录或者登陆token已经失效" ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception useris = await request.app.state.redis.get(username) if not useris and useris!=token: raise credentials_FOR_exception user = UserBase(email=username) return user except JWTError: raise credentials_exception
在存储redis的地方也需要修改
request.app.state.redis.set(user.email, token, expire=ACCESS_TOKEN_EXPIRE_MINUTES * 60)
我们把main.py的redis相关的也放在配置里
#config.py redishost='127.0.0.1' redisport='6379' redisdb='0'
main.py修改
async def get_redis_pool() -> Redis: redis = await create_redis_pool(f"redis://:@"+redishost+":"+redisport+"/"+redisport+"?encoding=utf-8") return redis
调整之前的websocket和file相关的代码。统一放在routers文件下,创建websoocket.py。
from fastapi import APIRouter,WebSocket,Depends,WebSocketDisconnect from typing import List, Dict from routers.user import get_cure_user socketRouter = APIRouter() class ConnectionManager: def __init__(self): # 存放**的链接 self.active_connections: List[Dict[str, WebSocket]] = [] async def connect(self, user: str, ws: WebSocket): # 链接 await ws.accept() self.active_connections.append({"user": user, "ws": ws}) def disconnect(self, user: str, ws: WebSocket): # 关闭时 移除ws对象 self.active_connections.remove({"user": user, "ws": ws}) async def send_other_message_json(self, message: dict, user: str): # 发送个人消息 for connection in self.active_connections: if connection["user"] == user: await connection['ws'].send_json(message) async def broadcast_json(self, data: dict): # 广播消息 for connection in self.active_connections: await connection['ws'].send_json(data) @staticmethod async def send_personal_message(message: str, ws: WebSocket): # 发送个人消息 await ws.send_text(message) async def send_other_message(self, message: str, user: str): # 发送个人消息 for connection in self.active_connections: if connection["user"] == user: await connection['ws'].send_text(message) async def broadcast(self, data: str): # 广播消息 for connection in self.active_connections: await connection['ws'].send_text(data) manager = ConnectionManager() @socketRouter.websocket("/ws/{user}/") async def websocket_many_point( websocket: WebSocket, user: str, cookie_or_token: str = Depends(get_cure_user), ): await manager.connect(user, websocket) try: while True: data = await websocket.receive_json() senduser = data['username'] if senduser: await manager.send_other_message_json(data, senduser) else: await manager.broadcast_json(data) except WebSocketDisconnect as e: manager.disconnect(user, websocket)
对于file相关的也放在files文件下
from fastapi import APIRouter, File, UploadFile from typing import List fileRouter = APIRouter() @fileRouter.post("/uploadfile/") async def upload_file(fileS: List[UploadFile] = File(...)): return {"filenames": [file.filename for file in fileS]}
然后我们在main.py优化
from routers.websoocket import socketRouter from routers.file import fileRouter #我们在最后注册这个router app.include_router(socketRouter, prefix="/socket", tags=['socket']) app.include_router(fileRouter, prefix="/file", tags=['file'])
调整完整后,我们的接口更加清晰了。我们启动下,看我们先有的接口。
代码存储 https://gitee.com/liwanlei/fastapistuday