import datetime from functools import wraps from flask import g, jsonify, request import jwt from app.configs.default import SECRET_KEY from flask_httpauth import HTTPTokenAuth, HTTPAuth # 生成token,有效时间为 60*60 秒 from app.exts import db def generate_auth_token(user_name, expiration=3600): reset_token = jwt.encode( { "user_name": user_name, "exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=expiration) }, SECRET_KEY, algorithm="HS256" ) return reset_token # 重写 HTTPTokenAuth class HTTPTokenAuthReReturn(HTTPTokenAuth): def __init__(self, scheme=None, realm=None, header=None): super().__init__(scheme, realm, header) # 重写default_auth_error, 或者也可以重新定义一个函数 # def default_auth_error(status): # return jsonify(data={}, message="token 错误!", code=status), status def default_auth_error(status, message): return jsonify(data={}, message=message, code=status), status # 如果重新定义函数的话,这里就传入新定义的函数名 super().error_handler(default_auth_error) # 重写 login_required def login_required(self, f=None, role=None, optional=None): if f is not None and \ (role is not None or optional is not None): # pragma: no cover raise ValueError( 'role and optional are the only supported arguments') def login_required_internal(f): @wraps(f) def decorated(*args, **kwargs): auth = self.get_auth() if request.method != 'OPTIONS': password = self.get_auth_password(auth) status = None # 添加状态信息 message = None user = self.authenticate(auth, password) # 这里判断verify_token的返回值是否是这里的一员 if user in (False, None, 'BadSignature', 'SignatureExpired'): status = 401 if user == 'BadSignature': message = "Bad Signature" elif user == 'SignatureExpired': message = "Signature Expired" elif not self.authorize(role, user, auth): status = 403 message = "Forbidden" if not optional and status: # Clear TCP receive buffer of any pending data request.data try: # 因为之前重写了default_auth_error所以多传入一个message return self.auth_error_callback(status, message) except TypeError: return self.auth_error_callback() g.flask_httpauth_user = user if user is not True \ else auth.username if auth else None return f(*args, **kwargs) return decorated if f: return login_required_internal(f) return login_required_internal auth = HTTPTokenAuthReReturn(scheme="Bearer") # 获取用户权限 @auth.get_user_roles def get_user_roles(user): the_role = get_role_token(user["token"]) return the_role.role_key # 验证token @auth.verify_token def verify_token(token): try: print(token) data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) print(data) except jwt.ExpiredSignatureError: print("token过期") # 这里不用False,而是用自定义字符串 return "SignatureExpired" except jwt.PyJWTError: print("token错误") # 这里不用False,而是用自定义字符串 return "BadSignature" return True # 解析token不加密部分 def token_parse(token): the_token = str.replace(str(token), 'Bearer ', '') decoded = jwt.decode(the_token, options={"verify_signature": False}) return decoded # 根据 username 获取角色 def get_role_username(username: str): return [] # 根据 username 获取角色 def get_role_token(token: str): user_info = token_parse(token) username = user_info["user_name"] the_role = get_role_username(username) return the_role