from app.model.sys_model import SysUser from app.model.schemas.sys_user_schemas import SysUserPager, SysUserOut from app.common.bcrypt_pw import hash_password from app.db.page_util import get_pager from sqlalchemy import and_, asc from sqlalchemy.orm import Session def user_pager(user: SysUserPager, session: Session): query = session.query(SysUser).order_by(asc(SysUser.id)) filters = [] if user.username is not None: filters.append(SysUser.username.ilike(f"%{user.username}%")) if len(filters) > 0: query = query.filter(and_(*filters)) pager = get_pager(query, user.pagerNum, user.pagerSize) pager.data = [SysUserOut.from_orm(user).dict() for user in pager.data] return pager def add_user(user: SysUser, session: Session): user.password = hash_password(user.password) session.add(user) session.commit() return user def get_user_by_id(user_id: int, session: Session): user = session.query(SysUser).filter(SysUser.id == user_id).first() return user def stop_user(user: SysUser, session: Session): user.user_status = "1" session.commit() return user def start_user(user: SysUser, session: Session): user.user_status = "0" session.commit() return user def get_user_by_username(username: str, session: Session): user = session.query(SysUser).filter(SysUser.username == username).first() return user # 验证username的唯一性 def check_username(username: str, session: Session): count = session.query(SysUser).filter(SysUser.username == username).count() if count > 0: return True else: return False # 修改密码 def update_pw(user: SysUser, session: Session): session.query(SysUser).filter_by(id=user.id).update({ "password": user.password }) session.commit()