FastAPI(59)- 详解使用 OAuth2PasswordBearer + JWT 认证(下)

简介: FastAPI(59)- 详解使用 OAuth2PasswordBearer + JWT 认证(下)

修改获取 token 的路径操作函数


# OAuth2 获取 token 的请求路径
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 1、获取客户端传过来的用户名、密码
    username = form_data.username
    password = form_data.password
    # 2、验证用户
    user = authenticate_user(fake_users_db, username, password)
    if not user:
        # 3、验证失败,返回错误码
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 4、生成 token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    # 5、返回 JSON 响应
    return {"access_token": access_token, "token_type": "bearer"}


sub 的是什么?


  • JWT 规范中有一个 sub key,子健
  • 它是可选的,这里的作用是通过用户名设置用户标识
  • 子健应该在整个应用程序中具有唯一的标识符,并且它应该是一个字符串

 

完整的代码


#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
# author: 小菠萝测试笔记
# blog:  https://www.cnblogs.com/poloyy/
# time: 2021/10/6 12:05 下午
# file: 49_bearer.py
"""
from typing import Optional
import uvicorn
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta
# 导入 CryptContext
from passlib.context import CryptContext
# 导入 JWT 相关库
from jose import JWTError, jwt
# 常量池
# 通过 openssl rand -hex 32 生成的随机密钥
SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"
# 加密算法
ALGORITHM = "HS256"
# 过期时间,分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 模拟数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}
# 返回给客户端的 User Model,不需要包含密码
class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None
# 继承 User,用于密码验证,所以要包含密码
class UserInDB(User):
    hashed_password: str
# 获取 token 路径操作函数的响应模型
class Token(BaseModel):
    access_token: str
    token_type: str
class TokenData(BaseModel):
    username: Optional[str] = None
# 实例对象池
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto")
# 密码加密
def hash_password(password: str) -> str:
    return pwd_context.hash(password)
# 验证密码
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)
# 模拟从数据库中根据用户名查找用户
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
# 根据用户名、密码来验证用户
def authenticate_user(db, username: str, password: str):
    # 1、通过用户名模拟去数据库查找用户
    user = get_user(db, username)
    if not user:
        # 2、用户不存在
        return False
    if not verify_password(password, user.hashed_password):
        # 3、密码验证失败
        return False
    # 4、验证通过,返回用户信息
    return user
# 用户名、密码验证成功后,生成 token
def create_access_token(
        data: dict,
        expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    # 加密
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt
# OAuth2 获取 token 的请求路径
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 1、获取客户端传过来的用户名、密码
    username = form_data.username
    password = form_data.password
    # 2、验证用户
    user = authenticate_user(fake_users_db, username, password)
    if not user:
        # 3、验证失败,返回错误码
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    # 4、生成 token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    # 5、返回 JSON 响应
    return {"access_token": access_token, "token_type": "bearer"}
# 根据当前用户的 token 获取用户,token 已失效则返回错误码
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 1、解码收到的 token
        payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
        # 2、拿到 username
        username: str = payload.get("sub")
        if not username:
            # 3、若 token 失效,则返回错误码
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    # 4、获取用户
    user = get_user(fake_users_db, username=token_data.username)
    if not user:
        raise credentials_exception
    # 5、返回用户
    return user
# 判断用户是否活跃,活跃则返回,不活跃则返回错误码
async def get_current_active_user(user: User = Depends(get_current_user)):
    if user.disabled:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid User")
    return user
# 获取当前用户信息
@app.get("/user/me")
async def read_user(user: User = Depends(get_current_active_user)):
    return user
# 正常的请求
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}
if __name__ == '__main__':
    uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)


请求结果

image.png

image.png

相关文章
|
5月前
|
SQL Java 测试技术
在Spring boot中 使用JWT和过滤器实现登录认证
在Spring boot中 使用JWT和过滤器实现登录认证
293 0
|
3月前
|
JSON 安全 数据安全/隐私保护
Python认证新风尚:OAuth遇上JWT,安全界的时尚Icon👗
【10月更文挑战第2天】在当今互联网世界中,数据安全与隐私保护日益重要。Python 作为广泛应用于 Web 开发的语言,其认证机制也不断进化。OAuth 2.0 和 JSON Web Tokens (JWT) 成为当前最热门的安全认证方案,不仅保障数据安全传输,还简化了用户认证流程。本文将介绍 Python 如何结合 OAuth 2.0 和 JWT 打造安全高效的认证体系。
44 3
|
2月前
|
JSON 安全 算法
Spring Boot 应用如何实现 JWT 认证?
Spring Boot 应用如何实现 JWT 认证?
86 8
|
2月前
|
JSON 安全 数据安全/隐私保护
Python认证新风尚:OAuth遇上JWT,安全界的时尚Icon👗
在当今互联网世界中,数据安全和隐私保护至关重要。Python 作为 Web 开发的主流语言,其认证机制也在不断进步。OAuth 2.0 和 JSON Web Tokens (JWT) 是当前最热门的安全认证方案,不仅保障数据安全传输,还简化用户认证流程。本文介绍如何在 Python 中结合 OAuth 2.0 和 JWT,打造一套既安全又高效的认证体系。通过 Flask-HTTPAuth 和 PyJWT 等库,实现授权和验证功能,确保每次请求的安全性和便捷性。
49 3
|
2月前
|
JSON 算法 安全
JWT Bearer 认证在 .NET Core 中的应用
【10月更文挑战第30天】JWT(JSON Web Token)是一种开放标准,用于在各方之间安全传输信息。它由头部、载荷和签名三部分组成,用于在用户和服务器之间传递声明。JWT Bearer 认证是一种基于令牌的认证方式,客户端在请求头中包含 JWT 令牌,服务器验证令牌的有效性后授权用户访问资源。在 .NET Core 中,通过安装 `Microsoft.AspNetCore.Authentication.JwtBearer` 包并配置认证服务,可以实现 JWT Bearer 认证。具体步骤包括安装 NuGet 包、配置认证服务、启用认证中间件、生成 JWT 令牌以及在控制器中使用认证信息
132 2
|
4月前
|
安全 Java 数据安全/隐私保护
|
4月前
|
JSON 安全 数据安全/隐私保护
Python 安全性大揭秘:OAuth 与 JWT,不只是认证,更是信任的传递
【9月更文挑战第4天】在数字化时代,确保应用安全至关重要。Python 作为广泛使用的编程语言,提供了强大的安全认证工具,如 OAuth 和 JWT。OAuth 是一种授权框架,允许第三方应用在有限权限下访问用户资源;JWT 则是一种自包含的数据传输格式,用于安全地传递声明。通过合理配置和使用这些技术,可以有效提升应用安全性,保障用户数据安全。正确管理和定期更新密钥、严格测试 JWT 的生成与验证等最佳实践,对于构建安全可靠的应用至关重要。不断学习新威胁,是维护应用安全的永恒课题。
59 2
|
5月前
|
JSON 安全 数据安全/隐私保护
Python 安全性大揭秘:OAuth 与 JWT,不只是认证,更是信任的传递
【8月更文挑战第6天】在数字化时代,Python 通过 OAuth 和 JWT 筑牢应用安全防线。OAuth 是一种授权框架,允许第三方应用在用户授权下安全访问资源;JWT 则是一种自包含的声明传输格式,确保通信安全。两者结合使用,能有效进行身份验证及授权管理。然而,密钥管理和 JWT 有效期设置等仍是挑战,需谨慎处理以保障整体安全性。正确配置这些工具和技术,可为用户提供既安全又便捷的服务体验。
53 7
|
5月前
|
JSON 安全 数据安全/隐私保护
Python安全新篇章:OAuth与JWT携手,开启认证与授权的新时代
【8月更文挑战第6天】随着互联网应用的发展,安全认证与授权变得至关重要。本文介绍OAuth与JWT两种关键技术,并展示如何结合它们构建安全系统。OAuth允许用户授权第三方应用访问特定信息,无需分享登录凭证。JWT是一种自包含的信息传输格式,用于安全地传递信息。通过OAuth认证用户并获取JWT,可以验证用户身份并保护数据安全,为用户提供可靠的身份验证体验。
66 6
|
5月前
|
JSON 人工智能 算法
Golang 搭建 WebSocket 应用(四) - jwt 认证
Golang 搭建 WebSocket 应用(四) - jwt 认证
76 0

热门文章

最新文章