"""
The Condor plugin. Allows submission to
`HTCondor <http://research.cs.wisc.edu/htcondor/>`_.
Note: Condor was renamed to HTCondor in 2012.
"""
import os
import logging
import getpass
import subprocess
from functools import partial
from tornado.concurrent import run_on_executor
from iceprod.server import grid
logger = logging.getLogger('condor')
[docs]
def check_call_clean_env(*args, **kwargs):
env = os.environ.copy()
del env['LD_LIBRARY_PATH']
kwargs['env'] = env
return subprocess.check_call(*args, **kwargs)
[docs]
def check_output_clean_env(*args, **kwargs):
env = os.environ.copy()
del env['LD_LIBRARY_PATH']
kwargs['env'] = env
return subprocess.check_output(*args, **kwargs)
[docs]
def condor_os_reqs(os_arch):
"""Convert from OS_ARCH to Condor OS requirements"""
os_arch = os_arch.rsplit('_',2)[0].rsplit('.',1)[0]
reqs = 'OpSysAndVer =?= "{}"'.format(os_arch.replace('RHEL','CentOS').replace('_',''))
reqs = reqs + '|| OpSysAndVer =?= "{}"'.format(os_arch.replace('RHEL','SL').replace('_',''))
reqs = 'isUndefined(OSGVO_OS_STRING) ? ('+reqs+') : OSGVO_OS_STRING =?= "{}"'.format(os_arch.replace('_',' '))
return reqs
[docs]
class condor(grid.BaseGrid):
"""Plugin Overrides for HTCondor pilot submission"""
# let the basic plugin be dumb and implement as little as possible
[docs]
@run_on_executor
def generate_submit_file(self, task, cfg=None, passkey=None,
filelist=None):
"""Generate queueing system submit file for task in dir."""
args = self.get_submit_args(task,cfg=cfg,passkey=passkey)
# get requirements and batchopts
requirements = []
batch_opts = {}
for b in self.queue_cfg['batchopts']:
if b.lower() == 'requirements':
requirements.append(self.queue_cfg['batchopts'][b])
else:
batch_opts[b] = self.queue_cfg['batchopts'][b]
if cfg:
if (cfg['steering'] and 'batchsys' in cfg['steering'] and
cfg['steering']['batchsys']):
for b in cfg['steering']['batchsys']:
if b.lower().startswith(self.__class__.__name__):
# these settings apply to this batchsys
for bb in cfg['steering']['batchsys'][b]:
value = cfg['steering']['batchsys'][b][bb]
if bb.lower() == 'requirements':
requirements.append(value)
else:
batch_opts[bb] = value
if task['task_id'] != 'pilot':
if 'task' in cfg['options']:
t = cfg['options']['task']
if t in cfg['tasks']:
alltasks = [cfg['tasks'][t]]
else:
alltasks = []
try:
for tt in cfg['tasks']:
if t == tt['name']:
alltasks.append(tt)
except Exception:
logger.warning('error finding specified task to run for %r',
task,exc_info=True)
else:
alltasks = cfg['tasks']
for t in alltasks:
if 'batchsys' in t and t['batchsys']:
for b in t['batchsys']:
if b.lower().startswith(self.__class__.__name__):
# these settings apply to this batchsys
for bb in t['batchsys'][b]:
value = t['batchsys'][b][bb]
if bb.lower() == 'requirements':
requirements.append(value)
else:
batch_opts[bb] = value
# write the submit file
submit_file = os.path.join(task['submit_dir'],'condor.submit')
with open(submit_file,'w') as f:
p = partial(print,sep='',file=f)
p('universe = vanilla')
p('executable = {}'.format(os.path.join(task['submit_dir'],'loader.sh')))
p('log = condor.log')
p('output = condor.out.$(Process)')
p('error = condor.err.$(Process)')
p('notification = never')
p('+IsIceProdJob = True') # mark as IceProd for monitoring
p('want_graceful_removal = True')
if filelist:
p('transfer_input_files = {}'.format(','.join(filelist)))
p('skip_filechecks = True')
p('should_transfer_files = always')
p('when_to_transfer_output = ON_EXIT_OR_EVICT')
p('+SpoolOnEvict = False')
p('transfer_output_files = iceprod_log, iceprod_out, iceprod_err')
if 'num' in task:
p('transfer_output_remaps = "iceprod_log=iceprod_log_$(Process)'
';iceprod_out=iceprod_out_$(Process)'
';iceprod_err=iceprod_err_$(Process)"')
# handle resources
p('+JobIsRunning = (JobStatus =!= 1) && (JobStatus =!= 5)')
if 'reqs' in task:
if 'cpu' in task['reqs'] and task['reqs']['cpu']:
p('+OriginalCpus = {}'.format(task['reqs']['cpu']))
p('+RequestResizedCpus = ((Cpus < OriginalCpus) ? OriginalCpus : Cpus)')
p('+JOB_GLIDEIN_Cpus = "$$(Cpus:0)"')
p('+JobIsRunningCpus = (JobIsRunning && (!isUndefined(MATCH_EXP_JOB_GLIDEIN_Cpus)))')
p('+JobCpus = (JobIsRunningCpus ? int(MATCH_EXP_JOB_GLIDEIN_Cpus) : OriginalCpus)')
p('request_cpus = (!isUndefined(Cpus)) ? RequestResizedCpus : JobCpus')
p('Rank = Rank + (isUndefined(Cpus) ? 0 : Cpus)/8')
if 'gpu' in task['reqs'] and task['reqs']['gpu']:
p('+OriginalGpus = {}'.format(task['reqs']['gpu']))
p('+RequestResizedGpus = (Gpus < OriginalGpus) ? OriginalGpus : Gpus')
p('+JOB_GLIDEIN_Gpus = "$$(Gpus:0)"')
p('+JobIsRunningGpus = (JobIsRunning && (!isUndefined(MATCH_EXP_JOB_GLIDEIN_Gpus)))')
p('+JobGpus = (JobIsRunningGpus ? int(MATCH_EXP_JOB_GLIDEIN_GPUs) : OriginalGpus)')
p('request_gpus = !isUndefined(Gpus) ? RequestResizedGpus : JobGpus')
if 'memory' in task['reqs'] and task['reqs']['memory']:
# extra 100MB for pilot
p('+OriginalMemory = {}'.format(int(task['reqs']['memory']*1000+100)))
p('+RequestResizedMemory = (Memory < OriginalMemory) ? OriginalMemory : Memory')
p('+JOB_GLIDEIN_Memory = "$$(Memory:0)"')
p('+JobIsRunningMemory = (JobIsRunning && (!isUndefined(MATCH_EXP_JOB_GLIDEIN_Memory)))')
p('+JobMemory = (JobIsRunningMemory ? int(MATCH_EXP_JOB_GLIDEIN_Memory) : OriginalMemory)')
p('request_memory = !isUndefined(Memory) ? RequestResizedMemory : JobMemory')
else:
p('request_memory = 1000')
if 'disk' in task['reqs'] and task['reqs']['disk']:
p('+OriginalDisk = {}'.format(int(task['reqs']['disk']*1000000)))
p('+RequestResizedDisk = (Disk-10000 < OriginalDisk) ? OriginalDisk : Disk-10000')
p('+JOB_GLIDEIN_Disk = "$$(Disk:0)"')
p('+JobIsRunningDisk = (JobIsRunning && (!isUndefined(MATCH_EXP_JOB_GLIDEIN_Disk)))')
p('+JobDisk = (JobIsRunningDisk ? int(MATCH_EXP_JOB_GLIDEIN_Disk) : OriginalDisk)')
p('request_disk = !isUndefined(Disk) ? RequestResizedDisk : JobDisk')
if 'time' in task['reqs'] and task['reqs']['time']:
# extra 10 min for pilot
p('+OriginalTime = {}'.format(int(task['reqs']['time'])*3600+600))
p('+TargetTime = (!isUndefined(Target.PYGLIDEIN_TIME_TO_LIVE) ? Target.PYGLIDEIN_TIME_TO_LIVE : Target.TimeToLive)')
p('Rank = Rank + (TargetTime - OriginalTime)/86400')
requirements.append('TargetTime > OriginalTime')
if 'os' in task['reqs'] and task['reqs']['os']:
requirements.append(condor_os_reqs(task['reqs']['os']))
for b in batch_opts:
p(b+'='+batch_opts[b])
if requirements:
p('requirements = ('+')&&('.join(requirements)+')')
if task['task_id'] == 'pilot' and 'pilot_ids' in task:
for pilot_id in task['pilot_ids']:
p('arguments = ',' '.join(args + ['--pilot_id', pilot_id]))
p('queue')
elif 'num' in task:
p('arguments = ',' '.join(args))
p('queue {}'.format(task['num']))
else:
p('arguments = ',' '.join(args))
p('queue')
[docs]
@run_on_executor
def submit(self,task):
"""Submit task to queueing system."""
cmd = ['condor_submit','-terse','condor.submit']
out = check_output_clean_env(cmd, cwd=task['submit_dir'], universal_newlines=True)
grid_queue_id = []
for line in out.split('\n'):
# look for range
parts = [p.strip() for p in line.split('-') if p.strip()]
if len(parts) != 2:
continue
major = parts[0].split('.')[0]
minor_1 = int(parts[0].split('.')[1])
minor_2 = int(parts[1].split('.')[1])
for i in range(minor_1, minor_2+1):
grid_queue_id.append('{}.{}'.format(major,i))
task['grid_queue_id'] = ','.join(grid_queue_id)
[docs]
@run_on_executor
def get_grid_status(self):
"""Get all tasks running on the queue system.
Returns {grid_queue_id:{status,submit_dir}}
"""
ret = {}
cmd = ['condor_q',getpass.getuser(),'-af:j','jobstatus','cmd']
out = check_output_clean_env(cmd, universal_newlines=True)
print('get_grid_status():',out)
for line in out.split('\n'):
if not line.strip():
continue
gid,status,cmd = line.split()
if 'loader.sh' not in cmd:
continue
if status == '1':
status = 'queued'
elif status == '2':
status = 'processing'
elif status == '4':
status = 'completed'
elif status in ('3','5','6'):
status = 'error'
else:
status = 'unknown'
ret[gid] = {'status':status,'submit_dir':os.path.dirname(cmd)}
return ret
[docs]
@run_on_executor
def remove(self,tasks):
"""Remove tasks from queueing system."""
if tasks:
check_call_clean_env(['condor_rm']+list(tasks))