59 lines
1.5 KiB
Python
59 lines
1.5 KiB
Python
import datetime
|
||
|
||
import jwt
|
||
from app.common.redis_cli import redis_conn
|
||
from fastapi import Request
|
||
|
||
# 过期时间,单位S
|
||
exp = 6000
|
||
|
||
# 加密秘钥
|
||
secret_key = 'syg15684712291'
|
||
|
||
|
||
def generate_token(user_id: int, username: str):
|
||
"""
|
||
根据用户id和用户名生成一个token
|
||
:param user_id: 用户id
|
||
:param username: 用户名
|
||
:return: token
|
||
"""
|
||
payload = {
|
||
'user_id': user_id,
|
||
'username': username,
|
||
'exp': datetime.datetime.utcnow() + datetime.timedelta(exp)
|
||
}
|
||
|
||
# 生成token
|
||
token = jwt.encode(payload, secret_key, algorithm='HS256')
|
||
|
||
return token
|
||
|
||
|
||
def check_token(token: str):
|
||
"""
|
||
验证token
|
||
:param token: token
|
||
:return: True or False
|
||
"""
|
||
try:
|
||
decoded_payload = jwt.decode(token, secret_key, algorithms=['HS256'])
|
||
user_id = decoded_payload['user_id']
|
||
token_redis = redis_conn.get(user_id)
|
||
if token_redis is None:
|
||
raise jwt.ExpiredSignatureError("Expired Token")
|
||
if token_redis != token:
|
||
raise jwt.ExpiredSignatureError("Invalid Token")
|
||
return decoded_payload
|
||
except jwt.ExpiredSignatureError:
|
||
raise jwt.ExpiredSignatureError("Expired Token")
|
||
except jwt.InvalidTokenError:
|
||
raise jwt.InvalidTokenError("Invalid Token")
|
||
|
||
|
||
def get_user_id(request: Request):
|
||
"""根据Request请求获取token"""
|
||
token = request.headers.get("Authorization")
|
||
decoded_payload = check_token(token)
|
||
user_id = decoded_payload['user_id']
|
||
return user_id |