上述代码的问题
还没有获取 token 的路径操作
完善 OAuth2
#!usr/bin/env python # -*- coding:utf-8 _*- """ # author: 小菠萝测试笔记 # blog: https://www.cnblogs.com/poloyy/ # time: 2021/10/6 12:05 下午 # file: 49_bearer.py """ from typing import Optional import uvicorn from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel # 模拟数据库 fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "fakehashedsecret", "disabled": False, }, "alice": { "username": "alice", "full_name": "Alice Wonderson", "email": "alice@example.com", "hashed_password": "fakehashedsecret2", "disabled": True, }, } app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # 模拟 hash 加密算法 def fake_hash_password(password: str) -> str: return "fakehashed" + password # 返回给客户端的 User Model,不需要包含密码 class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None # 继承 User,用于密码验证,所以要包含密码 class UserInDB(User): hashed_password: str # OAuth2 获取 token 的请求路径 @app.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): # 1、获取客户端传过来的用户名、密码 username = form_data.username password = form_data.password # 2、模拟从数据库中根据用户名查找对应的用户 user_dict = fake_users_db.get(username) if not user_dict: # 3、若没有找到用户则返回错误码 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名或密码不正确") # 4、找到用户 user = UserInDB(**user_dict) # 5、将传进来的密码模拟 hash 加密 hashed_password = fake_hash_password(password) # 6、如果 hash 后的密码和数据库中存储的密码不相等,则返回错误码 if not hashed_password == user.hashed_password: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名或密码不正确") # 7、用户名、密码验证通过后,返回一个 JSON return {"access_token": user.username, "token_type": "bearer"} # 模拟从数据库中根据用户名查找用户 def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) # 模拟验证 token,验证通过则返回对应的用户信息 def fake_decode_token(token): user = get_user(fake_users_db, token) return user # 根据当前用户的 token 获取用户,token 已失效则返回错误码 async def get_current_user(token: str = Depends(oauth2_scheme)): user = fake_decode_token(token) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) return user # 判断用户是否活跃,活跃则返回,不活跃则返回错误码 async def get_current_active_user(user: User = Depends(get_current_user)): if user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid User") return user # 获取当前用户信息 @app.get("/user/me") async def read_user(user: User = Depends(get_current_active_user)): return user # 正常的请求 @app.get("/items/") async def read_items(token: str = Depends(oauth2_scheme)): return {"token": token} if __name__ == '__main__': uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)
/token 路径操作函数的响应
# 7、用户名、密码验证通过后,返回一个 JSON
return {"access_token": user.username, "token_type": "bearer"}
- 获取 token 的接口的响应必须是一个 JSON 对象(返回一个 dict 即可)
- 它应该有一个 token_type,当使用 Bearer toklen 时,令牌类型应该是 bearer
- 它应该有一个 access_token,一个包含访问 token 的字符串
- 对于上面简单的例子,返回的 token 是用户名,这是不安全,只是作为栗子好理解一点
返回 401 的HTTPException
# 根据当前用户的 token 获取用户,token 已失效则返回错误码 async def get_current_user(token: str = Depends(oauth2_scheme)): user = fake_decode_token(token) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) return user
- 任何 HTTP(错误)状态码为 401 UNAUTHORIZED 都应该返回 WWW-Authenticate 的 Header
- 在此处返回的带有值 Bearer 的 WWW-Authenticate Header 也是 OAuth2 规范的一部分
- 在 Beaer token 的情况下,该值应该是 Bearer
- 当然,这并不是必须的,但建议符合规范
查看 Swagger API Authorize
验证通过
请求 /user/me 的结果
请求头带上了 'Authorization: Bearer johndoe'
logout 后再次请求,查看结果
logout 之后,请求头没有 'Authorization: Bearer johndoe' 所以验证就失败啦
验证一个不活跃的用户
authenticate 表单填入
- username:alice
- password:secret2
请求 /users/me
得到的响应
{
"detail": "Inactive user"
}
存在的问题
目前的 token 和验证方式并不安全,下一篇中将介绍 JWT token