接上一篇文章FastAPI(六十六)实战开发《在线课程学习系统》接口开发--用户注册接口开发。这次我们分享实际开发--用户登陆接口开发。
我们先来梳理下逻辑
1.查询用户是否存在 2.校验密码是否正确 3.密码校验失败记录失败次数 4.失败次数大于10次,当天不能登陆 5.密码校验通过产生对应的token返回
接着我们去设计pydantic,用于校验用户登陆
class UserLogin(UserBase): password: str
这里我们继承的是之前的UserBase。
对应操作数据库的curd我们用之前注册的时候使用的get_user_username即可。
我们把密码输入失败和token放在redis中,那么redis对应的配置。
config.py配置 redishost='127.0.0.1' redisport='6379' redisdb=0
我们在main.py增加配置
from fastapi import FastAPI from aioredis import create_redis_pool, Redis from routers.user import usersRouter from routers.websoocket import socketRouter from routers.file import fileRouter from config import * app = FastAPI() async def get_redis_pool() -> Redis: redis = await create_redis_pool(f"redis://:@"+redishost+":"+redisport+"/"+redisdb+"?encoding=utf-8") return redis @app.on_event("startup") async def startup_event(): app.state.redis = await get_redis_pool() @app.on_event("shutdown") async def shutdown_event(): app.state.redis.close() await app.state.redis.wait_closed()
我们把产生token的配置也一并配置进去
#config.py SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
那么产生token的代码如何实现呢。
from jose import JWTError, jwt #routers/user.py def create_access_token(data: dict): to_encode = data.copy() encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
接下来我们就是去根据逻辑去实现最后的代码了。
@usersRouter.post("/login", response_model=UsersToken) async def login(request: Request, user: UserCreate, db: Session = Depends(get_db)): db_crest = get_user_username(db, user.username) if not db_crest: logger.info("login:"+user.username+"不存在") return reponse(code=100205,message='用户不存在',data="") verifypassowrd = verify_password(user.password, db_crest.password) if verifypassowrd: useris = await request.app.state.redis.get(user.username) if not useris: try: token = create_access_token(data={"sub": user.username}) except Exception as e: logger.exception(e) return reponse(code=100203,message='产生token失败',data='') request.app.state.redis.set(user.username, token, expire=ACCESS_TOKEN_EXPIRE_MINUTES * 60) return reponse(code=200,message='成功',data={"token":token}) return reponse(code=100202,message='重复登陆',data='') else: result=await request.app.state.redis.hgetall(user.username+"_password", encoding='utf8') if not result: times = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") request.app.state.redis.hmset_dict(user.username+"_password",num=0,time=times) else: errornum=int(result['num']) numtime=(datetime.now() - datetime.strptime(result['time'],'%Y-%m-%d %H:%M:%S')).seconds / 60 if errornum<10 and numtime<30: #更新错误次数 errornum += 1 request.app.state.redis.hmset_dict(user.username + "_password", num=errornum) return reponse(code=100206,data='',message='密码错误') elif errornum<10 and numtime>30: #次数置于1,时间设置现在时间 errornum=1 times = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") request.app.state.redis.hmset_dict(user.username + "_password", num=errornum,time=times) return reponse(code=100206,data='',message='密码错误') elif errornum>10 and numtime<30: #次数设置成最大,返回 errornum+=1 request.app.state.redis.hmset_dict(user.username + "_password", num=errornum) return reponse(code=100204,message='输入密码错误次数过多,账号暂时锁定,请30min再来登录',data='') else: errornum = 1 times = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") request.app.state.redis.hmset_dict(user.username + "_password", num=errornum, time=times) return reponse(code=100206, data='', message='密码错误')
我们按照最后的代码逻辑实现去完成。
一个完整的登陆接口就实现完毕了。