# -*- coding:utf-8 -*-
'''
@Project : lb_toolkits
@File : downloadFY.py
@Modify Time : 2022/8/11 15:34
@Author : Lee
@Version : 1.0
@Description :
'''
import glob
import os
import time
import datetime
import tempfile
import logging
logger = logging.getLogger(__name__)
from lb_toolkits.utils import ftppro
from lb_toolkits.utils import spiderdownload
from .config import FY_FTP_URL
FYProdInfo = {
'FY4A' : {
'AGRI' : ['ACI', 'AMV',
'CFR', 'CIX', 'CLM', 'CLP', 'CLT', 'CTH', 'CTP', 'CTT',
'DLR', 'DSD', 'FHS', 'FOG', 'LPW', 'LSE', 'LST', 'OLR',
'QPE', 'RSR', 'SSI', 'SST', 'TBB', 'TFP', 'ULR'],
'GIIRS' : ['AVP'],
},
'FY4B' : {
'AGRI' : ['CTH', 'CTP', 'CTT', 'QPE'],
},
'FY3D' : {
'MERSI' : ['CLA', 'NVI', 'PWS', 'PWV'],
}
}
# FY3 10度块 编码对应关系
FY3Block10CoefX = {
"00": 0.0,
"10": 10.0,
"20": 20.0,
"30": 30.0,
"40": 40.0,
"50": 50.0,
"60": 60.0,
"70": 70.0,
"80": 80.0,
"90": 90.0,
"A0": 100.0,
"B0": 110.0,
"C0": 120.0,
"D0": 130.0,
"E0": 140.0,
"F0": 150.0,
"G0": 160.0,
"H0": 170.0,
"I0": -10.0,
"J0": -20.0,
"K0": -30.0,
"L0": -40.0,
"M0": -50.0,
"N0": -60.0,
"O0": -70.0,
"P0": -80.0,
"Q0": -90.0,
"R0": -100.0,
"S0": -110.0,
"T0": -120.0,
"U0": -130.0,
"V0": -140.0,
"W0": -150.0,
"X0": -160.0,
"Y0": -170.0,
"Z0": -180.0,
}
FY3Block10CoefY = {
"80": 90.0,
"70": 80.0,
"60": 70.0,
"50": 60.0,
"40": 50.0,
"30": 40.0,
"20": 30.0,
"10": 20.0,
"00": 10.0,
"90": 0.0,
"A0": -10.0,
"B0": -20.0,
"C0": -30.0,
"D0": -40.0,
"E0": -50.0,
"F0": -60.0,
"G0": -70.0,
"H0": -80.0,
}
[文档]
class downloadFY :
def __init__(self, username=None, password=None):
'''
支持下载FY3D、FY4A、FY4B的L1、L2级数据产品
Parameters
----------
username: str
用户名
password: str
密码
'''
if username is not None and password is not None :
self.ftp = ftppro(FY_FTP_URL, username, password)
self.connect()
self.dstfilelist = []
[文档]
def connect(self):
try:
self.ftp.connect()
# self.ftp.close()
except BaseException :
logger.error('登录失败,请连接并进行FTP账号注册。http://fy4.nsmc.org.cn/data/en/data/realtime.html')
[文档]
def searchfile(
self,
startDate,
endDate=None,
orderfile=None, orderID=None,
satid='FY3D', instid='MERSI',
resolution=0.01, geoflag=True,
**kwargs
):
'''
根据在风云官网提交的订单号或者订单文件信息,
对风云卫星数据进行下载
Parameters
----------
orderfile : str
orderID : str
Returns
-------
'''
if endDate is None and startDate is not None:
endDate = startDate
if startDate is not None and endDate is not None:
orderlist = self.searchL1File(startDate, endDate,
satid=satid, instid=instid,
resolution=resolution,
geoflag=geoflag)
return orderlist
if orderID is not None :
orderurl = 'http://file.nsmc.org.cn/ORDERFILELIST/{orderid}.txt'.format(orderid=orderID)
print(orderurl)
tempdir = tempfile.gettempdir()
spider = spiderdownload()
orderfile = spider.download(tempdir, orderurl)
if orderfile is None or not os.path.isfile(orderfile) :
logger.error('订单文件不存在【%s】' %(orderfile))
return []
orderlist = self.readorderfile(orderfile)
if orderID is not None :
try:
os.remove(orderfile)
except BaseException :
pass
return orderlist
[文档]
def download(
self,
outdir,
url,
tries=3,
timeout=5*60,
skip_download=False,
cover=False,
continuing=True,
**kwargs
):
if isinstance(url, str) :
urls = [url]
elif isinstance(url, list) :
urls = url
user = None
passwd = None
host = None
for url in urls :
if 'http://' in url or 'https://' in url :
spider = spiderdownload()
spider.download(outdir, url, timeout=timeout, skip_download=skip_download, cover=cover)
elif 'ftp://' in url :
dict_info = self._spliturl(url)
if 'user' not in dict_info :
continue
if 'passwd' not in dict_info :
continue
if 'host' not in dict_info :
continue
if 'filepath' not in dict_info :
continue
if user is None or passwd is None or host is None :
user = dict_info['user']
passwd = dict_info['passwd']
host = dict_info['host']
mc = ftppro(host, user=user, password=passwd)
if user != dict_info['user'] or passwd != dict_info['passwd'] or host != dict_info['host'] :
print('将切换账号:【%s】-->【%s】' %(user, dict_info['user']))
user = dict_info['user']
passwd = dict_info['passwd']
host = dict_info['host']
del mc
mc = ftppro(host, user=user, password=passwd)
mc.downloadFile(dict_info['filepath'], outdir, skip_download=skip_download, cover=cover)
else:
self.ftp.downloadFile(url, outdir, skip_download=skip_download, cover=cover)
[文档]
def searchL1File(self, starttime, endtime=None,
satid='FY3D', instid='MERSI',
resolution=0.01, geoflag=True,
pattern=None):
'''
下载FY3D MERSI L1数据文件
Parameters
----------
dstpath: str
下载存储路径
starttime: datetime
数据下载时间(UTC)
endtime: datetime
数据下载时间(UTC)
satid: str
卫星名, FY3D/FY4A/FY4B
instid: str
载荷名 MERSI
resolution: float
degree,数据分辨率
geoflag : bool
default False,是否需要
下载对应时间的GEO文件,默认是不下载,
如果需要下载对应的GEO,需要将geoflag=True
pattern: str
模糊匹配条件
skip: bool
默认为False。如果为True,则跳过下载,直接返回文件名
Returns
-------
list
下载文件名列表
'''
if endtime is None :
endtime = starttime
# 拼接目录
if satid in ['FY3D'] :
L1FileList = self._PathForFY3DMERSIL1(starttime, endtime,
satid=satid, instid=instid,
resolution=resolution,
pattern=pattern, geoflag=geoflag)
else:
raise Exception('暂不支持【%s / %s】L1数据下载' %(satid, instid))
return L1FileList
[文档]
def download_fy_l2(self, dstpath, starttime, endtime=None,
satid='FY4A', instid='AGRI', prodid='CLM',
regionid='DISK', resolution=0.04,
FY3Block10Flag=False, extent=None, shpname=None,
pattern=None, skip=False, cover=False):
'''
下载FY3D MERSI、FY4A AGRI、GIITS、LMI L1数据文件
Parameters
----------
dstpath: str
下载存储路径
starttime: datetime
数据下载时间(UTC)
endtime: datetime, optional
数据下载时间(UTC)
satid: str
卫星名, FY3D/FY4A/FY4B
instid: str
载荷名 MERSI/AGRI/GIIRS/LMI
prodid: str
观测区域,DISK/REGC
regionid: str
观测区域,DISK/REGC
resolution: float, optional
degree,数据分辨率
pattern: str, optional
skip: bool
默认为False。如果为True,则跳过下载,直接返回文件名
Returns
-------
list
下载文件名列表
'''
if endtime is None :
endtime = starttime
# 拼接目录
if satid in ['FY4A'] :
L2FileList = self._PathForFY4AL2(starttime, endtime,
satid=satid, instid=instid,
prodid=prodid, regionid=regionid, pattern=pattern)
elif satid in ['FY4B'] :
L2FileList = self._PathForFY4BL2(starttime, endtime,
satid=satid, instid=instid,
prodid=prodid, regionid=regionid, pattern=pattern)
elif satid in ['FY3D'] :
L2FileList = self._PathForFY3DMERSIL2(starttime, endtime,
satid=satid, instid=instid, prodid=prodid,
regionid=regionid, resolution=resolution,
FY3Block10=FY3Block10Flag, extent=extent, shpname=shpname,
pattern=pattern)
else:
raise Exception('暂不支持【%s %s】产品【%s】下载' %(satid, instid, prodid))
if skip :
return L2FileList
else:
self.download(dstpath, L2FileList, cover=cover)
return L2FileList
def _PathForFY4AL1(self, starttime, endtime,
satid='FY4A', instid='AGRI',
regionid='DISK', resolution=0.04,
pattern=None, geoflag=False):
matchfiles = []
strRes = '%dM' %(int(resolution * 100 * 1000))
if not strRes in ['500M', '1000M', '2000M', '4000M'] :
raise Exception("请确认输入的分辨率是否正确,只支持下载【'500M', '1000M', '2000M', '4000M'】")
nowdate = starttime
while nowdate <= endtime :
if instid in ['AGRI'] :
L1Path = os.path.join('/FY4A/AGRI/L1/FDI', regionid, strRes,
nowdate.strftime("%Y"), nowdate.strftime("%Y%m%d"))
GeoPath = os.path.join('/FY4A/AGRI/L1/FDI', regionid, 'GEO',
nowdate.strftime("%Y"), nowdate.strftime("%Y%m%d"))
elif instid in ['GIIRS'] :
L1Path = os.path.join('/FY4A/GIIRS/L1/IRD/REGX/',
nowdate.strftime("%Y"), nowdate.strftime("%Y%m%d"))
GeoPath = None
else:
raise Exception('只支持下载FY4A AGRI和GIIRS L1近实时数据')
files = self.getFileList(L1Path,
pattern='*%s*%s*' %(nowdate.strftime('%Y%m%d%H'), strRes))
matchfiles = self._checktime(matchfiles, starttime, endtime, files, satid)
if geoflag :
files = self.getFileList(GeoPath,
pattern='*%s*%s*' %(nowdate.strftime('%Y%m%d%H'), strRes))
matchfiles = self._checktime(matchfiles, starttime, endtime, files, satid)
nowdate += datetime.timedelta(hours=1)
return matchfiles
def _PathForFY4AL2(self, starttime, endtime,
satid='FY4A', instid='AGRI', prodid=None,
regionid='DISK', resolution=0.04,
pattern=None):
matchfiles = []
self._checkprodid(satid, instid, prodid)
nowdate = starttime
while nowdate <= endtime :
if instid in ['AGRI'] :
L1Path = os.path.join('/', satid, instid, 'L2/', prodid, regionid, 'NOM',
nowdate.strftime("%Y"), nowdate.strftime("%Y%m%d"))
elif instid in ['GIIRS'] :
L1Path = os.path.join('/', satid, instid, 'L2/', prodid, 'DWELL',
nowdate.strftime("%Y"), nowdate.strftime("%Y%m%d"))
else:
raise Exception('只支持下载FY4A AGRI和GIIRS L2近实时数据')
files = self.getFileList(L1Path,
pattern='*%s*' %(nowdate.strftime('%Y%m%d%H')))
matchfiles = self._checktime(matchfiles, starttime, endtime, files, satid)
nowdate += datetime.timedelta(hours=1)
return matchfiles
def _PathForFY4BL2(self, starttime, endtime,
satid='FY4B', instid='AGRI', prodid=None,
regionid='DISK', resolution=0.04,
pattern=None):
matchfiles = []
self._checkprodid(satid, instid, prodid)
nowdate = starttime
while nowdate <= endtime :
if instid in ['AGRI'] :
L1Path = os.path.join('/', satid, instid, 'L2/', prodid, regionid, 'NOM',
nowdate.strftime("%Y"), nowdate.strftime("%Y%m%d"))
elif instid in ['GIIRS'] :
L1Path = os.path.join('/', satid, instid, 'L2/', prodid, 'DWELL',
nowdate.strftime("%Y"), nowdate.strftime("%Y%m%d"))
else:
raise Exception('只支持下载FY4B AGRI L2近实时数据')
files = self.getFileList(L1Path,
pattern='*%s*' %(nowdate.strftime('%Y%m%d%H')))
matchfiles = self._checktime(matchfiles, starttime, endtime, files, satid)
nowdate += datetime.timedelta(hours=1)
return matchfiles
def _PathForFY3DMERSIL1(self, starttime, endtime,
satid='FY3D', instid='MERSI',
regionid='GBAL', resolution=0.01,
pattern=None, geoflag=False):
matchfiles = []
strRes = '%dM' %(int(resolution * 100 * 1000))
if not strRes in ['250M', '1000M'] :
raise Exception("请确认输入的分辨率是否正确,只支持下载【'250M', '1000M'】")
nowdate = starttime
while nowdate <= endtime :
if instid in ['MERSI'] :
L1Path = os.path.join('/L1/', nowdate.strftime("%Y%m%d"))
GeoPath = os.path.join('/L1/', nowdate.strftime("%Y%m%d"))
else:
raise Exception('只支持下载FY3D MERSI L1 近实时数据')
files = self.getFileList(L1Path,
pattern='*%s*%s*' %(nowdate.strftime('%Y%m%d_%H'), strRes))
matchfiles = self._checktime(matchfiles, starttime, endtime, files, satid)
if geoflag :
if strRes in ['250M'] :
geoRes = 'GEOQK'
elif strRes in ['1000M'] :
geoRes = 'GEO1K'
files = self.getFileList(GeoPath,
pattern='*%s*%s*' %(nowdate.strftime('%Y%m%d_%H'), geoRes))
matchfiles = self._checktime(matchfiles, starttime, endtime, files, satid)
nowdate += datetime.timedelta(hours=1)
return matchfiles
def _PathForFY3DMERSIL2(self, starttime, endtime,
satid='FY3D', instid='MERSI',
regionid='GBAL', resolution=0.01,
FY3Block10=False, extent=None, shpname=None,
pattern=None, prodid=None):
self._checkprodid(satid, instid, prodid)
matchfiles = []
strRes = '%dM' %(int(resolution * 100 * 1000))
if not strRes in ['250M', '1000M', '5000M'] :
raise Exception("请确认输入的分辨率是否正确,只支持下载【'250M', '1000M', '5000M'】")
nowdate = starttime
while nowdate <= endtime :
if instid in ['MERSI'] :
L2Path = os.path.join('/L2L3/', prodid, '10DAY', strRes,
nowdate.strftime("%Y"), nowdate.strftime("%Y%m%d"))
else:
raise Exception('只支持下载FY3D MERSI L2 近实时数据')
files = self.getFileList(L2Path)
matchfiles = self._checktime(matchfiles, starttime, endtime, files, satid)
# blockid = self.maskBlock10(extent=extent, shpname=shpname)
#
# for item in blockid :
# for filename in files :
# basename = os.path.basename(filename)
# if item in basename :
# matchfiles.append(filename)
nowdate += datetime.timedelta(days=1)
return matchfiles
[文档]
def maskBlock10(self, extent=None, shpname=None):
mpro = VectorPro()
tmp_file = tempfile.NamedTemporaryFile(prefix="tmp_lb_toolkits_fishgrid_", delete=True)
gridshp = tmp_file.name + '.shp'
mpro.fishgrid(gridshp, extent=[-180, -90, 180, 90], xRes=10, yRes=10,
xindex=list(FY3Block10CoefX.keys()),
yindex=list(FY3Block10CoefY.keys()))
if shpname is not None :
tmp_file = tempfile.NamedTemporaryFile(prefix="tmp_lb_toolkits_clip_", delete=True)
clipshp = tmp_file.name + '.shp'
mpro.intersect(clipshp, srcfile=gridshp, clipfile=shpname)
elif extent is not None :
mpro.createPolygon()
fieldvaluex = mpro.getField(clipshp, 'xindex')
fieldvaluey = mpro.getField(clipshp, 'yindex')
# print(clipshp)
matchgrid = []
for xitem, yitem in zip(fieldvaluex, fieldvaluey) :
matchgrid.append(yitem+xitem)
self._deltempshp(gridshp)
self._deltempshp(clipshp)
return matchgrid
[文档]
def readorderfile(self, filename):
if not os.path.isfile(filename) :
raise Exception('订单信息文件不存在【%s】' %(filename))
with open(filename, 'r', encoding='gbk') as fp :
lines = fp.readlines()
order_list = []
count = 0
for line in lines :
if len(line) <= 10 :
continue
line = line.replace('\n', '')
order_list.append(line)
count+=1
print('共获取到【%d】个文件下载ID' %(count))
return order_list
def _spliturl(self, url):
url = url.replace('\n','')
if 'ftp://' in url :
try:
url = url.replace('ftp://', '')
user = url.split(':')[0]
index_usr = url.index(':')
index_pwd = url.index('@')
index_host = url.index('.cn')
user = url[:index_usr]
passwd = url[index_usr+1:index_pwd]
host = url[index_pwd+1:index_host+3]
filepath = url[index_host+3:]
return {
'user' : user,
'passwd' : passwd,
'host' : host,
'filepath' : filepath,
}
except BaseException as e :
return {}
else:
return {}
# def download(self, outdir, url, skip_download=False, cover=False):
# '''
# 下载数据文件
# :param outdir:
# :param url:
# :return:
# '''
#
# if not os.path.exists(outdir):
# os.makedirs(outdir)
# print('成功创建路径【%s】' % (outdir))
#
# count = len(url)
# for srcname in url:
# print('='*100)
# count -= 1
# file = os.path.basename(srcname)
# dstname = os.path.join(outdir, file)
#
# self.dstfilelist.append(dstname)
# if skip_download :
# continue
# stime = time.time()
# print(datetime.datetime.utcnow().strftime('【%Y-%m-%d %H:%M:%S(UTC)】'),
# '开始下载文件【%d】:【%s】' %(count, srcname))
#
# if self.ftp.downloadFile(srcname, outdir, cover=cover):
#
# print(datetime.datetime.utcnow().strftime('【%Y-%m-%d %H:%M:%S(UTC)】'),
# '成功下载文件【%d】:【%s】' %(count, dstname))
#
# etime = time.time()
# print('耗时【%.2f秒】' %(etime - stime))
#
# return self.dstfilelist
[文档]
def listDir(self, path, pattern=None):
'''
列出远程路径下的文件或者文件夹
Parameters
----------
path: str
远程路径
pattern: str
模糊匹配字段
Returns
-------
list
返回远程路径下的文件或文件夹
'''
files = self.ftp.listdir(path, pattern)
files.sort()
return files
[文档]
def getFileList(self, srcpath, pattern=None):
'''
获取下载文件列表
Parameters
----------
srcpath: str
远程路径
pattern : str
模糊匹配参数
Returns
-------
list
所需下载的远程文件列表
'''
downfiles = []
srcpath = srcpath.replace('\\', '/')
files = self.ftp.listdir(srcpath, pattern)
files.sort()
for file in files :
downflag = True
# 根据传入的匹配参数,匹配文件名中是否包含相应的字符串
# if pattern is not None and isinstance(pattern, list) :
# for item in pattern :
# if not item in file :
# downflag = False
# break
# if pattern is not None and isinstance(pattern, list) :
# for item in pattern :
# if item in file :
# downflag = True
# break
# else:
# downflag = False
if downflag :
srcname = os.path.join(srcpath, file)
srcname = srcname.replace('\\','/')
downfiles.append(srcname)
return downfiles
def _checktime(self, matchfiles, starttime, endtime, files, satid, pattern=None):
'''
通过起始结束时间匹配满足条件的文件名
:param matchfiles:
:param starttime:
:param endtime:
:param files:
:param pattern:
:return:
'''
for file in files :
if 'FY4' in satid :
basename = os.path.basename(file)
namelist = basename.split('_')
for item in namelist :
if starttime.strftime('%Y%m%d') in item :
if len(item) == 14 :
nowdate = datetime.datetime.strptime(item, '%Y%m%d%H%M%S')
if (nowdate >= starttime) and (nowdate <= endtime) :
matchfiles.append(file)
break
elif len(item) == 8 :
nowdate = datetime.datetime.strptime(item, '%Y%m%d')
if (nowdate >= starttime) and (nowdate <= endtime) :
matchfiles.append(file)
break
elif 'FY3' in satid :
basename = os.path.basename(file)
namelist = basename.split('_')
for i in range(len(namelist)) :
item = namelist[i]
if starttime.strftime('%Y%m%d') in item :
if len(item) == 8 and len(namelist[i+1]) == 4 :
nowdate = datetime.datetime.strptime('%s_%s' %(item, namelist[i+1]),
'%Y%m%d_%H%M')
if (nowdate >= starttime) and (nowdate <= endtime) :
matchfiles.append(file)
break
return matchfiles
def _checkprodid(self, satid, instid, prodid):
if satid in FYProdInfo :
SatInfo = FYProdInfo[satid]
if instid in SatInfo :
if prodid in SatInfo[instid] :
return True
else:
raise Exception('产品【%s】不在官方发布【%s/%s】的产品范围内,请参考风云官网发布产品详情!'
%(prodid, satid, instid), SatInfo[instid])
return False
else:
raise Exception('暂不支持【%s/%s】下载' %(satid, instid), '当前支持', list(SatInfo.keys()))
else:
raise Exception('暂不支持【%s】卫星下载' %(satid), '当前支持', list(FYProdInfo.keys()))
def _deltempshp(self, shpname):
filelist = glob.glob(shpname.replace('.shp', '.*'))
for filename in filelist :
if os.path.isfile(filename) :
os.remove(filename)