Source code for iceprod.core.i3exec

"""
The task runner.

Run it with `python -m iceprod.core.i3exec`.

optional arguments:
  -h, --help            show this help message and exit
  -f CFGFILE, --cfgfile CFGFILE
                        Specify config file
  -u URL, --url URL     URL of the iceprod server
  -p PASSKEY, --passkey PASSKEY
                        passkey for communcation with iceprod server
  --pilot_id PILOTID    ID of the pilot (if this is a pilot)
  -d, --debug           Enable debug actions and logging
  --offline             Enable offline mode (don't talk with server)
  --offline_transfer True/False
                        Enable offline file transfer
  --logfile LOGFILE     Specify the logfile to use
  --job JOB             Index of the job to run
  --task TASK           Name of the task to run
  --gzip-logs           gzip the iceprod logs
"""

from __future__ import absolute_import, division, print_function

import os
import logging
import logging.config
import time
import socket
from functools import partial
import shutil
import threading
import asyncio
import gzip

import iceprod
from iceprod.core import constants
import iceprod.core.dataclasses
import iceprod.core.serialization
from iceprod.core.defaults import add_default_options
import iceprod.core.exe
from iceprod.core.exe_json import ServerComms
import iceprod.core.pilot
import iceprod.core.resources
import iceprod.core.logger


[docs] def load_config(cfgfile): """Load a config from file, serialized string, dictionary, etc""" logger = logging.getLogger('i3exec') config = None if isinstance(cfgfile,str): try: if os.path.exists(cfgfile): config = iceprod.core.serialization.serialize_json.load(cfgfile) else: config = iceprod.core.serialization.serialize_json.loads(cfgfile) if not config: raise Exception('Config not found') except Exception as e: logger.critical('Error loading configuration: %s' % str(e)) raise elif isinstance(cfgfile,iceprod.core.dataclasses.Job): config = cfgfile elif isinstance(cfgfile,dict): config = iceprod.core.serialization.dict_to_dataclasses(cfgfile) else: logger.warning('cfgfile: %r',cfgfile) raise Exception('cfgfile is not a str or a Job') return config
[docs] def main(cfgfile=None, logfile=None, url=None, debug=False, passkey='', pilot_id=None, offline=False, offline_transfer=False, gzip_logs=False): """Main task runner for iceprod""" # set up logger if debug: logl = 'INFO' else: logl = 'WARNING' if logfile: logf = os.path.abspath(os.path.expandvars(logfile)) else: logf = os.path.abspath(os.path.expandvars(constants['stdlog'])) if gzip_logs: logf += '.gz' iceprod.core.logger.set_logger(loglevel=logl, logfile=logf, logsize=67108864, lognum=1) logging.warning('starting IceProd core') if cfgfile is None: logging.critical('There is no cfgfile') raise Exception('missing cfgfile') elif isinstance(cfgfile, str): config = load_config(cfgfile) else: config = cfgfile logging.info('config: %r',config) if not offline: # if we are not in offline mode, we need a url if not url: logging.critical('url missing') raise Exception('url missing') # setup server comms kwargs = {} if 'username' in config['options']: kwargs['username'] = config['options']['username'] if 'password' in config['options']: kwargs['password'] = config['options']['password'] if 'ssl' in config['options'] and config['options']['ssl']: kwargs.update(config['options']['ssl']) rpc = ServerComms(url, passkey, None, **kwargs) async def run(): if offline: logging.info('offline mode') async for proc in runner(config, debug=debug, offline=offline, offline_transfer=offline_transfer): await proc.wait() elif 'tasks' in config and config['tasks']: logging.info('online mode - single task') # tell the server that we are processing this task if 'task_id' not in config['options']: raise Exception('config["options"]["task_id"] not specified') pilot_id2 = None try: pilot_id2 = await rpc.create_pilot(queue_host='manual', host=socket.getfqdn(), grid_queue_id='1', queue_version=iceprod.__version__, version=iceprod.__version__, tasks=[config['options']['task_id']], resources={}) await rpc.processing(config['options']['task_id']) except Exception: logging.error('comms error', exc_info=True) # set up stdout and stderr try: async for proc in runner(config, rpc=rpc, debug=debug): await proc.wait() finally: try: if pilot_id2: await rpc.delete_pilot(pilot_id2) except Exception: logging.error('comms error', exc_info=True) else: logging.info('pilot mode - get many tasks from server') if 'gridspec' not in config['options']: logging.critical('gridspec missing') raise Exception('gridspec missing') if not pilot_id: logging.critical('pilot_id missing') raise Exception('pilot_id missing') pilot_kwargs = {} if 'run_timeout' in config['options']: pilot_kwargs['run_timeout'] = config['options']['run_timeout'] if 'restrict_site' in config['options']: pilot_kwargs['restrict_site'] = config['options']['restrict_site'] async with iceprod.core.pilot.Pilot( config, rpc=rpc, debug=debug, runner=partial(runner, rpc=rpc, debug=debug), pilot_id=pilot_id, **pilot_kwargs) as p: await p.run() loop = asyncio.get_event_loop() loop.run_until_complete(run()) logging.warning('finished running normally; exiting...') iceprod.core.logger.remove_handlers()
[docs] async def runner(config, rpc=None, debug=False, offline=False, offline_transfer=False, resources=None): """Run a config. #. Set some default options if not set in configuration. #. Set up global env based on the configuration. #. Run tasks * If a task is specified in the configuration options: If the task is specified by name or number, run only that task. If there is a problem finding the task specified, raise a critical error. * Otherwise, run all tasks in the configuration in the order they were written. #. Destroy the global env, uploading and deleting files as needed. #. Upload the log, error, and output files if specified in options. Args: config (`iceprod.core.dataclasses.Job`): Dataset configuration rpc (:py:class:`iceprod.core.exe_json.ServerComms`): RPC object debug (bool): (optional) turn on debug logging offline (bool): (optional) enable offline mode offline_transfer (bool): (optional) enable/disable offline data transfers resources (:py:class:`iceprod.core.resources.Resources`): (optional) Resources object """ # set logging if offline: logger = logging.getLogger('task') if 'task_id' not in config['options']: config['options']['task_id'] = 'a' else: if 'task_id' not in config['options']: raise Exception('task_id not set in config options') logger = logging.getLogger(config['options']['task_id']) if 'debug' not in config['options']: config['options']['debug'] = debug if ('debug' in config['options'] and config['options']['debug'] and 'loglevel' not in config['options']): config['options']['loglevel'] = 'INFO' if ('loglevel' in config['options'] and config['options']['loglevel'].upper() in iceprod.core.logger.setlevel): try: iceprod.core.logger.set_log_level(config['options']['loglevel']) except Exception: logger.warning('failed to set a new log level', exc_info=True) # make sure some basic options are set add_default_options(config['options']) if 'offline' not in config['options']: config['options']['offline'] = offline if 'offline_transfer' not in config['options']: config['options']['offline_transfer'] = offline_transfer if 'subprocess_dir' not in config['options']: config['options']['subprocess_dir'] = os.getcwd() if 'task_temp' not in config['options']: config['options']['task_temp'] = 'file:'+os.path.join(config['options']['subprocess_dir'],'task_temp') if 'tray_temp' not in config['options']: config['options']['tray_temp'] = 'file:'+os.path.join(config['options']['subprocess_dir'],'tray_temp') if 'local_temp' not in config['options']: config['options']['local_temp'] = os.path.join(config['options']['subprocess_dir'],'local_temp') if 'credentials_dir' not in config['options']: config['options']['credentials_dir'] = os.path.join(os.getcwd(), 'iceprod_credentials') if 'stillrunninginterval' not in config['options']: config['options']['stillrunninginterval'] = 60 if 'upload' not in config['options']: config['options']['upload'] = 'logging' if not config['steering']: # make sure steering exists in the config config['steering'] = iceprod.core.dataclasses.Steering() resource_thread = None if not resources: try: import psutil except ImportError: resources = None else: # track resource usage in separate thread resource_stop = False resources = iceprod.core.resources.Resources(debug=debug) config['options']['resources'] = resources.claim(config['options']['task_id']) resources.register_process(config['options']['task_id'], psutil.Process(), os.getcwd()) def track(): while not resource_stop: resources.check_claims() time.sleep(1) resource_thread = threading.Thread(target=track) resource_thread.start() # make exe Config cfg = iceprod.core.exe.Config(config=config, rpc=rpc, logger=logger) # set up global env, based on config['options'] and config.steering env_opts = cfg.parseObject(config['options'], {}) stats = {} try: try: # keep track of the start time start_time = time.time() stats = {} async with iceprod.core.exe.SetupEnv(cfg, config['steering'], {'options':env_opts}, logger=logger) as env: logger.warning("config options: %r",config['options']) # find tasks to run if 'task' in config['options']: logger.warning('task specified: %r',config['options']['task']) # run only this task name or number name = config['options']['task'] if isinstance(name, iceprod.core.dataclasses.String) and name.isdigit(): name = int(name) if isinstance(name, iceprod.core.dataclasses.String): # find task by name for task in config['tasks']: if task['name'] == name: async for proc in iceprod.core.exe.runtask(cfg, env, task, logger=logger): yield proc break else: logger.critical('cannot find task named %r', name) raise Exception('cannot find specified task') elif isinstance(name, int): # find task by index if name >= 0 and name < len(config['tasks']): async for proc in iceprod.core.exe.runtask(cfg, env, config['tasks'][name], logger=logger): yield proc else: logger.critical('cannot find task index %d', name) raise Exception('cannot find specified task') else: logger.critical('task specified in options is %r, but no task found', name) raise Exception('cannot find specified task') stats = env['stats'] elif offline: # run all tasks in order for task in config['tasks']: async for proc in iceprod.core.exe.runtask(cfg, env, task): yield proc else: raise Exception('task to run not specified') if (not offline) and 'task' in config['options']: # finish task logger.warning('task finishing') await rpc.finish_task( config['options']['task_id'], dataset_id=config['options']['dataset_id'], stats=stats, start_time=start_time, resources=resources.get_final(config['options']['task_id']) if resources else None, site=resources.site if resources else None, ) except Exception as e: logger.error('task failed, exiting without running completion steps.', exc_info=True) # set task status on server if not offline: try: await rpc.task_error( config['options']['task_id'], dataset_id=config['options']['dataset_id'], stats=stats, start_time=start_time, reason=str(e), resources=resources.get_final(config['options']['task_id']) if resources else None, site=resources.site if resources else None, ) except Exception as e: logger.error(e) # forcibly turn on logging, so we can see the error config['options']['upload'] = 'logging' raise finally: # check resources if resource_thread: resource_stop = True resource_thread.join() print('Resources:') r = resources.get_final(config['options']['task_id']) if not r: print(' None') else: for k in r: print(' {}: {:.2f}'.format(k,r[k])) # upload log files to server try: if (not offline) and 'upload' in config['options']: logger.warning('uploading logfiles') if isinstance(config['options']['upload'], iceprod.core.dataclasses.String): upload = config['options']['upload'].lower().split('|') elif isinstance(config['options']['upload'],(tuple,list)): upload = [x.lower() for x in config['options']['upload']] else: raise Exception('upload config is not a valid type') errfile = constants['stderr'] outfile = constants['stdout'] if 'subprocess_dir' in config['options'] and config['options']['subprocess_dir']: subdir = config['options']['subprocess_dir'] errfile = os.path.join(subdir, errfile) outfile = os.path.join(subdir, outfile) for up in upload: if up.startswith('logging'): # upload err,log,out files await rpc.uploadLog(task_id=config['options']['task_id'], dataset_id=config['options']['dataset_id']) await rpc.uploadErr(filename=errfile, task_id=config['options']['task_id'], dataset_id=config['options']['dataset_id']) await rpc.uploadOut(filename=outfile, task_id=config['options']['task_id'], dataset_id=config['options']['dataset_id']) break elif up.startswith('log'): # upload log files await rpc.uploadLog(task_id=config['options']['task_id'], dataset_id=config['options']['dataset_id']) elif up.startswith('err'): # upload err files await rpc.uploadErr(filename=errfile, task_id=config['options']['task_id'], dataset_id=config['options']['dataset_id']) elif up.startswith('out'): # upload out files await rpc.uploadOut(filename=outfile, task_id=config['options']['task_id'], dataset_id=config['options']['dataset_id']) except Exception: logger.error('failed when uploading logging info',exc_info=True) logger.warning('finished without error')
if __name__ == '__main__': # get arguments import argparse parser = argparse.ArgumentParser(description='IceProd Core') parser.add_argument('-f','--cfgfile', type=str, help='Specify config file') parser.add_argument('-u','--url', type=str, help='URL of the iceprod server') parser.add_argument('-p','--passkey', type=str, help='passkey for communcation with iceprod server') parser.add_argument('--pilot_id', type=str, default=None, help='ID of the pilot (if this is a pilot)') parser.add_argument('-d','--debug', action='store_true', default=False, help='Enable debug actions and logging') parser.add_argument('--offline', action='store_true', default=False, help='Enable offline mode (don\'t talk with server)') parser.add_argument('--offline_transfer', type=bool, default=False, help='Enable/disable file transfer during offline mode') parser.add_argument('--logfile', type=str, default=None, help='Specify the logfile to use') parser.add_argument('--dataset', type=int, default=None, help='Dataset number') parser.add_argument('--job', type=int, default=None, help='Index of the job to run') parser.add_argument('--jobs_submitted', type=int, default=None, help='Total number of jobs in this dataset') parser.add_argument('--task', type=str, default=None, help='Name of the task to run') parser.add_argument('--gzip-logs', dest='gzip_logs', action='store_true', help='gzip the iceprod logs') args = vars(parser.parse_args()) print(args) # check cfgfile if args['cfgfile'] is not None and not os.path.isfile(args['cfgfile']): if os.path.isfile(os.path.join(os.getcwd(),args['cfgfile'])): args['cfgfile'] = os.path.join(os.getcwd(),args['cfgfile']) else: args['cfgfile'] = None options = {k: args.pop(k) for k in ('dataset','job','jobs_submitted','task')} if not options['jobs_submitted'] and options['job']: options['jobs_submitted'] = options['job']+1 options['debug'] = args['debug'] if args['cfgfile']: cfgfile = load_config(args['cfgfile']) for k in options: if options[k] is not None and k not in cfgfile['options']: cfgfile['options'][k] = options[k] args['cfgfile'] = cfgfile # start iceprod try: main(**args) finally: if args['gzip_logs']: # compress stdout and stderr for filename in ('stdout', 'stderr'): if os.path.exists(constants[filename]): with open(constants[filename], 'rb') as f_in: with gzip.open(constants[filename]+'.gz', 'wb') as f_out: shutil.copyfileobj(f_in, f_out) else: with gzip.open(constants[filename]+'.gz', 'wb') as f_out: pass