Source code for iceprod.core.gridftp

"""
gridftp interface
"""

from __future__ import absolute_import, division, print_function

import os
import logging
from collections import namedtuple
from datetime import datetime
import tempfile
import shutil
import subprocess

logger = logging.getLogger('gridftp')


def _cmd(cmd, timeout=1200):
    subprocess.run(cmd, timeout=timeout, check=True)


def _cmd_output(cmd, timeout=1200):
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    try:
        output = p.communicate(timeout=timeout)[0].decode('utf-8')
        return (p.returncode, output)
    except subprocess.TimeoutExpired:
        p.kill()
        raise Exception('Request timed out')


[docs] def listify(lines,details=False,dotfiles=False): """Turn ls output into a list of NamedTuples""" out = [] if details: File = namedtuple('File', ['directory','perms','subfiles', 'owner','group','size','date', 'name']) months = {'jan':1,'feb':2,'mar':3,'apr':4,'may':5,'jun':6, 'jul':7,'aug':8,'sep':9,'oct':10,'nov':11,'dec':12} for x in lines.split('\n'): if not x.strip(): continue pieces = x.split() name = os.path.basename(pieces[-1]) if name.startswith('.') and not dotfiles: continue d = x[0] == 'd' perms = pieces[0][1:] subfiles = 0 if pieces[1].isdigit(): subfiles = int(pieces[1]) pieces = pieces[1:] else: subfiles = int(pieces[3]) year = datetime.now().year month = months[pieces[4].lower()] day = int(pieces[5]) if ':' in pieces[6]: hour,minute = pieces[6].split(':') dt = datetime(year,month,day,int(hour),int(minute)) else: year = int(pieces[6]) dt = datetime(year,month,day) out.append(File(d,perms,subfiles,pieces[1],pieces[2], int(pieces[3]),dt,name)) else: for x in lines.split('\n'): if not x.strip(): continue f = x.split()[-1] if not f.startswith('.') or dotfiles: out.append(f) return out
[docs] class GridFTP(object): """GridFTP interface to command line client. Example: GridFTP.get('gsiftp://data.icecube.wisc.edu/file', filename='/path/to/file') """ _timeout = 3600 # 1 hour default timeout
[docs] @classmethod def supported_address(cls,address): """Return False for address types that are not supported""" if '://' not in address: return False addr_type = address.split(':')[0] if addr_type not in ('gsiftp','ftp'): return False return True
[docs] @classmethod def address_split(cls,address): """Split an address into server/path parts""" pieces = address.split('://',1) if '/' in pieces[1]: pieces2 = pieces[1].split('/',1) return (pieces[0]+'://'+pieces2[0],'/'+pieces2[1]) else: return (address,'/')
[docs] @classmethod def get(cls, address, filename=None, request_timeout=None): """ Do a GridFTP get request. Either data is returned directly or filename must be defined. Args: address (str): url to get from filename (str): filename to write data to request_timeout (float): timeout in secodns Returns: str: data, if filename is not defined Raises: Exception for failure """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) tmpdir = None if filename is None: tmpdir = tempfile.mkdtemp(dir=os.getcwd()) dest = 'file:'+os.path.join(tmpdir,'get_tmp_file') else: dest = 'file:'+filename cmd = ['globus-url-copy',address,dest] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout try: _cmd(cmd, timeout=timeout) if filename is None: with open(dest[5:]) as f: return f.read() finally: if tmpdir: shutil.rmtree(tmpdir,ignore_errors=True)
[docs] @classmethod def put(cls, address, data=None, filename=None, request_timeout=None): """ Do a GridFTP put request. Either data or filename must be defined. Args: address (str): url to put to data (str): the data to put filename (str): filename for data to put request_timeout (float): timeout in seconds Raises: Exception for failure """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) tmpdir = None if data is not None: tmpdir = tempfile.mkdtemp(dir=os.getcwd()) src = 'file:'+os.path.join(tmpdir,'put_tmp_file') with open(src[5:],'w' if isinstance(data,str) else 'wb') as f: f.write(data) elif filename is not None: src = 'file:'+filename else: raise Exception('Neither data or filename is defined') cmd = ['globus-url-copy','-cd',src,address] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout try: _cmd(cmd, timeout=timeout) finally: if tmpdir: shutil.rmtree(tmpdir,ignore_errors=True)
[docs] @classmethod def list(cls, address, request_timeout=None, details=False, dotfiles=False): """ Do a GridFTP list request. Args: address (str): url to list request_timeout (float): timeout in seconds details (bool): result is a list of NamedTuples dotfiles (bool): result includes '.', '..', and other '.' files Returns: list: a list of files Raises: Exception on error """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) cmd = ['uberftp','-retry','5','-ls',address] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout ret = _cmd_output(cmd, timeout=timeout) if ret[0]: if (not dotfiles) and 'No match for' in ret[1]: return [] logger.warning(f'{ret[1]}') raise Exception('Error getting listing') return listify(ret[1], details=details, dotfiles=dotfiles)
[docs] @classmethod def mkdir(cls, address, request_timeout=None, parents=False): """ Make a directory on the ftp server. Args: address (str): url to directory request_timeout (float): timeout in seconds parents (bool): make parent directories as needed Raises: Exception on error """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) if parents: # recursively make directory try: cls.mkdir(os.path.basename(address), request_timeout=request_timeout,parents=True) except Exception: pass cmd = ['uberftp','-retry','5','-mkdir',address] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout _cmd(cmd, timeout=timeout)
[docs] @classmethod def rmdir(cls, address, request_timeout=None): """ Remove a directory on the ftp server. This fails if the directory is not empty. Use :py:func:`rmtree` for recursive removal. Args: address (str): url to directory request_timeout (float): timeout in seconds Raises: Exception on error """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) cmd = ['uberftp','-retry','5','-rmdir',address] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout ret = _cmd_output(cmd, timeout=timeout) if ret[0] and 'No match for' not in ret[1]: raise Exception('Error removing dir')
[docs] @classmethod def delete(cls, address, request_timeout=None): """ Delete a file on the ftp server. Args: address (str): url to file request_timeout (float): timeout in seconds Raises: Exception on error """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) cmd = ['uberftp','-retry','5','-rm',address] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout ret = _cmd_output(cmd, timeout=timeout) if ret[0] and 'No match for' not in ret[1]: raise Exception('Error removing dir')
[docs] @classmethod def rmtree(cls, address, request_timeout=None): """ Delete a file or directory on the ftp server. This is recursive, like `rm -rf`. Args: address (str): url to file or directory request_timeout (float): timeout in seconds Raises: Exception on error """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) cmd = ['uberftp','-retry','5','-rm','-r',address] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout ret = _cmd_output(cmd, timeout=timeout) if ret[0] and 'No match for' not in ret[1]: raise Exception('Error removing dir')
[docs] @classmethod def move(cls, src, dest, request_timeout=None): """ Move a file on the ftp server. Args: src (str): url to source file dest (str): url to destination file request_timeout (float): timeout in seconds Raises: Exception on error """ if not cls.supported_address(src): raise Exception('address type not supported for src %s'%str(src)) if not cls.supported_address(dest): raise Exception('address type not supported for dest %s'%str(dest)) cmd = ['uberftp','-retry','5','-rename',src,cls.address_split(dest)[-1]] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout _cmd(cmd, timeout=timeout)
[docs] @classmethod def exists(cls, address, request_timeout=None): """ Check if a file exists on the ftp server. Args: address (str): url to file request_timeout (float): timeout in seconds Raises: Exception on error """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) cmd = ['uberftp','-retry','5','-size',address] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout ret = _cmd_output(cmd, timeout=timeout) return (not ret[0])
[docs] @classmethod def chmod(cls, address, mode, request_timeout=None): """ Chmod a file on the ftp server. Args: address (str): url to file mode (str): mode of file request_timeout (float): timeout in seconds Raises: Exception on error """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) cmd = ['uberftp','-retry','5','-chmod',mode,address] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout _cmd(cmd, timeout=timeout)
[docs] @classmethod def size(cls, address, request_timeout=None): """ Get the size of a file on the ftp server. Args: address (str): url to file request_timeout (float): timeout in seconds Returns: int: size of file in bytes Raises: Exception on error """ if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) cmd = ['uberftp','-retry','5','-size',address] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout ret = _cmd_output(cmd, timeout=timeout) if ret[0]: raise Exception('failed to get size') return int(ret[1])
@classmethod def _chksum(cls, type, address, request_timeout=None): """Chksum is faked by redownloading the file and checksumming that""" from iceprod.core.functions import cksm if not cls.supported_address(address): raise Exception('address type not supported for address %s'%str(address)) if type.endswith('sum'): type = type[:-3] tmpdir = tempfile.mkdtemp(dir=os.getcwd()) dest = 'file:'+os.path.join(tmpdir,'dest') cmd = ['globus-url-copy',address,dest] if request_timeout is None: timeout = cls._timeout else: timeout = request_timeout try: _cmd(cmd, timeout=timeout) if not os.path.exists(dest[5:]): raise Exception('failed to redownload') return cksm(dest[5:],type) finally: shutil.rmtree(tmpdir,ignore_errors=True) # Some helper functions for different checksum types #
[docs] @classmethod def md5sum(cls,address,request_timeout=None): """ Get the md5sum of a file on an ftp server. Args: address (str): url to file request_timeout (float): timeout in seconds Returns: str: the md5sum Raises: Exception on error """ return cls._chksum('md5sum',address,request_timeout=request_timeout)
[docs] @classmethod def sha1sum(cls,address,request_timeout=None): """ Get the sha1sum of a file on an ftp server. Args: address (str): url to file request_timeout (float): timeout in seconds Returns: str: the sha1sum Raises: Exception on error """ return cls._chksum('sha1sum',address,request_timeout=request_timeout)
[docs] @classmethod def sha256sum(cls, address, request_timeout=None): """ Get the sha256sum of a file on an ftp server. Args: address (str): url to file request_timeout (float): timeout in seconds Returns: str: the sha256sum Raises: Exception on error """ return cls._chksum('sha256sum',address,request_timeout=request_timeout)
[docs] @classmethod def sha512sum(cls, address, request_timeout=None): """ Get the sha512sum of a file on an ftp server. Args: address (str): url to file request_timeout (float): timeout in seconds Returns: str: the sha512sum Raises: Exception on error """ return cls._chksum('sha512sum',address,request_timeout=request_timeout)