XinYi Song
c5481ead05
2、整合JPSS,葵花8,GF3,哨兵1,哨兵2,哨兵3,资源2号,环境1号,SNPP等遥感数据解析算法。 3、flask中添加扫描各个卫星扫描任务,定时扫描,数据入库
1555 lines
77 KiB
Python
1555 lines
77 KiB
Python
from xml.dom import minidom
|
||
from osgeo import gdal
|
||
from osgeo import ogr
|
||
from osgeo import gdalconst
|
||
import h5py
|
||
from PIL import Image
|
||
import numpy as np
|
||
import tarfile
|
||
import zipfile
|
||
import re
|
||
import os
|
||
import io
|
||
import sys
|
||
|
||
|
||
def exe_path():
|
||
"""
|
||
[获取exe目录]
|
||
Returns:
|
||
[str]: [exe目录]
|
||
"""
|
||
if hasattr(sys, 'frozen'):
|
||
# Handles PyInstaller
|
||
return os.path.dirname(sys.executable)
|
||
return os.path.dirname(os.path.realpath(__file__))
|
||
|
||
|
||
os.environ['PROJ_LIB'] = exe_path() + "/PROJ"
|
||
|
||
|
||
def uint16to8(bands, lower_percent=0.001, higher_percent=99.999):
|
||
"""
|
||
拉伸图像:图片16位转8位
|
||
:param bands: 输入栅格数据
|
||
:param lower_percent: 最低百分比
|
||
:param higher_percent: 最高百分比
|
||
:return:
|
||
"""
|
||
out = np.zeros_like(bands, dtype=np.uint8)
|
||
n = bands.shape[0]
|
||
for i in range(n):
|
||
a = 0 # np.min(band)
|
||
b = 255 # np.max(band)
|
||
c = np.percentile(bands[i, :, :], lower_percent)
|
||
d = np.percentile(bands[i, :, :], higher_percent)
|
||
t = a + (bands[i, :, :] - c) * (b - a) / (d - c)
|
||
t[t < a] = a
|
||
t[t > b] = b
|
||
out[i, :, :] = t
|
||
return out
|
||
|
||
|
||
def createXML(metadata, xlm_file):
|
||
"""
|
||
创建xlm文件并写入字典
|
||
:param metadata: 元数据信息
|
||
:param xlm_file: xlm文件
|
||
:return:
|
||
"""
|
||
# 创建一个空的文档
|
||
document = minidom.Document() # 创建DOM文档对象
|
||
# 创建一个根节点对象
|
||
root = document.createElement('ProductMetaData')
|
||
# 设置根节点的属性
|
||
# root.setAttribute('', '')
|
||
# 将根节点添加到文档对象中
|
||
document.appendChild(root)
|
||
# 字典转xml
|
||
for key in metadata:
|
||
# 创建父节点
|
||
node_name = document.createElement(key)
|
||
# 给父节点设置文本
|
||
node_name.appendChild(document.createTextNode(str(metadata[key])))
|
||
# 将各父节点添加到根节点
|
||
root.appendChild(node_name)
|
||
# 写入xlm文档
|
||
with open(xlm_file, 'w', encoding='utf-8') as f:
|
||
document.writexml(f, indent='\t', newl='\n', addindent='\t', encoding='utf-8')
|
||
f.close()
|
||
|
||
|
||
def GetGFPMSData(in_file, xml_path, thumbnail_ath):
|
||
"""
|
||
获取高分 PMS卫星元数据
|
||
:param thumbnail_ath:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
in_path, basename = os.path.split(in_file)
|
||
with tarfile.open(in_file, mode='r') as tar_file:
|
||
extensions = ('MSS2_thumb.jpg', 'PAN2_thumb.jpg', 'MSS2.xml', 'PAN2.xml')
|
||
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
|
||
|
||
# 解压多光谱缩略图
|
||
if file_list[1].endswith('MSS2_thumb.jpg'):
|
||
tar_file.extract(file_list[1], thumbnail_ath)
|
||
ThumbnailPath_MSS = in_path + "/" + file_list[1]
|
||
ThumbnailName_MSS = file_list[1]
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
|
||
# 解压全色缩略图
|
||
if file_list[3].endswith("PAN2_thumb.jpg"):
|
||
tar_file.extract(file_list[3], thumbnail_ath)
|
||
ThumbnailPath_PAN = thumbnail_ath + "/" + file_list[3]
|
||
ThumbnailName_PAN = file_list[3]
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
|
||
# 解压多光谱XML文件
|
||
if file_list[0].endswith('MSS2.xml'):
|
||
# 解压XML文件
|
||
tar_file.extract(file_list[0], xml_path)
|
||
xmlPath = xml_path + "/" + file_list[0]
|
||
xmlFileName = file_list[0]
|
||
# 获取文件流
|
||
meta_file = tar_file.extractfile(file_list[0])
|
||
meta_content = meta_file.read()
|
||
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
|
||
|
||
# 产品日期
|
||
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
|
||
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
|
||
|
||
# 其他信息
|
||
# WidthInPixels = dom.getElementsByTagName('WidthInPixels')[0].firstChild.data
|
||
# HeightInPixels = dom.getElementsByTagName('HeightInPixels')[0].firstChild.data
|
||
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
|
||
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
|
||
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
|
||
|
||
# 中心经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
|
||
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
|
||
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
|
||
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
|
||
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
|
||
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
|
||
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
|
||
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
|
||
|
||
# 边界几何
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
# 构建多光谱字典
|
||
gf_mss_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": CloudPercent,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
# "WidthInPixels": WidthInPixels,
|
||
# "HeightInPixels": HeightInPixels,
|
||
"ProjectedCoordinates": "",
|
||
"CollectionCode": "",
|
||
"ThumbnailPath": ThumbnailPath_MSS,
|
||
"ThumbnailName": ThumbnailName_MSS,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "month"}
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
|
||
# 解压全色XML文件
|
||
if file_list[2].endswith('PAN2.xml'):
|
||
# 解压XML文件
|
||
tar_file.extract(file_list[2], xml_path)
|
||
xmlPath = xml_path + "/" + file_list[2]
|
||
xmlFileName = file_list[2]
|
||
|
||
# 获取文件流
|
||
meta_file = tar_file.extractfile(file_list[2])
|
||
meta_content = meta_file.read()
|
||
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
|
||
|
||
# 产品日期
|
||
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
|
||
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
|
||
|
||
# 其他信息
|
||
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
|
||
# WidthInPixels = dom.getElementsByTagName('WidthInPixels')[0].firstChild.data
|
||
# HeightInPixels = dom.getElementsByTagName('HeightInPixels')[0].firstChild.data
|
||
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
|
||
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
|
||
|
||
# 中心经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
|
||
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
|
||
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
|
||
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
|
||
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
|
||
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
|
||
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
|
||
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
|
||
|
||
# 边界几何
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
# 构建全色字典
|
||
gf_pan_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": CloudPercent,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
# "WidthInPixels": WidthInPixels,
|
||
# "HeightInPixels": HeightInPixels,
|
||
"ProjectedCoordinates": "",
|
||
"CollectionCode": "",
|
||
"ThumbnailPath": ThumbnailPath_PAN,
|
||
"ThumbnailName": ThumbnailName_PAN,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "month"}
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
# 关闭压缩文件
|
||
tar_file.close()
|
||
if (not gf_mss_dict) or (not gf_pan_dict):
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return gf_mss_dict, gf_pan_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetGF3MDJData(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取高分3号MDJ(GF-3 MDJ)卫星元数据
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
in_path, basename = os.path.split(in_file)
|
||
with tarfile.open(in_file, mode='r') as tar_file:
|
||
extensions = ('.thumb.jpg', 'meta.xml')
|
||
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
|
||
# 解压缩略图
|
||
if file_list[0].endswith('.thumb.jpg'):
|
||
tar_file.extract(file_list[0], thumbnail_path)
|
||
ThumbnailPath = thumbnail_path + "/" + file_list[0]
|
||
ThumbnailName = file_list[0]
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
|
||
# 解压XML文件
|
||
if file_list[1].endswith('meta.xml'):
|
||
tar_file.extract(file_list[1], xml_path)
|
||
xmlPath = xml_path + "/" + file_list[1]
|
||
xmlFileName = file_list[1]
|
||
# 获取文件流
|
||
meta_file = tar_file.extractfile(file_list[1])
|
||
meta_content = meta_file.read()
|
||
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
|
||
|
||
# 产品日期
|
||
CollectionCode = "GF3_MDJ"
|
||
ProduceTime = dom.getElementsByTagName('productGentime')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName("imagingTime")[0].getElementsByTagName("start")[0].firstChild.data
|
||
EndTime = dom.getElementsByTagName("imagingTime")[0].getElementsByTagName("end")[0].firstChild.data
|
||
|
||
# 其他信息
|
||
ImageGSD = dom.getElementsByTagName('NominalResolution')[0].firstChild.data
|
||
# EarthModel = dom.getElementsByTagName('EarthModel')[0].firstChild.data
|
||
ProjectedCoordinates = dom.getElementsByTagName('ProjectModel')[0].firstChild.data
|
||
Bands = "1,2"
|
||
|
||
# 经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName("topLeft")[0].getElementsByTagName("latitude")[
|
||
0].firstChild.data
|
||
TopLeftLongitude = dom.getElementsByTagName("topLeft")[0].getElementsByTagName("longitude")[
|
||
0].firstChild.data
|
||
TopRightLatitude = dom.getElementsByTagName("topRight")[0].getElementsByTagName("latitude")[
|
||
0].firstChild.data
|
||
TopRightLongitude = dom.getElementsByTagName("topRight")[0].getElementsByTagName("longitude")[
|
||
0].firstChild.data
|
||
BottomLeftLatitude = dom.getElementsByTagName("bottomLeft")[0].getElementsByTagName("latitude")[
|
||
0].firstChild.data
|
||
BottomLeftLongitude = dom.getElementsByTagName("bottomLeft")[0].getElementsByTagName("longitude")[
|
||
0].firstChild.data
|
||
BottomRightLatitude = dom.getElementsByTagName("bottomRight")[0].getElementsByTagName("latitude")[
|
||
0].firstChild.data
|
||
BottomRightLongitude = dom.getElementsByTagName("bottomRight")[0].getElementsByTagName("longitude")[
|
||
0].firstChild.data
|
||
# 边界几何
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
# 构建字典
|
||
gf3_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": "",
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"Bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": ProjectedCoordinates,
|
||
'CollectionCode': CollectionCode,
|
||
"ThumbnailPath": ThumbnailPath,
|
||
"ThumbnailName": ThumbnailName,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "month"}
|
||
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
|
||
# 判断字典是否为空
|
||
if not gf3_dict:
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return gf3_dict
|
||
except Exception as e:
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetGF4PMIData(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取高分4号PMI(GF-4 PMI)卫星元数据
|
||
PMS(可见光、近红外 5个波段 50m)、IRS(中波红外 1个波段 400m)
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
in_path, basename = os.path.split(in_file)
|
||
with tarfile.open(in_file, mode='r') as tar_file:
|
||
extensions = ('_thumb.jpg', '.xml')
|
||
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
|
||
|
||
# 解压PMS缩略图
|
||
if file_list[2].endswith('_thumb.jpg') and file_list[2].startswith('GF4_PMS_'):
|
||
tar_file.extract(file_list[2], thumbnail_path)
|
||
ThumbnailPath_PMS = thumbnail_path + "/" + file_list[2]
|
||
ThumbnailName_PMS = file_list[2]
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
|
||
# 解压IRS缩略图
|
||
if file_list[0].endswith('_thumb.jpg') and file_list[0].startswith('GF4_IRS_'):
|
||
|
||
tar_file.extract(file_list[0], thumbnail_path)
|
||
ThumbnailPath_IRS = thumbnail_path + "/" + file_list[0]
|
||
ThumbnailName_IRS = file_list[0]
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
|
||
# 解压PMS XML文件
|
||
if file_list[3].endswith('.xml') and file_list[3].startswith('GF4_PMS_'):
|
||
# 解压XML文件
|
||
tar_file.extract(file_list[3], xml_path)
|
||
xmlPath = xml_path + "/" + file_list[3]
|
||
xmlFileName = file_list[3]
|
||
|
||
# 获取文件流
|
||
meta_file = tar_file.extractfile(file_list[3])
|
||
meta_content = meta_file.read()
|
||
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
|
||
|
||
# 产品日期
|
||
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
|
||
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
|
||
|
||
# 其他信息
|
||
CollectionCode = "GF4_PMS"
|
||
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
|
||
# ProjectedCoordinates = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
|
||
ProjectedCoordinates = "" # 投影坐标系
|
||
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
|
||
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
|
||
|
||
# 中心经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
|
||
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
|
||
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
|
||
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
|
||
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
|
||
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
|
||
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
|
||
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
|
||
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
# 构建可见光近红外(PMS)字典
|
||
gf4_pms_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": CloudPercent,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"Bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": ProjectedCoordinates,
|
||
'CollectionCode': CollectionCode,
|
||
"ThumbnailName": ThumbnailName_PMS,
|
||
"ThumbnailPath": ThumbnailPath_PMS,
|
||
"xmlFileName": xmlFileName,
|
||
"xmlPath": xmlPath,
|
||
"DirectoryDepth": "month"}
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
|
||
# 解压IRS XML文件
|
||
if file_list[1].endswith('.xml') and file_list[1].startswith('GF4_IRS_'):
|
||
# 解压XML文件
|
||
tar_file.extract(file_list[1], xml_path)
|
||
xmlPath = xml_path + "/" + file_list[1]
|
||
xmlFileName = file_list[1]
|
||
|
||
# 获取文件流
|
||
meta_file = tar_file.extractfile(file_list[1])
|
||
meta_content = meta_file.read()
|
||
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
|
||
|
||
# 产品日期
|
||
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
|
||
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
|
||
|
||
# 其他信息
|
||
CollectionCode = "GF4_IRS"
|
||
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
|
||
# ProjectedCoordinates = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
|
||
ProjectedCoordinates = "" # 投影坐标系
|
||
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
|
||
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
|
||
|
||
# 中心经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
|
||
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
|
||
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
|
||
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
|
||
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
|
||
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
|
||
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
|
||
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
|
||
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
# 构建中红外(IRS)字典
|
||
gf4_irs_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": CloudPercent,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"Bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": ProjectedCoordinates,
|
||
'CollectionCode': CollectionCode,
|
||
"ThumbnailName": ThumbnailName_IRS,
|
||
"ThumbnailPath": ThumbnailPath_IRS,
|
||
"xmlFileName": xmlFileName,
|
||
"xmlPath": xmlPath,
|
||
"DirectoryDepth": "month"}
|
||
else:
|
||
return {"code": -1, "msg": "找不到指定文件..."}
|
||
|
||
# 关闭压缩文件
|
||
tar_file.close()
|
||
# 判断字典是否为空
|
||
if (not gf4_pms_dict) or (not gf4_irs_dict):
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return gf4_pms_dict, gf4_irs_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetH08Data(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取葵花8卫星元数据
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
in_path, basename = os.path.split(in_file)
|
||
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
|
||
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
|
||
|
||
# 其他信息
|
||
with h5py.File(in_file, mode='r') as f:
|
||
start_time = f['start_time'][0]
|
||
end_time = f['end_time'][0]
|
||
band_id = f['band_id'][:]
|
||
bands = ','.join(str(i) for i in band_id)
|
||
ImageGSD = '1000, 500, 2000'
|
||
|
||
# 生成缩略图
|
||
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
|
||
in_datasets = gdal.Open(in_file)
|
||
meta_data = in_datasets.GetMetadata()
|
||
# 取出子数据集
|
||
datasets = in_datasets.GetSubDatasets()
|
||
red_data = gdal.Open(datasets[7][0]).ReadAsArray()
|
||
gre_data = gdal.Open(datasets[6][0]).ReadAsArray()
|
||
blu_data = gdal.Open(datasets[5][0]).ReadAsArray()
|
||
img_data = np.array([red_data, gre_data, blu_data])
|
||
img_data = uint16to8(img_data)
|
||
# Array转Image
|
||
img_data2 = np.transpose(img_data, (1, 2, 0))
|
||
img_data2 = img_data2[:, :, ::-1]
|
||
img = Image.fromarray(img_data2)
|
||
# 压缩图片大小
|
||
if img_data.shape[1] > img_data.shape[2]:
|
||
width = 512
|
||
height = int(width / img_data.shape[1] * img_data.shape[2])
|
||
else:
|
||
height = 512
|
||
width = int(height / img_data.shape[1] * img_data.shape[2])
|
||
img.thumbnail((width, height))
|
||
img.save(ThumbnailPath, "PNG")
|
||
|
||
# 释放内存
|
||
del in_datasets
|
||
del img_data
|
||
del img_data2
|
||
del img
|
||
|
||
# 生成XML文件
|
||
xmlFileName = os.path.splitext(basename)[0] + ".xml"
|
||
xmlPath = os.path.join(xml_path, xmlFileName)
|
||
createXML(meta_data, xmlPath)
|
||
|
||
# 产品日期
|
||
date_created = meta_data['date_created']
|
||
# band_number = meta_data['band_number']
|
||
|
||
# 经纬度
|
||
upper_left_latitude = meta_data['upper_left_latitude']
|
||
upper_left_longitude = int(meta_data['upper_left_longitude']) - 180
|
||
upper_right_latitude = meta_data['upper_left_latitude']
|
||
upper_right_longitude = 200 - 180
|
||
lower_right_latitude = -60
|
||
lower_right_longitude = 200 - 180
|
||
lower_left_latitude = -60
|
||
lower_left_longitude = str(int(meta_data['upper_left_longitude']) - 180)
|
||
|
||
boundaryGeomStr = f'POLYGON(({upper_left_longitude} {upper_left_latitude},' \
|
||
f'{upper_right_longitude} {upper_right_latitude},' \
|
||
f'{lower_right_longitude} {lower_right_latitude},' \
|
||
f'{lower_left_longitude} {lower_left_latitude},' \
|
||
f'{upper_left_longitude} {upper_left_latitude}))'
|
||
|
||
# 构建字典
|
||
himawari8_dict = {"ProduceTime": date_created,
|
||
"StartTime": "",
|
||
"EndTime": "",
|
||
"CloudPercent": "",
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": bands,
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": "",
|
||
"CollectionCode": "",
|
||
"ThumbnailPath": ThumbnailPath,
|
||
"ThumbnailName": ThumbnailName,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "day"}
|
||
|
||
# 判断字典是否为空
|
||
if not himawari8_dict:
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return himawari8_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetJPSSData(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取联合极轨卫星系统(JPSS-1)元数据:NOAA-20(Joint Polar Satellite System spacecraft)
|
||
:param xml_path:
|
||
:param thumbnail_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
# 生成缩略图
|
||
in_path, basename = os.path.split(in_file)
|
||
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
|
||
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
|
||
|
||
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
|
||
in_datasets = gdal.Open(in_file)
|
||
meta_data = in_datasets.GetMetadata()
|
||
# 取出子数据集
|
||
datasets = in_datasets.GetSubDatasets()
|
||
red_data = gdal.Open(datasets[0][0]).ReadAsArray()
|
||
nir_data = gdal.Open(datasets[3][0]).ReadAsArray()
|
||
swir_data = gdal.Open(datasets[9][0]).ReadAsArray()
|
||
img_data = np.array([red_data, nir_data, swir_data])
|
||
img_data = uint16to8(img_data)
|
||
# Array转Image
|
||
img_data2 = np.transpose(img_data, (1, 2, 0))
|
||
img_data2 = img_data2[:, :, ::-1]
|
||
img = Image.fromarray(img_data2)
|
||
# 压缩图片大小
|
||
if img_data.shape[1] > img_data.shape[2]:
|
||
width = 512
|
||
height = int(width / img_data.shape[1] * img_data.shape[2])
|
||
else:
|
||
height = 512
|
||
width = int(height / img_data.shape[1] * img_data.shape[2])
|
||
img.thumbnail((width, height))
|
||
img.save(ThumbnailPath, "PNG")
|
||
|
||
# 释放内存
|
||
del in_datasets
|
||
del img_data
|
||
del img_data2
|
||
del img
|
||
|
||
# 生成XML文件
|
||
xmlFileName = os.path.splitext(basename)[0] + ".xml"
|
||
xmlPath = os.path.join(xml_path, xmlFileName)
|
||
createXML(meta_data, xmlPath)
|
||
|
||
# 产品日期
|
||
ProductionTime = meta_data['ProductionTime']
|
||
StartTime = meta_data['StartTime']
|
||
EndTime = meta_data['EndTime']
|
||
|
||
# 其他信息
|
||
ImageGSD = str(meta_data['LongName']).split(" ")[-1]
|
||
Bands = str(meta_data['title']).split(" ")[1]
|
||
|
||
# 中心经纬度
|
||
productUpperLeftLat = meta_data['NorthBoundingCoordinate'] # 左上纬度
|
||
productUpperLeftLong = meta_data['WestBoundingCoordinate'] # 左上经度
|
||
productUpperRightLat = meta_data['NorthBoundingCoordinate'] # 右上纬度
|
||
productUpperRightLong = meta_data['EastBoundingCoordinate'] # 右上经度
|
||
productLowerLeftLat = meta_data['SouthBoundingCoordinate'] # 左下纬度
|
||
productLowerLeftLong = meta_data['WestBoundingCoordinate'] # 左下经度
|
||
productLowerRightLat = meta_data['SouthBoundingCoordinate'] # 右下纬度
|
||
productLowerRightLong = meta_data['EastBoundingCoordinate'] # 右下纬度
|
||
|
||
# 边界几何
|
||
boundaryGeomStr = f'POLYGON(({productUpperLeftLong} {productUpperLeftLat},' \
|
||
f'{productUpperRightLong} {productUpperRightLat},' \
|
||
f'{productLowerRightLong} {productLowerRightLat},' \
|
||
f'{productLowerLeftLong} {productLowerLeftLat},' \
|
||
f'{productUpperLeftLong} {productUpperLeftLat}))'
|
||
|
||
# 构建字典
|
||
jpss_dict = {"ProduceTime": ProductionTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": "",
|
||
# "TopLeftLatitude": productUpperLeftLat,
|
||
# "TopLeftLongitude": productUpperLeftLong,
|
||
# "TopRightLatitude": productUpperRightLat,
|
||
# "TopRightLongitude": productUpperRightLong,
|
||
# "BottomLeftLatitude": productLowerLeftLat,
|
||
# "BottomLeftLongitude": productLowerLeftLong,
|
||
# "BottomRightLatitude": productLowerRightLat,
|
||
# "BottomRightLongitude": productLowerRightLong,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": "",
|
||
"CollectionCode": "",
|
||
"ThumbnailPath": ThumbnailPath,
|
||
"ThumbnailName": ThumbnailName,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "day"}
|
||
|
||
# 判断字典是否为空
|
||
if not jpss_dict:
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
print(jpss_dict)
|
||
return jpss_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetSNPPData(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取Suomi National Polar-orbiting Partnership(SNPP)元数据
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
in_path, basename = os.path.split(in_file)
|
||
# 生成缩略图
|
||
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
|
||
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
|
||
|
||
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
|
||
in_datasets = gdal.Open(in_file)
|
||
meta_data = in_datasets.GetMetadata()
|
||
|
||
# 取出子数据集
|
||
datasets = in_datasets.GetSubDatasets()
|
||
red_data = gdal.Open(datasets[0][0]).ReadAsArray()
|
||
gre_data = gdal.Open(datasets[3][0]).ReadAsArray()
|
||
blu_data = gdal.Open(datasets[9][0]).ReadAsArray()
|
||
img_data = np.array([red_data, gre_data, blu_data])
|
||
img_data = uint16to8(img_data)
|
||
# Array转Image
|
||
img_data2 = np.transpose(img_data, (1, 2, 0))
|
||
img_data2 = img_data2[:, :, ::-1]
|
||
img = Image.fromarray(img_data2)
|
||
# 压缩图片大小
|
||
if img_data.shape[1] > img_data.shape[2]:
|
||
width = 512
|
||
height = int(width / img_data.shape[1] * img_data.shape[2])
|
||
else:
|
||
height = 512
|
||
width = int(height / img_data.shape[1] * img_data.shape[2])
|
||
img.thumbnail((width, height))
|
||
img.save(ThumbnailPath, "PNG")
|
||
|
||
# 释放内存
|
||
del in_datasets
|
||
del img_data
|
||
del img_data2
|
||
del img
|
||
|
||
# 生成XML文件
|
||
xmlFileName = os.path.splitext(basename)[0] + ".xml"
|
||
xmlPath = os.path.join(xml_path, xmlFileName)
|
||
createXML(meta_data, xmlPath)
|
||
|
||
# 产品日期
|
||
ProductionTime = meta_data['ProductionTime']
|
||
StartTime = meta_data['StartTime']
|
||
EndTime = meta_data['EndTime']
|
||
|
||
# 其他信息
|
||
ImageGSD = str(meta_data['LongName']).split(" ")[-1][:-1]
|
||
Bands = str(meta_data['title'])
|
||
|
||
# 中心经纬度
|
||
productUpperLeftLat = meta_data['NorthBoundingCoordinate'] # 左上纬度
|
||
productUpperLeftLong = meta_data['WestBoundingCoordinate'] # 左上经度
|
||
productUpperRightLat = meta_data['NorthBoundingCoordinate'] # 右上纬度
|
||
productUpperRightLong = meta_data['EastBoundingCoordinate'] # 右上经度
|
||
productLowerLeftLat = meta_data['SouthBoundingCoordinate'] # 左下纬度
|
||
productLowerLeftLong = meta_data['WestBoundingCoordinate'] # 左下经度
|
||
productLowerRightLat = meta_data['SouthBoundingCoordinate'] # 右下纬度
|
||
productLowerRightLong = meta_data['EastBoundingCoordinate'] # 右下纬度
|
||
|
||
boundaryGeomStr = f'POLYGON(({productUpperLeftLong} {productUpperLeftLat},' \
|
||
f'{productUpperRightLong} {productUpperRightLat},' \
|
||
f'{productLowerRightLong} {productLowerRightLat},' \
|
||
f'{productLowerLeftLong} {productLowerLeftLat},' \
|
||
f'{productUpperLeftLong} {productUpperLeftLat}))'
|
||
|
||
# 构建字典
|
||
snpp_dict = {"ProductionTime": ProductionTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": "",
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": "",
|
||
"CollectionCode": "",
|
||
"ThumbnailPath": ThumbnailPath,
|
||
"ThumbnailName": ThumbnailName,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "day"}
|
||
|
||
# 判断字典是否为空
|
||
if not snpp_dict:
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return snpp_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetSentinel1Data(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取哨兵1卫星元数据
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
in_path, basename = os.path.split(in_file)
|
||
with zipfile.ZipFile(in_file, mode='r') as zip_file:
|
||
xmlFileName = os.path.splitext(basename)[0] + ".xml"
|
||
xmlPath = os.path.join(xml_path, xmlFileName)
|
||
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
|
||
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
|
||
for member in zip_file.namelist():
|
||
if re.match(r'[0-9a-zA-Z\_]+.SAFE/annotation/s1a-iw-grd-vv[0-9a-z\-]+.xml', member):
|
||
# 输出xml文件
|
||
meta_data = zip_file.read(member)
|
||
with open(xmlPath, "wb") as fout:
|
||
fout.write(meta_data)
|
||
# 产品日期
|
||
meta_content = zip_file.open(member)
|
||
dom = minidom.parse(meta_content)
|
||
ProduceTime = dom.getElementsByTagName('qualityInformation')[
|
||
0].getElementsByTagName('qualityDataList')[
|
||
0].getElementsByTagName('qualityData')[
|
||
0].getElementsByTagName('azimuthTime')[
|
||
0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('adsHeader')[0].getElementsByTagName('startTime')[
|
||
0].firstChild.data
|
||
StopTime = dom.getElementsByTagName('adsHeader')[0].getElementsByTagName('stopTime')[
|
||
0].firstChild.data
|
||
elif re.match(r'[0-9a-zA-Z\_]+.SAFE/preview/map-overlay.kml', member):
|
||
# 读取其他信息
|
||
meta_content = zip_file.open(member)
|
||
dom = minidom.parse(meta_content)
|
||
coordinates = dom.getElementsByTagName('coordinates')[0].firstChild.data
|
||
|
||
# 经纬度
|
||
lon_lat = re.split(r'\s', coordinates)
|
||
TopLeftLatitude = re.split(r'\,', lon_lat[0])[1] # 左上纬度
|
||
TopLeftLongitude = re.split(r'\,', lon_lat[0])[0] # 左上经度
|
||
TopRightLatitude = re.split(r'\,', lon_lat[1])[1] # 右上纬度
|
||
TopRightLongitude = re.split(r'\,', lon_lat[1])[0] # 右上经度
|
||
BottomRightLatitude = re.split(r'\,', lon_lat[2])[1] # 右下纬度
|
||
BottomRightLongitude = re.split(r'\,', lon_lat[2])[0] # 右下经度
|
||
BottomLeftLatitude = re.split(r'\,', lon_lat[3])[1] # 左下纬度
|
||
BottomLeftLongitude = re.split(r'\,', lon_lat[3])[0] # 左下经度
|
||
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
elif re.match(r'[0-9a-zA-Z\_]+.SAFE/preview/quick-look.png', member):
|
||
# 输出缩略图
|
||
thumb_data = zip_file.read(member)
|
||
with open(ThumbnailPath, "wb") as fout:
|
||
fout.write(thumb_data)
|
||
else:
|
||
continue
|
||
# 生成字典
|
||
S1_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"StopTime": StopTime,
|
||
"CloudPercent": "",
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": "Amplitude_VH,Intensity_VH,Amplitude_VV,Intensity_VV",
|
||
# "NumberBands": "",
|
||
"ImageGSD": "10",
|
||
"ProjectedCoordinates": '',
|
||
"CollectionCode": '',
|
||
"ThumbnailName": ThumbnailName,
|
||
"ThumbnailPath": ThumbnailPath,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "month"}
|
||
zip_file.close()
|
||
if not S1_dict:
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return S1_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetSentinel2Data(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取哨兵2卫星元数据
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
in_path, basename = os.path.split(in_file)
|
||
with zipfile.ZipFile(in_file, 'r', zipfile.ZIP_DEFLATED) as zip_file:
|
||
extensions = ('_B02_60m.jp2', '_B03_60m.jp2', '_B04_60m.jp2', '.SAFE/MTD_MSIL2A.xml')
|
||
file_list = [file for file in zip_file.namelist() if file.endswith(extensions)]
|
||
file_list.sort()
|
||
|
||
# 生成缩略图
|
||
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
|
||
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
|
||
|
||
# bgr_data = ['/vsizip/%s/%s' % (in_file, file) for file in file_list[:3]]
|
||
# out_vrt = '/vsimem/stacked.vrt' # 波段合成输出虚拟路径
|
||
# # 将多个源文件合成为一个VRT(virtual gdal dataset)文件
|
||
# out_dataset = gdal.BuildVRT(out_vrt, bgr_data, separate=True)
|
||
# # 将VRT文件转换为目标格式的图像
|
||
# gdal.Translate(ThumbnailPath,
|
||
# out_dataset,
|
||
# format='JPEG',
|
||
# outputType=gdal.GDT_Byte,
|
||
# widthPct=10,
|
||
# heightPct=10,
|
||
# creationOptions=["TILED=YES", "COMPRESS=LZW"])
|
||
# # 释放内存
|
||
# # gdal.GetDriverByName("VRT").Delete('/vsimem/stacked.vrt')
|
||
# gdal.Unlink('/vsimem/stacked.vrt')
|
||
# del out_dataset
|
||
|
||
rgb_list = []
|
||
for file in file_list[:3]:
|
||
sub_dataset = gdal.Open('/vsizip/%s/%s' % (in_file, file))
|
||
sub_array = sub_dataset.ReadAsArray()
|
||
rgb_list.append(sub_array)
|
||
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
|
||
img_data = uint16to8(img_data)
|
||
|
||
# Array转Image
|
||
img_data2 = np.transpose(img_data, (1, 2, 0))
|
||
img_data2 = img_data2[:, :, ::-1]
|
||
img = Image.fromarray(img_data2)
|
||
# 压缩图片大小
|
||
if img_data.shape[1] > img_data.shape[2]:
|
||
width = 512
|
||
height = int(width / img_data.shape[1] * img_data.shape[2])
|
||
else:
|
||
height = 512
|
||
width = int(height / img_data.shape[1] * img_data.shape[2])
|
||
img.thumbnail((width, height))
|
||
img.save(ThumbnailPath, "PNG")
|
||
|
||
# 释放内存
|
||
del rgb_list
|
||
del img_data
|
||
del img_data2
|
||
del img
|
||
|
||
# 解压多光谱XML文件
|
||
if file_list[3].endswith('.SAFE/MTD_MSIL2A.xml'):
|
||
# 生成XML文件
|
||
xmlFileName = os.path.splitext(basename)[0] + ".xml"
|
||
xmlPath = os.path.join(xml_path, xmlFileName)
|
||
meta_data = zip_file.read(file_list[3])
|
||
with open(xmlPath, "wb") as fout:
|
||
fout.write(meta_data)
|
||
|
||
# 读取其他信息
|
||
meta_content = zip_file.open(file_list[3])
|
||
dom = minidom.parse(meta_content)
|
||
cloud_percent = dom.getElementsByTagName('n1:Quality_Indicators_Info')[
|
||
0].getElementsByTagName('Cloud_Coverage_Assessment')[0].firstChild.data
|
||
ImageGSD = '10, 20, 60'
|
||
ProjectedCoordinates = dom.getElementsByTagName('n1:Geometric_Info')[
|
||
0].getElementsByTagName('Coordinate_Reference_System')[
|
||
0].getElementsByTagName('GEO_TABLES')[0].firstChild.data
|
||
|
||
# 产品日期
|
||
ProduceTime = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
|
||
0].getElementsByTagName('GENERATION_TIME')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
|
||
0].getElementsByTagName('PRODUCT_START_TIME')[0].firstChild.data
|
||
StopTime = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
|
||
0].getElementsByTagName('PRODUCT_STOP_TIME')[0].firstChild.data
|
||
|
||
# 经纬度
|
||
lon_lat = dom.getElementsByTagName('n1:Geometric_Info')[0].getElementsByTagName('Product_Footprint')[
|
||
0].getElementsByTagName('Product_Footprint')[0].getElementsByTagName('Global_Footprint')[
|
||
0].getElementsByTagName('EXT_POS_LIST')[0].firstChild.data
|
||
TopLeftLatitude = re.split(r'\s', lon_lat)[0] # 左上纬度
|
||
TopLeftLongitude = re.split(r'\s', lon_lat)[1] # 左上经度
|
||
TopRightLatitude = re.split(r'\s', lon_lat)[2] # 右上纬度
|
||
TopRightLongitude = re.split(r'\s', lon_lat)[3] # 右上经度
|
||
BottomRightLatitude = re.split(r'\s', lon_lat)[4] # 右下纬度
|
||
BottomRightLongitude = re.split(r'\s', lon_lat)[5] # 右下经度
|
||
BottomLeftLatitude = re.split(r'\s', lon_lat)[6] # 左下纬度
|
||
BottomLeftLongitude = re.split(r'\s', lon_lat)[7] # 左下经度
|
||
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
# 生成字典
|
||
S2_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"StopTime": StopTime,
|
||
"CloudPercent": cloud_percent,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": "1,2,3,4,5,6,7,8,9,10,11,12",
|
||
# "NumberBands": '12',
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": ProjectedCoordinates,
|
||
"CollectionCode": '',
|
||
"ThumbnailName": ThumbnailName,
|
||
"ThumbnailPath": ThumbnailPath,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "month"}
|
||
zip_file.close()
|
||
if not S2_dict:
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return S2_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetSentinel3OLData(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取哨兵3 OLCI(海陆色度计)卫星元数据(有待修改)
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
in_path, basename = os.path.split(in_file)
|
||
with zipfile.ZipFile(in_file, 'r', zipfile.ZIP_DEFLATED) as zip_file:
|
||
extensions = ('Oa03_radiance.nc', 'Oa05_radiance.nc', 'Oa08_radiance.nc', 'xfdumanifest.xml')
|
||
file_list = [file for file in zip_file.namelist() if file.endswith(extensions)]
|
||
|
||
# 生成缩略图
|
||
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
|
||
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
|
||
rgb_list = []
|
||
for file in file_list[:3]:
|
||
sub_dataset = gdal.Open('/vsizip/%s/%s' % (in_file, file))
|
||
sub_array = sub_dataset.ReadAsArray()
|
||
rgb_list.append(sub_array)
|
||
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
|
||
img_data = uint16to8(img_data)
|
||
|
||
# Array转Image
|
||
img_data2 = np.transpose(img_data, (1, 2, 0))
|
||
img_data2 = img_data2[:, :, ::-1]
|
||
img = Image.fromarray(img_data2)
|
||
# 压缩图片大小
|
||
if img_data.shape[1] > img_data.shape[2]:
|
||
width = 512
|
||
height = int(width / img_data.shape[1] * img_data.shape[2])
|
||
else:
|
||
height = 512
|
||
width = int(height / img_data.shape[1] * img_data.shape[2])
|
||
img.thumbnail((width, height))
|
||
img.save(ThumbnailPath, "PNG")
|
||
|
||
# 释放内存
|
||
del rgb_list
|
||
del img_data
|
||
del img_data2
|
||
del img
|
||
|
||
# 解压XML文件
|
||
if file_list[3].endswith('xfdumanifest.xml'):
|
||
# 生成XML文件
|
||
xmlFileName = os.path.splitext(basename)[0] + ".xml"
|
||
xmlPath = os.path.join(xml_path, xmlFileName)
|
||
meta_data = zip_file.read(file_list[3])
|
||
with open(xmlPath, "wb") as fout:
|
||
fout.write(meta_data)
|
||
|
||
# 读取其他信息
|
||
CollectionCode = "Sentinel3_OLCI_L1"
|
||
meta_content = zip_file.open(file_list[3])
|
||
dom = minidom.parse(meta_content)
|
||
ProjectedCoordinates = ""
|
||
CloudPercent = ""
|
||
|
||
# 产品日期
|
||
ProduceTime = dom.getElementsByTagName('sentinel3:creationTime')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('sentinel3:receivingStartTime')[0].firstChild.data
|
||
StopTime = dom.getElementsByTagName('sentinel3:receivingStopTime')[0].firstChild.data
|
||
|
||
# 经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName('sentinel-safe:y')[0].firstChild.data # 左上纬度
|
||
TopLeftLongitude = dom.getElementsByTagName('sentinel-safe:x')[0].firstChild.data # 左上经度
|
||
TopRightLatitude = dom.getElementsByTagName('sentinel-safe:y')[0].firstChild.data # 右上纬度
|
||
TopRightLongitude = dom.getElementsByTagName('sentinel-safe:x')[2].firstChild.data # 右上经度
|
||
BottomRightLatitude = dom.getElementsByTagName('sentinel-safe:y')[2].firstChild.data # 右下纬度
|
||
BottomRightLongitude = dom.getElementsByTagName('sentinel-safe:x')[2].firstChild.data # 右下经度
|
||
BottomLeftLatitude = dom.getElementsByTagName('sentinel-safe:y')[2].firstChild.data # 左下纬度
|
||
BottomLeftLongitude = dom.getElementsByTagName('sentinel-safe:x')[0].firstChild.data # 左下经度
|
||
|
||
# boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
# f'{TopRightLongitude} {TopRightLatitude},' \
|
||
# f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
# f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
# f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
# 生成字典
|
||
S3_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"StopTime": StopTime,
|
||
"CloudPercent": CloudPercent,
|
||
"boundaryGeomStr": "",
|
||
"bands": "Oa01,Oa02,Oa03,Oa04,Oa05,Oa06,Oa07,Oa08,Oa09,Oa10,Oa11,Oa12,Oa13,Oa14,Oa15,Oa16,"
|
||
"Oa17,Oa18,Oa19,Oa20,Oa21",
|
||
# "NumberBands": '21',
|
||
"ImageGSD": "270,294",
|
||
"ProjectedCoordinates": ProjectedCoordinates,
|
||
"CollectionCode": CollectionCode,
|
||
"ThumbnailName": ThumbnailName,
|
||
"ThumbnailPath": ThumbnailPath,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "month"}
|
||
zip_file.close()
|
||
if not S3_dict:
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return S3_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetHJ1Data(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取环境1号(HJ-1)卫星元数据
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
with tarfile.open(in_file, mode='r') as tar_file:
|
||
in_path, basename = os.path.split(in_file)
|
||
for member in tar_file.getnames():
|
||
if member.endswith("THUMB.JPG"):
|
||
# 解压缩略图
|
||
tar_file.extract(member, thumbnail_path)
|
||
ThumbnailPath = thumbnail_path + "/" + member
|
||
ThumbnailName = member.split('/')[1]
|
||
elif member.endswith(".XML"):
|
||
# 解压XML文件
|
||
tar_file.extract(member, xml_path)
|
||
xmlPath = xml_path + "/" + member
|
||
xmlFileName = member.split('/')[1]
|
||
|
||
# 获取文件流
|
||
meta_file = tar_file.extractfile(member)
|
||
meta_content = meta_file.read()
|
||
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
|
||
|
||
# 产品日期
|
||
productDate = dom.getElementsByTagName('productDate')[0].firstChild.data
|
||
imagingStartTime = dom.getElementsByTagName('imagingStartTime')[0].firstChild.data # 开始时间
|
||
imagingStopTime = dom.getElementsByTagName('imagingStopTime')[0].firstChild.data # 结束时间
|
||
|
||
# 其他信息
|
||
pixelSpacing = dom.getElementsByTagName('pixelSpacing')[0].firstChild.data # 分辨率
|
||
# earthModel = dom.getElementsByTagName('earthModel')[0].firstChild.data # 投影
|
||
mapProjection = dom.getElementsByTagName('mapProjection')[0].firstChild.data # 投影坐标系
|
||
# zone = dom.getElementsByTagName('zone')[0].firstChild.data # 带号
|
||
bands = dom.getElementsByTagName('bands')[0].firstChild.data # 波段
|
||
|
||
# 中心经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName('productUpperLeftLat')[0].firstChild.data # 左上纬度
|
||
TopLeftLongitude = dom.getElementsByTagName('productUpperLeftLong')[0].firstChild.data # 左上经度
|
||
TopRightLatitude = dom.getElementsByTagName('productUpperRightLat')[0].firstChild.data # 右上纬度
|
||
TopRightLongitude = dom.getElementsByTagName('productUpperRightLong')[0].firstChild.data # 右上经度
|
||
BottomLeftLatitude = dom.getElementsByTagName('productLowerLeftLat')[0].firstChild.data # 左下纬度
|
||
BottomLeftLongitude = dom.getElementsByTagName('productLowerLeftLong')[0].firstChild.data # 左下经度
|
||
BottomRightLatitude = dom.getElementsByTagName('productLowerRightLat')[0].firstChild.data # 右下纬度
|
||
BottomRightLongitude = dom.getElementsByTagName('productLowerRightLong')[0].firstChild.data # 右下纬度
|
||
|
||
# 边界几何
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
else:
|
||
continue
|
||
# 构建字典
|
||
hj1_dict = {"ProductTime": productDate,
|
||
"StartTime": imagingStartTime,
|
||
"EndTime": imagingStopTime,
|
||
"CloudPercent": "",
|
||
# "TopLeftLatitude": TopLeftLatitude,
|
||
# "TopLeftLongitude": TopLeftLongitude,
|
||
# "TopRightLatitude": TopRightLatitude,
|
||
# "TopRightLongitude": TopRightLongitude,
|
||
# "BottomLeftLatitude": BottomLeftLatitude,
|
||
# "BottomLeftLongitude": BottomLeftLongitude,
|
||
# "BottomRightLatitude": BottomRightLatitude,
|
||
# "BottomRightLongitude": BottomRightLongitude,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": bands,
|
||
"ImageGSD": pixelSpacing,
|
||
"ProjectedCoordinates": mapProjection,
|
||
"CollectionCode": "",
|
||
"ThumbnailPath": ThumbnailPath,
|
||
"ThumbnailName": ThumbnailName,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "month"}
|
||
# 关闭压缩文件
|
||
tar_file.close()
|
||
# 判断字典是否为空
|
||
if not hj1_dict:
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return hj1_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetZY02CData(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取资源2(ZY-2)卫星元数据:
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
zy2_mux_dict, zy2_pan_dict = dict(), dict()
|
||
in_path, basename = os.path.split(in_file)
|
||
with tarfile.open(in_file, mode='r') as tar_file:
|
||
for member in tar_file.getnames():
|
||
if member.endswith("MUX_thumb.jpg"):
|
||
# 解压多光谱缩略图
|
||
tar_file.extract(member, thumbnail_path)
|
||
ThumbnailPath_MUX = thumbnail_path + "/" + member
|
||
ThumbnailName_MUX = member
|
||
elif member.endswith("PAN_thumb.jpg"):
|
||
# 解压全色缩略图
|
||
tar_file.extract(member, thumbnail_path)
|
||
ThumbnailPath_PAN = thumbnail_path + "/" + member
|
||
ThumbnailName_PAN = member
|
||
|
||
if member.endswith('MUX.xml'):
|
||
# 解压XML文件
|
||
tar_file.extract(member, xml_path)
|
||
xmlPath = xml_path + "/" + member
|
||
xmlFileName = member
|
||
|
||
# 获取文件流
|
||
meta_file = tar_file.extractfile(member)
|
||
meta_content = meta_file.read()
|
||
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
|
||
|
||
# 产品日期
|
||
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
|
||
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
|
||
|
||
# 其他信息
|
||
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
|
||
MapProjection = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
|
||
EarthEllipsoid = dom.getElementsByTagName('EarthEllipsoid')[0].firstChild.data # 地理坐标系
|
||
ZoneNo = dom.getElementsByTagName('ZoneNo')[0].firstChild.data # 投影分带带号
|
||
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
|
||
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
|
||
|
||
# 中心经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
|
||
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
|
||
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
|
||
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
|
||
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
|
||
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
|
||
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
|
||
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
|
||
|
||
# 几何边界
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
# 构建多光谱字典
|
||
zy2_mux_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": CloudPercent,
|
||
# "TopLeftLatitude": TopLeftLatitude,
|
||
# "TopLeftLongitude": TopLeftLongitude,
|
||
# "TopRightLatitude": TopRightLatitude,
|
||
# "TopRightLongitude": TopRightLongitude,
|
||
# "BottomRightLatitude": BottomRightLatitude,
|
||
# "BottomRightLongitude": BottomRightLongitude,
|
||
# "BottomLeftLatitude": BottomLeftLatitude,
|
||
# "BottomLeftLongitude": BottomLeftLongitude,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": MapProjection,
|
||
'CollectionCode': "",
|
||
"ThumbnailPath": ThumbnailPath_MUX,
|
||
"ThumbnailName": ThumbnailName_MUX,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "day"}
|
||
elif member.endswith('PAN.xml'):
|
||
# 解压XML文件
|
||
tar_file.extract(member, xml_path)
|
||
xmlPath = xml_path + "/" + member
|
||
xmlFileName = member
|
||
|
||
# 获取文件流
|
||
meta_file = tar_file.extractfile(member)
|
||
meta_content = meta_file.read()
|
||
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
|
||
|
||
# 产品日期
|
||
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
|
||
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
|
||
|
||
# 其他信息
|
||
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
|
||
MapProjection = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
|
||
EarthEllipsoid = dom.getElementsByTagName('EarthEllipsoid')[0].firstChild.data # 地理坐标系
|
||
ZoneNo = dom.getElementsByTagName('ZoneNo')[0].firstChild.data # 投影分带带号
|
||
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
|
||
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
|
||
|
||
# 中心经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
|
||
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
|
||
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
|
||
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
|
||
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
|
||
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
|
||
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
|
||
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
|
||
|
||
# 几何边界
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
# 构建全色字典
|
||
zy2_pan_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": CloudPercent,
|
||
# "TopLeftLatitude": TopLeftLatitude,
|
||
# "TopLeftLongitude": TopLeftLongitude,
|
||
# "TopRightLatitude": TopRightLatitude,
|
||
# "TopRightLongitude": TopRightLongitude,
|
||
# "BottomRightLatitude": BottomRightLatitude,
|
||
# "BottomRightLongitude": BottomRightLongitude,
|
||
# "BottomLeftLatitude": BottomLeftLatitude,
|
||
# "BottomLeftLongitude": BottomLeftLongitude,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": MapProjection,
|
||
'CollectionCode': "",
|
||
"ThumbnailPath": ThumbnailPath_PAN,
|
||
"ThumbnailName": ThumbnailName_PAN,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "day"}
|
||
else:
|
||
continue
|
||
# 关闭压缩文件
|
||
tar_file.close()
|
||
# 判断字典是否为空
|
||
if (not zy2_mux_dict) or (not zy2_pan_dict):
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return zy2_mux_dict, zy2_pan_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
def GetZY3Data(in_file, xml_path, thumbnail_path):
|
||
"""
|
||
获取资源3(ZY-3)卫星元数据:
|
||
:param thumbnail_path:
|
||
:param xml_path:
|
||
:param in_file:
|
||
:return: 元数据字典
|
||
"""
|
||
try:
|
||
zy3_dict = dict()
|
||
with tarfile.open(in_file, mode='r') as tar_file:
|
||
in_path, basename = os.path.split(in_file)
|
||
for member in tar_file.getnames():
|
||
if member.endswith("thumb.jpg"):
|
||
# 解压缩略图
|
||
tar_file.extract(member, thumbnail_path)
|
||
ThumbnailPath = thumbnail_path + "/" + member
|
||
ThumbnailName = member
|
||
if not member.endswith('.xml'):
|
||
continue
|
||
elif member.endswith('order.xml'):
|
||
continue
|
||
else:
|
||
# 解压XML文件
|
||
tar_file.extract(member, xml_path)
|
||
xmlPath = xml_path + "/" + member
|
||
xmlFileName = member
|
||
|
||
# 获取文件流
|
||
meta_file = tar_file.extractfile(member)
|
||
meta_content = meta_file.read()
|
||
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
|
||
|
||
# 产品日期
|
||
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
|
||
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
|
||
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
|
||
|
||
# 其他信息
|
||
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
|
||
MapProjection = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
|
||
EarthEllipsoid = dom.getElementsByTagName('EarthEllipsoid')[0].firstChild.data # 地理坐标系
|
||
ZoneNo = dom.getElementsByTagName('ZoneNo')[0].firstChild.data # 投影分带带号
|
||
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
|
||
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
|
||
|
||
# 中心经纬度
|
||
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
|
||
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
|
||
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
|
||
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
|
||
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
|
||
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
|
||
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
|
||
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
|
||
|
||
# 边界几何
|
||
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
|
||
f'{TopRightLongitude} {TopRightLatitude},' \
|
||
f'{BottomRightLongitude} {BottomRightLatitude},' \
|
||
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
|
||
f'{TopLeftLongitude} {TopLeftLatitude}))'
|
||
|
||
# 构建字典
|
||
zy3_dict = {"ProduceTime": ProduceTime,
|
||
"StartTime": StartTime,
|
||
"EndTime": EndTime,
|
||
"CloudPercent": CloudPercent,
|
||
# "TopLeftLatitude": TopLeftLatitude,
|
||
# "TopLeftLongitude": TopLeftLongitude,
|
||
# "TopRightLatitude": TopRightLatitude,
|
||
# "TopRightLongitude": TopRightLongitude,
|
||
# "BottomRightLatitude": BottomRightLatitude,
|
||
# "BottomRightLongitude": BottomRightLongitude,
|
||
# "BottomLeftLatitude": BottomLeftLatitude,
|
||
# "BottomLeftLongitude": BottomLeftLongitude,
|
||
"boundaryGeomStr": boundaryGeomStr,
|
||
"bands": Bands,
|
||
"ImageGSD": ImageGSD,
|
||
"ProjectedCoordinates": MapProjection,
|
||
"CollectionCode": "",
|
||
"ThumbnailPath": ThumbnailPath,
|
||
"ThumbnailName": ThumbnailName,
|
||
"xmlPath": xmlPath,
|
||
"xmlFileName": xmlFileName,
|
||
"DirectoryDepth": "month"}
|
||
# 关闭压缩文件
|
||
tar_file.close()
|
||
# 判断是否为空
|
||
if not zy3_dict:
|
||
return {"code": -1, "msg": "没有满足条件的数据字典..."}
|
||
return zy3_dict
|
||
except Exception as e:
|
||
print(str(e))
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
|
||
if __name__ == '__main__':
|
||
HJ1FilePath = r"Y:\不同传感器数据\HJ-1\HJ1A-CCD2-450-80-20090501-L20000106616.tar.gz"
|
||
JPSSFilePath = r"Y:\不同传感器数据\JPSS\VJ102IMG.A2021159.0542.002.2021159094907.nc"
|
||
ZY2FilePath = r"Y:\不同传感器数据\ZY-2\ZY02C_PMS_E115.9_N36.2_20120422_L2C0000391981.tar.gz"
|
||
ZY3FilePath = r"Y:\不同传感器数据\ZY-3\ZY3_MUX_E83.3_N43.3_20120405_L2A0000301226.tar.gz"
|
||
|
||
S1FilePath = r'Y:\不同传感器数据\SENTINEL-1\S1A_IW_GRDH_1SDV_20210407T095634_20210407T095659_037343_046675_8E66.zip'
|
||
S2FilePath = r'Y:\不同传感器数据\SENTINEL-2\S2B_MSIL2A_20210804T024549_N0301_R132_T50SQF_20210804T053331.zip'
|
||
GF1PMSPath = r'Y:\不同传感器数据\GF-1\GF1_PMS2_E104.1_N36.6_20210308_L1A0005524847.tar.gz'
|
||
H08FilePath = r"Y:\不同传感器数据\葵花8\NC_H08_20210802_2010_R21_FLDK.06001_06001.nc"
|
||
SNPPFilePath = r"Y:\不同传感器数据\VIIRS\VNP02IMG.A2021182.0418.001.2021182100800.nc"
|
||
|
||
GF3MDJPath = r'Y:\不同传感器数据\GF-3\GF3_MDJ_SS_024986_E120.8_N35.6_20210509_L1A_VHVV_L10005638033.tar.gz'
|
||
GF4PMIPath = r'Y:\不同传感器数据\GF-4\GF4_PMI_E119.8_N35.3_20210908_L1A0000417337.tar.gz'
|
||
S3OLFilePath = r'Y:\不同传感器数据\SENTINEL-3' \
|
||
r'\S3B_OL_1_EFR____20210910T022645_20210910T022945_20210911T064342_0179_056_374_2340_LN1_O_NT_002.zip'
|
||
S3SLFilePath = r'Y:\不同传感器数据\SENTINEL-3' \
|
||
r'\S3A_SL_1_RBT____20210916T020956_20210916T021256_20210917T120953_0179_076_217_2340_LN2_O_NT_004.zip'
|
||
# 读取 HJ-1 元数据
|
||
hj1_dic = GetHJ1Data(HJ1FilePath)
|
||
print(hj1_dic)
|
||
# 读取 JPSS 元数据
|
||
jpss_dic = GetJPSSData(JPSSFilePath)
|
||
print(jpss_dic)
|
||
# 读取 ZY2 元数据
|
||
zy2_mux_dic, zy2_pan_dic = GetZY02CData(ZY2FilePath)
|
||
print(zy2_mux_dic)
|
||
print(zy2_pan_dic)
|
||
# 读取 ZY3 元数据
|
||
zy3_dic = GetZY3Data(ZY3FilePath)
|
||
print(zy3_dic)
|
||
|
||
# 读取GF-PMS元数据
|
||
pms_mss_dic, pms_pan_dic = GetGFPMSData(GF1PMSPath)
|
||
print(pms_mss_dic)
|
||
print(pms_pan_dic)
|
||
# 读取葵花8元数据
|
||
h8_dic = GetH08Data(H08FilePath)
|
||
print(h8_dic)
|
||
# 读取 S2 元数据
|
||
s2_dic = GetSentinel2Data(S2FilePath)
|
||
print(s2_dic)
|
||
# 读取 S1 元数据
|
||
s1_dic = GetSentinel1Data(S1FilePath)
|
||
print(s1_dic)
|
||
# 读取 SNPP 元数据
|
||
snpp_dic = GetSNPPData(SNPPFilePath)
|
||
print(snpp_dic)
|
||
|
||
# 读取 GF3 元数据
|
||
gf3_dic = GetGF3MDJData(GF3MDJPath)
|
||
print(gf3_dic)
|
||
# 读取 GF4 元数据
|
||
gf4_pms_dic, gf4_irs_dic = GetGF4PMIData(GF4PMIPath)
|
||
print(gf4_pms_dic)
|
||
print(gf4_irs_dic)
|
||
|
||
# 读取 S3 OL元数据
|
||
s3ol_dic = GetSentinel3OLData(S3OLFilePath)
|
||
print(s3ol_dic)
|
||
# # 读取 S3 SL元数据
|
||
# s3sl_dic = GetSentinel3SLData(S3SLFilePath)
|
||
# print(s3sl_dic)
|