import datetime
import ftplib
import functools
import inspect
import os
import re
import shutil
import socket
import warnings
from itertools import chain
from dateutil import parser
def from_env(**param_to_env):
def decorator(f):
spec = inspect.getfullargspec(f)
assert all(
param in chain(spec.args, spec.kwonlyargs)
for param in param_to_env
)
f.from_env = param_to_env
return f
return decorator
class DateTime(datetime.datetime):
def __new__(cls, s):
return parser.parse(s, ignoretz=True)
fromtimestamp = datetime.datetime.fromtimestamp
now = datetime.datetime.now
utcfromtimestamp = datetime.datetime.utcfromtimestamp
def reconnect_ftp(func):
@functools.wraps(func)
def wrapper(ftp, *args, **kwargs):
try:
return func(ftp, *args, **kwargs)
except ftplib.error_temp:
ftp.connect()
return func(ftp, *args, **kwargs)
return wrapper
class FtpError(Exception):
pass
def _handle_ftp_error(func):
@functools.wraps(func)
def wrapper(ftp, *args, **kwargs):
try:
return func(ftp, *args, **kwargs)
except (ftplib.Error, socket.error) as e:
raise FtpError(*e.args)
return wrapper
@functools.lru_cache(maxsize=None)
class Ftp:
ftp = None
def __init__(self, host, login, password, use_ftps, passive_mode=True,
port=21, timeout=10, **_):
self.host = host
self.login = login
self.password = password
self.passive_mode = passive_mode
self.use_ftps = use_ftps
self.port = port
self.timeout = timeout
@property
def _address(self):
return self.login + '@' + self.host + ':' + str(self.port)
def __str__(self):
protocol = 'ftps' if self.use_ftps else 'ftp'
return protocol + '://' + self._address
@_handle_ftp_error
def connect(self):
if self.use_ftps:
ftp = ftplib.FTP_TLS()
else:
ftp = ftplib.FTP()
ftp.connect(self.host, self.port, self.timeout)
ftp.login(self.login, self.password)
if self.use_ftps:
ftp.prot_p()
ftp.set_pasv(self.passive_mode)
self.ftp = ftp
@_handle_ftp_error
@reconnect_ftp
def listdir(self, *args):
return self.ftp.nlst(*args)
@_handle_ftp_error
@reconnect_ftp
def mlistdir(self, *args, **kwargs):
return self.ftp.mlsd(*args, **kwargs)
@_handle_ftp_error
@reconnect_ftp
def retrieve(self, path, destination):
"""
:raises FtpError:
:raises IsNotDirError:
:raises DirNotEmptyError:
"""
if not os.path.isabs(destination):
destination = os.path.abspath(destination)
destination = os.path.join(destination, self._address)
target_name = os.path.join(destination, path.lstrip(os.path.sep))
target_dir = os.path.dirname(target_name)
mkdir(target_dir)
free_space = shutil.disk_usage(target_dir).free
try:
target_size = self.size(path)
except FtpError:
# Skip this step if SIZE command is not supported on the server
pass
else:
if target_size > free_space:
raise FtpError('Disk is full', free_space, target_size)
with open(target_name, 'wb') as w:
self.ftp.retrbinary('RETR ' + path, w.write)
return target_name
@_handle_ftp_error
@reconnect_ftp
def size(self, dir_path):
""" Get the size of the file on the server """
try:
self.ftp.sendcmd('TYPE I')
return self.ftp.size(dir_path)
finally:
# Fallback to the default ASCII mode
self.ftp.sendcmd('TYPE A')
class DirNotEmptyError(FileExistsError):
def __init__(self, message=None):
message = message or 'destination directory exists and it is not empty'
super().__init__(message)
class IsNotDirError(FileExistsError):
def __init__(self, message=None):
message = message or 'destination exists but it is not a directory'
super().__init__(message)
class ActionError(Exception):
def __init__(self, message=None, code=1):
self.message = message
self.code = code
def mkdir(path, check_empty=False):
if os.path.isdir(path):
if check_empty and os.listdir(path):
raise DirNotEmptyError()
elif os.path.exists(path):
raise IsNotDirError()
else:
os.makedirs(path)
def read(fileobj, size=2 ** 20):
for chunk in iter(lambda: fileobj.read(size), b''):
yield chunk
optional_arg_match = re.compile(r'--(?P