RODY/app/services/TokenAuthService.py
552068321@qq.com 6f7de660aa first commit
2022-11-04 17:37:08 +08:00

135 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
from functools import wraps
from flask import g, jsonify, request
import jwt
from app.configs.default import SECRET_KEY
from flask_httpauth import HTTPTokenAuth, HTTPAuth
# 生成token,有效时间为 60*60 秒
from app.exts import db
def generate_auth_token(user_name, expiration=3600):
reset_token = jwt.encode(
{
"user_name": user_name,
"exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=expiration)
},
SECRET_KEY,
algorithm="HS256"
)
return reset_token
# 重写 HTTPTokenAuth
class HTTPTokenAuthReReturn(HTTPTokenAuth):
def __init__(self, scheme=None, realm=None, header=None):
super().__init__(scheme, realm, header)
# 重写default_auth_error, 或者也可以重新定义一个函数
# def default_auth_error(status):
# return jsonify(data={}, message="token 错误!", code=status), status
def default_auth_error(status, message):
return jsonify(data={}, message=message, code=status), status
# 如果重新定义函数的话,这里就传入新定义的函数名
super().error_handler(default_auth_error)
# 重写 login_required
def login_required(self, f=None, role=None, optional=None):
if f is not None and \
(role is not None or optional is not None): # pragma: no cover
raise ValueError(
'role and optional are the only supported arguments')
def login_required_internal(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = self.get_auth()
if request.method != 'OPTIONS':
password = self.get_auth_password(auth)
status = None # 添加状态信息
message = None
user = self.authenticate(auth, password)
# 这里判断verify_token的返回值是否是这里的一员
if user in (False, None, 'BadSignature', 'SignatureExpired'):
status = 401
if user == 'BadSignature':
message = "Bad Signature"
elif user == 'SignatureExpired':
message = "Signature Expired"
elif not self.authorize(role, user, auth):
status = 403
message = "Forbidden"
if not optional and status:
# Clear TCP receive buffer of any pending data
request.data
try:
# 因为之前重写了default_auth_error所以多传入一个message
return self.auth_error_callback(status, message)
except TypeError:
return self.auth_error_callback()
g.flask_httpauth_user = user if user is not True \
else auth.username if auth else None
return f(*args, **kwargs)
return decorated
if f:
return login_required_internal(f)
return login_required_internal
auth = HTTPTokenAuthReReturn(scheme="Bearer")
# 获取用户权限
@auth.get_user_roles
def get_user_roles(user):
the_role = get_role_token(user["token"])
return the_role.role_key
# 验证token
@auth.verify_token
def verify_token(token):
try:
print(token)
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
print(data)
except jwt.ExpiredSignatureError:
print("token过期")
# 这里不用False而是用自定义字符串
return "SignatureExpired"
except jwt.PyJWTError:
print("token错误")
# 这里不用False而是用自定义字符串
return "BadSignature"
return True
# 解析token不加密部分
def token_parse(token):
the_token = str.replace(str(token), 'Bearer ', '')
decoded = jwt.decode(the_token, options={"verify_signature": False})
return decoded
# 根据 username 获取角色
def get_role_username(username: str):
return []
# 根据 username 获取角色
def get_role_token(token: str):
user_info = token_parse(token)
username = user_info["user_name"]
the_role = get_role_username(username)
return the_role