项目初次提交

This commit is contained in:
2025-04-11 08:54:28 +08:00
commit 9e14a3256f
220 changed files with 15673 additions and 0 deletions

0
utils/__init__.py Normal file
View File

53
utils/aes_crypto.py Normal file
View File

@ -0,0 +1,53 @@
import base64
from Crypto.Cipher import AES # 安装pip install pycryptodome
# 密钥key, 密斯偏移量iv CBC模式加密
# base64 详解https://cloud.tencent.com/developer/article/1099008
_key = '0CoJUm6Qywm6ts68' # 自己密钥
def aes_encrypt(data: str):
"""
加密
"""
vi = '0102030405060708'
pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16)
data = pad(data)
# 字符串补位
cipher = AES.new(_key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
encrypted_bytes = cipher.encrypt(data.encode('utf8'))
# 加密后得到的是bytes类型的数据
encode_strs = base64.urlsafe_b64encode(encrypted_bytes)
# 使用Base64进行编码,返回byte字符串
# 对byte字符串按utf-8进行解码
return encode_strs.decode('utf8')
def aes_decrypt(data):
"""
解密
"""
vi = '0102030405060708'
data = data.encode('utf8')
encode_bytes = base64.urlsafe_b64decode(data)
# 将加密数据转换位bytes类型数据
cipher = AES.new(_key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
text_decrypted = cipher.decrypt(encode_bytes)
unpad = lambda s: s[0:-s[-1]]
text_decrypted = unpad(text_decrypted)
# 补位
text_decrypted = text_decrypted.decode('utf8')
return text_decrypted
if __name__ == '__main__':
_data = '16658273438153332588-95YEUPJR' # 需要加密的内容
enctext = aes_encrypt(_data)
print(enctext)
# enctext = "Wzll1oiVs9UKAySY1-xSy_CbrZmelVwyqu8P0CZTrrc="
# _text_decrypted = aes_decrypt(_key, enctext)
# print(_text_decrypted)

89
utils/cache.py Normal file
View File

@ -0,0 +1,89 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/3/21 11:03
# @File : cache.py
# @IDE : PyCharm
# @desc : 缓存
from typing import List
from sqlalchemy import false
from sqlalchemy.future import select
from sqlalchemy.orm import joinedload
from core.logger import logger # 注意:报错就在这里,如果只写 core.logger 会写入日志报错,很难排查
from core.database import db_getter
from apps.vadmin.system.models import VadminSystemSettingsTab
import json
from redis.asyncio.client import Redis
from core.exception import CustomException
from utils import status
class Cache:
DEFAULT_TAB_NAMES = ["wx_server", "aliyun_sms", "aliyun_oss", "web_email"]
def __init__(self, rd: Redis):
self.rd = rd
async def __get_tab_name_values(self, tab_names: List[str]):
"""
获取系统配置标签下的标签信息
"""
async_session = db_getter()
session = await async_session.__anext__()
model = VadminSystemSettingsTab
v_options = [joinedload(model.settings)]
sql = select(model).where(
model.is_delete == false(),
model.tab_name.in_(tab_names),
model.disabled == false()
).options(*[load for load in v_options])
queryset = await session.execute(sql)
datas = queryset.scalars().unique().all()
return self.__generate_values(datas)
@classmethod
def __generate_values(cls, datas: List[VadminSystemSettingsTab]):
"""
生成字典值
"""
return {
tab.tab_name: {
item.config_key: item.config_value
for item in tab.settings
if not item.disabled
}
for tab in datas
}
async def cache_tab_names(self, tab_names: List[str] = None):
"""
缓存系统配置
如果手动修改了mysql数据库中的配置
那么需要在redis中将对应的tab_name删除
"""
if not tab_names:
tab_names = self.DEFAULT_TAB_NAMES
datas = await self.__get_tab_name_values(tab_names)
for k, v in datas.items():
await self.rd.client().set(k, json.dumps(v))
async def get_tab_name(self, tab_name: str, retry: int = 3):
"""
获取系统配置
:param tab_name: 配置表标签名称
:param retry: 重试次数
"""
result = await self.rd.get(tab_name)
if not result and retry > 0:
logger.error(f"未从Redis中获取到{tab_name}配置信息,正在重新更新配置信息,重试次数:{retry}")
await self.cache_tab_names([tab_name])
return await self.get_tab_name(tab_name, retry - 1)
elif not result and retry == 0:
raise CustomException(f"获取{tab_name}配置信息失败,请联系管理员!", code=status.HTTP_ERROR)
else:
return json.loads(result)

74
utils/compute.py Normal file
View File

@ -0,0 +1,74 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Creaet Time : 2022/5/12 17:09
# @File : compute.py
# @IDE : PyCharm
# @desc : 精准计算
from decimal import Decimal
from typing import Union
class Compute:
@staticmethod
def add(precision: int, *args: Union[float, Decimal]) -> float:
"""
相加
:param precision: 精度
"""
result = 0
for i in args:
if i is None:
i = 0
result += Decimal(str(i))
if precision == -1:
return float(result)
return round(float(result), precision)
@staticmethod
def subtract(precision: int, *args: Union[float, Decimal]) -> float:
"""
相减
:param precision: 精度
"""
if args[0] is None:
start = 0
else:
start = args[0]
result = Decimal(str(start))
for i in args[1:]:
if i is None:
i = 0
result -= Decimal(str(i))
if precision == -1:
return float(result)
return round(float(result), precision)
@staticmethod
def divide(precision: int, *args: Union[float, Decimal]) -> float:
"""
除法
:param precision: 精度
"""
result = Decimal(str(args[0]))
for i in args[1:]:
result = result / Decimal(str(i))
if precision == -1:
return float(result)
return round(float(result), precision)
@staticmethod
def multiply(precision: int, *args: Union[float, Decimal]) -> float:
"""
乘法
:param precision: 精度
"""
result = Decimal(str(1))
for i in args:
if i is None:
i = 1
result = result * Decimal(str(i))
if precision == -1:
return float(result)
return round(float(result), precision)

40
utils/count.py Normal file
View File

@ -0,0 +1,40 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/11/3 17:23
# @File : count.py
# @IDE : PyCharm
# @desc : 计数
from redis.asyncio.client import Redis
class Count:
"""
计数
"""
def __init__(self, rd: Redis, key):
self.rd = rd
self.key = key
async def add(self, ex: int = None) -> int:
await self.rd.set(self.key, await self.get_count() + 1, ex=ex)
return await self.get_count()
async def subtract(self, ex: int = None) -> int:
await self.rd.set(self.key, await self.get_count() - 1, ex=ex)
return await self.get_count()
async def get_count(self) -> int:
number = await self.rd.get(self.key)
if number:
return int(number)
return 0
async def reset(self) -> None:
await self.rd.set(self.key, 0)
async def delete(self) -> None:
await self.rd.delete(self.key)

21
utils/csv_utils.py Normal file
View File

@ -0,0 +1,21 @@
import csv
def read_csv(file_path):
"""
根据文件路径读取csv文件
:param file_path:
:return:
"""
with open(file_path, 'r', encoding='utf-8') as file:
# 创建 CSV 阅读器对象
csv_reader = csv.reader(file)
# 跳过标题行
next(csv_reader)
result_row = []
for row in csv_reader:
if row is None:
break
result_row.append(row)
return result_row
return None

66
utils/db_getter.py Normal file
View File

@ -0,0 +1,66 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2023/5/6 9:29
# @File : task.py
# @IDE : PyCharm
# @desc : 任务基础类
import re
import pymysql
from application.settings import SQLALCHEMY_DATABASE_URL
from core.logger import logger
class DBGetter:
def __init__(self):
self.mysql_cursor = None
self.mysql_conn = None
def conn_mysql(self) -> None:
"""
连接系统中配置的 mysql 数据库
"""
try:
connection_string = SQLALCHEMY_DATABASE_URL.split("//")[1]
pattern = r'^(?P<username>[^:]+):(?P<password>[^@]+)@(?P<host>[^:/]+):(?P<port>\d+)/(?P<database>[^/]+)$'
match = re.match(pattern, connection_string)
username = match.group('username')
password = match.group('password')
host = match.group('host')
port = int(match.group('port'))
database = match.group('database')
self.mysql_conn = pymysql.connect(
host=host,
port=port,
user=username,
password=password,
database=database
)
self.mysql_cursor = self.mysql_conn.cursor()
except pymysql.err.OperationalError as e:
logger.error(f"数据库连接失败,{e}")
raise ValueError("数据库连接失败!")
except AttributeError as e:
logger.error(f"数据库链接解析失败,{e}")
raise ValueError("数据库链接解析失败!")
def close_mysql(self) -> None:
"""
关闭 mysql 链接
"""
try:
self.mysql_cursor.close()
self.mysql_conn.close()
except AttributeError as e:
logger.error(f"未连接数据库,无需关闭!,{e}")
raise ValueError("未连接数据库,无需关闭!")
if __name__ == '__main__':
t = DBGetter()
t.conn_mysql()
t.close_mysql()

7
utils/excel/__init__.py Normal file
View File

@ -0,0 +1,7 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/11/14 9:56
# @File : __init__.py.py
# @IDE : PyCharm
# @desc : 简要说明

245
utils/excel/excel_manage.py Normal file
View File

@ -0,0 +1,245 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/5/6 17:25
# @File : excel_manage.py
# @IDE : PyCharm
# @desc : EXCEL 文件操作
import datetime
import os
import re
from pathlib import Path
from openpyxl.utils import get_column_letter
from openpyxl import load_workbook, Workbook
from application.settings import STATIC_ROOT, STATIC_URL
from openpyxl.styles import Alignment, Font, PatternFill, Border, Side
from utils.file.file_base import FileBase
from .excel_schema import AlignmentModel, FontModel, PatternFillModel
class ExcelManage:
"""
excel 文件序列化
"""
# 列名A-Z
EXCEL_COLUMNS = [chr(a) for a in range(ord('A'), ord('Z') + 1)]
def __init__(self):
self.sheet = None
self.wb = None
def open_workbook(self, file: str, read_only: bool = False, data_only: bool = False) -> None:
"""
初始化 excel 文件
:param file: 文件名称或者对象
:param read_only: 是否只读,优化读取速度
:param data_only: 是否加载文件对象
:return:
"""
# 加载excel文件获取表单
self.wb = load_workbook(file, read_only=read_only, data_only=data_only)
def open_sheet(
self,
sheet_name: str = None,
file: str = None,
read_only: bool = False,
data_only: bool = False
) -> None:
"""
初始化 excel 文件
:param sheet_name: 表单名称,为空则默认第一个
:param file:
:param read_only:
:param data_only:
:return:
"""
# 加载excel文件获取表单
if not self.wb:
self.open_workbook(file, read_only, data_only)
if sheet_name:
if sheet_name in self.get_sheets():
self.sheet = self.wb[sheet_name]
else:
self.sheet = self.wb.create_sheet(sheet_name)
else:
self.sheet = self.wb.active
def get_sheets(self) -> list:
"""
读取所有工作区名称
:return: 一维数组
"""
return self.wb.sheetnames
def create_excel(self, sheet_name: str = None) -> None:
"""
创建 excel 文件
:param sheet_name: 表单名称,为空则默认第一个
:return:
"""
# 加载excel文件获取表单
self.wb = Workbook()
self.sheet = self.wb.active
if sheet_name:
self.sheet.title = sheet_name
def readlines(self, min_row: int = 1, min_col: int = 1, max_row: int = None, max_col: int = None) -> list:
"""
读取指定表单所有数据
:param min_row: 最小行
:param min_col: 最小列
:param max_row: 最大行
:param max_col: 最大列
:return: 二维数组
"""
rows = self.sheet.iter_rows(min_row=min_row, min_col=min_col, max_row=max_row, max_col=max_col)
result = []
for row in rows:
_row = []
for cell in row:
_row.append(cell.value)
if any(_row):
result.append(_row)
return result
def get_header(self, row: int = 1, col: int = None, asterisk: bool = False) -> list:
"""
读取指定表单的表头(第一行数据)
:param row: 指定行
:param col: 最大列
:param asterisk: 是否去除 * 号
:return: 一维数组
"""
rows = self.sheet.iter_rows(min_row=row, max_col=col)
result = []
for row in rows:
for cell in row:
value = cell.value.replace("*", "").strip() if asterisk else cell.value.strip()
result.append(value)
break
return result
def write_list(self, rows: list, header: list = None) -> None:
"""
写入 excel文件
:param rows: 行数据集
:param header: 表头
:return:
"""
if header:
self.sheet.append(header)
pattern_fill_style = PatternFillModel(start_color='D9D9D9', end_color='D9D9D9', fill_type='solid')
font_style = FontModel(bold=True)
self.__set_row_style(1, len(header), pattern_fill_style=pattern_fill_style, font_style=font_style)
for index, data in enumerate(rows):
format_columns = {
"date_columns": []
}
for i in range(0, len(data)):
if isinstance(data[i], datetime.datetime):
data[i] = data[i].strftime("%Y-%m-%d %H:%M:%S")
format_columns["date_columns"].append(i + 1)
elif isinstance(data[i], bool):
data[i] = 1 if data[i] else 0
self.sheet.append(data)
self.__set_row_style(index + 2 if header else index + 1, len(data))
self.__set_row_format(index + 2 if header else index + 1, format_columns)
self.__auto_width()
self.__set_row_border()
def save_excel(self, path: str = "excel_manage"):
"""
保存 excel 文件到本地 static 目录
:param path: static 指定目录类别
:return:
"""
file_path = FileBase.generate_static_file_path(path=path, suffix="xlsx")
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
self.wb.save(file_path)
return {
"local_path": file_path,
"remote_path": file_path.replace(STATIC_ROOT, STATIC_URL)
}
def __set_row_style(
self,
row: int,
max_column: int,
alignment_style: AlignmentModel = AlignmentModel(),
font_style: FontModel = FontModel(),
pattern_fill_style: PatternFillModel = PatternFillModel()
):
"""
设置行样式
:param row: 行
:param max_column: 最大列
:param alignment_style: 单元格内容的对齐设置
:param font_style: 单元格内容的字体样式设置
:param pattern_fill_style: 单元格的填充模式设置
:return:
"""
for index in range(0, max_column):
alignment = Alignment(**alignment_style.model_dump())
font = Font(**font_style.model_dump())
pattern_fill = PatternFill(**pattern_fill_style.model_dump())
self.sheet.cell(row=row, column=index + 1).alignment = alignment
self.sheet.cell(row=row, column=index + 1).font = font
self.sheet.cell(row=row, column=index + 1).fill = pattern_fill
def __set_row_format(self, row: int, columns: dict):
"""
格式化行数据类型
:param row: 行
:param columns: 列数据
:return:
"""
for index in columns.get("date_columns", []):
self.sheet.cell(row=row, column=index).number_format = "yyyy-mm-dd h:mm:ss"
def __set_row_border(self):
"""
设置行边框
:return:
"""
# 创建 Border 对象并设置边框样式
border = Border(
left=Side(border_style="thin", color="000000"),
right=Side(border_style="thin", color="000000"),
top=Side(border_style="thin", color="000000"),
bottom=Side(border_style="thin", color="000000")
)
# 设置整个表格的边框
for row in self.sheet.iter_rows():
for cell in row:
cell.border = border
def __auto_width(self):
"""
设置自适应列宽
:return:
"""
# 设置一个字典用于保存列宽数据
dims = {}
# 遍历表格数据,获取自适应列宽数据
for row in self.sheet.rows:
for cell in row:
if cell.value:
# 遍历整个表格,把该列所有的单元格文本进行长度对比,找出最长的单元格
# 在对比单元格文本时需要将中文字符识别为1.7个长度英文字符识别为1个这里只需要将文本长度直接加上中文字符数量即可
# re.findall('([\u4e00-\u9fa5])', cell.value)能够识别大部分中文字符
cell_len = 0.7 * len(re.findall('([\u4e00-\u9fa5])', str(cell.value))) + len(str(cell.value))
dims[cell.column] = max((dims.get(cell.column, 0), cell_len))
for col, value in dims.items():
# 设置列宽get_column_letter用于获取数字列号对应的字母列号最后值+2是用来调整最终效果的
self.sheet.column_dimensions[get_column_letter(col)].width = value + 10
def close(self):
"""
关闭文件
:return:
"""
self.wb.close()

View File

@ -0,0 +1,65 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2023/08/24 22:19
# @File : excel_schema.py
# @IDE : PyCharm
# @desc :
from pydantic import BaseModel, Field
class AlignmentModel(BaseModel):
horizontal: str = Field('center', description="水平对齐方式。可选值:'left''center''right''justify''distributed'")
vertical: str = Field('center', description="垂直对齐方式。可选值:'top''center''bottom''justify''distributed'")
textRotation: int = Field(0, description="文本旋转角度(以度为单位)。默认为 0。")
wrapText: bool = Field(None, description="自动换行文本。设置为 True 时,如果文本内容超出单元格宽度,会自动换行显示。")
shrinkToFit: bool = Field(
None,
description="缩小字体以适应单元格。设置为 True 时,如果文本内容超出单元格宽度,会自动缩小字体大小以适应。"
)
indent: int = Field(0, description="文本缩进级别。默认为 0。")
relativeIndent: int = Field(0, description="相对缩进级别。默认为 0。")
justifyLastLine: bool = Field(
None,
description="对齐换行文本的末尾行。设置为 True 时,如果设置了文本换行,末尾的行也会被对齐。"
)
readingOrder: int = Field(0, description="阅读顺序。默认为 0。")
class Config:
title = "对齐设置模型"
description = "用于设置单元格内容的对齐样式。"
class FontModel(BaseModel):
name: str = Field(None, description="字体名称")
size: float = Field(None, description="字体大小(磅为单位)")
bold: bool = Field(None, description="是否加粗")
italic: bool = Field(None, description="是否斜体")
underline: str = Field(None, description="下划线样式。可选值:'single''double''singleAccounting''doubleAccounting'")
strikethrough: bool = Field(None, description="是否有删除线")
outline: bool = Field(None, description="是否轮廓字体")
shadow: bool = Field(None, description="是否阴影字体")
condense: bool = Field(None, description="是否压缩字体")
extend: bool = Field(None, description="是否扩展字体")
vertAlign: str = Field(None, description="垂直对齐方式。可选值:'superscript''subscript''baseline'")
color: dict = Field(None, description="字体颜色")
scheme: str = Field(None, description="字体方案。可选值:'major''minor'")
charset: int = Field(None, description="字符集编号")
family: int = Field(None, description="字体族编号")
class Config:
title = "字体设置模型"
description = "用于设置单元格内容的字体样式"
class PatternFillModel(BaseModel):
start_color: str = Field("FFFFFF", description="起始颜色RGB 值或颜色名称)")
end_color: str = Field("FFFFFF", description="结束颜色RGB 值或颜色名称)")
fill_type: str = Field("solid", description="填充类型('none''solid''darkDown' 等)")
class Config:
title = "填充模式设置模型"
description = "单元格的填充模式设置"

View File

@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2021/12/5 8:45
# @File : import_manage.py
# @IDE : PyCharm
# @desc : 数据导入管理
from typing import List
from fastapi import UploadFile
from core.exception import CustomException
from utils import status
from .excel_manage import ExcelManage
from utils.file.file_manage import FileManage
from .write_xlsx import WriteXlsx
from ..tools import list_dict_find
from enum import Enum
class FieldType(Enum):
list = "list"
str = "str"
class ImportManage(ExcelManage):
"""
数据导入管理
只支持 XLSX 类型文件application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
1. 判断文件类型
2. 保存文件为临时文件
3. 获取文件中的数据
4. 逐行检查数据,通过则创建数据
5. 不通过则添加到错误列表
6. 统计数量并返回
"""
file_type = ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"]
def __init__(self, file: UploadFile, headers: List[dict]):
super().__init__()
self.__table_data = None
self.__table_header = None
self.errors = []
self.success = []
self.success_number = 0
self.error_number = 0
self.check_file_type(file)
self.file = file
self.headers = headers
@classmethod
def check_file_type(cls, file: UploadFile) -> None:
"""
验证文件类型
:param file: 上传文件
:return:
"""
if file.content_type not in cls.file_type:
raise CustomException(msg="文件类型必须为xlsx类型", code=status.HTTP_ERROR)
async def get_table_data(
self,
file_path: str = None,
sheet_name: str = None,
header_row: int = 1,
data_row: int = 2
) -> None:
"""
获取表格数据与表头
:param file_path:
:param sheet_name:
:param header_row: 表头在第几行
:param data_row: 数据开始行
:return:
"""
if file_path:
__filename = file_path
else:
__filename = await FileManage.async_save_temp_file(self.file)
self.open_sheet(sheet_name=sheet_name, file=__filename, read_only=True)
self.__table_header = self.get_header(header_row, len(self.headers), asterisk=True)
self.__table_data = self.readlines(min_row=data_row, max_col=len(self.headers))
self.close()
def check_table_data(self) -> None:
"""
检查表格数据
:return:
"""
for row in self.__table_data:
result = self.__check_row(row)
if not result[0]:
row.append(result[1])
self.errors.append(row)
self.error_number += 1
else:
self.success_number += 1
self.success.append(result[1])
def __check_row(self, row: list) -> tuple:
"""
检查行数据
检查条件:
1. 检查是否为必填项
2. 检查是否为选项列表
3. 检查是否符合规则
:param row: 数据行
:return:
"""
data = {}
for index, cell in enumerate(row):
value = cell
field = self.headers[index]
label = self.__table_header[index]
if not cell and field.get("required", False):
return False, f"{label}不能为空!"
elif field.get("options", []) and cell:
item = list_dict_find(field.get("options", []), "label", cell)
if item:
value = item.get("value")
else:
return False, f"请选择正确的{label}"
elif field.get("rules", []) and cell:
rules = field.get("rules")
for validator in rules:
try:
validator(str(cell))
except ValueError as e:
return False, f"{label}{e.__str__()}"
if value:
field_type = field.get("type", FieldType.str)
if field_type == FieldType.list:
data[field.get("field")] = [value]
elif field_type == FieldType.str:
data[field.get("field")] = str(value)
else:
data[field.get("field")] = value
data["old_data_list"] = row
return True, data
def generate_error_url(self) -> str:
"""
成功错误数据的文件链接
:return:
"""
if self.error_number <= 0:
return ""
em = WriteXlsx()
em.create_excel(sheet_name="用户导入失败数据", save_static=True)
em.generate_template(self.headers, max_row=self.error_number)
em.write_list(self.errors)
em.close()
return em.get_file_url()
def add_error_data(self, row: dict) -> None:
"""
增加错误数据
:param row: 错误的数据行
:return:
"""
self.errors.append(row)
self.error_number += 1
self.success_number -= 1

129
utils/excel/write_xlsx.py Normal file
View File

@ -0,0 +1,129 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/11/11 12:01
# @File : write_xlsx.py
# @IDE : PyCharm
# @desc : 简要说明
"""
XlsxWriterhttps://github.com/jmcnamara/XlsxWriter
博客教程https://blog.csdn.net/lemonbit/article/details/113855768
"""
import os.path
import xlsxwriter
from typing import List
from application.settings import STATIC_ROOT, STATIC_URL
from utils.file.file_base import FileBase
from pathlib import Path
class WriteXlsx:
"""
写入xlsx文件
"""
def __init__(self):
self.file_path = None
self.sheet_name = None
self.wb = None
self.sheet = None
def create_excel(self, file_path: str = None, sheet_name: str = "sheet1", save_static: bool = False) -> None:
"""
创建 excel 文件
:param file_path: 文件绝对路径或相对路径
:param sheet_name: sheet 名称
:param save_static: 保存方式 static 静态资源或者临时文件
:return:
"""
if not file_path:
if save_static:
self.file_path = FileBase.generate_static_file_path(path="write_xlsx", suffix="xlsx")
else:
self.file_path = FileBase.generate_temp_file_path(suffix="xlsx")
elif not os.path.isabs(file_path):
if save_static:
self.file_path = FileBase.generate_static_file_path(path="write_xlsx", filename=file_path)
else:
self.file_path = FileBase.generate_temp_file_path(filename=file_path)
else:
self.file_path = file_path
Path(self.file_path).parent.mkdir(parents=True, exist_ok=True)
self.sheet_name = sheet_name
self.wb = xlsxwriter.Workbook(self.file_path)
self.sheet = self.wb.add_worksheet(sheet_name)
def generate_template(self, headers: List[dict] = None, max_row: int = 101) -> None:
"""
生成模板
:param headers: 表头
:param max_row: 设置下拉列表至最大行
:return: 文件链接地址
"""
max_row = max_row + 100
for index, field in enumerate(headers):
font_format = {
'bold': False, # 字体加粗
'align': 'center', # 水平位置设置:居中
'valign': 'vcenter', # 垂直位置设置,居中
'font_size': 11, # '字体大小设置'
}
if field.get("required", False):
# 设置单元格必填样式
field["label"] = "* " + field["label"]
font_format["font_color"] = "red"
if field.get("options", False):
# 添加数据验证,下拉列表
validate = {'validate': 'list', 'source': [item.get("label") for item in field.get("options", [])]}
self.sheet.data_validation(1, index, max_row, index, validate)
cell_format = self.wb.add_format(font_format)
self.sheet.write(0, index, field.get("label"), cell_format)
# 设置列宽
self.sheet.set_column(0, len(headers) - 1, 22)
# 设置行高
self.sheet.set_row(0, 25)
def write_list(self, rows: list, start_row: int = 1) -> None:
"""
写入 excel文件
:param rows: 行数据集
:param start_row: 开始行
"""
font_format = {
'bold': False, # 字体加粗
'align': 'center', # 水平位置设置:居中
'valign': 'vcenter', # 垂直位置设置,居中
'font_size': 11, # '字体大小设置'
}
cell_format = self.wb.add_format(font_format)
for index, row in enumerate(rows):
row_number = index+start_row
self.sheet.write_row(row_number, 0, row, cell_format)
# 设置列宽
self.sheet.set_column(0, len(rows[0]) - 1, 22)
# 设置行高
self.sheet.set_default_row(25)
def close(self) -> None:
"""
关闭文件
"""
self.wb.close()
def get_file_url(self) -> str:
"""
获取访问文件的 url
:return:
"""
if not self.file_path:
raise ValueError("还未创建文件,请先创建 excel 文件!")
assert isinstance(self.file_path, str)
if self.file_path.startswith(STATIC_ROOT):
return self.file_path.replace(STATIC_ROOT, STATIC_URL)
else:
print("write_xlsx 生成文件:", self.file_path)
raise ValueError("生成文件为临时文件,无法访问!")

7
utils/file/__init__.py Normal file
View File

@ -0,0 +1,7 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/12/12 14:30
# @File : __init__.py.py
# @IDE : PyCharm
# @desc : 简要说明

106
utils/file/aliyun_oss.py Normal file
View File

@ -0,0 +1,106 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/4/28 22:32
# @File : aliyun_oss.py
# @IDE : PyCharm
# @desc : 阿里云对象存储
import os.path
from fastapi import UploadFile
from pydantic import BaseModel
import oss2 # 安装依赖库pip install oss2
from oss2.models import PutObjectResult
from core.exception import CustomException
from core.logger import logger
from utils import status
from utils.file.file_manage import FileManage
from utils.file.file_base import FileBase
class BucketConf(BaseModel):
accessKeyId: str
accessKeySecret: str
endpoint: str
bucket: str
baseUrl: str
class AliyunOSS(FileBase):
"""
阿里云对象存储
常见报错https://help.aliyun.com/document_detail/185228.htm?spm=a2c4g.11186623.0.0.6de530e5pxNK76#concept-1957777
官方文档https://help.aliyun.com/document_detail/32026.html
使用Python SDK时大部分操作都是通过oss2.Service和oss2.Bucket两个类进行。
oss2.Service类用于列举存储空间。
oss2.Bucket类用于上传、下载、删除文件以及对存储空间进行各种配置。
"""
def __init__(self, bucket: BucketConf):
# 阿里云账号AccessKey拥有所有API的访问权限风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维请登录RAM控制台创建RAM用户。
auth = oss2.Auth(bucket.accessKeyId, bucket.accessKeySecret)
# yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1杭州为例Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
# 填写Bucket名称。
self.bucket = oss2.Bucket(auth, bucket.endpoint, bucket.bucket)
self.baseUrl = bucket.baseUrl
async def upload_image(self, path: str, file: UploadFile, max_size: int = 10) -> str:
"""
上传图片
:param path: path由包含文件后缀不包含Bucket名称组成的Object完整路径例如abc/efg/123.jpg。
:param file: 文件对象
:param max_size: 图片文件最大值,单位 MB默认 10MB
:return: 上传后的文件oss链接
"""
# 验证图片类型
await self.validate_file(file, max_size, self.IMAGE_ACCEPT)
# 生成文件路径
path = self.generate_relative_path(path, file.filename)
file_data = await file.read()
return await self.__upload_file_to_oss(path, file_data)
async def upload_video(self, path: str, file: UploadFile, max_size: int = 100) -> str:
"""
上传视频
:param path: path由包含文件后缀不包含Bucket名称组成的Object完整路径例如abc/efg/123.jpg。
:param file: 文件对象
:param max_size: 视频文件最大值,单位 MB默认 100MB
:return: 上传后的文件oss链接
"""
# 验证图片类型
await self.validate_file(file, max_size, self.VIDEO_ACCEPT)
# 生成文件路径
path = self.generate_relative_path(path, file.filename)
file_data = await file.read()
return await self.__upload_file_to_oss(path, file_data)
async def upload_file(self, path: str, file: UploadFile) -> str:
"""
上传文件
:param path: path由包含文件后缀不包含Bucket名称组成的Object完整路径例如abc/efg/123.jpg。
:param file: 文件对象
:return: 上传后的文件oss链接
"""
path = self.generate_relative_path(path, file.filename)
file_data = await file.read()
return await self.__upload_file_to_oss(path, file_data)
async def __upload_file_to_oss(self, path: str, file_data: bytes) -> str:
"""
上传文件到OSS
:param path: path由包含文件后缀不包含Bucket名称组成的Object完整路径例如abc/efg/123.jpg。
:param file_data: 文件数据
:return: 上传后的文件oss链接
"""
result = self.bucket.put_object(path, file_data)
assert isinstance(result, PutObjectResult)
if result.status != 200:
logger.error(f"文件上传到OSS失败状态码{result.status}")
raise CustomException("上传文件失败", code=status.HTTP_ERROR)
return self.baseUrl + path

148
utils/file/file_base.py Normal file
View File

@ -0,0 +1,148 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/12/12 14:31
# @File : file_base.py
# @IDE : PyCharm
# @desc : 简要说明
import datetime
import os
from pathlib import Path
from aiopathlib import AsyncPath
from fastapi import UploadFile
from application.settings import TEMP_DIR, STATIC_ROOT
from core.exception import CustomException
from utils import status
from utils.tools import generate_string
class FileBase:
IMAGE_ACCEPT = ["image/png", "image/jpeg", "image/gif", "image/x-icon"]
VIDEO_ACCEPT = ["video/mp4", "video/mpeg"]
AUDIO_ACCEPT = ["audio/wav", "audio/mp3", "audio/m4a", "audio/wma", "audio/ogg", "audio/mpeg", "audio/x-wav"]
ALL_ACCEPT = [*IMAGE_ACCEPT, *VIDEO_ACCEPT, *AUDIO_ACCEPT]
@classmethod
def get_random_filename(cls, suffix: str) -> str:
"""
生成随机文件名称,生成规则:当前时间戳 + 8位随机字符串拼接
:param suffix: 文件后缀
:return:
"""
if not suffix.startswith("."):
suffix = "." + suffix
return f"{str(int(datetime.datetime.now().timestamp())) + str(generate_string(8))}{suffix}"
@classmethod
def get_today_timestamp(cls) -> str:
"""
获取当天时间戳
:return:
"""
return str(int((datetime.datetime.now().replace(hour=0, minute=0, second=0)).timestamp()))
@classmethod
def generate_relative_path(cls, path: str, filename: str = None, suffix: str = None) -> str:
"""
生成相对路径,生成规则:自定义目录/当天日期时间戳/随机文件名称
1. filename 参数或者 suffix 参数必须填写一个
2. filename 参数和 suffix 参数都存在则优先取 suffix 参数为后缀
:param path: static 指定目录类别
:param filename: 文件名称,只用户获取后缀,不做真实文件名称,避免文件重复问题
:param suffix: 文件后缀
"""
if not filename and not suffix:
raise ValueError("filename 参数或者 suffix 参数必须填写一个")
elif not suffix and filename:
suffix = os.path.splitext(filename)[-1]
path = path.replace("\\", "/")
if path[0] == "/":
path = path[1:]
if path[-1] == "/":
path = path[:-1]
today = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d")
return f"{path}/{today}/{cls.get_random_filename(suffix)}"
@classmethod
def generate_static_file_path(cls, path: str, filename: str = None, suffix: str = None) -> str:
"""
生成 static 静态文件路径,生成规则:自定义目录/当天日期时间戳/随机文件名称
1. filename 参数或者 suffix 参数必须填写一个
2. filename 参数和 suffix 参数都存在则优先取 suffix 参数为后缀
:param path: static 指定目录类别
:param filename: 文件名称,只用户获取后缀,不做真实文件名称,避免文件重复问题
:param suffix: 文件后缀
:return:
"""
return f"{STATIC_ROOT}/{cls.generate_relative_path(path, filename, suffix)}"
@classmethod
def generate_temp_file_path(cls, filename: str = None, suffix: str = None) -> str:
"""
生成临时文件路径,生成规则:
1. filename 参数或者 suffix 参数必须填写一个
2. filename 参数和 suffix 参数都存在则优先取 suffix 参数为后缀
:param filename: 文件名称
:param suffix: 文件后缀
:return:
"""
if not filename and not suffix:
raise ValueError("filename 参数或者 suffix 参数必须填写一个")
elif not suffix and filename:
suffix = os.path.splitext(filename)[-1]
return f"{cls.generate_temp_dir_path()}/{cls.get_random_filename(suffix)}"
@classmethod
def generate_temp_dir_path(cls) -> str:
"""
生成临时目录路径
:return:
"""
date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d")
file_dir = Path(TEMP_DIR) / date
if not file_dir.exists():
file_dir.mkdir(parents=True, exist_ok=True)
return str(file_dir).replace("\\", "/")
@classmethod
async def async_generate_temp_file_path(cls, filename: str) -> str:
"""
生成临时文件路径
:param filename: 文件名称
:return:
"""
return f"{await cls.async_generate_temp_dir_path()}/{filename}"
@classmethod
async def async_generate_temp_dir_path(cls) -> str:
"""
生成临时目录路径
:return:
"""
date = datetime.datetime.strftime(datetime.datetime.now(), "%Y%m%d")
file_dir = AsyncPath(TEMP_DIR) / date
path = file_dir / (generate_string(4) + str(int(datetime.datetime.now().timestamp())))
if not await path.exists():
await path.mkdir(parents=True, exist_ok=True)
return str(path).replace("\\", "/")
@classmethod
async def validate_file(cls, file: UploadFile, max_size: int = None, mime_types: list = None) -> bool:
"""
验证文件是否符合格式
:param file: 文件
:param max_size: 文件最大值,单位 MB
:param mime_types: 支持的文件类型
"""
if max_size:
size = len(await file.read()) / 1024 / 1024
if size > max_size:
raise CustomException(f"上传文件过大,不能超过{max_size}MB", status.HTTP_ERROR)
await file.seek(0)
if mime_types:
if file.content_type not in mime_types:
raise CustomException(f"上传文件格式错误,只支持 {'/'.join(mime_types)} 格式!", status.HTTP_ERROR)
return True

141
utils/file/file_manage.py Normal file
View File

@ -0,0 +1,141 @@
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2021/12/5 8:45
# @File : file_manage.py
# @IDE : PyCharm
# @desc : 保存图片到本地
import asyncio
import io
import os
import zipfile
from application.settings import STATIC_ROOT, BASE_DIR, STATIC_URL
from fastapi import UploadFile
import sys
from core.exception import CustomException
from utils.file.file_base import FileBase
from aiopathlib import AsyncPath
import aioshutil
class FileManage(FileBase):
"""
上传文件管理
"""
def __init__(self, file: UploadFile, path: str):
self.path = self.generate_static_file_path(path, file.filename)
self.file = file
async def save_image_local(self, accept: list = None) -> dict:
"""
保存图片文件到本地
:param accept:
:return:
"""
if accept is None:
accept = self.IMAGE_ACCEPT
await self.validate_file(self.file, max_size=5, mime_types=accept)
return await self.async_save_local()
async def save_audio_local(self, accept: list = None) -> dict:
"""
保存音频文件到本地
:param accept:
:return:
"""
if accept is None:
accept = self.AUDIO_ACCEPT
await self.validate_file(self.file, max_size=50, mime_types=accept)
return await self.async_save_local()
async def save_video_local(self, accept: list = None) -> dict:
"""
保存视频文件到本地
:param accept:
:return:
"""
if accept is None:
accept = self.VIDEO_ACCEPT
await self.validate_file(self.file, max_size=100, mime_types=accept)
return await self.async_save_local()
async def async_save_local(self) -> dict:
"""
保存文件到本地
:return: 示例:
{
'local_path': 'D:\\business\\kinit_dev\\aicheckv2-api\\static\\system\\20240301\\1709303205HuYB3mrC.png',
'remote_path': '/media/system/20240301/1709303205HuYB3mrC.png'
}
"""
path = AsyncPath(self.path)
if sys.platform == "win32":
path = AsyncPath(self.path.replace("/", "\\"))
if not await path.parent.exists():
await path.parent.mkdir(parents=True, exist_ok=True)
await path.write_bytes(await self.file.read())
return {
"local_path": str(path),
"remote_path": STATIC_URL + str(path).replace(STATIC_ROOT, '').replace("\\", '/')
}
@classmethod
async def async_save_temp_file(cls, file: UploadFile) -> str:
"""
保存临时文件
:param file:
:return:
"""
temp_file_path = await cls.async_generate_temp_file_path(file.filename)
await AsyncPath(temp_file_path).write_bytes(await file.read())
return temp_file_path
@classmethod
async def unzip(cls, file: UploadFile, dir_path: str) -> str:
"""
解压 zip 压缩包
:param file:
:param dir_path: 解压路径
:return:
"""
if file.content_type != "application/x-zip-compressed":
raise CustomException("上传文件类型错误,必须是 zip 压缩包格式!")
# 读取上传的文件内容
contents = await file.read()
# 将文件内容转换为字节流
zip_stream = io.BytesIO(contents)
# 使用zipfile库解压字节流
with zipfile.ZipFile(zip_stream, "r") as zip_ref:
zip_ref.extractall(dir_path)
return dir_path
@staticmethod
async def async_copy_file(src: str, dst: str) -> None:
"""
异步复制文件
根目录为项目根目录,传过来的文件路径均为相对路径
:param src: 原始文件
:param dst: 目标路径。绝对路径
"""
if src[0] == "/":
src = src.lstrip("/")
src = AsyncPath(BASE_DIR) / src
if not await src.exists():
raise CustomException(f"{src} 源文件不存在!")
dst = AsyncPath(dst)
if not await dst.parent.exists():
await dst.parent.mkdir(parents=True, exist_ok=True)
await aioshutil.copyfile(src, dst)
@staticmethod
async def async_copy_dir(src: str, dst: str, dirs_exist_ok: bool = True) -> None:
"""
复制目录
:param src: 源目录
:param dst: 目标目录
:param dirs_exist_ok: 是否覆盖
"""
if not os.path.exists(dst):
raise CustomException("目标目录不存在!")
await aioshutil.copytree(src, dst, dirs_exist_ok=dirs_exist_ok)

71
utils/ip_manage.py Normal file
View File

@ -0,0 +1,71 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/11/3 17:23
# @File : ip_manage.py
# @IDE : PyCharm
# @desc : 获取IP地址归属地
"""
文档https://user.ip138.com/ip/doc
IP查询第三方服务有1000次的免费次数
JSONP请求示例IPv4
https://api.ip138.com/ip/?ip=58.16.180.3&datatype=jsonp&token=cc87f3c77747bccbaaee35006da1ebb65e0bad57
aiohttp 异步请求文档https://docs.aiohttp.org/en/stable/client_quickstart.html
"""
from aiohttp import TCPConnector
from application.settings import IP_PARSE_TOKEN, IP_PARSE_ENABLE
import aiohttp
from core.logger import logger
from pydantic import BaseModel
from typing import Optional
class IPLocationOut(BaseModel):
ip: str | None = None
address: str | None = None
country: str | None = None
province: str | None = None
city: str | None = None
county: str | None = None
operator: str | None = None
postal_code: str | None = None
area_code: str | None = None
class IPManage:
def __init__(self, ip: str):
self.ip = ip
self.url = f"https://api.ip138.com/ip/?ip={ip}&datatype=jsonp&token={IP_PARSE_TOKEN}"
async def parse(self):
"""
IP 数据解析
接口返回:{'ret': 'ok', 'ip': '114.222.121.253','data': ['中国', '江苏', '南京', '江宁区', '电信', '211100', '025']}
"""
out = IPLocationOut()
out.ip = self.ip
if not IP_PARSE_ENABLE:
logger.warning("未开启IP地址数据解析无法获取到IP所属地请在application/config/production.py:IP_PARSE_ENABLE中开启")
return out
async with aiohttp.ClientSession(connector=TCPConnector(ssl=False)) as session:
async with session.get(self.url) as resp:
body = await resp.json()
if body.get("ret") != 'ok':
logger.error(f"获取IP所属地失败{body}")
return out
data = body.get("data")
out.address = f"{''.join(data[i] for i in range(0, 4))} {data[4]}"
out.country = data[0]
out.province = data[1]
out.city = data[2]
out.county = data[3]
out.operator = data[4]
out.postal_code = data[5]
out.area_code = data[6]
return out

183
utils/love.py Normal file
View File

@ -0,0 +1,183 @@
# 晚上星月争辉,美梦陪你入睡
import random
from math import sin, cos, pi, log
from tkinter import *
CANVAS_WIDTH = 640 # 画布的宽
CANVAS_HEIGHT = 480 # 画布的高
CANVAS_CENTER_X = CANVAS_WIDTH / 2 # 画布中心的X轴坐标
CANVAS_CENTER_Y = CANVAS_HEIGHT / 2 # 画布中心的Y轴坐标
IMAGE_ENLARGE = 11 # 放大比例
HEART_COLOR = "#ff2121" # 心的颜色,这个是中国红
def heart_function(t, shrink_ratio: float = IMAGE_ENLARGE):
"""
“爱心函数生成器”
:param shrink_ratio: 放大比例
:param t: 参数
:return: 坐标
"""
# 基础函数
x = 16 * (sin(t) ** 3)
y = -(13 * cos(t) - 5 * cos(2 * t) - 2 * cos(3 * t) - cos(4 * t))
# 放大
x *= shrink_ratio
y *= shrink_ratio
# 移到画布中央
x += CANVAS_CENTER_X
y += CANVAS_CENTER_Y
return int(x), int(y)
def scatter_inside(x, y, beta=0.15):
"""
随机内部扩散
:param x: 原x
:param y: 原y
:param beta: 强度
:return: 新坐标
"""
ratio_x = - beta * log(random.random())
ratio_y = - beta * log(random.random())
dx = ratio_x * (x - CANVAS_CENTER_X)
dy = ratio_y * (y - CANVAS_CENTER_Y)
return x - dx, y - dy
def shrink(x, y, ratio):
"""
抖动
:param x: 原x
:param y: 原y
:param ratio: 比例
:return: 新坐标
"""
force = -1 / (((x - CANVAS_CENTER_X) ** 2 + (y - CANVAS_CENTER_Y) ** 2) ** 0.6) # 这个参数...
dx = ratio * force * (x - CANVAS_CENTER_X)
dy = ratio * force * (y - CANVAS_CENTER_Y)
return x - dx, y - dy
def curve(p):
"""
自定义曲线函数,调整跳动周期
:param p: 参数
:return: 正弦
"""
# 可以尝试换其他的动态函数,达到更有力量的效果(贝塞尔?)
return 2 * (2 * sin(4 * p)) / (2 * pi)
class Heart:
"""
爱心类
"""
def __init__(self, generate_frame=20):
self._points = set() # 原始爱心坐标集合
self._edge_diffusion_points = set() # 边缘扩散效果点坐标集合
self._center_diffusion_points = set() # 中心扩散效果点坐标集合
self.all_points = {} # 每帧动态点坐标
self.build(2000)
self.random_halo = 1000
self.generate_frame = generate_frame
for frame in range(generate_frame):
self.calc(frame)
def build(self, number):
# 爱心
for _ in range(number):
t = random.uniform(0, 2 * pi) # 随机不到的地方造成爱心有缺口
x, y = heart_function(t)
self._points.add((x, y))
# 爱心内扩散
for _x, _y in list(self._points):
for _ in range(3):
x, y = scatter_inside(_x, _y, 0.05)
self._edge_diffusion_points.add((x, y))
# 爱心内再次扩散
point_list = list(self._points)
for _ in range(4000):
x, y = random.choice(point_list)
x, y = scatter_inside(x, y, 0.17)
self._center_diffusion_points.add((x, y))
@staticmethod
def calc_position(x, y, ratio):
# 调整缩放比例
force = 1 / (((x - CANVAS_CENTER_X) ** 2 + (y - CANVAS_CENTER_Y) ** 2) ** 0.520) # 魔法参数
dx = ratio * force * (x - CANVAS_CENTER_X) + random.randint(-1, 1)
dy = ratio * force * (y - CANVAS_CENTER_Y) + random.randint(-1, 1)
return x - dx, y - dy
def calc(self, generate_frame):
ratio = 10 * curve(generate_frame / 10 * pi) # 圆滑的周期的缩放比例
halo_radius = int(4 + 6 * (1 + curve(generate_frame / 10 * pi)))
halo_number = int(3000 + 4000 * abs(curve(generate_frame / 10 * pi) ** 2))
all_points = []
# 光环
heart_halo_point = set() # 光环的点坐标集合
for _ in range(halo_number):
t = random.uniform(0, 2 * pi) # 随机不到的地方造成爱心有缺口
x, y = heart_function(t, shrink_ratio=11.6) # 魔法参数
x, y = shrink(x, y, halo_radius)
if (x, y) not in heart_halo_point:
# 处理新的点
heart_halo_point.add((x, y))
x += random.randint(-14, 14)
y += random.randint(-14, 14)
size = random.choice((1, 2, 2))
all_points.append((x, y, size))
# 轮廓
for x, y in self._points:
x, y = self.calc_position(x, y, ratio)
size = random.randint(1, 3)
all_points.append((x, y, size))
# 内容
for x, y in self._edge_diffusion_points:
x, y = self.calc_position(x, y, ratio)
size = random.randint(1, 2)
all_points.append((x, y, size))
for x, y in self._center_diffusion_points:
x, y = self.calc_position(x, y, ratio)
size = random.randint(1, 2)
all_points.append((x, y, size))
self.all_points[generate_frame] = all_points
def render(self, render_canvas, render_frame):
for x, y, size in self.all_points[render_frame % self.generate_frame]:
render_canvas.create_rectangle(x, y, x + size, y + size, width=0, fill=HEART_COLOR)
def draw(main: Tk, render_canvas: Canvas, render_heart: Heart, render_frame=0):
render_canvas.delete('all')
render_heart.render(render_canvas, render_frame)
main.after(160, draw, main, render_canvas, render_heart, render_frame + 1)
if __name__ == '__main__':
root = Tk() # 一个Tk
canvas = Canvas(root, bg='black', height=CANVAS_HEIGHT, width=CANVAS_WIDTH)
canvas.pack()
heart = Heart() # 心
draw(root, canvas, heart) # 开始画画~
root.mainloop()

112
utils/os_utils.py Normal file
View File

@ -0,0 +1,112 @@
import os
import shutil
from fastapi import UploadFile
from PIL import Image
def file_path(*path):
"""
拼接返回文件路径
:param path:
:return:
"""
return_path = os.path.join(*path)
return return_path
def create_folder(*path):
"""根据路径创建文件夹"""
folder_path = os.path.join(*path)
try:
os.makedirs(folder_path, exist_ok=True)
except Exception as e:
print(f"创建文件夹时错误: {e}")
return folder_path
def save_images(*path, file: UploadFile):
"""
保存上传的图片
:param path: 路径
:param file: 文件
:return:
"""
save_path = os.path.join(*path, file.filename)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
with open(save_path, "wb") as f:
for line in file.file:
f.write(line)
return save_path
def create_thumbnail(input_image_path, out_image_path, size=(116, 70)):
"""
给图片生成缩略图
:param input_image_path:
:param out_image_path:
:param size: 缩略的尺寸
:return:
"""
with Image.open(input_image_path) as image:
# 使用thumbnail方法生成缩略图参数size指定缩略图的最大尺寸
# 注意thumbnail方法会保持图片的宽高比不变
image.thumbnail(size)
os.makedirs(os.path.dirname(out_image_path), exist_ok=True)
# 保存生成的缩略图
image.save(out_image_path, 'JPEG')
def copy_and_rename_file(src_file_path, dst_dir, new_name):
"""
复制文件到指定目录并重命名,保持文件的后缀不变。
:param src_file_path: 源文件路径
:param dst_dir: 目标目录
:param new_name: 新文件名(不含扩展名)
"""
# 获取文件的完整文件名(包括扩展名)
base_name = os.path.basename(src_file_path)
# 分离文件名和扩展名
file_name, file_extension = os.path.splitext(base_name)
# 构建新的文件名和目标路径
new_file_name = f"{new_name}{file_extension}"
dst_file_path = os.path.join(dst_dir, new_file_name)
# 确保目标目录存在
os.makedirs(dst_dir, exist_ok=True)
# 复制文件到目标位置并重命名
shutil.copy(src_file_path, dst_file_path)
def delete_file_if_exists(*file_paths: str):
"""
删除文件
:param file_path:
:return:
"""
for path in file_paths:
if os.path.exists(path): # 检查文件是否存在
os.remove(path) # 删除文件
def delete_paths(paths):
"""
删除给定路径数组中的每个路径及其包含的所有内容。
:param paths: 文件或目录路径的列表
"""
for path in paths:
if os.path.exists(path):
try:
if os.path.isfile(path) or os.path.islink(path):
# 如果是文件或符号链接,则删除
os.remove(path)
print(f"Deleted file: {path}")
elif os.path.isdir(path):
# 如果是目录,则递归删除
shutil.rmtree(path)
except Exception as e:
print(f"路径删除失败 {path}: {e}")
else:
print(f"路径不存在: {path}")

40
utils/ppt_to_pdf.py Normal file
View File

@ -0,0 +1,40 @@
import os
from win32com.client import gencache
import comtypes.client
from core.logger import logger
def ppt_to_pdf_1(ppt_path: str, pdf_path: str):
"""
ppt 转 pdf会弹出 office 软件
:param ppt_path:
:param pdf_path:
:return:
"""
# 创建PDF
powerpoint = comtypes.client.CreateObject("Powerpoint.Application")
powerpoint.Visible = 1
slide = powerpoint.Presentations.Open(ppt_path)
# 保存PDF
slide.SaveAs(pdf_path, 32)
slide.Close()
# 退出 office 软件
powerpoint.Quit()
def ppt_to_pdf_2(ppt_path: str, pdf_path: str):
"""
完美办法PPT 转 PDF
:param ppt_path:
:param pdf_path:
:return:
"""
p = gencache.EnsureDispatch("PowerPoint.Application")
try:
ppt = p.Presentations.Open(ppt_path, False, False, False)
ppt.ExportAsFixedFormat(pdf_path, 2, PrintRange=None)
ppt.Close()
p.Quit()
except Exception as e:
print(os.path.split(ppt_path)[1], "转化失败,失败原因%s" % e)
logger.info(os.path.split(ppt_path)[1], "转化失败,失败原因%s" % e)

77
utils/ps_util.py Normal file
View File

@ -0,0 +1,77 @@
import psutil
import platform
import json
from datetime import datetime
def get_server_info():
# 获取服务器运行状态
info = {}
# 1. 系统基本信息
info["system"] = {
"os": platform.system(),
"os_version": platform.version(),
"node": platform.node(),
"machine": platform.machine(),
"processor": platform.processor(),# 处理器型号
"boot_time": datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S") # 启动时间
}
# 2. CPU 信息
cpu_usage = psutil.cpu_percent(interval=1)
info["cpu"] = {
"cpu_count": psutil.cpu_count(logical=False),
"cpu_count_logical": psutil.cpu_count(logical=True),
"cpu_usage": cpu_usage,
"cpu_percent": psutil.cpu_percent(interval=1, percpu=True)
}
# 3. 内存信息
mem = psutil.virtual_memory()
info["memory"] = {
"total": round(mem.total / (1024**3), 2),
"available": round(mem.available / (1024**3), 2),
"percent": mem.percent
}
# 4. 磁盘信息
disks = []
for partition in psutil.disk_partitions():
usage = psutil.disk_usage(partition.mountpoint)
disks.append({
"device": partition.device,
"mountpoint": partition.mountpoint,
"fstype": partition.fstype,
"total": round(usage.total / (1024**3), 2),
"used": round(usage.used / (1024**3), 2),
"percent": usage.percent
})
info["disks"] = disks
# 5. 网络信息
net = psutil.net_io_counters()
info["network"] = {
"bytes_sent": round(net.bytes_sent / (1024**2), 2),
"bytes_recv": round(net.bytes_recv / (1024**2), 2)
}
# 6. 进程信息示例前5个高CPU进程
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
if len(processes) >= 5:
break
if proc.info['cpu_percent'] > 0:
processes.append({
"PID": proc.info['pid'],
"name": proc.info['name'],
"cpu_percent": proc.info['cpu_percent']
})
info["top_processes"] = sorted(processes, key=lambda x: x["cpu_percent"], reverse=True)
return info
def get_server_json():
server_info = get_server_info()
return json.dumps(server_info, indent=2, ensure_ascii=False)

12
utils/random_utils.py Normal file
View File

@ -0,0 +1,12 @@
import random
import string
def random_str(length=10):
"""随机生成自定义长度的小写字母"""
letters = string.ascii_lowercase
# 使用 random.choices 从 letters 中随机选择 length 个字母,返回一个列表
random_letters = random.choices(letters, k=length)
# 将列表中的字母连接成一个字符串
return ''.join(random_letters)

32
utils/response.py Normal file
View File

@ -0,0 +1,32 @@
# 依赖安装pip install orjson
from fastapi.responses import ORJSONResponse as Response
from fastapi import status as http_status
from utils import status as http
class SuccessResponse(Response):
"""
成功响应
"""
def __init__(self, data=None, msg="success", code=http.HTTP_SUCCESS, status=http_status.HTTP_200_OK, **kwargs):
self.data = {
"code": code,
"message": msg,
"data": data
}
self.data.update(kwargs)
super().__init__(content=self.data, status_code=status)
class ErrorResponse(Response):
"""
失败响应
"""
def __init__(self, msg=None, code=http.HTTP_ERROR, status=http_status.HTTP_200_OK, **kwargs):
self.data = {
"code": code,
"message": msg,
"data": []
}
self.data.update(kwargs)
super().__init__(content=self.data, status_code=status)

93
utils/send_email.py Normal file
View File

@ -0,0 +1,93 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2023/3/27 9:48
# @File : send_email.py
# @IDE : PyCharm
# @desc : 发送邮件封装类
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from typing import List
from redis.asyncio import Redis
from core.exception import CustomException
from utils.cache import Cache
class EmailSender:
def __init__(self, rd: Redis):
self.email = None
self.password = None
self.smtp_server = None
self.smtp_port = None
self.server = None
self.rd = rd
async def __get_settings(self, retry: int = 3):
"""
获取配置信息
"""
web_email = await Cache(self.rd).get_tab_name("web_email", retry)
self.email = web_email.get("email_access")
self.password = web_email.get("email_password")
self.smtp_server = web_email.get("email_server")
self.smtp_port = int(web_email.get("email_port"))
self.server = smtplib.SMTP(self.smtp_server, self.smtp_port)
self.server.starttls()
try:
self.server.login(self.email, self.password)
except smtplib.SMTPAuthenticationError:
raise CustomException("邮件发送失败,邮箱服务器认证失败!")
except AttributeError:
raise CustomException("邮件发送失败,邮箱服务器认证失败!")
async def send_email(self, to_emails: List[str], subject: str, body: str, attachments: List[str] = None):
"""
发送邮件
:param to_emails: 收件人,一个或多个
:param subject: 主题
:param body: 内容
:param attachments: 附件
"""
await self.__get_settings()
message = MIMEMultipart()
message['From'] = self.email
message['To'] = ', '.join(to_emails)
message['Subject'] = subject
body = MIMEText(body)
message.attach(body)
if attachments:
for attachment in attachments:
with open(attachment, 'rb') as f:
file_data = f.read()
filename = attachment.split('/')[-1]
attachment = MIMEApplication(file_data, Name=filename)
attachment['Content-Disposition'] = f'attachment; filename="{filename}"'
message.attach(attachment)
try:
result = self.server.sendmail(self.email, to_emails, message.as_string())
self.server.quit()
print("邮件发送结果", result)
if result:
return False
else:
return True
except smtplib.SMTPException as e:
self.server.quit()
print('邮件发送失败!错误信息:', e)
return False
# if __name__ == '__main__':
# sender = EmailSender()
# to_emails = ['ktianc2001@163.com', '2445667550@qq.com']
# subject = 'Test email'
# body = 'This is a test email'
# sender.send_email(to_emails, subject, body)

7
utils/sms/__init__.py Normal file
View File

@ -0,0 +1,7 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2023/6/14 15:26
# @File : __init__.py
# @IDE : PyCharm
# @desc : 简要说明

245
utils/sms/aliyun.py Normal file
View File

@ -0,0 +1,245 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2023/6/14 15:26
# @File : aliyun.py
# @IDE : PyCharm
# @desc : 最新版阿里云短信服务发送程序Python版本2022-11-08
"""
短信 API 官方文档https://help.aliyun.com/document_detail/419298.html?spm=5176.25163407.help.dexternal.6ff2bb6ercg9x3
发送短信 官方文档https://help.aliyun.com/document_detail/419273.htm?spm=a2c4g.11186623.0.0.36855d7aRBSwtP
Python SDK 官方文档https://help.aliyun.com/document_detail/215764.html?spm=a2c4g.11186623.0.0.6a0c4198XsBJNW
环境要求
Python 3
安装 SDK 核心库 OpenAPI 使用pip安装包依赖:
pip install alibabacloud_tea_openapi
pip install alibabacloud_dysmsapi20170525
"""
import random
import re
from typing import List
from core.exception import CustomException
from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models
from alibabacloud_tea_util import models as util_models
from core.logger import logger
import datetime
from redis.asyncio.client import Redis
from utils.cache import Cache
from utils.db_getter import DBGetter
class AliyunSMS(DBGetter):
# 返回错误码对应:
doc = "https://help.aliyun.com/document_detail/101346.html"
def __init__(self, telephones: List[str], rd: Redis = None):
super().__init__()
self.check_telephones_format(telephones)
self.telephones = telephones
self.rd = rd
self.sign_conf = None # 数据库中 sms_sign_name_* 的配置
self.template_code_conf = None # 数据库中 sms_template_code_* 的配置
# 以上两个配置项的好处在于可以灵活配置短信信息,不需要改代码
async def main_async(self, **kwargs) -> List[bool]:
"""
主程序入口,异步方式
redis 对象必填
"""
result = []
await self._get_settings_async()
for telephone in self.telephones:
result.append(await self._send_async(telephone, **kwargs))
return result
async def _send_async(self, telephone: str, **kwargs) -> bool:
"""
发送短信
"""
client = self.create_client(self.access_key, self.access_key_secret)
send_sms_request = dysmsapi_20170525_models.SendSmsRequest(
phone_numbers=telephone,
sign_name=self.sign_name,
template_code=self.template_code,
template_param=self._get_template_param(**kwargs)
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
resp = await client.send_sms_with_options_async(send_sms_request, runtime)
return self._validation(telephone, resp)
except Exception as e:
print(e.__str__())
return False
async def _get_settings_async(self, retry: int = 3):
"""
获取配置信息
"""
if not self.rd:
raise ValueError("缺少 redis 对象参数!")
elif not self.sign_conf or not self.template_code_conf:
raise ValueError("缺少短信签名信息和短信模板ID")
aliyun_sms = await Cache(self.rd).get_tab_name("aliyun_sms", retry)
self.access_key = aliyun_sms.get("sms_access_key")
self.access_key_secret = aliyun_sms.get("sms_access_key_secret")
self.send_interval = int(aliyun_sms.get("sms_send_interval"))
self.valid_time = int(aliyun_sms.get("sms_valid_time"))
self.sign_name = aliyun_sms.get(self.sign_conf)
self.template_code = aliyun_sms.get(self.template_code_conf)
def main(self, **kwargs) -> List[bool]:
"""
主程序入口,同步方式
"""
result = []
self._get_settings()
for telephone in self.telephones:
result.append(self._send(telephone, **kwargs))
return result
def _send(self, telephone: str, **kwargs) -> bool:
"""
同步方式发送短信
"""
client = self.create_client(self.access_key, self.access_key_secret)
send_sms_request = dysmsapi_20170525_models.SendSmsRequest(
phone_numbers=telephone,
sign_name=self.sign_name,
template_code=self.template_code,
template_param=self._get_template_param(**kwargs)
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
resp = client.send_sms_with_options(send_sms_request, runtime)
return self._validation(telephone, resp)
except Exception as e:
print(e.__str__())
return False
def _get_settings(self):
"""
同步方式获取配置信息
"""
if not self.sign_conf or not self.template_code_conf:
raise ValueError("缺少短信签名信息和短信模板ID")
self.conn_mysql()
sql = f"""
SELECT
config_value
FROM
`vadmin_system_settings`
WHERE
config_key IN (
'sms_access_key',
'sms_access_key_secret',
'sms_send_interval',
'sms_valid_time',
'{self.sign_conf}',
'{self.template_code_conf}'
)
"""
self.mysql_cursor.execute(sql)
result = self.mysql_cursor.fetchall()
self.close_mysql()
self.access_key = result[0][0]
self.access_key_secret = result[1][0]
self.send_interval = result[2][0]
self.valid_time = result[3][0]
self.sign_name = result[4][0]
self.template_code = result[5][0]
def _get_template_param(self, **kwargs) -> str:
"""
获取模板参数
可以被子类继承的受保护的私有方法
"""
raise NotImplementedError("该方法应该被重写!")
def _validation(self, telephone: str, resp: dysmsapi_20170525_models.SendSmsResponse) -> bool:
"""
验证结果并返回
"""
send_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if resp.body.code == "OK":
logger.info(f'{send_time} {telephone} 短信发送成功返回code{resp.body.code}')
return True
else:
logger.error(f'{send_time} {telephone} 短信发送失败返回code{resp.body.code},请参考文档:{self.doc}')
return False
@staticmethod
def get_code(length: int = 6, blend: bool = False) -> str:
"""
随机获取短信验证码
短信验证码只支持数字,不支持字母及其他符号
:param length: 验证码长度
:param blend: 是否 字母+数字 混合
"""
code = "" # 创建字符串变量,存储生成的验证码
for i in range(length): # 通过for循环控制验证码位数
num = random.randint(0, 9) # 生成随机数字0-9
if blend: # 需要字母验证码,不用传参,如果不需要字母的,关键字alpha=False
upper_alpha = chr(random.randint(65, 90))
lower_alpha = chr(random.randint(97, 122))
# 随机选择其中一位
num = random.choice([num, upper_alpha, lower_alpha])
code = code + str(num)
return code
@classmethod
def check_telephones_format(cls, telephones: List[str]) -> bool:
"""
同时检查多个手机号格式是否合法
不合法就会抛出异常
"""
for telephone in telephones:
cls.check_telephone_format(telephone)
return True
@staticmethod
def check_telephone_format(telephone: str) -> bool:
"""
检查手机号格式是否合法
不合法就会抛出异常
"""
REGEX_TELEPHONE = r'^1(3\d|4[4-9]|5[0-35-9]|6[67]|7[013-8]|8[0-9]|9[0-9])\d{8}$'
if not telephone:
raise CustomException(msg=f"手机号码({telephone})不能为空", code=400)
elif not re.match(REGEX_TELEPHONE, telephone):
raise CustomException(msg=f"手机号码({telephone})格式不正确", code=400)
return True
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> Dysmsapi20170525Client:
"""
使用AK&SK初始化账号Client
:param access_key_id:
:param access_key_secret:
:return: Client
:throws Exception
"""
config = open_api_models.Config(
# 您的 AccessKey ID,
access_key_id=access_key_id,
# 您的 AccessKey Secret,
access_key_secret=access_key_secret
)
# 访问的域名
config.endpoint = f'dysmsapi.aliyuncs.com'
return Dysmsapi20170525Client(config)

69
utils/sms/code.py Normal file
View File

@ -0,0 +1,69 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2023/6/14 15:55
# @File : code.py
# @IDE : PyCharm
# @desc : 发送验证码短信
import datetime
import warnings
from redis.asyncio import Redis
from .aliyun import AliyunSMS
from core.logger import logger
from core.exception import CustomException
class CodeSMS(AliyunSMS):
def __init__(self, telephone: str, rd: Redis):
super().__init__([telephone], rd)
self.telephone = telephone
self.sign_conf = "sms_sign_name_1"
self.template_code_conf = "sms_template_code_1"
async def main_async(self) -> bool:
"""
主程序入口,异步方式
redis 对象必填
"""
send_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if await self.rd.get(self.telephone + "_flag_"):
logger.error(f'{send_time} {self.telephone} 短信发送失败,短信发送过于频繁')
raise CustomException(msg="短信发送频繁", code=400)
await self._get_settings_async()
result = await self._send_async(self.telephone)
if result:
await self.rd.set(self.telephone, self.code, self.valid_time)
await self.rd.set(self.telephone + "_flag_", self.code, self.send_interval)
return result
async def main(self) -> None:
"""
主程序入口,同步方式
"""
warnings.warn("此方法已废弃,如需要请重写该方法", DeprecationWarning)
async def check_sms_code(self, code: str) -> bool:
"""
检查短信验证码是否正确
"""
if code and code == await self.rd.get(self.telephone):
await self.rd.delete(self.telephone)
await self.rd.delete(self.telephone + "_flag_")
return True
return False
def _get_template_param(self, **kwargs) -> str:
"""
获取模板参数
可以被子类继承的受保护的私有方法
"""
self.code = kwargs.get("code", self.get_code())
template_param = '{"code":"%s"}' % self.code
return template_param

47
utils/sms/reset_passwd.py Normal file
View File

@ -0,0 +1,47 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2023/6/14 16:58
# @File : reset_passwd.py
# @IDE : PyCharm
# @desc : 重置密码
from typing import List
from redis.asyncio import Redis
from .aliyun import AliyunSMS
class ResetPasswordSMS(AliyunSMS):
def __init__(self, telephones: List[str], rd: Redis = None):
super().__init__(telephones, rd)
self.sign_conf = "sms_sign_name_2"
self.template_code_conf = "sms_template_code_2"
async def main_async(self, password: str) -> List[bool]:
"""
主程序入口,异步方式
redis 对象必填
:param password: 新密码
"""
return await super().main_async(password=password)
def main(self, password: str) -> List[bool]:
"""
主程序入口,同步方式
:param password: 新密码
"""
return super().main(password=password)
def _get_template_param(self, **kwargs) -> str:
"""
获取模板参数
可以被子类继承的受保护的私有方法
"""
password = kwargs.get("password")
template_param = '{"password":"%s"}' % password
return template_param

84
utils/socket_client.py Normal file
View File

@ -0,0 +1,84 @@
import json
import socket
class SocketClient:
"""
socket 客户端操作
"""
def __init__(self, host: str = "127.0.0.1", port: int = 3636, send_type: str = "tcp"):
"""
:param host: socket server 地址
:param port: socket server 端口
:param send_type: 通信协议
"""
self.send_type = send_type
if self.send_type == "tcp":
socket_type = socket.SOCK_STREAM
elif self.send_type == "udp":
socket_type = socket.SOCK_DGRAM
else:
print("不支持通信协议")
raise ValueError("不支持的通信协议")
self.client_socket = socket.socket(socket.AF_INET, socket_type)
self.host = host
self.port = port
if self.send_type == "tcp":
self.tcp_connect()
def tcp_connect(self):
"""
TCP 连接服务端
:return:
"""
self.client_socket.connect((self.host, self.port))
print("tcp 连接成功")
def udp_send_message(self, message: str):
"""
UDP 发送消息
:param message:
:return:
"""
self.client_socket.sendto(message.encode('utf-8'), (self.host, self.port))
print("udp 消息发送成功:", message)
def tcp_send_message(self, message: str):
"""
TCP 发送消息
:param message:
:return:
"""
self.client_socket.sendall(message.encode('utf-8'))
print("tcp 消息发送成功:", message)
def send_message(self, message: str):
"""
TCP 发送消息
:param message:
:return:
"""
if self.send_type == "tcp":
self.tcp_send_message(message)
elif self.send_type == "udp":
self.udp_send_message(message)
else:
print("不支持协议")
raise ValueError("不支持的协议")
def close(self):
"""
关闭 socket 连接
:return:
"""
self.client_socket.close()
if __name__ == '__main__':
_host = "127.0.0.1"
_port = 3636
SC = SocketClient()
SC.tcp_send_message(json.dumps({"label": "ceshi", "value": 1}))
SC.close()

14
utils/status.py Normal file
View File

@ -0,0 +1,14 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/8/10 22:20
# @File : status.py
# @IDE : PyCharm
# @desc : 简要说明
HTTP_SUCCESS = 200
HTTP_ERROR = 400
HTTP_401_UNAUTHORIZED = 401
HTTP_403_FORBIDDEN = 403
HTTP_404_NOT_FOUND = 404

103
utils/tools.py Normal file
View File

@ -0,0 +1,103 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/10/9 17:09
# @File : tools.py
# @IDE : PyCharm
# @desc : 工具类
import datetime
import random
import re
import string
from typing import List, Union
import importlib
from core.logger import logger
def test_password(password: str) -> Union[str, bool]:
"""
检测密码强度
"""
if len(password) < 8 or len(password) > 16:
return '长度需为8-16个字符,请重新输入。'
else:
for i in password:
if 0x4e00 <= ord(i) <= 0x9fa5 or ord(i) == 0x20: # Ox4e00等十六进制数分别为中文字符和空格的Unicode编码
return '不能使用空格、中文,请重新输入。'
else:
key = 0
key += 1 if bool(re.search(r'\d', password)) else 0
key += 1 if bool(re.search(r'[A-Za-z]', password)) else 0
key += 1 if bool(re.search(r"\W", password)) else 0
if key >= 2:
return True
else:
return '至少含数字/字母/字符2种组合请重新输入。'
def list_dict_find(options: List[dict], key: str, value: any) -> Union[dict, None]:
"""
字典列表查找
"""
return next((item for item in options if item.get(key) == value), None)
def get_time_interval(start_time: str, end_time: str, interval: int, time_format: str = "%H:%M:%S") -> List:
"""
获取时间间隔
:param end_time: 结束时间
:param start_time: 开始时间
:param interval: 间隔时间(分)
:param time_format: 字符串格式化,默认:%H:%M:%S
"""
if start_time.count(":") == 1:
start_time = f"{start_time}:00"
if end_time.count(":") == 1:
end_time = f"{end_time}:00"
start_time = datetime.datetime.strptime(start_time, "%H:%M:%S")
end_time = datetime.datetime.strptime(end_time, "%H:%M:%S")
time_range = []
while end_time > start_time:
time_range.append(start_time.strftime(time_format))
start_time = start_time + datetime.timedelta(minutes=interval)
return time_range
def generate_string(length: int = 8) -> str:
"""
生成随机字符串
:param length: 字符串长度
"""
return ''.join(random.sample(string.ascii_letters + string.digits, length))
def import_modules(modules: list, desc: str, **kwargs):
for module in modules:
if not module:
continue
try:
# 动态导入模块
module_pag = importlib.import_module(module[0:module.rindex(".")])
getattr(module_pag, module[module.rindex(".") + 1:])(**kwargs)
except ModuleNotFoundError:
logger.error(f"AttributeError导入{desc}失败,未找到该模块:{module}")
except AttributeError:
logger.error(f"ModuleNotFoundError导入{desc}失败,未找到该模块下的方法:{module}")
async def import_modules_async(modules: list, desc: str, **kwargs):
for module in modules:
if not module:
continue
try:
# 动态导入模块
module_pag = importlib.import_module(module[0:module.rindex(".")])
await getattr(module_pag, module[module.rindex(".") + 1:])(**kwargs)
# except TimeoutError:
# logger.error(f"asyncio.exceptions.TimeoutError连接Mysql数据库超时")
# print(f"asyncio.exceptions.TimeoutError连接Mysql数据库超时")
except ModuleNotFoundError:
logger.error(f"AttributeError导入{desc}失败,未找到该模块:{module}")
except AttributeError:
logger.error(f"ModuleNotFoundError导入{desc}失败,未找到该模块下的方法:{module}")

7
utils/wx/__init__.py Normal file
View File

@ -0,0 +1,7 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/3/15 20:18
# @File : __init__.py
# @IDE : PyCharm
# @desc : 简要说明

136
utils/wx/oauth.py Normal file
View File

@ -0,0 +1,136 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2022/3/15 20:44
# @File : oauth.py
# @IDE : PyCharm
# @desc : 简要说明
import requests
from core.logger import logger
from utils.cache import Cache
from utils.wx.wx_access_token import WxAccessToken
from redis.asyncio import Redis
class WXOAuth:
def __init__(self, rd: Redis, index: int = 0):
"""
初始化微信认证
:param index: 选择小程序0微信服务端
"""
# 重试次数
self.retry_count = 5
self.appid = None
self.secret = None
self.rd = rd
self.tab_name = None
if index == 0:
self.tab_name = "wx_server"
async def __get_settings(self, retry: int = 3):
"""
获取配置信息
"""
if not self.tab_name:
logger.error(f"请选择认证的微信平台")
wx_config = await Cache(self.rd).get_tab_name(self.tab_name, retry)
self.appid = wx_config.get("wx_server_app_id")
self.secret = wx_config.get("wx_server_app_secret")
async def get_code2session(self, code: str) -> dict:
"""
通过微信用户临时登录凭证 code 进行校验获取用户openid与 session 信息
官方文档https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/login/auth.code2Session.html
:param code: 登录时获取的 code
:return: 正确:{'session_key': 'F8/5LZrdtINYLPEdUJgXXQ==', 'openid': 'okLlC5Kcv7DH2J99dz-Z2FwJeEeU'}
:return: 报错:{'errcode': 40029, 'errmsg': 'invalid code, rid: 62308e5d-0b0b697e-1db652eb'}
"""
if not self.appid or not self.secret:
await self.__get_settings()
api = "https://api.weixin.qq.com/sns/jscode2session"
params = {
"appid": self.appid,
"secret": self.secret,
"js_code": code,
"grant_type": "authorization_code"
}
response = requests.get(url=api, params=params)
result = response.json()
if "openid" not in result:
logger.error(f"微信校验失败:{result}, code{code}")
else:
logger.info(f"微信校验成功:{result}, code{code}")
return result
async def get_phone_number(self, code: str):
"""
获取微信用户手机号
官方文档https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/phonenumber/phonenumber.getPhoneNumber.html
:param code: 动态令牌。可通过动态令牌换取用户手机号。
:return: 成功:{'errcode': 0, 'errmsg': 'ok', 'phone_info': {'phoneNumber': '15093430559'
, 'purePhoneNumber': '15093430559', 'countryCode': '86', 'watermark': {'timestamp': 1647355468, 'appid': 'wx069c452f9a733df1'}}}
失败:{'errcode': 40001, 'errmsg': 'invalid credential, access_token is invalid or not latest rid: 62690257-2894b530-58c6fcf3'}
"""
if not self.appid or not self.secret:
await self.__get_settings()
api = "https://api.weixin.qq.com/wxa/business/getuserphonenumber"
at = WxAccessToken(self.appid, self.secret, self.rd)
access_token = await at.get()
if not access_token.get("status", False):
result = {'errcode': 40001, 'errmsg': '获取微信令牌失败'}
# print(result)
logger.error(f"获取微信用户手机号失败:{result}")
return result
params = {
"access_token": access_token.get("token"),
}
data = {
"code": code,
}
response = requests.post(url=api, params=params, json=data)
result = response.json()
if result.get("errcode", 0) == 0:
# print("获取微信用户手机号成功", result)
logger.info(f"获取微信用户手机号成功:{result}, code{code}")
else:
# print("获取微信用户手机号失败", result)
logger.error(f"获取微信用户手机号失败:{result}, code{code}")
if result.get("errcode", 0) == 40001:
await at.update()
if self.retry_count > 0:
logger.error(f"重试获取微信手机号,重试剩余次数, {self.retry_count}")
self.retry_count -= 1
return await self.get_phone_number(code)
return result
async def parsing_phone_number(self, code: str):
"""
解析微信用户手机号
:param code: 动态令牌。可通过动态令牌换取用户手机号。
:return:
"""
result = await self.get_phone_number(code)
if result.get("errcode") == 0:
phone_info = result["phone_info"]
assert isinstance(phone_info, dict)
return phone_info["phoneNumber"]
return None
async def parsing_openid(self, code: str):
"""
解析openid
:param code: 动态令牌。可通过动态令牌换取用户手机号。
:return: openid | None
"""
result = await self.get_code2session(code)
if "openid" in result:
return result["openid"]
return None
# if __name__ == '__main__':
# WXOAuth().get_code2session("063mPDFa1v16PC0yqhGa1uQ86t4mPDFV")

View File

@ -0,0 +1,61 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Create Time : 2021/11/27 18:37
# @File : wx_access_token.py
# @IDE : PyCharm
# @desc : 获取小程序全局唯一后台接口调用凭据
import requests
from redis.asyncio import Redis
from core.logger import logger
class WxAccessToken:
"""
获取到的access_token存储在redis数据库中
获取小程序全局唯一后台接口调用凭据access_token。调用绝大多数后台接口时都需使用 access_token开发者需要进行妥善保存。
官方文档https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/access-token/auth.getAccessToken.html
"""
def __init__(self, appid: str, secret: str, redis: Redis, grant_type: str = "client_credential", *args, **kwargs):
self.__url = "https://api.weixin.qq.com/cgi-bin/token"
self.__method = "get"
self.appidKey = f"{appid}_access_token"
self.redis = redis
self.params = {
"appid": appid,
"secret": secret,
"grant_type": grant_type
}
async def get(self) -> dict:
"""
获取小程序access_token
"""
token = await self.redis.get(self.appidKey)
if not token:
return await self.update()
return {"status": True, "token": token}
async def update(self) -> dict:
"""
更新小程序access_token
"""
print("开始更新 access_token")
method = getattr(requests, self.__method)
response = method(url=self.__url, params=self.params)
result = response.json()
if result.get("errcode", "0") != "0":
print("获取access_token失败", result)
logger.error(f"获取access_token失败{result}")
return {"status": False, "token": None}
print("成功获取到", result)
await self.redis.set(self.appidKey, result.get("access_token"), ex=2000)
logger.info(f"获取access_token成功{result}")
return {"status": True, "token": result.get("access_token")}