dms-client/data_meta/GetMetaInfo.py

2025 lines
98 KiB
Python
Raw Permalink Normal View History

2022-02-28 13:51:30 +08:00
import io
from xml.dom import minidom
from osgeo import gdal
import h5py
from PIL import Image
import numpy as np
import re
import os
import sys
2022-02-28 13:51:30 +08:00
import time
import datetime
import tarfile
import zipfile
def exe_path():
"""
[获取exe目录]
Returns:
[str]: [exe目录]
"""
if hasattr(sys, 'frozen'):
# Handles PyInstaller
return os.path.dirname(sys.executable)
return os.path.dirname(os.path.realpath(__file__))
os.environ['PROJ_LIB'] = exe_path() + "/PROJ"
2022-02-28 13:51:30 +08:00
def GetTimestamp(date) :
# 转换成时间数组
timeArray = time.strptime(date, "%Y-%m-%d %H:%M:%S")
# 转换成时间戳
timestamp = time.mktime(timeArray)
return timestamp
def uint16to8(bands, lower_percent=0.001, higher_percent=99.999):
"""
拉伸图像图片16位转8位
:param bands: 输入栅格数据
:param lower_percent: 最低百分比
:param higher_percent: 最高百分比
:return:
"""
out = np.zeros_like(bands, dtype=np.uint8)
n = bands.shape[0]
for i in range(n):
a = 0 # np.min(band)
b = 255 # np.max(band)
c = np.percentile(bands[i, :, :], lower_percent)
d = np.percentile(bands[i, :, :], higher_percent)
t = a + (bands[i, :, :] - c) * (b - a) / (d - c)
t[t < a] = a
t[t > b] = b
out[i, :, :] = t
return out
def createXML(metadata, xlm_file):
"""
创建xlm文件并写入字典
:param metadata: 元数据信息
:param xlm_file: xlm文件
:return:
"""
# 创建一个空的文档
document = minidom.Document() # 创建DOM文档对象
# 创建一个根节点对象
root = document.createElement('ProductMetaData')
# 设置根节点的属性
# root.setAttribute('', '')
# 将根节点添加到文档对象中
document.appendChild(root)
# 字典转xml
for key in metadata:
# 创建父节点
node_name = document.createElement(key)
# 给父节点设置文本
node_name.appendChild(document.createTextNode(str(metadata[key])))
# 将各父节点添加到根节点
root.appendChild(node_name)
# 写入xlm文档
with open(xlm_file, 'w', encoding='utf-8') as f:
document.writexml(f, indent='\t', newl='\n', addindent='\t', encoding='utf-8')
f.close()
def GetGFPMSData(in_file, xml_path, thumbnail_ath):
"""
获取高分 PMS卫星元数据
:param thumbnail_ath:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file:
extensions = ('MSS2_thumb.jpg', 'PAN2_thumb.jpg', 'MSS2.xml', 'PAN2.xml')
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
# 解压多光谱缩略图
if file_list[1].endswith('MSS2_thumb.jpg'):
tar_file.extract(file_list[1], thumbnail_ath)
ThumbnailPath_MSS = in_path + "/" + file_list[1]
ThumbnailName_MSS = file_list[1]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压全色缩略图
if file_list[3].endswith("PAN2_thumb.jpg"):
tar_file.extract(file_list[3], thumbnail_ath)
ThumbnailPath_PAN = thumbnail_ath + "/" + file_list[3]
ThumbnailName_PAN = file_list[3]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压多光谱XML文件
if file_list[0].endswith('MSS2.xml'):
# 解压XML文件
tar_file.extract(file_list[0], xml_path)
xmlPath = xml_path + "/" + file_list[0]
xmlFileName = file_list[0]
# 获取文件流
meta_file = tar_file.extractfile(file_list[0])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
# WidthInPixels = dom.getElementsByTagName('WidthInPixels')[0].firstChild.data
# HeightInPixels = dom.getElementsByTagName('HeightInPixels')[0].firstChild.data
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建多光谱字典
gf_mss_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
# "WidthInPixels": WidthInPixels,
# "HeightInPixels": HeightInPixels,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath_MSS,
"ThumbnailName": ThumbnailName_MSS,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压全色XML文件
if file_list[2].endswith('PAN2.xml'):
# 解压XML文件
tar_file.extract(file_list[2], xml_path)
xmlPath = xml_path + "/" + file_list[2]
xmlFileName = file_list[2]
# 获取文件流
meta_file = tar_file.extractfile(file_list[2])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
# WidthInPixels = dom.getElementsByTagName('WidthInPixels')[0].firstChild.data
# HeightInPixels = dom.getElementsByTagName('HeightInPixels')[0].firstChild.data
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建全色字典
gf_pan_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
# "WidthInPixels": WidthInPixels,
# "HeightInPixels": HeightInPixels,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath_PAN,
"ThumbnailName": ThumbnailName_PAN,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 关闭压缩文件
tar_file.close()
if (not gf_mss_dict) or (not gf_pan_dict):
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return gf_mss_dict, gf_pan_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetGF3MDJData(in_file, xml_path, thumbnail_path):
"""
获取高分3号MDJGF-3 MDJ卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file:
extensions = ('.thumb.jpg', 'meta.xml')
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
# 解压缩略图
if file_list[0].endswith('.thumb.jpg'):
tar_file.extract(file_list[0], thumbnail_path)
ThumbnailPath = thumbnail_path + "/" + file_list[0]
ThumbnailName = file_list[0]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压XML文件
if file_list[1].endswith('meta.xml'):
tar_file.extract(file_list[1], xml_path)
xmlPath = xml_path + "/" + file_list[1]
xmlFileName = file_list[1]
# 获取文件流
meta_file = tar_file.extractfile(file_list[1])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
CollectionCode = "GF3_MDJ"
ProduceTime = dom.getElementsByTagName('productGentime')[0].firstChild.data
StartTime = dom.getElementsByTagName("imagingTime")[0].getElementsByTagName("start")[0].firstChild.data
EndTime = dom.getElementsByTagName("imagingTime")[0].getElementsByTagName("end")[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('NominalResolution')[0].firstChild.data
# EarthModel = dom.getElementsByTagName('EarthModel')[0].firstChild.data
ProjectedCoordinates = dom.getElementsByTagName('ProjectModel')[0].firstChild.data
Bands = "1,2"
# 经纬度
TopLeftLatitude = dom.getElementsByTagName("topLeft")[0].getElementsByTagName("latitude")[
0].firstChild.data
TopLeftLongitude = dom.getElementsByTagName("topLeft")[0].getElementsByTagName("longitude")[
0].firstChild.data
TopRightLatitude = dom.getElementsByTagName("topRight")[0].getElementsByTagName("latitude")[
0].firstChild.data
TopRightLongitude = dom.getElementsByTagName("topRight")[0].getElementsByTagName("longitude")[
0].firstChild.data
BottomLeftLatitude = dom.getElementsByTagName("bottomLeft")[0].getElementsByTagName("latitude")[
0].firstChild.data
BottomLeftLongitude = dom.getElementsByTagName("bottomLeft")[0].getElementsByTagName("longitude")[
0].firstChild.data
BottomRightLatitude = dom.getElementsByTagName("bottomRight")[0].getElementsByTagName("latitude")[
0].firstChild.data
BottomRightLongitude = dom.getElementsByTagName("bottomRight")[0].getElementsByTagName("longitude")[
0].firstChild.data
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建字典
gf3_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": "",
"boundaryGeomStr": boundaryGeomStr,
"Bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": ProjectedCoordinates,
'CollectionCode': CollectionCode,
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 判断字典是否为空
if not gf3_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return gf3_dict
except Exception as e:
return {"code": -1, "msg": str(e)}
def GetGF4PMIData(in_file, xml_path, thumbnail_path):
"""
获取高分4号PMIGF-4 PMI卫星元数据
PMS可见光近红外 5个波段 50mIRS中波红外 1个波段 400m
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file:
extensions = ('_thumb.jpg', '.xml')
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
# 解压PMS缩略图
if file_list[2].endswith('_thumb.jpg') and file_list[2].startswith('GF4_PMS_'):
tar_file.extract(file_list[2], thumbnail_path)
ThumbnailPath_PMS = thumbnail_path + "/" + file_list[2]
ThumbnailName_PMS = file_list[2]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压IRS缩略图
if file_list[0].endswith('_thumb.jpg') and file_list[0].startswith('GF4_IRS_'):
tar_file.extract(file_list[0], thumbnail_path)
ThumbnailPath_IRS = thumbnail_path + "/" + file_list[0]
ThumbnailName_IRS = file_list[0]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压PMS XML文件
if file_list[3].endswith('.xml') and file_list[3].startswith('GF4_PMS_'):
# 解压XML文件
tar_file.extract(file_list[3], xml_path)
xmlPath = xml_path + "/" + file_list[3]
xmlFileName = file_list[3]
# 获取文件流
meta_file = tar_file.extractfile(file_list[3])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
CollectionCode = "GF4_PMS"
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
# ProjectedCoordinates = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
ProjectedCoordinates = "" # 投影坐标系
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建可见光近红外(PMS)字典
gf4_pms_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": boundaryGeomStr,
"Bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": ProjectedCoordinates,
'CollectionCode': CollectionCode,
"ThumbnailName": ThumbnailName_PMS,
"ThumbnailPath": ThumbnailPath_PMS,
"xmlFileName": xmlFileName,
"xmlPath": xmlPath,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压IRS XML文件
if file_list[1].endswith('.xml') and file_list[1].startswith('GF4_IRS_'):
# 解压XML文件
tar_file.extract(file_list[1], xml_path)
xmlPath = xml_path + "/" + file_list[1]
xmlFileName = file_list[1]
# 获取文件流
meta_file = tar_file.extractfile(file_list[1])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
CollectionCode = "GF4_IRS"
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
# ProjectedCoordinates = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
ProjectedCoordinates = "" # 投影坐标系
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建中红外(IRS)字典
gf4_irs_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": boundaryGeomStr,
"Bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": ProjectedCoordinates,
'CollectionCode': CollectionCode,
"ThumbnailName": ThumbnailName_IRS,
"ThumbnailPath": ThumbnailPath_IRS,
"xmlFileName": xmlFileName,
"xmlPath": xmlPath,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 关闭压缩文件
tar_file.close()
# 判断字典是否为空
if (not gf4_pms_dict) or (not gf4_irs_dict):
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return gf4_pms_dict, gf4_irs_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetH08Data(in_file, xml_path, thumbnail_path):
"""
获取葵花8卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
# 其他信息
with h5py.File(in_file, mode='r') as f:
start_time = f['start_time'][0]
end_time = f['end_time'][0]
band_id = f['band_id'][:]
bands = ','.join(str(i) for i in band_id)
ImageGSD = '1000, 500, 2000'
# 生成缩略图
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 取出子数据集
datasets = in_datasets.GetSubDatasets()
red_data = gdal.Open(datasets[7][0]).ReadAsArray()
gre_data = gdal.Open(datasets[6][0]).ReadAsArray()
blu_data = gdal.Open(datasets[5][0]).ReadAsArray()
img_data = np.array([red_data, gre_data, blu_data])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del in_datasets
del img_data
del img_data2
del img
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
createXML(meta_data, xmlPath)
# 产品日期
date_created = meta_data['date_created']
# band_number = meta_data['band_number']
# 经纬度
upper_left_latitude = meta_data['upper_left_latitude']
upper_left_longitude = int(meta_data['upper_left_longitude']) - 180
upper_right_latitude = meta_data['upper_left_latitude']
upper_right_longitude = 200 - 180
lower_right_latitude = -60
lower_right_longitude = 200 - 180
lower_left_latitude = -60
lower_left_longitude = str(int(meta_data['upper_left_longitude']) - 180)
boundaryGeomStr = f'POLYGON(({upper_left_longitude} {upper_left_latitude},' \
f'{upper_right_longitude} {upper_right_latitude},' \
f'{lower_right_longitude} {lower_right_latitude},' \
f'{lower_left_longitude} {lower_left_latitude},' \
f'{upper_left_longitude} {upper_left_latitude}))'
# 构建字典
himawari8_dict = {"ProduceTime": date_created,
"StartTime": "",
"EndTime": "",
"CloudPercent": "",
"boundaryGeomStr": boundaryGeomStr,
"bands": bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
# 判断字典是否为空
if not himawari8_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return himawari8_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetJPSSData(in_file, xml_path, thumbnail_path):
"""
获取联合极轨卫星系统JPSS-1元数据NOAA-20(Joint Polar Satellite System spacecraft)
:param xml_path:
:param thumbnail_path:
:param in_file:
:return: 元数据字典
"""
try:
# 生成缩略图
in_path, basename = os.path.split(in_file)
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 取出子数据集
datasets = in_datasets.GetSubDatasets()
red_data = gdal.Open(datasets[0][0]).ReadAsArray()
nir_data = gdal.Open(datasets[3][0]).ReadAsArray()
swir_data = gdal.Open(datasets[9][0]).ReadAsArray()
img_data = np.array([red_data, nir_data, swir_data])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del in_datasets
del img_data
del img_data2
del img
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
createXML(meta_data, xmlPath)
# 产品日期
ProductionTime = meta_data['ProductionTime']
StartTime = meta_data['StartTime']
EndTime = meta_data['EndTime']
# 其他信息
ImageGSD = str(meta_data['LongName']).split(" ")[-1]
Bands = str(meta_data['title']).split(" ")[1]
# 中心经纬度
productUpperLeftLat = meta_data['NorthBoundingCoordinate'] # 左上纬度
productUpperLeftLong = meta_data['WestBoundingCoordinate'] # 左上经度
productUpperRightLat = meta_data['NorthBoundingCoordinate'] # 右上纬度
productUpperRightLong = meta_data['EastBoundingCoordinate'] # 右上经度
productLowerLeftLat = meta_data['SouthBoundingCoordinate'] # 左下纬度
productLowerLeftLong = meta_data['WestBoundingCoordinate'] # 左下经度
productLowerRightLat = meta_data['SouthBoundingCoordinate'] # 右下纬度
productLowerRightLong = meta_data['EastBoundingCoordinate'] # 右下纬度
# 边界几何
boundaryGeomStr = f'POLYGON(({productUpperLeftLong} {productUpperLeftLat},' \
f'{productUpperRightLong} {productUpperRightLat},' \
f'{productLowerRightLong} {productLowerRightLat},' \
f'{productLowerLeftLong} {productLowerLeftLat},' \
f'{productUpperLeftLong} {productUpperLeftLat}))'
# 构建字典
jpss_dict = {"ProduceTime": ProductionTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": "",
# "TopLeftLatitude": productUpperLeftLat,
# "TopLeftLongitude": productUpperLeftLong,
# "TopRightLatitude": productUpperRightLat,
# "TopRightLongitude": productUpperRightLong,
# "BottomLeftLatitude": productLowerLeftLat,
# "BottomLeftLongitude": productLowerLeftLong,
# "BottomRightLatitude": productLowerRightLat,
# "BottomRightLongitude": productLowerRightLong,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
# 判断字典是否为空
if not jpss_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
print(jpss_dict)
return jpss_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetSNPPData(in_file, xml_path, thumbnail_path):
"""
获取Suomi National Polar-orbiting PartnershipSNPP元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 取出子数据集
datasets = in_datasets.GetSubDatasets()
red_data = gdal.Open(datasets[0][0]).ReadAsArray()
gre_data = gdal.Open(datasets[3][0]).ReadAsArray()
blu_data = gdal.Open(datasets[9][0]).ReadAsArray()
img_data = np.array([red_data, gre_data, blu_data])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del in_datasets
del img_data
del img_data2
del img
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
createXML(meta_data, xmlPath)
# 产品日期
ProductionTime = meta_data['ProductionTime']
StartTime = meta_data['StartTime']
EndTime = meta_data['EndTime']
# 其他信息
ImageGSD = str(meta_data['LongName']).split(" ")[-1][:-1]
Bands = str(meta_data['title'])
# 中心经纬度
productUpperLeftLat = meta_data['NorthBoundingCoordinate'] # 左上纬度
productUpperLeftLong = meta_data['WestBoundingCoordinate'] # 左上经度
productUpperRightLat = meta_data['NorthBoundingCoordinate'] # 右上纬度
productUpperRightLong = meta_data['EastBoundingCoordinate'] # 右上经度
productLowerLeftLat = meta_data['SouthBoundingCoordinate'] # 左下纬度
productLowerLeftLong = meta_data['WestBoundingCoordinate'] # 左下经度
productLowerRightLat = meta_data['SouthBoundingCoordinate'] # 右下纬度
productLowerRightLong = meta_data['EastBoundingCoordinate'] # 右下纬度
boundaryGeomStr = f'POLYGON(({productUpperLeftLong} {productUpperLeftLat},' \
f'{productUpperRightLong} {productUpperRightLat},' \
f'{productLowerRightLong} {productLowerRightLat},' \
f'{productLowerLeftLong} {productLowerLeftLat},' \
f'{productUpperLeftLong} {productUpperLeftLat}))'
# 构建字典
snpp_dict = {"ProductionTime": ProductionTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": "",
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
# 判断字典是否为空
if not snpp_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return snpp_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetSentinel1Data(in_file, xml_path, thumbnail_path):
"""
获取哨兵1卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with zipfile.ZipFile(in_file, mode='r') as zip_file:
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
for member in zip_file.namelist():
if re.match(r'[0-9a-zA-Z\_]+.SAFE/annotation/s1a-iw-grd-vv[0-9a-z\-]+.xml', member):
# 输出xml文件
meta_data = zip_file.read(member)
with open(xmlPath, "wb") as fout:
fout.write(meta_data)
# 产品日期
meta_content = zip_file.open(member)
dom = minidom.parse(meta_content)
ProduceTime = dom.getElementsByTagName('qualityInformation')[
0].getElementsByTagName('qualityDataList')[
0].getElementsByTagName('qualityData')[
0].getElementsByTagName('azimuthTime')[
0].firstChild.data
StartTime = dom.getElementsByTagName('adsHeader')[0].getElementsByTagName('startTime')[
0].firstChild.data
StopTime = dom.getElementsByTagName('adsHeader')[0].getElementsByTagName('stopTime')[
0].firstChild.data
elif re.match(r'[0-9a-zA-Z\_]+.SAFE/preview/map-overlay.kml', member):
# 读取其他信息
meta_content = zip_file.open(member)
dom = minidom.parse(meta_content)
coordinates = dom.getElementsByTagName('coordinates')[0].firstChild.data
# 经纬度
lon_lat = re.split(r'\s', coordinates)
TopLeftLatitude = re.split(r'\,', lon_lat[0])[1] # 左上纬度
TopLeftLongitude = re.split(r'\,', lon_lat[0])[0] # 左上经度
TopRightLatitude = re.split(r'\,', lon_lat[1])[1] # 右上纬度
TopRightLongitude = re.split(r'\,', lon_lat[1])[0] # 右上经度
BottomRightLatitude = re.split(r'\,', lon_lat[2])[1] # 右下纬度
BottomRightLongitude = re.split(r'\,', lon_lat[2])[0] # 右下经度
BottomLeftLatitude = re.split(r'\,', lon_lat[3])[1] # 左下纬度
BottomLeftLongitude = re.split(r'\,', lon_lat[3])[0] # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
elif re.match(r'[0-9a-zA-Z\_]+.SAFE/preview/quick-look.png', member):
# 输出缩略图
thumb_data = zip_file.read(member)
with open(ThumbnailPath, "wb") as fout:
fout.write(thumb_data)
else:
continue
# 生成字典
S1_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"StopTime": StopTime,
"CloudPercent": "",
"boundaryGeomStr": boundaryGeomStr,
"bands": "Amplitude_VH,Intensity_VH,Amplitude_VV,Intensity_VV",
# "NumberBands": "",
"ImageGSD": "10",
"ProjectedCoordinates": '',
"CollectionCode": '',
"ThumbnailName": ThumbnailName,
"ThumbnailPath": ThumbnailPath,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
zip_file.close()
if not S1_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return S1_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetSentinel2Data(in_file, xml_path, thumbnail_path):
"""
获取哨兵2卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with zipfile.ZipFile(in_file, 'r', zipfile.ZIP_DEFLATED) as zip_file:
2022-02-28 13:51:30 +08:00
extensions = ('_B02.jp2', '_B03.jp2', '_B04.jp2', '.SAFE/MTD_MSIL1C')
file_list = [file for file in zip_file.namelist() if file.endswith(extensions)]
file_list.sort()
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
# bgr_data = ['/vsizip/%s/%s' % (in_file, file) for file in file_list[:3]]
# out_vrt = '/vsimem/stacked.vrt' # 波段合成输出虚拟路径
# # 将多个源文件合成为一个VRTvirtual gdal dataset文件
# out_dataset = gdal.BuildVRT(out_vrt, bgr_data, separate=True)
# # 将VRT文件转换为目标格式的图像
# gdal.Translate(ThumbnailPath,
# out_dataset,
# format='JPEG',
# outputType=gdal.GDT_Byte,
# widthPct=10,
# heightPct=10,
# creationOptions=["TILED=YES", "COMPRESS=LZW"])
# # 释放内存
# # gdal.GetDriverByName("VRT").Delete('/vsimem/stacked.vrt')
# gdal.Unlink('/vsimem/stacked.vrt')
# del out_dataset
rgb_list = []
for file in file_list[:3]:
sub_dataset = gdal.Open('/vsizip/%s/%s' % (in_file, file))
sub_array = sub_dataset.ReadAsArray()
rgb_list.append(sub_array)
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del rgb_list
del img_data
del img_data2
del img
# 解压多光谱XML文件
2022-02-28 13:51:30 +08:00
if file_list[3].endswith('.SAFE/MTD_MSIL1C'):
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
meta_data = zip_file.read(file_list[3])
with open(xmlPath, "wb") as fout:
fout.write(meta_data)
# 读取其他信息
meta_content = zip_file.open(file_list[3])
dom = minidom.parse(meta_content)
cloud_percent = dom.getElementsByTagName('n1:Quality_Indicators_Info')[
0].getElementsByTagName('Cloud_Coverage_Assessment')[0].firstChild.data
ImageGSD = '10, 20, 60'
ProjectedCoordinates = dom.getElementsByTagName('n1:Geometric_Info')[
0].getElementsByTagName('Coordinate_Reference_System')[
0].getElementsByTagName('GEO_TABLES')[0].firstChild.data
# 产品日期
ProduceTime = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('GENERATION_TIME')[0].firstChild.data
StartTime = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('PRODUCT_START_TIME')[0].firstChild.data
StopTime = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('PRODUCT_STOP_TIME')[0].firstChild.data
# 经纬度
lon_lat = dom.getElementsByTagName('n1:Geometric_Info')[0].getElementsByTagName('Product_Footprint')[
0].getElementsByTagName('Product_Footprint')[0].getElementsByTagName('Global_Footprint')[
0].getElementsByTagName('EXT_POS_LIST')[0].firstChild.data
TopLeftLatitude = re.split(r'\s', lon_lat)[0] # 左上纬度
TopLeftLongitude = re.split(r'\s', lon_lat)[1] # 左上经度
TopRightLatitude = re.split(r'\s', lon_lat)[2] # 右上纬度
TopRightLongitude = re.split(r'\s', lon_lat)[3] # 右上经度
BottomRightLatitude = re.split(r'\s', lon_lat)[4] # 右下纬度
BottomRightLongitude = re.split(r'\s', lon_lat)[5] # 右下经度
BottomLeftLatitude = re.split(r'\s', lon_lat)[6] # 左下纬度
BottomLeftLongitude = re.split(r'\s', lon_lat)[7] # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 生成字典
S2_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"StopTime": StopTime,
"CloudPercent": cloud_percent,
"boundaryGeomStr": boundaryGeomStr,
"bands": "1,2,3,4,5,6,7,8,9,10,11,12",
# "NumberBands": '12',
"ImageGSD": ImageGSD,
"ProjectedCoordinates": ProjectedCoordinates,
"CollectionCode": '',
"ThumbnailName": ThumbnailName,
"ThumbnailPath": ThumbnailPath,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
zip_file.close()
if not S2_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return S2_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetSentinel3OLData(in_file, xml_path, thumbnail_path):
"""
获取哨兵3 OLCI海陆色度计卫星元数据有待修改
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with zipfile.ZipFile(in_file, 'r', zipfile.ZIP_DEFLATED) as zip_file:
extensions = ('Oa03_radiance.nc', 'Oa05_radiance.nc', 'Oa08_radiance.nc', 'xfdumanifest.xml')
file_list = [file for file in zip_file.namelist() if file.endswith(extensions)]
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
rgb_list = []
for file in file_list[:3]:
sub_dataset = gdal.Open('/vsizip/%s/%s' % (in_file, file))
sub_array = sub_dataset.ReadAsArray()
rgb_list.append(sub_array)
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del rgb_list
del img_data
del img_data2
del img
# 解压XML文件
if file_list[3].endswith('xfdumanifest.xml'):
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
meta_data = zip_file.read(file_list[3])
with open(xmlPath, "wb") as fout:
fout.write(meta_data)
# 读取其他信息
CollectionCode = "Sentinel3_OLCI_L1"
meta_content = zip_file.open(file_list[3])
dom = minidom.parse(meta_content)
ProjectedCoordinates = ""
CloudPercent = ""
# 产品日期
ProduceTime = dom.getElementsByTagName('sentinel3:creationTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('sentinel3:receivingStartTime')[0].firstChild.data
StopTime = dom.getElementsByTagName('sentinel3:receivingStopTime')[0].firstChild.data
# 经纬度
TopLeftLatitude = dom.getElementsByTagName('sentinel-safe:y')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('sentinel-safe:x')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('sentinel-safe:y')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('sentinel-safe:x')[2].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('sentinel-safe:y')[2].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('sentinel-safe:x')[2].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('sentinel-safe:y')[2].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('sentinel-safe:x')[0].firstChild.data # 左下经度
# boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
# f'{TopRightLongitude} {TopRightLatitude},' \
# f'{BottomRightLongitude} {BottomRightLatitude},' \
# f'{BottomLeftLongitude} {BottomLeftLatitude},' \
# f'{TopLeftLongitude} {TopLeftLatitude}))'
# 生成字典
S3_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"StopTime": StopTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": "",
"bands": "Oa01,Oa02,Oa03,Oa04,Oa05,Oa06,Oa07,Oa08,Oa09,Oa10,Oa11,Oa12,Oa13,Oa14,Oa15,Oa16,"
"Oa17,Oa18,Oa19,Oa20,Oa21",
# "NumberBands": '21',
"ImageGSD": "270,294",
"ProjectedCoordinates": ProjectedCoordinates,
"CollectionCode": CollectionCode,
"ThumbnailName": ThumbnailName,
"ThumbnailPath": ThumbnailPath,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
zip_file.close()
if not S3_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return S3_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetHJ1Data(in_file, xml_path, thumbnail_path):
"""
获取环境1号HJ-1卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
with tarfile.open(in_file, mode='r') as tar_file:
in_path, basename = os.path.split(in_file)
for member in tar_file.getnames():
if member.endswith("THUMB.JPG"):
# 解压缩略图
tar_file.extract(member, thumbnail_path)
ThumbnailPath = thumbnail_path + "/" + member
ThumbnailName = member.split('/')[1]
elif member.endswith(".XML"):
# 解压XML文件
tar_file.extract(member, xml_path)
xmlPath = xml_path + "/" + member
xmlFileName = member.split('/')[1]
# 获取文件流
meta_file = tar_file.extractfile(member)
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
productDate = dom.getElementsByTagName('productDate')[0].firstChild.data
imagingStartTime = dom.getElementsByTagName('imagingStartTime')[0].firstChild.data # 开始时间
imagingStopTime = dom.getElementsByTagName('imagingStopTime')[0].firstChild.data # 结束时间
# 其他信息
pixelSpacing = dom.getElementsByTagName('pixelSpacing')[0].firstChild.data # 分辨率
# earthModel = dom.getElementsByTagName('earthModel')[0].firstChild.data # 投影
mapProjection = dom.getElementsByTagName('mapProjection')[0].firstChild.data # 投影坐标系
# zone = dom.getElementsByTagName('zone')[0].firstChild.data # 带号
bands = dom.getElementsByTagName('bands')[0].firstChild.data # 波段
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('productUpperLeftLat')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('productUpperLeftLong')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('productUpperRightLat')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('productUpperRightLong')[0].firstChild.data # 右上经度
BottomLeftLatitude = dom.getElementsByTagName('productLowerLeftLat')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('productLowerLeftLong')[0].firstChild.data # 左下经度
BottomRightLatitude = dom.getElementsByTagName('productLowerRightLat')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('productLowerRightLong')[0].firstChild.data # 右下纬度
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
else:
continue
# 构建字典
hj1_dict = {"ProductTime": productDate,
"StartTime": imagingStartTime,
"EndTime": imagingStopTime,
"CloudPercent": "",
# "TopLeftLatitude": TopLeftLatitude,
# "TopLeftLongitude": TopLeftLongitude,
# "TopRightLatitude": TopRightLatitude,
# "TopRightLongitude": TopRightLongitude,
# "BottomLeftLatitude": BottomLeftLatitude,
# "BottomLeftLongitude": BottomLeftLongitude,
# "BottomRightLatitude": BottomRightLatitude,
# "BottomRightLongitude": BottomRightLongitude,
"boundaryGeomStr": boundaryGeomStr,
"bands": bands,
"ImageGSD": pixelSpacing,
"ProjectedCoordinates": mapProjection,
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
# 关闭压缩文件
tar_file.close()
# 判断字典是否为空
if not hj1_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return hj1_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetZY02CData(in_file, xml_path, thumbnail_path):
"""
获取资源2ZY-2卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
zy2_mux_dict, zy2_pan_dict = dict(), dict()
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file:
for member in tar_file.getnames():
if member.endswith("MUX_thumb.jpg"):
# 解压多光谱缩略图
tar_file.extract(member, thumbnail_path)
ThumbnailPath_MUX = thumbnail_path + "/" + member
ThumbnailName_MUX = member
elif member.endswith("PAN_thumb.jpg"):
# 解压全色缩略图
tar_file.extract(member, thumbnail_path)
ThumbnailPath_PAN = thumbnail_path + "/" + member
ThumbnailName_PAN = member
if member.endswith('MUX.xml'):
# 解压XML文件
tar_file.extract(member, xml_path)
xmlPath = xml_path + "/" + member
xmlFileName = member
# 获取文件流
meta_file = tar_file.extractfile(member)
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
MapProjection = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
EarthEllipsoid = dom.getElementsByTagName('EarthEllipsoid')[0].firstChild.data # 地理坐标系
ZoneNo = dom.getElementsByTagName('ZoneNo')[0].firstChild.data # 投影分带带号
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 几何边界
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建多光谱字典
zy2_mux_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
# "TopLeftLatitude": TopLeftLatitude,
# "TopLeftLongitude": TopLeftLongitude,
# "TopRightLatitude": TopRightLatitude,
# "TopRightLongitude": TopRightLongitude,
# "BottomRightLatitude": BottomRightLatitude,
# "BottomRightLongitude": BottomRightLongitude,
# "BottomLeftLatitude": BottomLeftLatitude,
# "BottomLeftLongitude": BottomLeftLongitude,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": MapProjection,
'CollectionCode': "",
"ThumbnailPath": ThumbnailPath_MUX,
"ThumbnailName": ThumbnailName_MUX,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
elif member.endswith('PAN.xml'):
# 解压XML文件
tar_file.extract(member, xml_path)
xmlPath = xml_path + "/" + member
xmlFileName = member
# 获取文件流
meta_file = tar_file.extractfile(member)
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
MapProjection = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
EarthEllipsoid = dom.getElementsByTagName('EarthEllipsoid')[0].firstChild.data # 地理坐标系
ZoneNo = dom.getElementsByTagName('ZoneNo')[0].firstChild.data # 投影分带带号
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 几何边界
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建全色字典
zy2_pan_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
# "TopLeftLatitude": TopLeftLatitude,
# "TopLeftLongitude": TopLeftLongitude,
# "TopRightLatitude": TopRightLatitude,
# "TopRightLongitude": TopRightLongitude,
# "BottomRightLatitude": BottomRightLatitude,
# "BottomRightLongitude": BottomRightLongitude,
# "BottomLeftLatitude": BottomLeftLatitude,
# "BottomLeftLongitude": BottomLeftLongitude,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": MapProjection,
'CollectionCode': "",
"ThumbnailPath": ThumbnailPath_PAN,
"ThumbnailName": ThumbnailName_PAN,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
else:
continue
# 关闭压缩文件
tar_file.close()
# 判断字典是否为空
if (not zy2_mux_dict) or (not zy2_pan_dict):
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return zy2_mux_dict, zy2_pan_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetZY3Data(in_file, xml_path, thumbnail_path):
"""
获取资源3ZY-3卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
zy3_dict = dict()
with tarfile.open(in_file, mode='r') as tar_file:
in_path, basename = os.path.split(in_file)
for member in tar_file.getnames():
if member.endswith("thumb.jpg"):
# 解压缩略图
tar_file.extract(member, thumbnail_path)
ThumbnailPath = thumbnail_path + "/" + member
ThumbnailName = member
if not member.endswith('.xml'):
continue
elif member.endswith('order.xml'):
continue
else:
# 解压XML文件
tar_file.extract(member, xml_path)
xmlPath = xml_path + "/" + member
xmlFileName = member
# 获取文件流
meta_file = tar_file.extractfile(member)
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
MapProjection = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
EarthEllipsoid = dom.getElementsByTagName('EarthEllipsoid')[0].firstChild.data # 地理坐标系
ZoneNo = dom.getElementsByTagName('ZoneNo')[0].firstChild.data # 投影分带带号
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建字典
zy3_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
# "TopLeftLatitude": TopLeftLatitude,
# "TopLeftLongitude": TopLeftLongitude,
# "TopRightLatitude": TopRightLatitude,
# "TopRightLongitude": TopRightLongitude,
# "BottomRightLatitude": BottomRightLatitude,
# "BottomRightLongitude": BottomRightLongitude,
# "BottomLeftLatitude": BottomLeftLatitude,
# "BottomLeftLongitude": BottomLeftLongitude,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": MapProjection,
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
# 关闭压缩文件
tar_file.close()
# 判断是否为空
if not zy3_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return zy3_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
2022-02-28 13:51:30 +08:00
def GetLandsatData(in_file, thumbnail_path, txt_path) :
try :
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file :
extensions = ('_B2.TIF', '_B3.TIF', '_B4.TIF', '_MTL.txt')
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
file_list.sort()
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
rgb_list = []
for file in file_list[:3] :
path = '/vsitar/%s/%s' % (in_file, file)
sub_dataset = gdal.Open(path)
sub_array = sub_dataset.ReadAsArray()
rgb_list.append(sub_array)
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, : :-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2] :
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else :
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del rgb_list
del img_data
del img_data2
del img
Landsat_dict = {}
if file_list[3].endswith('_MTL.txt') :
# 读取信息
TxTPath = os.path.join(txt_path, file_list[3])
tar_file.extract(file_list[3], txt_path)
fr = open(TxTPath, 'r')
dic = {}
keys = [] # 用来存储读取的顺序
for line in fr :
if '=' in line :
v = line.strip().split(' = ')
dic[v[0]] = v[1]
keys.append(v[0])
fr.close()
# 读取信息
cloud_percent = dic['CLOUD_COVER']
ImageGSD = '30'
ProjectedCoordinates = dic['MAP_PROJECTION']
# 产品日期
FILE_DATE = dic['FILE_DATE']
FILE_DATE = FILE_DATE.split("Z")[0].replace("T", " ")
ProduceTime = str(GetTimestamp(FILE_DATE))
# 经纬度
TopLeftLatitude = dic['CORNER_UL_LAT_PRODUCT'] # 左上纬度
TopLeftLongitude = dic['CORNER_UL_LON_PRODUCT'] # 左上经度
TopRightLatitude = dic['CORNER_UR_LAT_PRODUCT'] # 右上纬度
TopRightLongitude = dic['CORNER_UR_LON_PRODUCT'] # 右上经度
BottomRightLatitude = dic['CORNER_LR_LAT_PRODUCT'] # 右下纬度
BottomRightLongitude = dic['CORNER_LR_LON_PRODUCT'] # 右下经度
BottomLeftLatitude = dic['CORNER_LL_LAT_PRODUCT'] # 左下纬度
BottomLeftLongitude = dic['CORNER_LL_LON_PRODUCT'] # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 生成字典
Landsat_dict = {"ProduceTime" : ProduceTime,
"StartTime" : "",
"StopTime" : "",
"CloudPercent" : cloud_percent,
"boundaryGeomStr" : boundaryGeomStr,
# "bands" : "1,2,3,4,5,6,7,8,9,10,11,12",
"ImageGSD" : ImageGSD,
# "ProjectedCoordinates" : ProjectedCoordinates,
# "CollectionCode" : '',
"ThumbnailName" : ThumbnailName,
"ThumbnailPath" : ThumbnailPath,
"xmlPath" : TxTPath,
"xmlFileName" : file_list[3],
"DirectoryDepth" : "month"}
if not Landsat_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return Landsat_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
def GetSentinel2Data(in_file, xml_path, thumbnail_path) :
"""
获取哨兵2卫星元数据
:param thumbnail_path:
:param in_file:
:return: 元数据字典
"""
try :
in_path, basename = os.path.split(in_file)
with zipfile.ZipFile(in_file, 'r', zipfile.ZIP_DEFLATED) as zip_file :
extensions = ('_B02.jp2', '_B03.jp2', '_B04.jp2', 'MTD_MSIL1C.xml')
file_list = [file for file in zip_file.namelist() if file.endswith(extensions)]
file_list.sort()
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
rgb_list = []
for file in file_list[:3] :
sub_dataset = gdal.Open('/vsizip/%s/%s' % (in_file, file))
sub_array = sub_dataset.ReadAsArray()
rgb_list.append(sub_array)
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, : :-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2] :
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else :
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del rgb_list
del img_data
del img_data2
del img
S2_dict = {}
if file_list[3].endswith('MTD_MSIL1C.xml') :
# 生成xml文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
meta_data = zip_file.read(file_list[3])
with open(xmlPath, "wb") as fout :
fout.write(meta_data)
# 读取信息
meta_content = zip_file.open(file_list[3])
dom = minidom.parse(meta_content)
cloud_percent = dom.getElementsByTagName('n1:Quality_Indicators_Info')[
0].getElementsByTagName('Cloud_Coverage_Assessment')[0].firstChild.data
ImageGSD = '10, 20, 60'
# 产品日期
GENERATION_TIME = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('GENERATION_TIME')[0].firstChild.data
GENERATION_TIME = GENERATION_TIME.split(".")[0].replace("T", " ")
ProduceTime = str(GetTimestamp(GENERATION_TIME))
PRODUCT_START_TIME = \
dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('PRODUCT_START_TIME')[0].firstChild.data
PRODUCT_START_TIME = PRODUCT_START_TIME.split(".")[0].replace("T", " ")
StartTime = str(GetTimestamp(PRODUCT_START_TIME))
PRODUCT_STOP_TIME = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('PRODUCT_STOP_TIME')[0].firstChild.data
PRODUCT_STOP_TIME = PRODUCT_STOP_TIME.split(".")[0].replace("T", " ")
StopTime = str(GetTimestamp(PRODUCT_STOP_TIME))
# 经纬度
lon_lat = dom.getElementsByTagName('n1:Geometric_Info')[0].getElementsByTagName('Product_Footprint')[
0].getElementsByTagName('Product_Footprint')[0].getElementsByTagName('Global_Footprint')[
0].getElementsByTagName('EXT_POS_LIST')[0].firstChild.data
TopLeftLatitude = re.split(r'\s', lon_lat)[0] # 左上纬度
TopLeftLongitude = re.split(r'\s', lon_lat)[1] # 左上经度
TopRightLatitude = re.split(r'\s', lon_lat)[2] # 右上纬度
TopRightLongitude = re.split(r'\s', lon_lat)[3] # 右上经度
BottomRightLatitude = re.split(r'\s', lon_lat)[4] # 右下纬度
BottomRightLongitude = re.split(r'\s', lon_lat)[5] # 右下经度
BottomLeftLatitude = re.split(r'\s', lon_lat)[6] # 左下纬度
BottomLeftLongitude = re.split(r'\s', lon_lat)[7] # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 生成字典
S2_dict = {"ProduceTime" : ProduceTime,
"StartTime" : StartTime,
"StopTime" : StopTime,
"CloudPercent" : cloud_percent,
"boundaryGeomStr" : boundaryGeomStr,
"ImageGSD" : ImageGSD,
"ThumbnailName" : ThumbnailName,
"ThumbnailPath" : ThumbnailPath,
"xmlPath" : file_list[3],
"xmlFileName" : xmlFileName,
"DirectoryDepth" : "month"}
if not S2_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return S2_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
def GetModisData(in_file) :
"""
获取MODIS卫星元数据
:param in_file:
:return: 元数据字典
"""
try :
datasets = gdal.Open(in_file)
# 获取hdf中的元数据
Metadata = datasets.GetMetadata()
# 获取信息
PRODUCTIONDATETIME = Metadata["PRODUCTIONDATETIME"]
PRODUCTIONDATETIME = PRODUCTIONDATETIME.split(".")[0].replace("T", " ")
ProductionTime = str(GetTimestamp(PRODUCTIONDATETIME))
RANGEBEGINNINGDATE = Metadata["RANGEBEGINNINGDATE"] + " " + Metadata["RANGEBEGINNINGTIME"]
StartTime = str(GetTimestamp(RANGEBEGINNINGDATE))
RANGEENDINGDATE = Metadata["RANGEENDINGDATE"] + " " + Metadata["RANGEENDINGTIME"]
EndTime = str(GetTimestamp(RANGEENDINGDATE))
Latitudes = Metadata["GRINGPOINTLATITUDE.1"] # 获取四个角的维度
LatitudesList = Latitudes.split(", ") # 采用", "进行分割
Longitude = Metadata["GRINGPOINTLONGITUDE.1"] # 获取四个角的经度
LongitudeList = Longitude.split(", ") # 采用", "进行分割
TopLeftLatitude = LatitudesList[0]
TopLeftLongitude = LongitudeList[0]
TopRightLatitude = LatitudesList[1]
TopRightLongitude = LongitudeList[1]
BottomRightLatitude = LatitudesList[2]
BottomRightLongitude = LongitudeList[2]
BottomLeftLatitude = LatitudesList[3]
BottomLeftLongitude = LongitudeList[3] # 获取经纬度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
DirectoryDepth = Metadata["DAYNIGHTFLAG"]
modis_dict = {"ProduceTime" : ProductionTime,
"StartTime" : StartTime,
"EndTime" : EndTime,
"ImageGSD" : "",
"CloudPercent" : "0",
'boundaryGeomStr' : boundaryGeomStr,
"ThumbnailPath" : "",
"ThumbnailName" : "",
"xmlPath" : "",
"xmlFileName" : "",
"DirectoryDepth" : DirectoryDepth
}
if not modis_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return modis_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
def GetGOCIData(in_file) :
"""
获取GOCI卫星元数据
:param in_file:
:return: 元数据字典
"""
try :
# 读取信息
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 产品日期
date_created = meta_data['HDFEOS_POINTS_Ephemeris_Scene_center_time']
date_created = date_created.split(".")[0]
date_created = str(datetime.datetime.strptime(date_created, "%d-%b-%Y %H:%M:%S"))
ProductionTime = str(GetTimestamp(date_created))
start_time = meta_data['HDFEOS_POINTS_Ephemeris_Scene_Start_time']
start_time = start_time.split(".")[0]
start_time = str(datetime.datetime.strptime(start_time, "%d-%b-%Y %H:%M:%S"))
StartTime = str(GetTimestamp(start_time))
end_time = meta_data['HDFEOS_POINTS_Ephemeris_Scene_end_time']
end_time = end_time.split(".")[0]
end_time = str(datetime.datetime.strptime(end_time, "%d-%b-%Y %H:%M:%S"))
EndTime = str(GetTimestamp(end_time))
ImageGSD = meta_data['HDFEOS_POINTS_Scene_Header_pixel_spacing'].split(" ")[0]
# 经纬度
upper_left_latitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_upper-left_latitude']
upper_left_longitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_upper-left_longitude']
upper_right_latitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_upper-right_latitude']
upper_right_longitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_upper-right_longitude']
lower_right_latitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_lower-right_latitude']
lower_right_longitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_lower-right_longitude']
lower_left_latitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_lower-left_latitude']
lower_left_longitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_lower-left_longitude']
boundaryGeomStr = f'POLYGON(({upper_left_longitude} {upper_left_latitude},' \
f'{upper_right_longitude} {upper_right_latitude},' \
f'{lower_right_longitude} {lower_right_latitude},' \
f'{lower_left_longitude} {lower_left_latitude},' \
f'{upper_left_longitude} {upper_left_latitude}))'
# 构建字典
GOCI_dict = {"ProduceTime" : ProductionTime,
"StartTime" : StartTime,
"EndTime" : EndTime,
"CloudPercent" : "0",
"boundaryGeomStr" : boundaryGeomStr,
"ImageGSD" : ImageGSD,
"ThumbnailPath" : "",
"ThumbnailName" : "",
"xmlPath" : "",
"xmlFileName" : "",
"DirectoryDepth" : "day"}
# 判断字典是否为空
if not GOCI_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return GOCI_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
def GetGK2BData(in_file) :
"""
获取GK2B_GOCI卫星元数据
:param in_file:
:return: 元数据字典
"""
try :
# 读取信息
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 产品日期
produc_time = meta_data['NC_GLOBAL#slot_acquisition_time'].replace("_", "")
produc_time = produc_time[0 :4] + "-" + produc_time[4 :6] + "-" + produc_time[6 :8] + " " + \
produc_time[8 :10] + ":" + produc_time[10 :12] + ":" + produc_time[12 :14]
ProductionTime = str(GetTimestamp(produc_time))
start_time = meta_data['NC_GLOBAL#observation_start_time'].replace("_", "")
start_time = start_time[0 :4] + "-" + start_time[4 :6] + "-" + start_time[6 :8] + " " + \
start_time[8 :10] + ":" + start_time[10 :12] + ":" + start_time[12 :14]
StartTime = str(GetTimestamp(start_time))
end_time = meta_data['NC_GLOBAL#observation_end_time'].replace("_", "")
end_time = end_time[0 :4] + "-" + end_time[4 :6] + "-" + end_time[6 :8] + " " + \
end_time[8 :10] + ":" + end_time[10 :12] + ":" + end_time[12 :14]
EndTime = str(GetTimestamp(end_time))
# 其他信息
ImageGSD = meta_data['NC_GLOBAL#geospatial_lat_resolution'].split(" ")[0]
# 中心经纬度
productUpperLeftLat = meta_data['NC_GLOBAL#image_upperleft_latitude'] # 左上纬度
productUpperLeftLong = meta_data['NC_GLOBAL#image_upperleft_longitude'] # 左上经度
productUpperRightLat = meta_data['NC_GLOBAL#image_upperleft_latitude'] # 右上纬度
productUpperRightLong = meta_data['NC_GLOBAL#image_lowerright_longitude'] # 右上经度
productLowerLeftLat = meta_data['NC_GLOBAL#image_lowerright_latitude'] # 左下纬度
productLowerLeftLong = meta_data['NC_GLOBAL#image_upperleft_longitude'] # 左下经度
productLowerRightLat = meta_data['NC_GLOBAL#image_lowerright_latitude'] # 右下纬度
productLowerRightLong = meta_data['NC_GLOBAL#image_lowerright_longitude'] # 右下经度
# 边界几何
boundaryGeomStr = f'POLYGON(({productUpperLeftLong} {productUpperLeftLat},' \
f'{productUpperRightLong} {productUpperRightLat},' \
f'{productLowerRightLong} {productLowerRightLat},' \
f'{productLowerLeftLong} {productLowerLeftLat},' \
f'{productUpperLeftLong} {productUpperLeftLat}))'
# 构建字典
GK2B_dict = {"ProduceTime" : ProductionTime,
"StartTime" : StartTime,
"EndTime" : EndTime,
"CloudPercent" : "0",
"boundaryGeomStr" : boundaryGeomStr,
"ImageGSD" : ImageGSD,
"ThumbnailPath" : "",
"ThumbnailName" : "",
"xmlPath" : "",
"xmlFileName" : "",
"DirectoryDepth" : "day"}
# 判断字典是否为空
if not GK2B_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return GK2B_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
if __name__ == '__main__':
2022-02-28 13:51:30 +08:00
# HJ1FilePath = r"Y:\不同传感器数据\HJ-1\HJ1A-CCD2-450-80-20090501-L20000106616.tar.gz"
# JPSSFilePath = r"Y:\不同传感器数据\JPSS\VJ102IMG.A2021159.0542.002.2021159094907.nc"
# ZY2FilePath = r"Y:\不同传感器数据\ZY-2\ZY02C_PMS_E115.9_N36.2_20120422_L2C0000391981.tar.gz"
# ZY3FilePath = r"Y:\不同传感器数据\ZY-3\ZY3_MUX_E83.3_N43.3_20120405_L2A0000301226.tar.gz"
#
# S1FilePath = r'Y:\不同传感器数据\SENTINEL-1\S1A_IW_GRDH_1SDV_20210407T095634_20210407T095659_037343_046675_8E66.zip'
# S2FilePath = r"D:\1Company\Python\RS_data_Dowload\Google_Download_New\data\S2B_MSIL1C_20210113T024049_N0209_R089_T51STB_20210113T041228.zip"
# GF1PMSPath = r'Y:\不同传感器数据\GF-1\GF1_PMS2_E104.1_N36.6_20210308_L1A0005524847.tar.gz'
# H08FilePath = r"Y:\不同传感器数据\葵花8\NC_H08_20210802_2010_R21_FLDK.06001_06001.nc"
# SNPPFilePath = r"Y:\不同传感器数据\VIIRS\VNP02IMG.A2021182.0418.001.2021182100800.nc"
#
# GF3MDJPath = r'Y:\不同传感器数据\GF-3\GF3_MDJ_SS_024986_E120.8_N35.6_20210509_L1A_VHVV_L10005638033.tar.gz'
# GF4PMIPath = r'Y:\不同传感器数据\GF-4\GF4_PMI_E119.8_N35.3_20210908_L1A0000417337.tar.gz'
# S3OLFilePath = r'Y:\不同传感器数据\SENTINEL-3' \
# r'\S3B_OL_1_EFR____20210910T022645_20210910T022945_20210911T064342_0179_056_374_2340_LN1_O_NT_002.zip'
# S3SLFilePath = r'Y:\不同传感器数据\SENTINEL-3' \
# r'\S3A_SL_1_RBT____20210916T020956_20210916T021256_20210917T120953_0179_076_217_2340_LN2_O_NT_004.zip'
# 读取 HJ-1 元数据
2022-02-28 13:51:30 +08:00
# hj1_dic = GetHJ1Data(HJ1FilePath)
# print(hj1_dic)
# 读取 JPSS 元数据
2022-02-28 13:51:30 +08:00
# jpss_dic = GetJPSSData(JPSSFilePath)
# print(jpss_dic)
# 读取 ZY2 元数据
2022-02-28 13:51:30 +08:00
# zy2_mux_dic, zy2_pan_dic = GetZY02CData(ZY2FilePath)
# print(zy2_mux_dic)
# print(zy2_pan_dic)
# 读取 ZY3 元数据
2022-02-28 13:51:30 +08:00
# zy3_dic = GetZY3Data(ZY3FilePath)
# print(zy3_dic)
# 读取GF-PMS元数据
2022-02-28 13:51:30 +08:00
# pms_mss_dic, pms_pan_dic = GetGFPMSData(GF1PMSPath)
# print(pms_mss_dic)
# print(pms_pan_dic)
# 读取葵花8元数据
2022-02-28 13:51:30 +08:00
# h8_dic = GetH08Data(H08FilePath)
# print(h8_dic)
# 读取 S2 元数据
2022-02-28 13:51:30 +08:00
# xml_path = r"D:\1Company\Python\RS_data_Dowload\Google_Download_New\data"
# thumbnail_path = r"D:\1Company\Python\RS_data_Dowload\Google_Download_New\data"
# s2_dic = GetSentinel2Data(S2FilePath, xml_path, thumbnail_path)
# print(s2_dic)
# 读取 S1 元数据
2022-02-28 13:51:30 +08:00
# s1_dic = GetSentinel1Data(S1FilePath)
# print(s1_dic)
# 读取 SNPP 元数据
2022-02-28 13:51:30 +08:00
# snpp_dic = GetSNPPData(SNPPFilePath)
# print(snpp_dic)
# 读取 GF3 元数据
2022-02-28 13:51:30 +08:00
# gf3_dic = GetGF3MDJData(GF3MDJPath)
# print(gf3_dic)
# 读取 GF4 元数据
2022-02-28 13:51:30 +08:00
# gf4_pms_dic, gf4_irs_dic = GetGF4PMIData(GF4PMIPath)
# print(gf4_pms_dic)
# print(gf4_irs_dic)
# 读取 S3 OL元数据
2022-02-28 13:51:30 +08:00
# s3ol_dic = GetSentinel3OLData(S3OLFilePath)
# print(s3ol_dic)
# # 读取 S3 SL元数据
# s3sl_dic = GetSentinel3SLData(S3SLFilePath)
# print(s3sl_dic)
2022-02-28 13:51:30 +08:00
# 读取 S2 元数据
S2FilePath = r"F:\test\Sentinel\S2A_MSIL1C_20220102T031131_N0301_R075_T50SLG_20220102T050158.SAFE.zip"
xml_path = r"F:\test\Sentinel"
thumbnail_path = r"F:\test\Sentinel"
s2_dic = GetSentinel2Data(S2FilePath, xml_path, thumbnail_path)
print(s2_dic)
# 读取Landsat 8 数据
LandsatFilePath = r"F:\test\USGS_data\landsat_8_c1\LC81220342021355LGN00.tar.gz"
thumbnail_path1 = r"F:\test\USGS_data\landsat_8_c1"
txt_path = thumbnail_path1
Landsat_dic = GetLandsatData(LandsatFilePath, thumbnail_path1, txt_path)
print(Landsat_dic)
# 读取MODIS数据
MODIS_path = r"F:\test\MODIS\MOD11A1\MOD11A1.A2022038.h26v05.006.2022039112456.hdf" # MOD11A1数据信息读取
MODIS = GetModisData(MODIS_path)
print(MODIS)
# 读取GOCI数据
GOCI_FilePath = r"F:\test\GOCI_Data\GOCI_L2\2020-01-01\COMS_GOCI_L2A_GA_20200101001642.CDOM.he5"
GOCI = GetGOCIData(GOCI_FilePath)
print(GOCI)
# 读取GK2B_GOCI数据
GK2B_FilePath = r"F:\test\GOCI_Data\GK2_GC2_L2\AC\GK2B_GOCI2_L2_20220208_011530_LA_S010_AC.nc"
GK2B = GetGK2BData(GK2B_FilePath)
print(GK2B)