lb_toolkits.utils.ftppro 源代码

# -*- coding:utf-8 -*-
'''
@Project  : lb_toolkits

@File     : ftppro.py

@Modify Time : 2022/8/11 15:34

@Author : Lee

@Version : 1.0

@Description :
通过ftplib进行ftp文件上传和下载
'''
import errno
import ftplib
import os
import datetime
import re
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


[文档] class ftppro(): def __init__(self, ip, user=None, password=None, TLSFlag=False): logger.info('正在连接【%s】' %(ip)) self.host = ip self.user = user self.pwd = str(password) self.ftp = None self.TLSFlag = TLSFlag self.connect()
[文档] def connect(self, timeout=5*60): connflag = False try: self.ftp.voidcmd("NOOP") connflag = True except BaseException as e: connflag = False try: if connflag : return self.ftp else: if self.TLSFlag : ftp = ftplib.FTP_TLS(self.host, timeout=timeout) else: ftp = ftplib.FTP(self.host, timeout=timeout) ftp.encoding = 'utf' if self.user is None or self.pwd is None : ftp.login('anonymous', '') else: ftp.login(self.user, self.pwd) if self.TLSFlag : ftp.prot_p() # 启用数据通道加密 self.ftp = ftp return ftp except BaseException as e: logger.error('连接失败【%s】' % self.host) # ftp.quit() return None
[文档] def downloadFile(self, remoteFile, localPath, blocksize=1*1024, skip_download=False, cover=False): ''' 通过ftp下载文件 Parameters ---------- remoteFile: str localPath : str blocksize skip_download : bool 是否跳过下载文件,TRUE则不下载文件,直接返回文件名,否则,下载该文件 cover : bool 文件存在, 如果cover为TRUE,则删除原文件后重新下载, 如果cover为FALSE,就跳过该文件下载 Returns ------- ''' localFile = os.path.join(localPath, os.path.basename(remoteFile)) # 文件存在,如果skip为TRUE,就跳过该文件下载, # 如果skip为FALSE,则删除原文件后重新下载 if skip_download : return localFile # 文件存在,是否需要覆盖下载 if os.path.isfile(localFile): if cover : os.remove(localFile) logger.warning('文件已存在,删除该文件后重新下载【%s】' %(localFile)) else: logger.info('文件已存在,跳过下载该文件【%s】' %(localFile)) return localFile tempfile = localFile + '.download' self.__makedir(localPath) try: ftp = self.connect() if ftp is None : return None self._download(ftp, tempfile, remoteFile, blocksize=blocksize) # 把之前下载的文件删除后进行重命名 if os.path.isfile(localFile): os.remove(localFile) if os.path.isfile(tempfile) : os.rename(tempfile, localFile) return localFile except BaseException as e: logger.warning(e) logger.warning('下载文件失败,再尝试下载【%s】' %(localFile)) try: ftp = self.connect() if ftp is None : return None self._download(ftp, tempfile, remoteFile, blocksize=blocksize) if os.path.isfile(tempfile) : os.rename(tempfile, localFile) return localFile except BaseException as e: logger.error(e) logger.warning('再次下载文件失败【%s】' %(localFile)) if os.path.isfile(tempfile) : os.remove(tempfile) return None
[文档] def uploadFile(self, localfile, remotePath, cover=True, blocksize = 1 * 1024): ''' 上传文件 【注意】:本功能未经过测试验证可行性 Parameters ---------- localfile: str 本地存储路径 remotePath: str 远程上传路径 remoteFile: str 远程文件 block_size: int 上传块大小,单位:byte Returns ------- ''' ftp = self.connect() if ftp is None : return False ftp.voidcmd('TYPE I') basename = os.path.basename(localfile) remoteFile = os.path.join(remotePath, basename) remoteFile = remoteFile.replace('\\','/') remotePath = remotePath.replace('\\','/') self.__makeRemoteDir(remotePath) self._upload(ftp, localfile, remoteFile, cover=cover, blocksize=blocksize)
def _upload(self, ftp, localfile, remotefile, cover=True, blocksize=1024): basename = os.path.basename(localfile) localsize = os.path.getsize(localfile) if self._judge_remotefile_exist(ftp, remotefile): ftp.voidcmd('TYPE I') remotesize = ftp.size(remotefile) else: remotesize = 0 if localsize == remotesize : if cover : remotesize = 0 else: logger.info('远程文件已存在,跳过上传【%s】' %(remotefile)) return remotefile try: with open(localfile, 'rb') as fp: from tqdm import tqdm with tqdm( total=localsize, unit="B", unit_scale=True, desc=f"正在上传【{basename}】", unit_divisor=blocksize, initial=remotesize ) as pbar: ftp.voidcmd('TYPE I') datasock, esize = ftp.ntransfercmd("STOR %s" % remotefile, remotesize) fp.seek(remotesize) def callback(block): datasock.sendall(block) pbar.update(len(block)) while True: data = fp.read(blocksize) if not data: break callback(data) remotesize += len(data) if remotesize != localsize : logger.error('上传文件失败【%s】' %(remotefile)) try: import ssl except ImportError: _SSLSocket = None else: _SSLSocket = ssl.SSLSocket datasock.close() # shutdown ssl layer # if _SSLSocket is not None and isinstance(conn, _SSLSocket): # conn.unwrap() # with open(localfile, "rb") as fp: # fp.seek(remotesize) # ftp.storbinary("STOR %s" % remotePath, fp, block_size) except BaseException as e: logger.error(e) error_code = str(e).split()[0] if error_code == '451': logger.warning('重新上传文件【%s】' %(localfile)) ftp.voidcmd('TYPE I') with open(localfile, "rb") as fp: ftp.storbinary(f"STOR {remotefile}", fp) elif error_code == '550': logger.error(f"错误 ({error_code}): 操作不允许 - {str(e)}") # 检查是否有读取权限 if self.check_read_permission(ftp, remotefile): logger.warning("有读取权限,尝试删除并重新上传...") try: ftp.delete(remotefile) logger.info(f"已删除现有文件: {remotefile}") except ftplib.error_perm: logger.warning("无法删除文件,可能没有权限") else: logger.warning("没有读取权限,请检查用户权限") return None else: logger.error(f"其他FTP权限错误 ({error_code}): {str(e)}") return None ftp.voidresp() return remotefile def __makedir(self,dirname): if not os.path.isdir(dirname): os.makedirs(dirname)
[文档] def check_read_permission(self, ftp, dir_path): """检查是否有读取目录的权限""" try: ftp.cwd(dir_path) ftp.retrlines('LIST') return True except ftplib.error_perm: return False
[文档] def list_dir_regex(self, remotePath, regexStr): ftp = self.connect() try: ftp.cwd(remotePath) fileList = ftp.nlst() p = re.compile(regexStr) matchedFiles = [] for file in fileList: match = p.search(file) if match: matchedFiles.append(match.string) return matchedFiles except BaseException as e: logger.warning(e)
[文档] def listdir(self, dirname, pattern=None): ftp = self.connect() try: ftp.cwd(dirname) if pattern is None : fileList = ftp.nlst() else: fileList = ftp.nlst(pattern) return fileList except BaseException as e: return []
def __makeRemoteDir(self, dirPath): ftp = self.connect() current_dir = ftp.pwd() dirs = dirPath.split('/') for dirName in dirs: if dirName: try: if dirName == 'home': ftp.cwd( '/' + dirName) else: ftp.cwd(dirName) except Exception as e: try: ftp.mkd(dirName) ftp.cwd(dirName) except ftplib.error_perm: raise PermissionError(f"无法创建目录: {dirName}") # 切换至根目录 ftp.cwd(current_dir) def _download(self, ftp, tempfile, remoteFile, blocksize=1024, continuing=False): ''' 下载文件 ''' ftp.voidcmd('TYPE I') basename = os.path.basename(remoteFile) def callback(block): fp.write(block) pbar.update(len(block)) if continuing and os.path.isfile(tempfile): already_downloaded_bytes = os.path.getsize(tempfile) continuing = True else: continuing = False already_downloaded_bytes = 0 remotesize = ftp.size(remoteFile) mode = "ab" if continuing else "wb" with open(tempfile, mode) as fp: from tqdm import tqdm with tqdm( total=remotesize, unit="B", unit_scale=True, desc=f"正在下载【{basename}】", unit_divisor=blocksize, initial=already_downloaded_bytes ) as pbar: with ftp.transfercmd('RETR %s' % (remoteFile), already_downloaded_bytes) as conn: while 1: data = conn.recv(blocksize) if not data: break callback(data) try: import ssl except ImportError: _SSLSocket = None else: _SSLSocket = ssl.SSLSocket # shutdown ssl layer if _SSLSocket is not None and isinstance(conn, _SSLSocket): conn.unwrap() return ftp.voidresp() def _judge_remotefile_exist(self, ftp, remotefile): try: if remotefile in ftp.nlst(os.path.dirname(remotefile)): return True else: return False except BaseException as e: return False
[文档] def close(self, ftp): if ftp is not None: ftp.quit() ftp = None
def __del__(self): if self.ftp is not None: self.ftp.quit() self.ftp = None