FastAPI(58)- 使用 OAuth2PasswordBearer 的简单栗子 (下)

简介: FastAPI(58)- 使用 OAuth2PasswordBearer 的简单栗子 (下)

上述代码的问题

还没有获取 token 的路径操作

 

完善 OAuth2


#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
# author: 小菠萝测试笔记
# blog:  https://www.cnblogs.com/poloyy/
# time: 2021/10/6 12:05 下午
# file: 49_bearer.py
"""
from typing import Optional
import uvicorn
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
# 模拟数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 模拟 hash 加密算法
def fake_hash_password(password: str) -> str:
    return "fakehashed" + password
# 返回给客户端的 User Model,不需要包含密码
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None
# 继承 User,用于密码验证,所以要包含密码
class UserInDB(User):
    hashed_password: str
# OAuth2 获取 token 的请求路径
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 1、获取客户端传过来的用户名、密码
    username = form_data.username
    password = form_data.password
    # 2、模拟从数据库中根据用户名查找对应的用户
    user_dict = fake_users_db.get(username)
    if not user_dict:
        # 3、若没有找到用户则返回错误码
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名或密码不正确")
    # 4、找到用户
    user = UserInDB(**user_dict)
    # 5、将传进来的密码模拟 hash 加密
    hashed_password = fake_hash_password(password)
    # 6、如果 hash 后的密码和数据库中存储的密码不相等,则返回错误码
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名或密码不正确")
    # 7、用户名、密码验证通过后,返回一个 JSON
    return {"access_token": user.username, "token_type": "bearer"}
# 模拟从数据库中根据用户名查找用户
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
# 模拟验证 token,验证通过则返回对应的用户信息
def fake_decode_token(token):
    user = get_user(fake_users_db, token)
    return user
# 根据当前用户的 token 获取用户,token 已失效则返回错误码
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user
# 判断用户是否活跃,活跃则返回,不活跃则返回错误码
async def get_current_active_user(user: User = Depends(get_current_user)):
    if user.disabled:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid User")
    return user
# 获取当前用户信息
@app.get("/user/me")
async def read_user(user: User = Depends(get_current_active_user)):
    return user
# 正常的请求
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}
if __name__ == '__main__':
    uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)


/token 路径操作函数的响应

# 7、用户名、密码验证通过后,返回一个 JSON

return {"access_token": user.username, "token_type": "bearer"}

  • 获取 token 的接口的响应必须是一个 JSON 对象(返回一个 dict 即可)
  • 它应该有一个 token_type,当使用 Bearer toklen 时,令牌类型应该是 bearer
  • 它应该有一个 access_token,一个包含访问 token 的字符串
  • 对于上面简单的例子,返回的 token 是用户名,这是不安全,只是作为栗子好理解一点

 

返回 401 的HTTPException

# 根据当前用户的 token 获取用户,token 已失效则返回错误码
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


  • 任何 HTTP(错误)状态码为 401 UNAUTHORIZED 都应该返回 WWW-Authenticate 的 Header
  • 在此处返回的带有值 Bearer 的 WWW-Authenticate Header 也是 OAuth2 规范的一部分
  • 在 Beaer token 的情况下,该值应该是 Bearer
  • 当然,这并不是必须的,但建议符合规范

 

查看 Swagger API Authorize

image.png

image.png


验证通过

 

请求 /user/me 的结果


image.png


请求头带上了 'Authorization: Bearer johndoe'

 

logout 后再次请求,查看结果

image.png


logout 之后,请求头没有 'Authorization: Bearer johndoe' 所以验证就失败啦

 

验证一个不活跃的用户

authenticate 表单填入

  • username:alice
  • password:secret2

请求 /users/me

得到的响应

{

 "detail": "Inactive user"

}

 

存在的问题

目前的 token 和验证方式并不安全,下一篇中将介绍 JWT token

相关文章
|
4天前
|
JSON 缓存 前端开发
验证码demo(简单实现)
验证码demo(简单实现)
16 0
|
4天前
|
缓存 NoSQL 前端开发
一文搞懂Go整合captcha实现验证码功能
一文搞懂Go整合captcha实现验证码功能
30 0
eggjs 怎么使用 egg-jwt 实现 token 解析?
eggjs 怎么使用 egg-jwt 实现 token 解析?
167 0
eggjs 怎么使用 egg-jwt 实现 token 解析?
|
存储 前端开发 API
FastAPI(58)- 使用 OAuth2PasswordBearer 的简单栗子 (上)
FastAPI(58)- 使用 OAuth2PasswordBearer 的简单栗子 (上)
335 0
FastAPI(58)- 使用 OAuth2PasswordBearer 的简单栗子 (上)
FastAPI(7)- 详解 Path(上)
FastAPI(7)- 详解 Path(上)
213 0
FastAPI(7)- 详解 Path(上)
|
JSON 数据库 数据格式
FastAPI(46)- JSONResponse
FastAPI(46)- JSONResponse
297 0
FastAPI(46)- JSONResponse
|
JSON JavaScript API
FastAPI(1)- 简单介绍
FastAPI(1)- 简单介绍
274 0
FastAPI(15)- 声明请求示例数据(下)
FastAPI(15)- 声明请求示例数据(下)
188 0
FastAPI(15)- 声明请求示例数据(下)
|
JSON API 数据格式
FastAPI(15)- 声明请求示例数据(上)
FastAPI(15)- 声明请求示例数据(上)
183 0
FastAPI(15)- 声明请求示例数据(上)
|
安全 Linux 测试技术
FastAPI(57)- 安全相关的概念
FastAPI(57)- 安全相关的概念
108 0