1、添加了断点续传服务端,用socket建立连接通道

2、修改了动态生成存储路径的代码模块
3、修改了扫描路径
This commit is contained in:
XinYi Song 2021-12-03 17:33:02 +08:00
parent c5481ead05
commit 05df024ddc
13 changed files with 157 additions and 48 deletions

Binary file not shown.

View File

@ -33,49 +33,49 @@ def create_app():
"id": "job3", # 任务ID
"func": "util:scan_file_util.scan_GF3MDJ_dir", # 任务位置
"trigger": "interval", # 触发器
"seconds": 3 * 60 # 时间间隔
"seconds": 5 * 60 # 时间间隔
},
{
"id": "job4", # 任务ID
"func": "util:scan_file_util.scan_H08_dir", # 任务位置
"trigger": "interval", # 触发器
"seconds": 3 * 60 # 时间间隔
"seconds": 7 * 60 # 时间间隔
},
{
"id": "job5", # 任务ID
"func": "util:scan_file_util.scan_Sentinel1_dir", # 任务位置
"trigger": "interval", # 触发器
"seconds": 3 * 60 # 时间间隔
"seconds": 11 * 60 # 时间间隔
},
{
"id": "job6", # 任务ID
"func": "util:scan_file_util.scan_Sentinel2_dir", # 任务位置
"trigger": "interval", # 触发器
"seconds": 3 * 60 # 时间间隔
},
{
"id": "job7", # 任务ID
"func": "util:scan_file_util.scan_Sentinel3OL_dir", # 任务位置
"trigger": "interval", # 触发器
"seconds": 3 * 60 # 时间间隔
"seconds": 8 * 60 # 时间间隔
},
# {
# "id": "job7", # 任务ID
# "func": "util:scan_file_util.scan_Sentinel3OL_dir", # 任务位置
# "trigger": "interval", # 触发器
# "seconds": 5 * 60 # 时间间隔
# },
{
"id": "job8", # 任务ID
"func": "util:scan_file_util.scan_HJ1_dir", # 任务位置
"trigger": "interval", # 触发器
"seconds": 3 * 60 # 时间间隔
"seconds": 5 * 60 # 时间间隔
},
{
"id": "job9", # 任务ID
"func": "util:scan_file_util.scan_ZY3_dir", # 任务位置
"trigger": "interval", # 触发器
"seconds": 3 * 60 # 时间间隔
"seconds": 5 * 60 # 时间间隔
},
{
"id": "job10", # 任务ID
"func": "util:scan_file_util.scan_SNPP_dir", # 任务位置
"trigger": "interval", # 触发器
"seconds": 3 * 60 # 时间间隔
"seconds": 5 * 60 # 时间间隔
}
]
}

5
upload/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""
Author : XinYi Song
Time : 2021/12/3 11:17
Desc:
"""

Binary file not shown.

Binary file not shown.

80
upload/upload_client.py Normal file
View File

@ -0,0 +1,80 @@
"""
Author : XinYi Song
Time : 2021/12/1 15:19
Desc:
"""
import socket
import sys
import re
import os
from hashlib import md5
from util.file_store_path import file_store_path_year, file_store_path_month, file_store_path_day
FILE_DIR = os.path.dirname(__file__)
# BASE_DIR = os.path.dirname(os.path.dirname(__file__))
# home = os.path.join(BASE_DIR, "/unstructured_data/remote_sensing_data")
home = "/unstructured_data/remote_sensing_data"
# 定义一个函数,计算进度条
def bar(num=1, sum=100):
rate = float(num) / float(sum)
rate_num = int(rate * 100)
temp = '\r%d %%' % (rate_num)
sys.stdout.write(temp)
def md5_file(name):
m = md5()
a_file = open(name, 'rb') #需要使用二进制格式读取文件内容
m.update(a_file.read())
a_file.close()
return m.hexdigest()
def upload_file_client(file_path, depth, dateTime, collectionCode):
ck = socket.socket()
ck.connect(('192.168.2.9', 9002))
print(str(ck.recv(1024), encoding='utf-8'))
# inp = input('请输入内容格式post|文件路径 目标路径): \n >>> ').strip() # 输入内容格式:命令|文件路径 目标路径
# func, file_path = inp.split("|", 1) # 将输入的内容拆分为两部分,方法名和路径
# local_path, target_path = re.split("\s*", file_path, 1) #再将路径部分,通过正则表达式。以空格拆分为:文件路径和上传的目标路径
file_byte_size = os.stat(file_path).st_size # 获取文件的大小
file_name = os.path.basename(file_path) # 设置文件名
md5 = md5_file(file_path)
if depth == 'year':
file_paths = file_store_path_year(dateTime, home, collectionCode)
if depth == 'month':
file_paths = file_store_path_month(dateTime, home, collectionCode)
if depth == 'day':
file_paths = file_store_path_day(dateTime, home, collectionCode)
post_info = "post|%s|%s|%s|%s|%s" % (file_name, file_byte_size, collectionCode, dateTime, depth) # 将发送的内容进行拼接
ck.sendall(bytes(post_info, encoding='utf-8')) # 向服务器端发送内容
result_exist = str(ck.recv(1024), encoding='utf-8')
has_sent = 0
if result_exist == "2003":
# inp = input("文件已存在是否续传Y/N:").strip()
# if inp.upper() == 'Y':
ck.sendall(bytes("2004", encoding='utf-8'))
result_continue_pos = str(ck.recv(1024), encoding='utf-8') # 已经传输了多少的文件内容
print(result_continue_pos)
has_sent = int(result_continue_pos)
# else:
# ck.sendall(bytes("2005", encoding='utf-8')) #发送2005代表不续传
file_obj = open(file_path, 'rb') # 对文件进行读操作
file_obj.seek(has_sent) # 调整指针
while has_sent < file_byte_size:
data = file_obj.read(1024)
ck.sendall(data)
has_sent += len(data)
bar(has_sent, file_byte_size) # 进度条
file_obj.close()
print("文件上传成功!")
uc = {'file_size': file_byte_size, 'fileName': file_name, 'md5': md5, 'file_path': file_paths+'/'+file_name, 'type': 'ok'}
return uc

View File

@ -18,7 +18,7 @@ def file_store_path(time_stamp):
return os.path.join('E:/data/upload', str(t[0]), str(t[1]), str(t[2]))
def file_store_path_year(data_str_time, upload_path):
def file_store_path_year(data_str_time, upload_path, conllection_code):
"""
目录到年
:param upload_path:
@ -26,10 +26,10 @@ def file_store_path_year(data_str_time, upload_path):
:return:
"""
t = time.strptime(data_str_time, '%Y-%m-%d %H:%M:%S')
return os.path.join(upload_path, str(t[0]))
return os.path.join(upload_path, conllection_code, str(t[0]))
def file_store_path_month(data_str_time, upload_path):
def file_store_path_month(data_str_time, upload_path, conllection_code):
"""
目录到月
:param upload_path:
@ -37,10 +37,10 @@ def file_store_path_month(data_str_time, upload_path):
:return:
"""
t = time.strptime(data_str_time, '%Y-%m-%d %H:%M:%S')
return os.path.join(upload_path, str(t[0]), str(t[1]))
return os.path.join(upload_path, conllection_code, str(t[0]), str(t[1]))
def file_store_path_day(data_str_time, upload_path):
def file_store_path_day(data_str_time, upload_path, conllection_code):
"""
目录到日
:param upload_path:
@ -48,7 +48,7 @@ def file_store_path_day(data_str_time, upload_path):
:return:
"""
t = time.strptime(data_str_time, '%Y-%m-%d %H:%M:%S')
return os.path.join(upload_path, str(t[0]), str(t[1]), str(t[2]))
return os.path.join(upload_path, conllection_code, str(t[0]), str(t[1]), str(t[2]))
if __name__ == '__main__':

View File

@ -34,7 +34,7 @@ def md5_file(name):
return m.hexdigest()
def upload_client(local_path, depth, dateTime):
def upload_client(local_path, depth, dateTime, conllection_code):
global file_path
while True:
file_byte_size = os.stat(local_path).st_size # 获取文件的大小
@ -45,15 +45,15 @@ def upload_client(local_path, depth, dateTime):
file_obj = open(local_path, 'rb') # 对文件进行读操作
file_obj.seek(has_sent) # 调整指针
if depth == 'year':
file_path = file_store_path_year(dateTime, home)
file_path = file_store_path_year(dateTime, home, conllection_code)
if not os.path.exists(file_path):
os.makedirs(file_path)
if depth == 'month':
file_path = file_store_path_month(dateTime, home)
file_path = file_store_path_month(dateTime, home, conllection_code)
if not os.path.exists(file_path):
os.makedirs(file_path)
if depth == 'day':
file_path = file_store_path_day(dateTime, home)
file_path = file_store_path_day(dateTime, home, conllection_code)
if not os.path.exists(file_path):
os.makedirs(file_path)
path = os.path.join(file_path, file_name)

View File

@ -13,6 +13,7 @@ from common.tools.dms import dms_login, dms_task_record, dms_sensing_data
from scan_data.GetMetaInfo import GetGFPMSData, GetGF3MDJData, GetH08Data, GetSentinel1Data, GetSentinel2Data, \
GetSentinel3OLData, GetHJ1Data, GetZY3Data, GetSNPPData
from scan_data.example import GetJPSSData
from upload.upload_client import upload_file_client
from util.http_file_upload import upload_client
from util.http_util import httpUtil
from util.remote_sensing_util import gf4_pmi_001
@ -122,7 +123,9 @@ def scan_VJ102_dir():
解析JPSS-VJ102元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\JPSS'
dir_list = os.listdir(file_dir)
print('开始扫描VJ102IMG数据集')
collectionCode = 'VJ102IMG'
# 用户登录
@ -172,7 +175,8 @@ def scan_VJ102_dir():
CollectionCode = JPSSData_dict['CollectionCode']
DirectoryDepth = JPSSData_dict['DirectoryDepth']
StartTime = JPSSData_dict['StartTime']
uc = upload_client(path, DirectoryDepth, StartTime[0:19])
# uc = upload_client(path, DirectoryDepth, StartTime[0:19])
uc = upload_file_client(path, DirectoryDepth, StartTime[0:19], collectionCode)
StartTime = time.mktime(time.strptime(JPSSData_dict['StartTime'][0:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(JPSSData_dict['EndTime'][0:19], '%Y-%m-%d %H:%M:%S'))
@ -220,7 +224,8 @@ def scan_VJ103_dir():
:param file_dir:
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\JPSS'
print('开始扫描VJ103IMG数据集')
collectionCode = 'VJ103IMG'
# 用户登录
@ -269,7 +274,8 @@ def scan_VJ103_dir():
CollectionCode = JPSSData_dict['CollectionCode']
DirectoryDepth = JPSSData_dict['DirectoryDepth']
StartTime = JPSSData_dict['StartTime']
uc = upload_client(path, DirectoryDepth, StartTime[0:19])
# uc = upload_client(path, DirectoryDepth, StartTime[0:19])
uc = upload_file_client(path, DirectoryDepth, StartTime[0:19], collectionCode)
StartTime = time.mktime(time.strptime(JPSSData_dict['StartTime'][0:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(JPSSData_dict['EndTime'][0:19], '%Y-%m-%d %H:%M:%S'))
@ -316,7 +322,8 @@ def scan_GF1_PMS2_dir():
获取高分 PMS卫星元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\GF-1'
print('开始扫描GF1_PMS2_001数据集')
collectionCode = 'GF1_PMS2_001'
# 用户登录
@ -366,7 +373,8 @@ def scan_GF1_PMS2_dir():
CollectionCode = GFPMS_dict['CollectionCode']
DirectoryDepth = GFPMS_dict['DirectoryDepth']
StartTime = GFPMS_dict['StartTime']
uc = upload_client(path, DirectoryDepth, StartTime[0:19])
# uc = upload_client(path, DirectoryDepth, StartTime[0:19])
uc = upload_file_client(path, DirectoryDepth, StartTime[0:19], collectionCode)
StartTime = time.mktime(time.strptime(GFPMS_dict['StartTime'][0:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(GFPMS_dict['EndTime'][0:19], '%Y-%m-%d %H:%M:%S'))
@ -413,7 +421,8 @@ def scan_GF3MDJ_dir():
获取高分3号MDJGF-3 MDJ卫星元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\GF-3'
print('开始扫描GF3_MDJ_SS数据集')
collectionCode = 'GF3_MDJ_SS'
# 用户登录
@ -462,7 +471,8 @@ def scan_GF3MDJ_dir():
CollectionCode = GF3_MDJ_SS_dict['CollectionCode']
DirectoryDepth = GF3_MDJ_SS_dict['DirectoryDepth']
StartTime = GF3_MDJ_SS_dict['StartTime']
uc = upload_client(path, DirectoryDepth, StartTime[0:19])
# uc = upload_client(path, DirectoryDepth, StartTime[0:19])
uc = upload_file_client(path, DirectoryDepth, StartTime[0:19], collectionCode)
StartTime = time.mktime(time.strptime(GF3_MDJ_SS_dict['StartTime'][0:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(GF3_MDJ_SS_dict['EndTime'][0:19], '%Y-%m-%d %H:%M:%S'))
@ -506,10 +516,11 @@ def scan_GF3MDJ_dir():
def scan_H08_dir():
"""
获取高分3号MDJGF-3 MDJ卫星元数据
获取葵花8卫星元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\葵花8'
print('开始扫描H08数据集')
collectionCode = 'NC_H08'
# 用户登录
@ -558,7 +569,8 @@ def scan_H08_dir():
CollectionCode = GetH08_dict['CollectionCode']
DirectoryDepth = GetH08_dict['DirectoryDepth']
StartTime = GetH08_dict['ProduceTime'][0:10] + ' ' + GetH08_dict['ProduceTime'][11:19]
uc = upload_client(path, DirectoryDepth, StartTime)
# uc = upload_client(path, DirectoryDepth, StartTime)
uc = upload_file_client(path, DirectoryDepth, StartTime, collectionCode)
StartTime = time.mktime(time.strptime(GetH08_dict['ProduceTime'][0:10] + ' ' + GetH08_dict['ProduceTime'][11:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(GetH08_dict['ProduceTime'][0:10] + ' '+ GetH08_dict['ProduceTime'][11:19], '%Y-%m-%d %H:%M:%S'))
@ -605,7 +617,8 @@ def scan_Sentinel1_dir():
获取哨兵1号卫星元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\SENTINEL-1'
print('开始扫描Sentinel1数据集')
collectionCode = 'S1A_IW_GRDH'
# 用户登录
@ -654,7 +667,8 @@ def scan_Sentinel1_dir():
CollectionCode = Sentinel1_dict['CollectionCode']
DirectoryDepth = Sentinel1_dict['DirectoryDepth']
StartTime = Sentinel1_dict['ProduceTime'][0:10] + ' ' + Sentinel1_dict['ProduceTime'][11:19]
uc = upload_client(path, DirectoryDepth, StartTime)
# uc = upload_client(path, DirectoryDepth, StartTime)
uc = upload_file_client(path, DirectoryDepth, StartTime, collectionCode)
StartTime = time.mktime(time.strptime(Sentinel1_dict['StartTime'][0:10] + ' ' + Sentinel1_dict['StartTime'][11:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(Sentinel1_dict['StopTime'][0:10] + ' ' + Sentinel1_dict['StopTime'][11:19], '%Y-%m-%d %H:%M:%S'))
@ -701,7 +715,8 @@ def scan_Sentinel2_dir():
获取哨兵2号卫星元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\SENTINEL-2'
print('开始扫描Sentinel2数据集')
collectionCode = 'S2B'
# 用户登录
@ -750,7 +765,8 @@ def scan_Sentinel2_dir():
CollectionCode = Sentinel2_dict['CollectionCode']
DirectoryDepth = Sentinel2_dict['DirectoryDepth']
StartTime = Sentinel2_dict['ProduceTime'][0:10] + ' ' + Sentinel2_dict['ProduceTime'][11:19]
uc = upload_client(path, DirectoryDepth, StartTime)
# uc = upload_client(path, DirectoryDepth, StartTime)
uc = upload_file_client(path, DirectoryDepth, StartTime, collectionCode)
StartTime = time.mktime(time.strptime(Sentinel2_dict['StartTime'][0:10] + ' ' + Sentinel2_dict['StartTime'][11:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(Sentinel2_dict['StopTime'][0:10] + ' ' + Sentinel2_dict['StopTime'][11:19], '%Y-%m-%d %H:%M:%S'))
@ -797,7 +813,8 @@ def scan_Sentinel3OL_dir():
获取哨兵3号卫星元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\SENTINEL-3'
print('开始扫描Sentinel3数据集')
collectionCode = 'Sentinel3_OLCI'
# 用户登录
@ -847,7 +864,8 @@ def scan_Sentinel3OL_dir():
CollectionCode = Sentinel3OL_dict['CollectionCode']
DirectoryDepth = Sentinel3OL_dict['DirectoryDepth']
StartTime = Sentinel3OL_dict['StartTime'][0:10] + ' ' + Sentinel3OL_dict['StartTime'][11:19]
uc = upload_client(path, DirectoryDepth, StartTime)
# uc = upload_client(path, DirectoryDepth, StartTime)
uc = upload_file_client(path, DirectoryDepth, StartTime, collectionCode)
StartTime = time.mktime(time.strptime(Sentinel3OL_dict['StartTime'][0:10] + ' ' + Sentinel3OL_dict['StartTime'][11:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(Sentinel3OL_dict['StopTime'][0:10] + ' ' + Sentinel3OL_dict['StopTime'][11:19], '%Y-%m-%d %H:%M:%S'))
@ -894,7 +912,8 @@ def scan_HJ1_dir():
获取环境1号卫星元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\HJ-1'
print('开始扫描环境1号数据集')
collectionCode = 'HJ-1'
# 用户登录
@ -943,7 +962,8 @@ def scan_HJ1_dir():
CollectionCode = GetHJ1Data_dict['CollectionCode']
DirectoryDepth = GetHJ1Data_dict['DirectoryDepth']
StartTime = GetHJ1Data_dict['ProductTime'][0:10] + ' ' + GetHJ1Data_dict['ProductTime'][11:19]
uc = upload_client(path, DirectoryDepth, StartTime)
# uc = upload_client(path, DirectoryDepth, StartTime)
uc = upload_file_client(path, DirectoryDepth, StartTime, collectionCode)
StartTime = time.mktime(time.strptime(GetHJ1Data_dict['StartTime'][0:10] + ' ' + GetHJ1Data_dict['StartTime'][11:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(GetHJ1Data_dict['EndTime'][0:10] + ' ' + GetHJ1Data_dict['EndTime'][11:19], '%Y-%m-%d %H:%M:%S'))
@ -990,7 +1010,8 @@ def scan_ZY3_dir():
获取资源3号卫星元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\ZY-3'
print('开始扫描资源3号数据集')
collectionCode = 'ZY-3'
# 用户登录
@ -1038,7 +1059,7 @@ def scan_ZY3_dir():
CollectionCode = GetZY3Data_dict['CollectionCode']
DirectoryDepth = GetZY3Data_dict['DirectoryDepth']
StartTime = GetZY3Data_dict['ProduceTime'][0:10] + ' ' + GetZY3Data_dict['ProduceTime'][11:19]
uc = upload_client(path, DirectoryDepth, StartTime)
uc = upload_file_client(path, DirectoryDepth, StartTime, collectionCode)
StartTime = time.mktime(time.strptime(GetZY3Data_dict['StartTime'][0:10] + ' ' + GetZY3Data_dict['StartTime'][11:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(GetZY3Data_dict['EndTime'][0:10] + ' ' + GetZY3Data_dict['EndTime'][11:19], '%Y-%m-%d %H:%M:%S'))
@ -1085,7 +1106,8 @@ def scan_SNPP_dir():
获取资源3号卫星元数据
:return:
"""
file_dir = 'E:/数管'
# file_dir = 'E:/数管'
file_dir = '\\\\192.168.2.85\\数据\\不同传感器数据\\VIIRS'
print('开始扫描VNP02IMG数据集')
collectionCode = 'VNP02IMG'
# 用户登录
@ -1117,6 +1139,7 @@ def scan_SNPP_dir():
if os.path.isfile(path): # 判断是否是文件还是目录需要用绝对路径
# 解析遥感数据文件demo
GetSNPPData_dict = GetSNPPData(path, xmlPath, ThumbnailPath)
print(GetSNPPData_dict)
# 配置文件服务器参数
url = Config.DFS_UPLOAD_URL
files = {'file': open(GetSNPPData_dict['xmlPath'], 'rb')}
@ -1133,7 +1156,8 @@ def scan_SNPP_dir():
CollectionCode = GetSNPPData_dict['CollectionCode']
DirectoryDepth = GetSNPPData_dict['DirectoryDepth']
StartTime = GetSNPPData_dict['ProductionTime'][0:10] + ' ' + GetSNPPData_dict['ProductionTime'][11:19]
uc = upload_client(path, DirectoryDepth, StartTime)
# uc = upload_client(path, DirectoryDepth, StartTime, collectionCode)
uc = upload_file_client(path, DirectoryDepth, StartTime, collectionCode)
StartTime = time.mktime(time.strptime(GetSNPPData_dict['StartTime'][0:10] + ' ' + GetSNPPData_dict['StartTime'][11:19], '%Y-%m-%d %H:%M:%S'))
EndTime = time.mktime(time.strptime(GetSNPPData_dict['EndTime'][0:10] + ' ' + GetSNPPData_dict['EndTime'][11:19], '%Y-%m-%d %H:%M:%S'))
@ -1180,7 +1204,7 @@ if __name__ == '__main__':
# list_dir(file_dir)
# file_dir = 'E:/数管'
#
# scan_VJ102_dir()
scan_VJ102_dir()
#
# scan_VJ103_dir()
@ -1192,4 +1216,4 @@ if __name__ == '__main__':
# scan_Sentinel3OL_dir()
# scan_HJ1_dir()
# scan_ZY3_dir()
scan_SNPP_dir()
# scan_SNPP_dir()