Source code for iceprod.core.exe

"""
The core execution functions for running on a node.
These are all called from :any:`iceprod.core.i3exec`.

The fundamental design of the core is to run a task composed of trays and
modules.  The general heirarchy looks like::

    task
    |
    |- tray1
       |
       |- module1
       |
       |- module2
    |
    |- tray2
       |
       |- module3
       |
       |- module4

Parameters can be defined at every level, and each level is treated as a
scope (such that inner scopes inherit from outer scopes).  This is
accomplished via an internal evironment for each scope.
"""

from __future__ import absolute_import, division, print_function

import sys
import os
import stat
import time
import glob
import copy
from datetime import datetime
import asyncio

try:
    import cPickle as pickle
except Exception:
    import pickle

import logging


from iceprod.core import constants
from iceprod.core import dataclasses
from iceprod.core import util
from iceprod.core import functions
from iceprod.core.resources import Resources
import iceprod.core.parser
from iceprod.core.jsonUtil import json_encode,json_decode


[docs] class Config: """Contain the configuration and related methods""" def __init__(self, config=None, parser=None, rpc=None, logger=None): self.config = config if config else dataclasses.Job() self.parser = parser if parser else iceprod.core.parser.ExpParser() self.rpc = rpc self.logger = logger if logger else logging
[docs] def parseValue(self, value, env={}): """ Parse a value from the available env and global config. Uses the :class:`Meta Parser <iceprod.core.parser>` on any string value. Pass-through for any other object. :param value: The value to parse :param env: The environment to use, optional :returns: The parsed value """ if isinstance(value,dataclasses.String): self.logger.debug('parse before:%r| env=%r',value,env) value = self.parser.parse(value,self.config,env) if isinstance(value,dataclasses.String): value = os.path.expandvars(value) self.logger.debug('parse after:%r',value) return value
[docs] def parseObject(self,obj,env): """Recursively parse a dict or list""" if isinstance(obj,dataclasses.String): return self.parseValue(obj,env) elif isinstance(obj,(list,tuple)): return [self.parseObject(v,env) for v in obj] elif isinstance(obj,dict): ret = copy.copy(obj) # in case it's a subclass of dict, like dataclasses for k in obj: ret[k] = self.parseObject(obj[k],env) return ret else: return obj
class SetupEnv: """ The internal environment (env) is a dictionary composed of several objects: parameters Parameters are defined directly as an object, or as a string pointing to another object. They can use the IceProd meta-language to be defined in relation to other parameters specified in inherited scopes, or as eval or sprinf functions. resources \\ data Resources and data are similar in that they handle extra files that modules may create or use. The difference is that resources are only for reading, such as pre-built lookup tables, while data can be input and/or output. Compression can be automatically handled by IceProd. Both resources and data are defined in the environment as strings to their file location. classes This is where external software gets added. The software can be an already downloaded resource or just a url to download. All python files get added to the python path and binary libraries get symlinked into a directory on the LD_LIBRARY_PATH. Note that if there is more than one copy of the same shared library file, only the most recent one is in scope. Classes are defined in the environment as strings to their file location. deletions These are files that should be deleted when the scope ends. uploads These are files that should be uploaded when the scope ends. Mostly Data objects that are used as output. shell environment An environment to reset to when exiting the context manager. To keep the scope correct a new dictionary is created for every level, then the inheritable objects are shallow copied (to 1 level) into the new env. The deletions are not inheritable (start empty for each scope), and the shell environment is set at whatever the previous scope currently has. Args: cfg (:py:class:`Config`): Config object obj (dict): A dict-like object from :py:mod:`iceprod.core.dataclasses` such as :py:class:`iceprod.core.dataclasses.Steering`. oldenv (dict): (optional) env that we are running inside """ def __init__(self, cfg, obj, oldenv={}, logger=None): self.cfg = cfg self.obj = obj self.oldenv = oldenv self.env = {} self.logger = logger if logger else logging # validation of input if not self.obj: raise util.NoncriticalError('object to load environment from is empty') if isinstance(self.obj, dataclasses.Steering) and not self.obj.valid(): raise Exception('object is not valid Steering') async def __aenter__(self): try: # attempt to do depth=2 copying for key in self.oldenv: if key not in ('deletions','uploads','environment','pythonpath','stats'): self.env[key] = copy.copy(self.oldenv[key]) # make sure things for this env are clear (don't inherit) self.env['deletions'] = [] self.env['uploads'] = [] # get clear environment variables self.env['environment'] = os.environ.copy() self.env['pythonpath'] = copy.copy(sys.path) # inherit statistics if 'stats' in self.oldenv: self.env['stats'] = self.oldenv['stats'] else: self.env['stats'] = {'upload':[], 'download':[], 'tasks':[]} # copy parameters if 'parameters' not in self.env: self.env['parameters'] = {} if 'parameters' in self.obj: # copy new parameters to env first so local referrals work self.env['parameters'].update(self.obj['parameters']) # parse parameter values and update if necessary for p in self.obj['parameters']: newval = self.cfg.parseValue(self.obj['parameters'][p], self.env) if newval != self.obj['parameters'][p]: self.env['parameters'][p] = newval if 'resources' not in self.env: self.env['resources'] = {} if 'resources' in self.obj: # download resources for resource in self.obj['resources']: await downloadResource(self.env, self.cfg.parseObject(resource, self.env), logger=self.logger) if 'data' not in self.env: self.env['data'] = {} input_files = self.cfg.config['options']['input'].split() if 'input' in self.cfg.config['options'] else [] output_files = self.cfg.config['options']['output'].split() if 'output' in self.cfg.config['options'] else [] if 'data' in self.obj: # download data for data in self.obj['data']: d = self.cfg.parseObject(data, self.env) if d['movement'] in ('input','both'): await downloadData(self.env, d, logger=self.logger) if 'local' in d and d['local']: input_files.append(d['local']) elif 'remote' in d and d['remote']: input_files.append(os.path.basename(d['remote'])) if d['movement'] in ('output','both'): self.env['uploads'].append(d) if 'local' in d and d['local']: output_files.append(d['local']) elif 'remote' in d and d['remote']: output_files.append(os.path.basename(d['remote'])) # add input and output to parseable options self.cfg.config['options']['input'] = ' '.join(input_files) self.cfg.config['options']['output'] = ' '.join(output_files) logging.info('input: %r', self.cfg.config['options']['input']) logging.info('output: %r', self.cfg.config['options']['output']) if 'classes' not in self.env: self.env['classes'] = {} if 'classes' in self.obj: # set up classes for c in self.obj['classes']: await setupClass(self.env, self.cfg.parseObject(c, self.env), logger=self.logger) except util.NoncriticalError: self.logger.warning('Noncritical error when setting up environment', exc_info=True) except Exception: self.logger.critical('Serious error when setting up environment', exc_info=True) raise return self.env async def __aexit__(self, exc_type, exc, tb): try: if not exc_type: # upload data if there was no exception if 'uploads' in self.env and ( 'offline' not in self.cfg.config['options'] or (not self.cfg.config['options']['offline']) or ( self.cfg.config['options']['offline'] and 'offline_transfer' in self.cfg.config['options'] and self.cfg.config['options']['offline_transfer'] )): for d in self.env['uploads']: await uploadData(self.env, d, logger=self.logger) finally: # delete any files if 'deletions' in self.env and len(self.env['deletions']) > 0: for f in reversed(self.env['deletions']): try: os.remove(f) # base = os.path.basename(f) except OSError as e: self.logger.error('failed to delete file %s - %s',(str(f),str(e))) if ('options' in self.env and 'debug' in self.env['options'] and self.env['options']['debug']): raise # reset environment if 'environment' in self.env: for e in list(os.environ.keys()): if e not in self.env['environment']: del os.environ[e] for e in self.env['environment'].keys(): os.environ[e] = self.env['environment'][e]
[docs] async def downloadResource(env, resource, remote_base=None, local_base=None, checksum=None, logger=None): if not logger: logger = logging """Download a resource and put location in the env""" if not remote_base: remote_base = env['options']['resource_url'] if not resource['remote'] and not resource['local']: raise Exception('need to specify either local or remote') if not resource['remote']: url = os.path.join(remote_base, resource['local']) elif functions.isurl(resource['remote']): url = resource['remote'] else: url = os.path.join(remote_base,resource['remote']) execute = resource.do_transfer() if execute is False: logger.info('not transferring file %s', url) return if not local_base: if 'subprocess_dir' in env['options']: local_base = env['options']['subprocess_dir'] else: local_base = os.getcwd() if not resource['local']: resource['local'] = os.path.basename(resource['remote']) local = os.path.join(local_base,resource['local']) if 'files' not in env: env['files'] = {} if not os.path.exists(os.path.dirname(local)): os.makedirs(os.path.dirname(local)) # get resource if resource['local'] in env['files']: logger.info('resource %s already exists in env, so skip download and compression',resource['local']) return elif os.path.exists(local): logger.info('resource %s already exists as file, so skip download',resource['local']) else: # download resource download_options = {} if 'options' in env and 'username' in env['options']: download_options['username'] = env['options']['username'] if 'options' in env and 'password' in env['options']: download_options['password'] = env['options']['password'] if 'options' in env and 'ssl' in env['options'] and env['options']['ssl']: download_options.update(env['options']['ssl']) if 'options' in env and 'credentials' in env['options']: for base_url in env['options']['credentials']: if resource['remote'].startswith(base_url): logger.info('using credential for %s', base_url) cred_file = os.path.join(env['options']['credentials_dir'], env['options']['credentials'][base_url]) try: with open(cred_file) as f: token = f.read() except Exception: logger.critical('failed to load credential at %s', cred_file) raise Exception('failed to download {} to {}'.format(url, local)) download_options['token'] = token break failed = False try: start_time = time.time() await functions.download(url, local, options=download_options) if not os.path.exists(local): raise Exception('file does not exist') if checksum: # check the checksum cksm = functions.sha512sum(local) if cksm != checksum: raise Exception('checksum validation failed') except Exception: if execute is False or execute == 'maybe': logger.info('not transferring file %s', url) return failed = True logger.critical('failed to download %s to %s', url, local, exc_info=True) raise Exception('failed to download {} to {}'.format(url, local)) finally: stats = { 'name': url, 'error': failed, 'now': datetime.utcnow().isoformat(), 'duration': time.time()-start_time, } if (not failed) and os.path.exists(local): stats['size'] = os.path.getsize(local) stats['rate_MBps'] = stats['size']/1000/1000/stats['duration'] if 'stats' in env and (execute is True or (execute == 'maybe' and not failed)): if 'download' not in env['stats']: env['stats']['download'] = [] env['stats']['download'].append(stats) if (not failed) and 'data_movement_stats' in env['options'] and env['options']['data_movement_stats']: print(f'{stats["now"]} Data movement stats: input {stats["duration"]:.3f} {stats["size"]:.0f} {stats["name"]}') # check compression if resource['compression'] and (functions.iscompressed(url) or functions.istarred(url)): # uncompress file files = functions.uncompress(local) # add uncompressed file(s) to env env['files'][resource['local']] = files else: # add file to env env['files'][resource['local']] = local logger.warning('resource %s added to env',resource['local'])
[docs] async def downloadData(env, data, logger=None): """Download data and put location in the env""" if not logger: logger = logging remote_base = data.storage_location(env) if 'options' in env and 'subprocess_dir' in env['options']: local_base = env['options']['subprocess_dir'] else: local_base = os.getcwd() execute = data.do_transfer() checksum = None if execute is not False: try: filecatalog = data.filecatalog(env) path, checksum = filecatalog.get(data['local']) except Exception: # no filecatalog available pass await downloadResource(env, data, remote_base, local_base, checksum=checksum, logger=logger)
[docs] async def uploadData(env, data, logger=None): """Upload data""" if not logger: logger = logging remote_base = data.storage_location(env) if 'options' in env and 'subprocess_dir' in env['options']: local_base = env['options']['subprocess_dir'] else: local_base = os.getcwd() if (not data['remote']) and not data['local']: raise Exception('need either remote or local defined') if not data['remote']: url = os.path.join(remote_base, data['local']) elif not functions.isurl(data['remote']): url = os.path.join(remote_base, data['remote']) else: url = data['remote'] if not data['local']: data['local'] = os.path.basename(data['remote']) local = os.path.join(local_base, data['local']) execute = data.do_transfer() exists = os.path.exists(local) if execute is False or (execute == 'maybe' and not exists): logger.info('not transferring file %s', local) return elif not exists: raise Exception('file {} does not exist'.format(local)) # check compression if data['compression']: # get compression type, if specified if ((functions.iscompressed(url) or functions.istarred(url)) and not (functions.iscompressed(local) or functions.istarred(local))): # url has compression on it, so use that if '.tar.' in url: c = '.'.join(url.rsplit('.',2)[-2:]) else: c = url.rsplit('.',1)[-1] try: local = functions.compress(local,c) except Exception: logger.warning('cannot compress file %s to %s', local, c) raise # upload file upload_options = {} if 'options' in env and 'username' in env['options']: upload_options['username'] = env['options']['username'] if 'options' in env and 'password' in env['options']: upload_options['password'] = env['options']['password'] if 'options' in env and 'ssl' in env['options'] and env['options']['ssl']: upload_options.update(env['options']['ssl']) if 'options' in env and 'credentials' in env['options']: for base_url in env['options']['credentials']: if data['remote'].startswith(base_url): logger.info('using credential for %s', base_url) cred_file = os.path.join(env['options']['credentials_dir'], env['options']['credentials'][base_url]) try: with open(cred_file) as f: token = f.read() except Exception: logger.critical('failed to load credential at %s', cred_file) raise Exception('failed to upload {} to {}'.format(url, local)) upload_options['token'] = token break do_checksum = True if 'options' in env and 'upload_checksum' in env['options']: do_checksum = env['options']['upload_checksum'] failed = False try: start_time = time.time() await functions.upload(local, url, checksum=do_checksum, options=upload_options) except Exception: failed = True logger.critical('failed to upload %s to %s', local, url, exc_info=True) raise Exception('failed to upload {} to {}'.format(local, url)) finally: stats = { 'name': url, 'error': failed, 'now': datetime.utcnow().isoformat(), 'duration': time.time()-start_time, } if not failed: stats['size'] = os.path.getsize(local) stats['rate_MBps'] = stats['size']/1000/1000/stats['duration'] if 'stats' in env: env['stats']['upload'].append(stats) if (not failed) and 'data_movement_stats' in env['options'] and env['options']['data_movement_stats']: print(f'{stats["now"]} Data movement stats: output {stats["duration"]:.3f} {stats["size"]:.0f} {stats["name"]}') # if successful, add to filecatalog try: filecatalog = data.filecatalog(env) except Exception: pass # no filecatalog available else: try: cksm = functions.sha512sum(local) metadata = { 'file_size': stats['size'], 'create_date': stats['now'], 'modify_date': stats['now'], 'data_type': 'simulation', 'transfer_duration': stats['duration'], 'transfer_MBps': stats['rate_MBps'], } options = ('dataset','dataset_id','task_id','task','job','debug') metadata.update({env['options'][k] for k in options if k in env['options']}) filecatalog.add(data['local'], url, cksm, metadata) except Exception: logger.warning('failed to add %r to filecatalog', url, exc_info=True)
[docs] async def setupClass(env, class_obj, logger=None): """Set up a class for use in modules, and put it in the env""" if not logger: logger = logging if 'classes' not in env: env['classes'] = {} if not class_obj: raise Exception('Class is not defined') loaded = False if class_obj['name'] in env['classes']: # class already loaded, so leave it alone logger.info('class %s already loaded',class_obj['name']) elif class_obj['resource_name']: # class is downloaded as a resource if 'files' not in env or class_obj['resource_name'] not in env['files']: logger.error('resource %s for class %s does not exist', class_obj['resource_name'],class_obj['name']) else: local = env['files'][class_obj['resource_name']] if not isinstance(local,dataclasses.String): local = local[0] if class_obj['src'] and os.path.exists(os.path.join(local,class_obj['src'])): # treat src as a path inside the resource local = os.path.join(local,class_obj['src']) loaded = True else: # get url of class i = 0 while True: url = class_obj['src'] if url and functions.isurl(url): i = 10 # skip repeat download attempts else: if i == 0: # first, look in resources if 'options' in env and 'resource_url' in env['options']: url = os.path.join(env['options']['resource_url'],class_obj['src']) else: url = os.path.join('http://prod-exe.icecube.wisc.edu/',class_obj['src']) elif i == 1: # then, look in regular svn if 'options' in env and 'svn_repository' in env['options']: url = os.path.join(env['options']['svn_repository'],class_obj['src']) else: url = os.path.join('http://code.icecube.wisc.edu/svn/projects/',class_obj['src']) else: raise util.NoncriticalError('Cannot find class %s because of bad src url'%class_obj['name']) if 'options' in env and 'local_temp' in env['options']: local_temp = env['options']['local_temp'] else: local_temp = os.path.join(os.getcwd(),'classes') env['options']['local_temp'] = local_temp if not os.path.exists(local_temp): os.makedirs(local_temp) if 'PYTHONPATH' in os.environ and local_temp not in os.environ['PYTHONPATH']: os.environ['PYTHONPATH'] += ':'+local_temp elif 'PYTHONPATH' not in os.environ: os.environ['PYTHONPATH'] = local_temp local = os.path.join(local_temp,class_obj['name'].replace(' ','_')) download_options = {} if 'options' in env and 'username' in env['options']: download_options['username'] = env['options']['username'] if 'options' in env and 'password' in env['options']: download_options['password'] = env['options']['password'] if 'options' in env and 'ssl' in env['options'] and env['options']['ssl']: download_options.update(env['options']['ssl']) # download class logger.warning('attempting to download class %s to %s',url,local_temp) try: download_local = await functions.download(url, local_temp, options=download_options) except Exception: logger.info('download failed, {} attempts left'.format(i), exc_info=True) if i < 10: i += 1 continue # retry with different url raise if not os.path.exists(download_local): raise Exception('failed to download {} to {}'.format(url, local)) if functions.iscompressed(download_local) or functions.istarred(download_local): files = functions.uncompress(download_local, out_dir=local_temp) # check if we extracted a tarfile if isinstance(files,dataclasses.String): local = files elif isinstance(files,list): dirname = os.path.commonprefix(files) if dirname: dirname = os.path.join(local_temp, dirname.split(os.path.sep)[0]) else: dirname = local_temp logger.info('looking up tarball at %r', dirname) if os.path.isdir(dirname): logger.info('rename %r to %r', local, dirname) local = dirname else: logger.warning('files is strange datatype: %r', type(files)) elif local != download_local: logger.info('rename %r to %r', download_local, local) os.rename(download_local, local) loaded = True break if loaded: # add to env env['classes'][class_obj['name']] = local logger.warning('class %s loaded at %r',class_obj['name'],local) # add binary libraries to the LD_LIBRARY_PATH def ldpath(root,f=None): root = os.path.abspath(root) def islib(f): return f[-3:] == '.so' or '.so.' in f or f[-2:] == '.a' or '.a.' in f if (f and islib(f)) or any(islib(f) for f in os.listdir(root)): logger.info('adding to LD_LIBRARY_PATH: %s',root) if 'LD_LIBRARY_PATH' in os.environ: if root in os.environ['LD_LIBRARY_PATH'].split(':'): return # already present os.environ['LD_LIBRARY_PATH'] = root+':'+os.environ['LD_LIBRARY_PATH'] else: os.environ['LD_LIBRARY_PATH'] = root else: logger.debug('no libs in %s',root) def addToPythonPath(root): if glob.glob(os.path.join(root,'*.py')): logger.info('adding to PYTHONPATH: %s',root) if 'PYTHONPATH' in os.environ: if root in os.environ['PYTHONPATH'].split(':'): return # already present os.environ['PYTHONPATH'] = root+':'+os.environ['PYTHONPATH'] else: os.environ['PYTHONPATH'] = root else: logger.debug('no python files: %s',root) if os.path.isdir(local): # build search list search_list = [local] search_list.extend(glob.glob(os.path.join(local,'lib*'))) search_list.extend(glob.glob(os.path.join(local,'lib*/python*/*-packages'))) if class_obj['libs'] is not None: search_list.extend(os.path.join(local,x) for x in class_obj['libs'].split(':')) for s in search_list: if not os.path.isdir(s): continue addToPythonPath(s) ldpath(s) elif os.path.exists(local): root, f = os.path.split(local) if f.endswith('.py'): if root not in sys.path: addToPythonPath(root) else: # check for binary library ldpath(root,f) # modify environment variables logger.info('env_vars = %s',class_obj['env_vars']) if class_obj['env_vars']: for e in class_obj['env_vars'].split(';'): try: k,v = e.split('=') except ValueError as e: logger.warning('bad env variable: %s',e) continue v = v.replace('$CLASS',local) logger.info('setting envvar: %s = %s',k,v) if k in os.environ: os.environ[k] = v+':'+os.environ[k] else: os.environ[k] = v
# Run Functions #
[docs] async def runtask(cfg, globalenv, task, logger=None): """Run the specified task""" if not task: raise Exception('No task provided') if not logger: logger = logging # set up task_temp if not os.path.exists('task_temp'): os.mkdir('task_temp') globalenv['task_temp'] = os.path.join(os.getcwd(),'task_temp') # set up stats stats = {} # check if we have any files in the task_files API if task['task_files'] and ((not cfg.config['options']['offline']) or cfg.config['options']['offline_transfer']): files = await cfg.rpc.task_files(cfg.config['options']['dataset_id'], cfg.config['options']['task_id']) task['data'].extend(files) try: # set up local env async with SetupEnv(cfg, task, globalenv, logger=logger) as env: # run trays for tray in task['trays']: tmpstat = {} async for proc in runtray(cfg, env, tray, stats=tmpstat, logger=logger): yield proc if len(tmpstat) > 1: stats[tray['name']] = tmpstat elif len(tmpstat) == 1: stats[tray['name']] = tmpstat[list(tmpstat.keys())[0]] finally: # destroy task temp try: functions.removedirs('task_temp') except Exception as e: logger.warning('error removing task_temp directory: %r', e, exc_info=True) globalenv['stats']['tasks'].append(stats)
[docs] async def runtray(cfg, globalenv,tray,stats={}, logger=None): """Run the specified tray""" if not tray: raise Exception('No tray provided') if not logger: logger = logging # set up tray_temp if not os.path.exists('tray_temp'): os.mkdir('tray_temp') globalenv['tray_temp'] = os.path.join(os.getcwd(),'tray_temp') # run iterations try: tmpenv = globalenv.copy() for i in range(tray['iterations']): # set up local env cfg.config['options']['iter'] = i tmpstat = {} async with SetupEnv(cfg, tray, tmpenv, logger=logger) as env: # run modules for module in tray['modules']: async for proc in runmodule(cfg, env, module, stats=tmpstat, logger=logger): yield proc stats[i] = tmpstat finally: # destroy tray temp try: functions.removedirs('tray_temp') except Exception as e: logger.warning('error removing tray_temp directory: %s', str(e), exc_info=True)
[docs] async def runmodule(cfg, globalenv, module, stats={}, logger=None): """Run the specified module""" if not module: raise Exception('No module provided') if not logger: logger = logging # set up local env module = module.copy() async with SetupEnv(cfg, module, globalenv, logger=logger) as env: if module['running_class']: module['running_class'] = cfg.parseValue(module['running_class'],env) if module['args']: module['args'] = cfg.parseObject(module['args'],env) if module['src']: module['src'] = cfg.parseValue(module['src'],env) if module['env_shell']: module['env_shell'] = cfg.parseValue(module['env_shell'],env) if module['configs']: # parse twice to make sure it's parsed, even if it starts as a string module['configs'] = cfg.parseObject(module['configs'],env) module['configs'] = cfg.parseObject(module['configs'],env) # make subprocess to run the module async with ForkModule(cfg, env, module, logger=logger, stats=stats) as process: # yield process back to pilot or driver, so it can be killed yield process
class ForkModule: """ Modules are run in a forked process to prevent segfaults from killing IceProd. Their stdout and stderr is dumped into the log file with prefixes on each line to designate its source. Any error or the return value is returned to the main process via a Queue. If a module defines a src, that is assumed to be a Class which should be added to the env. The running_class is where the exact script or binary is chosen. It can match several things: * A fully defined python module.class import (also takes module.function) * A python class defined in the src provided * A regular python script * An executable of some type (this is run in a subprocess with shell execution disabled) """ def __init__(self, cfg, env, module, logger=None, stats=None): self.cfg = cfg self.env = env self.module = module if not logger: logger = logging self.logger = logger self.stats = stats if stats else {} self.proc = None self.error_filename = constants['task_exception'] self.stats_filename = constants['stats'] if 'subprocess_dir' in cfg.config['options'] and cfg.config['options']['subprocess_dir']: subdir = cfg.config['options']['subprocess_dir'] self.error_filename = os.path.join(subdir, self.error_filename) self.stats_filename = os.path.join(subdir, self.stats_filename) if os.path.exists(self.error_filename): os.remove(self.error_filename) async def __aenter__(self): module_src = None if self.module['src']: if not functions.isurl(self.module['src']): module_src = self.module['src'] else: # get script to run c = dataclasses.Class() c['src'] = self.module['src'] c['name'] = os.path.basename(c['src']) if '?' in c['name']: c['name'] = c['name'][:c['name'].find('?')] elif '#' in c['name']: c['name'] = c['name'][:c['name'].find('#')] await setupClass(self.env,c,logger=self.logger) if c['name'] not in self.env['classes']: raise Exception('Failed to install class %s'%c['name']) module_src = self.env['classes'][c['name']] # set up env_shell env_shell = None if self.module['env_shell']: env_shell = self.module['env_shell'].split() self.logger.info('searching for env_shell at %r', env_shell[0]) if not os.path.exists(env_shell[0]): env_class = env_shell[0].split('/')[0] self.logger.info('searching for env_shell as %r class', env_class) if env_class in self.env['classes']: env_tmp = env_shell[0].split('/') env_tmp[0] = self.env['classes'][env_class] env_shell[0] = '/'.join(env_tmp) else: self.logger.info('attempting to download env_shell') c = dataclasses.Class() c['src'] = env_shell[0] c['name'] = os.path.basename(c['src']) await setupClass(self.env,c,logger=self.logger) if c['name'] not in self.env['classes']: raise Exception('Failed to install class %s'%c['name']) env_shell[0] = self.env['classes'][c['name']] if module_src: self.logger.warning('running module \'%s\' with src %s', self.module['name'], module_src) else: self.logger.warning('running module \'%s\' with class %s', self.module['name'], self.module['running_class']) # set up the args args = self.module['args'] if args: self.logger.warning('args=%s',args) if args and isinstance(args,dataclasses.String) and args[0] in ('{','['): args = json_decode(args) if args and isinstance(args, dict) and set(args) == {'args','kwargs'}: args = self.cfg.parseObject(args, self.env) elif isinstance(args,dataclasses.String): args = {"args":[self.cfg.parseValue(x,self.env) for x in args.split()],"kwargs":{}} elif isinstance(args,list): args = {"args":[self.cfg.parseValue(x,self.env) for x in args],"kwargs":{}} elif isinstance(args,dict): args = {"args":[],"kwargs":self.cfg.parseObject(args,self.env)} else: raise Exception('args is unknown type') # set up the environment cmd = [] if env_shell: cmd.extend(env_shell) kwargs = {'close_fds': True} if 'subprocess_dir' in self.cfg.config['options'] and self.cfg.config['options']['subprocess_dir']: subdir = self.cfg.config['options']['subprocess_dir'] if not os.path.exists(subdir): os.makedirs(subdir) kwargs['cwd'] = subdir else: kwargs['cwd'] = os.getcwd() self.stdout = open(os.path.join(kwargs['cwd'], constants['stdout']), 'ab') self.stderr = open(os.path.join(kwargs['cwd'], constants['stderr']), 'ab') kwargs['stdout'] = self.stdout kwargs['stderr'] = self.stderr # set up configs if self.module['configs']: for filename in self.module['configs']: self.logger.info('creating config %r', filename) with open(os.path.join(kwargs['cwd'], filename),'w') as f: f.write(json_encode(self.module['configs'][filename])) # run the module if self.module['running_class']: self.logger.info('run as a class using the helper script') exe_helper = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'exe_helper.py') cmd.extend(['python', exe_helper, '--classname', self.module['running_class']]) if self.env['options']['debug']: cmd.append('--debug') if module_src: cmd.extend(['--filename', module_src]) if args: args_filename = constants['args'] if 'cwd' in kwargs: args_filename = os.path.join(kwargs['cwd'], args_filename) with open(args_filename,'w') as f: f.write(json_encode(args)) cmd.append('--args') elif module_src: self.logger.info('run as a script directly') if args: def splitter(a,b): ret = ('-%s' if len(str(a)) <= 1 else '--%s')%str(a) if b is None: return ret else: return ret+'='+str(b) args = args['args']+[splitter(a,args['kwargs'][a]) for a in args['kwargs']] # force args to string def toStr(a): if isinstance(a,(bytes,str)): return a else: return str(a) args = [toStr(a) for a in args] else: args = [] shebang = False if os.path.exists(module_src): try: with open(module_src) as f: if f.read(10).startswith('#!'): # shebang found mode = os.stat(module_src).st_mode if not (mode & stat.S_IXUSR): os.chmod(module_src, mode | stat.S_IXUSR) shebang = True except Exception: self.logger.warning('cannot get shebang for %s', module_src, exc_info=True) if (not shebang) and module_src[-3:] == '.py': # call as python script cmd.extend(['python', module_src]+args) elif (not shebang) and module_src[-3:] == '.sh': # call as shell script cmd.extend(['/bin/sh', module_src]+args) else: # call as regular executable cmd.extend([module_src]+args) else: self.logger.error('module is missing class and src') raise Exception('error running module') self.logger.warning('subprocess cmd=%r',cmd) if self.module['env_clear']: # must be on cvmfs-like environ for this to apply env = {'PYTHONNOUSERSITE':'1'} if 'SROOT' in os.environ: prefix = os.environ['SROOT'] elif 'ICEPRODROOT' in os.environ: prefix = os.environ['ICEPRODROOT'] else: prefix = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) for k in os.environ: if k in ('OPENCL_VENDOR_PATH','http_proxy','TMP','TMPDIR','_CONDOR_SCRATCH_DIR'): # pass through unchanged env[k] = os.environ[k] elif ('sroot' in k.lower() or 'iceprod' in k.lower() or k in ('CUDA_VISIBLE_DEVICES','COMPUTE','GPU_DEVICE_ORDINAL')): # don't pass these at all pass else: # filter SROOT out of environ ret = [x for x in os.environ[k].split(':') if x.strip() and (not x.startswith(prefix)) and 'iceprod' not in x.lower()] if ret: env[k] = ':'.join(ret) # handle resource environment if 'resources' in self.cfg.config['options']: Resources.set_env(self.cfg.config['options']['resources'], env) self.logger.warning('env = %r', env) kwargs['env'] = env self.proc = await asyncio.create_subprocess_exec(*cmd, **kwargs) return self.proc async def __aexit__(self, exc_type, exc, tb): try: self.stdout.close() self.stderr.close() except Exception: pass if not exc_type: # now clean up after process if self.proc and self.proc.returncode: self.logger.warning('return code: {}'.format(self.proc.returncode)) try: with open(self.error_filename, 'rb') as f: e = pickle.load(f) except Exception: self.logger.warning('cannot load exception info from failed module', ) raise Exception('module failed') else: if isinstance(e, Exception): raise e else: raise Exception(str(e)) # get stats, if available if os.path.exists(self.stats_filename): try: new_stats = pickle.load(open(self.stats_filename, 'rb')) if self.module['name']: self.stats[self.module['name']] = new_stats else: self.stats.update(new_stats) except Exception: self.logger.warning('cannot load stats info from module')