import datetime import jwt from app.common.redis_cli import redis_conn from fastapi import Request # 过期时间,单位S exp = 6000 # 加密秘钥 secret_key = 'syg15684712291' def generate_token(user_id: int, username: str): """ 根据用户id和用户名生成一个token :param user_id: 用户id :param username: 用户名 :return: token """ payload = { 'user_id': user_id, 'username': username, 'exp': datetime.datetime.utcnow() + datetime.timedelta(exp) } # 生成token token = jwt.encode(payload, secret_key, algorithm='HS256') return token def check_token(token: str): """ 验证token :param token: token :return: True or False """ try: decoded_payload = jwt.decode(token, secret_key, algorithms=['HS256']) user_id = decoded_payload['user_id'] token_redis = redis_conn.get(user_id) if token_redis is None: raise jwt.ExpiredSignatureError("Expired Token") if token_redis != token: raise jwt.ExpiredSignatureError("Invalid Token") return decoded_payload except jwt.ExpiredSignatureError: raise jwt.ExpiredSignatureError("Expired Token") except jwt.InvalidTokenError: raise jwt.InvalidTokenError("Invalid Token") def get_user_id(request: Request): """根据Request请求获取token""" token = request.headers.get("Authorization") decoded_payload = check_token(token) user_id = decoded_payload['user_id'] return user_id