first commit
This commit is contained in:
134
app/services/TokenAuthService.py
Normal file
134
app/services/TokenAuthService.py
Normal file
@ -0,0 +1,134 @@
|
||||
import datetime
|
||||
from functools import wraps
|
||||
|
||||
from flask import g, jsonify, request
|
||||
|
||||
import jwt
|
||||
from app.configs.default import SECRET_KEY
|
||||
from flask_httpauth import HTTPTokenAuth, HTTPAuth
|
||||
|
||||
# 生成token,有效时间为 60*60 秒
|
||||
from app.exts import db
|
||||
|
||||
|
||||
def generate_auth_token(user_name, expiration=3600):
|
||||
reset_token = jwt.encode(
|
||||
{
|
||||
"user_name": user_name,
|
||||
"exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=expiration)
|
||||
},
|
||||
SECRET_KEY,
|
||||
algorithm="HS256"
|
||||
)
|
||||
return reset_token
|
||||
|
||||
|
||||
# 重写 HTTPTokenAuth
|
||||
class HTTPTokenAuthReReturn(HTTPTokenAuth):
|
||||
def __init__(self, scheme=None, realm=None, header=None):
|
||||
super().__init__(scheme, realm, header)
|
||||
|
||||
# 重写default_auth_error, 或者也可以重新定义一个函数
|
||||
# def default_auth_error(status):
|
||||
# return jsonify(data={}, message="token 错误!", code=status), status
|
||||
|
||||
def default_auth_error(status, message):
|
||||
return jsonify(data={}, message=message, code=status), status
|
||||
|
||||
# 如果重新定义函数的话,这里就传入新定义的函数名
|
||||
super().error_handler(default_auth_error)
|
||||
# 重写 login_required
|
||||
|
||||
def login_required(self, f=None, role=None, optional=None):
|
||||
if f is not None and \
|
||||
(role is not None or optional is not None): # pragma: no cover
|
||||
raise ValueError(
|
||||
'role and optional are the only supported arguments')
|
||||
|
||||
def login_required_internal(f):
|
||||
@wraps(f)
|
||||
def decorated(*args, **kwargs):
|
||||
auth = self.get_auth()
|
||||
|
||||
if request.method != 'OPTIONS':
|
||||
password = self.get_auth_password(auth)
|
||||
|
||||
status = None # 添加状态信息
|
||||
message = None
|
||||
user = self.authenticate(auth, password)
|
||||
# 这里判断verify_token的返回值是否是这里的一员
|
||||
if user in (False, None, 'BadSignature', 'SignatureExpired'):
|
||||
status = 401
|
||||
if user == 'BadSignature':
|
||||
message = "Bad Signature"
|
||||
elif user == 'SignatureExpired':
|
||||
message = "Signature Expired"
|
||||
elif not self.authorize(role, user, auth):
|
||||
status = 403
|
||||
message = "Forbidden"
|
||||
if not optional and status:
|
||||
# Clear TCP receive buffer of any pending data
|
||||
request.data
|
||||
try:
|
||||
# 因为之前重写了default_auth_error所以多传入一个message
|
||||
return self.auth_error_callback(status, message)
|
||||
except TypeError:
|
||||
return self.auth_error_callback()
|
||||
|
||||
g.flask_httpauth_user = user if user is not True \
|
||||
else auth.username if auth else None
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return decorated
|
||||
|
||||
if f:
|
||||
return login_required_internal(f)
|
||||
return login_required_internal
|
||||
|
||||
|
||||
auth = HTTPTokenAuthReReturn(scheme="Bearer")
|
||||
|
||||
|
||||
# 获取用户权限
|
||||
@auth.get_user_roles
|
||||
def get_user_roles(user):
|
||||
the_role = get_role_token(user["token"])
|
||||
return the_role.role_key
|
||||
|
||||
|
||||
# 验证token
|
||||
@auth.verify_token
|
||||
def verify_token(token):
|
||||
try:
|
||||
print(token)
|
||||
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
|
||||
print(data)
|
||||
except jwt.ExpiredSignatureError:
|
||||
print("token过期")
|
||||
# 这里不用False,而是用自定义字符串
|
||||
return "SignatureExpired"
|
||||
except jwt.PyJWTError:
|
||||
print("token错误")
|
||||
# 这里不用False,而是用自定义字符串
|
||||
return "BadSignature"
|
||||
return True
|
||||
|
||||
|
||||
# 解析token不加密部分
|
||||
def token_parse(token):
|
||||
the_token = str.replace(str(token), 'Bearer ', '')
|
||||
decoded = jwt.decode(the_token, options={"verify_signature": False})
|
||||
return decoded
|
||||
|
||||
|
||||
# 根据 username 获取角色
|
||||
def get_role_username(username: str):
|
||||
return []
|
||||
|
||||
|
||||
# 根据 username 获取角色
|
||||
def get_role_token(token: str):
|
||||
user_info = token_parse(token)
|
||||
username = user_info["user_name"]
|
||||
the_role = get_role_username(username)
|
||||
return the_role
|
Reference in New Issue
Block a user