lb_toolkits.utils.wget 源代码

# -*- coding:utf-8 -*-
'''
@Project     : lb_toolkits

@File        : wget.py

@Modify Time :  2023/10/30 14:19   

@Author      : Lee    

@Version     : 1.0   

@Description :

'''
import os
import sys
import time

import numpy as np
import datetime
import shutil
import platform

from lb_toolkits import parm
import logging
logger = logging.getLogger(__name__)

exedir = os.path.abspath(list(parm.__path__)[0])


[文档] def get_wget(path): ''' 获取wget的路径 Parameters ---------- path : str wget路径 Returns ------- wgetpath : str ''' WGET = _find_path(path) if WGET is None : from lb_toolkits.download import downloadGithub if platform.system().lower() == 'windows' : WGET = os.path.join(exedir, 'bin', 'windows', 'wget.exe') if not os.path.isfile(WGET) : down = downloadGithub() down.download(os.path.dirname(WGET), 'https://github.com/libin033/lb_toolkits/tree/master/lb_toolkits/parm/bin/windows/wget.exe') if not os.path.isfile(WGET) : raise Exception('wget工具不可用') else: WGET = os.path.join(exedir, 'bin', 'linux', 'wget') if not os.path.isfile(WGET) : down = downloadGithub() down.download(os.path.dirname(WGET), 'https://github.com/libin033/lb_toolkits/tree/master/lb_toolkits/parm/bin/linux/wget') if not os.path.isfile(WGET) : raise Exception('wget工具不可用') return WGET
def _find_path(path=None): """Finds the path of the wget executable. Arguments: * ``path`` -- (Optional) The path to the wget executable Finds the wget executable on the system, either using the given path or by searching the system PATH variable and the current directory """ if path is not None: return path else: return ( _which("wget.exe") or _which("wget") ) def _which(program): def is_exe(fpath): return os.path.exists(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
[文档] def wget(outdir, url, username=None, password=None, token=None, tries=3, skip_download=False, cover=False, timeout=5 * 60, continuing=True, wgetpath=None, options=None): ''' 通过wget方式进行数据下载 ''' local_filename = os.path.basename(url) local_filename = os.path.join(outdir, local_filename) if skip_download : return local_filename if os.path.isfile(local_filename) : if cover : logger.warning('文件已存在,将执行删除文件【%s】' %(local_filename)) os.remove(local_filename) else: logger.info('文件已存在,跳过下载【%s】' %(local_filename)) return local_filename tempfile = local_filename + '.download' WGET = get_wget(wgetpath) cmd = [f'{WGET}'] if username is not None and password is not None : cmd.append(f'--http-user={username} --http-passwd={password}') if tries is not None : cmd.append(f'--tries={tries}') if timeout is not None : cmd.append(f'--timeout={timeout}') if continuing : cmd.append('-c ') cmd.append('--wait=3 --no-check-certificate -e robots=off -np -R .html,.tmp -nH --cut-dirs=10') if options is not None : if isinstance(options, str) : cmd.append(options) elif isinstance(options, list) : cmd.extend(options) cmd.append(f'{url}') if token is not None : cmd.append(f'--header "Authorization: Bearer {token}"') cmd.append(f'-O {tempfile}') command = ' '.join(cmd) logger.info('执行下载命令: 【%s】' %(command)) os.system(command) if os.path.isfile(local_filename) : os.remove(local_filename) if os.path.isfile(tempfile) : # shutil.move(tempfile, local_filename) try: os.rename(tempfile, local_filename) except BaseException : time.sleep(3) try: os.rename(tempfile, local_filename) except BaseException : logger.error('处理失败【%s】' %(tempfile)) if os.path.isfile(local_filename) and os.stat(local_filename).st_size == 0 : os.remove(local_filename) return local_filename