169 lines
5.1 KiB
Python
169 lines
5.1 KiB
Python
import os
|
||
import shutil
|
||
import zipfile
|
||
import tempfile
|
||
from PIL import Image
|
||
from fastapi import UploadFile
|
||
|
||
img_extensions = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'webp'}
|
||
|
||
video_extensions = {'avi', 'wmv', 'rmvb', 'mp4', 'm4v', 'avi'}
|
||
|
||
|
||
def file_path(*path):
|
||
"""
|
||
拼接返回文件路径
|
||
:param path:
|
||
:return:
|
||
"""
|
||
return_path = os.path.join(*path)
|
||
return return_path
|
||
|
||
|
||
def create_folder(*path):
|
||
"""根据路径创建文件夹"""
|
||
folder_path = os.path.join(*path)
|
||
try:
|
||
os.makedirs(folder_path, exist_ok=True)
|
||
except Exception as e:
|
||
print(f"创建文件夹时错误: {e}")
|
||
return folder_path
|
||
|
||
|
||
def save_images(*path, file: UploadFile):
|
||
"""
|
||
保存上传的图片
|
||
:param path: 路径
|
||
:param file: 文件
|
||
:return:
|
||
"""
|
||
save_path = os.path.join(*path, file.filename)
|
||
|
||
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
||
with open(save_path, "wb") as f:
|
||
for line in file.file:
|
||
f.write(line)
|
||
return save_path
|
||
|
||
|
||
def create_thumbnail(input_image_path, out_image_path, size=(116, 70)):
|
||
"""
|
||
给图片生成缩略图
|
||
:param input_image_path:
|
||
:param out_image_path:
|
||
:param size: 缩略的尺寸
|
||
:return:
|
||
"""
|
||
with Image.open(input_image_path) as image:
|
||
# 使用thumbnail方法生成缩略图,参数size指定缩略图的最大尺寸
|
||
# 注意:thumbnail方法会保持图片的宽高比不变
|
||
image.thumbnail(size)
|
||
os.makedirs(os.path.dirname(out_image_path), exist_ok=True)
|
||
# 保存生成的缩略图
|
||
image.save(out_image_path, 'JPEG')
|
||
|
||
|
||
def copy_and_rename_file(src_file_path, dst_dir, new_name):
|
||
"""
|
||
复制文件到指定目录并重命名,保持文件的后缀不变。
|
||
:param src_file_path: 源文件路径
|
||
:param dst_dir: 目标目录
|
||
:param new_name: 新文件名(不含扩展名)
|
||
"""
|
||
# 获取文件的完整文件名(包括扩展名)
|
||
base_name = os.path.basename(src_file_path)
|
||
# 分离文件名和扩展名
|
||
file_name, file_extension = os.path.splitext(base_name)
|
||
|
||
# 构建新的文件名和目标路径
|
||
new_file_name = f"{new_name}{file_extension}"
|
||
dst_file_path = os.path.join(dst_dir, new_file_name)
|
||
|
||
# 确保目标目录存在
|
||
os.makedirs(dst_dir, exist_ok=True)
|
||
|
||
# 复制文件到目标位置并重命名
|
||
shutil.copy(src_file_path, dst_file_path)
|
||
|
||
|
||
def delete_file_if_exists(*file_paths: str):
|
||
"""
|
||
删除文件
|
||
:param file_paths:
|
||
:return:
|
||
"""
|
||
for path in file_paths:
|
||
if os.path.exists(path): # 检查文件是否存在
|
||
os.remove(path) # 删除文件
|
||
|
||
|
||
def delete_paths(paths):
|
||
"""
|
||
删除给定路径数组中的每个路径及其包含的所有内容。
|
||
:param paths: 文件或目录路径的列表
|
||
"""
|
||
for path in paths:
|
||
if os.path.exists(path):
|
||
try:
|
||
if os.path.isfile(path) or os.path.islink(path):
|
||
# 如果是文件或符号链接,则删除
|
||
os.remove(path)
|
||
print(f"Deleted file: {path}")
|
||
elif os.path.isdir(path):
|
||
# 如果是目录,则递归删除
|
||
shutil.rmtree(path)
|
||
except Exception as e:
|
||
print(f"路径删除失败 {path}: {e}")
|
||
else:
|
||
print(f"路径不存在: {path}")
|
||
|
||
|
||
def is_extensions(extension_type: str, file_name: str):
|
||
"""
|
||
校验文件名
|
||
"""
|
||
if extension_type == 'img':
|
||
file_extensions = img_extensions
|
||
elif extension_type == 'video':
|
||
file_extensions = video_extensions
|
||
else:
|
||
file_extensions = []
|
||
return '.' in file_name and file_name.rsplit('.', 1)[1].lower() in file_extensions
|
||
|
||
|
||
def zip_folder(folder_path: str, zip_filename: str) -> str:
|
||
"""
|
||
将指定文件夹打包成 ZIP 文件,并返回 ZIP 文件的路径。
|
||
|
||
:param folder_path: 要打包的文件夹路径
|
||
:param zip_filename: 生成的 ZIP 文件名(不带扩展名)
|
||
:return: 生成的 ZIP 文件的完整路径
|
||
"""
|
||
# 检查文件夹是否存在
|
||
if not os.path.isdir(folder_path):
|
||
raise ValueError(f"文件夹路径不存在: {folder_path}")
|
||
|
||
# 确保 ZIP 文件名以 .zip 结尾
|
||
if not zip_filename.endswith(".zip"):
|
||
zip_filename += ".zip"
|
||
|
||
# 创建临时目录用于存储 ZIP 文件
|
||
temp_dir = tempfile.mkdtemp()
|
||
zip_file_path = os.path.join(temp_dir, zip_filename)
|
||
|
||
try:
|
||
# 打包文件夹为 ZIP 文件
|
||
with zipfile.ZipFile(zip_file_path, "w", zipfile.ZIP_DEFLATED, allowZip64=True) as zip_f:
|
||
for root, dirs, files in os.walk(folder_path):
|
||
for file in files:
|
||
file_path = os.path.join(root, file)
|
||
arc_name = os.path.relpath(file_path, folder_path) # 保持相对路径
|
||
zip_f.write(file_path, arc_name)
|
||
|
||
# 返回生成的 ZIP 文件路径
|
||
return zip_file_path
|
||
except Exception as e:
|
||
# 清理临时文件夹并重新抛出异常
|
||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||
raise RuntimeError(f"打包失败: {e}")
|