FastAPI(59)- 详解使用 OAuth2PasswordBearer + JWT 认证(下)

简介: FastAPI(59)- 详解使用 OAuth2PasswordBearer + JWT 认证(下)

修改获取 token 的路径操作函数


# OAuth2 获取 token 的请求路径
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 1、获取客户端传过来的用户名、密码
    username = form_data.username
    password = form_data.password
    # 2、验证用户
    user = authenticate_user(fake_users_db, username, password)
    if not user:
        # 3、验证失败,返回错误码
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 4、生成 token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    # 5、返回 JSON 响应
    return {"access_token": access_token, "token_type": "bearer"}


sub 的是什么?


  • JWT 规范中有一个 sub key,子健
  • 它是可选的,这里的作用是通过用户名设置用户标识
  • 子健应该在整个应用程序中具有唯一的标识符,并且它应该是一个字符串

 

完整的代码


#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
# author: 小菠萝测试笔记
# blog:  https://www.cnblogs.com/poloyy/
# time: 2021/10/6 12:05 下午
# file: 49_bearer.py
"""
from typing import Optional
import uvicorn
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta
# 导入 CryptContext
from passlib.context import CryptContext
# 导入 JWT 相关库
from jose import JWTError, jwt
# 常量池
# 通过 openssl rand -hex 32 生成的随机密钥
SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"
# 加密算法
ALGORITHM = "HS256"
# 过期时间,分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 模拟数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}
# 返回给客户端的 User Model,不需要包含密码
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None
# 继承 User,用于密码验证,所以要包含密码
class UserInDB(User):
    hashed_password: str
# 获取 token 路径操作函数的响应模型
class Token(BaseModel):
    access_token: str
    token_type: str
class TokenData(BaseModel):
    username: Optional[str] = None
# 实例对象池
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto")
# 密码加密
def hash_password(password: str) -> str:
    return pwd_context.hash(password)
# 验证密码
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)
# 模拟从数据库中根据用户名查找用户
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
# 根据用户名、密码来验证用户
def authenticate_user(db, username: str, password: str):
    # 1、通过用户名模拟去数据库查找用户
    user = get_user(db, username)
    if not user:
        # 2、用户不存在
        return False
    if not verify_password(password, user.hashed_password):
        # 3、密码验证失败
        return False
    # 4、验证通过,返回用户信息
    return user
# 用户名、密码验证成功后,生成 token
def create_access_token(
        data: dict,
        expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    # 加密
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt
# OAuth2 获取 token 的请求路径
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 1、获取客户端传过来的用户名、密码
    username = form_data.username
    password = form_data.password
    # 2、验证用户
    user = authenticate_user(fake_users_db, username, password)
    if not user:
        # 3、验证失败,返回错误码
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 4、生成 token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    # 5、返回 JSON 响应
    return {"access_token": access_token, "token_type": "bearer"}
# 根据当前用户的 token 获取用户,token 已失效则返回错误码
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 1、解码收到的 token
        payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
        # 2、拿到 username
        username: str = payload.get("sub")
        if not username:
            # 3、若 token 失效,则返回错误码
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    # 4、获取用户
    user = get_user(fake_users_db, username=token_data.username)
    if not user:
        raise credentials_exception
    # 5、返回用户
    return user
# 判断用户是否活跃,活跃则返回,不活跃则返回错误码
async def get_current_active_user(user: User = Depends(get_current_user)):
    if user.disabled:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid User")
    return user
# 获取当前用户信息
@app.get("/user/me")
async def read_user(user: User = Depends(get_current_active_user)):
    return user
# 正常的请求
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}
if __name__ == '__main__':
    uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)


请求结果

image.png

image.png

相关文章
|
4天前
|
安全 Java Spring
Spring Security+jwt实现认证
Spring Security+jwt实现认证
|
4天前
|
JSON JavaScript 数据格式
jwt-auth插件实现了基于JWT(JSON Web Tokens)进行认证鉴权的功能。
jwt-auth插件实现了基于JWT(JSON Web Tokens)进行认证鉴权的功能。
49 1
|
4天前
|
安全 数据安全/隐私保护
Springboot+Spring security +jwt认证+动态授权
Springboot+Spring security +jwt认证+动态授权
|
4天前
|
JSON SpringCloudAlibaba Cloud Native
SpringCloudAlibaba:4.3云原生网关higress的JWT 认证
SpringCloudAlibaba:4.3云原生网关higress的JWT 认证
15 1
|
4天前
|
运维 Serverless API
Serverless 应用引擎产品使用之阿里函数计算中要关掉http触发器的jwt认证才可以进行性能探测如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
16 1
|
4天前
|
JSON 安全 API
【专栏】四种REST API身份验证方法:基本认证、OAuth、JSON Web Token(JWT)和API密钥
【4月更文挑战第28天】本文探讨了四种REST API身份验证方法:基本认证、OAuth、JSON Web Token(JWT)和API密钥。基本认证简单但不安全;OAuth适用于授权第三方应用;JWT提供安全的身份验证信息传递;API密钥适合内部使用。选择方法时需平衡安全性、用户体验和开发复杂性。
|
4天前
|
负载均衡 Cloud Native 安全
云原生最佳实践系列 6:MSE 云原生网关使用 JWT 进行认证鉴权
本文档介绍了如何在 MSE(Microservices Engine)云原生网关中集成JWT进行全局认证鉴权。
|
4天前
|
JSON JavaScript 数据格式
jwt-auth插件实现了基于JWT(JSON Web Tokens)进行认证鉴权的功能
jwt-auth插件实现了基于JWT(JSON Web Tokens)进行认证鉴权的功能
40 1
|
4天前
|
JSON 安全 API
在Python Web开发过程中:请简要介绍OAuth和JWT两种认证方式,并说明何时使用它们。
OAuth是开放授权协议,用于第三方应用安全访问用户资源;JWT是JSON格式的安全令牌,用于传递身份和权限。OAuth适合第三方登录和API访问,JWT适用于单点登录和分布式系统中的身份验证。选择取决于应用场景,两者都需确保安全实施,如加密和签名验证。
|
4天前
|
安全 Java 数据库
SpringSecurity+JWT前后端分离架构登录认证
在SpringSecurity实现前后端分离登录token认证详解_springsecurity前后端分离登录认证-CSDN博客基础上进行重构,实现前后端分离架构登录认证,基本思想相同,借鉴开源Gitee代码进行改造,具有更好的代码规范。
186 1