import logging
import json
import uuid
import math
from collections import defaultdict
import pymongo
import tornado.web
from ..base_handler import APIBase
from ..auth import authorization, attr_auth
from iceprod.core import dataclasses
from iceprod.core.resources import Resources
from iceprod.server.util import nowstr, task_statuses, task_status_sort
logger = logging.getLogger('rest.tasks')
[docs]
def setup(handler_cfg):
"""
Setup method for Tasks REST API.
Args:
handler_cfg (dict): args to pass to the route
Returns:
dict: routes, database, indexes
"""
return {
'routes': [
(r'/tasks', MultiTasksHandler, handler_cfg),
(r'/tasks/(?P<task_id>\w+)', TasksHandler, handler_cfg),
(r'/tasks/(?P<task_id>\w+)/status', TasksStatusHandler, handler_cfg),
(r'/task_actions/queue', TasksActionsQueueHandler, handler_cfg),
(r'/task_actions/bulk_status/(?P<status>\w+)', TaskBulkStatusHandler, handler_cfg),
(r'/task_actions/process', TasksActionsProcessingHandler, handler_cfg),
(r'/task_counts/status', TaskCountsStatusHandler, handler_cfg),
(r'/tasks/(?P<task_id>\w+)/task_actions/reset', TasksActionsErrorHandler, handler_cfg),
(r'/tasks/(?P<task_id>\w+)/task_actions/failed', TasksActionsFailedHandler, handler_cfg),
(r'/tasks/(?P<task_id>\w+)/task_actions/complete', TasksActionsCompleteHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/tasks', DatasetMultiTasksHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/tasks/(?P<task_id>\w+)', DatasetTasksHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/tasks/(?P<task_id>\w+)/status', DatasetTasksStatusHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/task_summaries/status', DatasetTaskSummaryStatusHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/task_counts/status', DatasetTaskCountsStatusHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/task_counts/name_status', DatasetTaskCountsNameStatusHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/task_actions/bulk_status/(?P<status>\w+)', DatasetTaskBulkStatusHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/task_actions/bulk_requirements/(?P<name>[^\/\?\#]+)', DatasetTaskBulkRequirementsHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/task_stats', DatasetTaskStatsHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/files', DatasetMultiFilesHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/files/(?P<task_id>\w+)', DatasetTaskFilesHandler, handler_cfg),
],
'database': 'tasks',
'indexes': {
'tasks': {
'task_id_index': {'keys': 'task_id', 'unique': True},
'dataset_id_index': {'keys': 'dataset_id', 'unique': False},
'job_id_index': {'keys': 'job_id', 'unique': False},
'status_index': {'keys': 'status', 'unique': False},
'priority_index': {'keys': [('status', pymongo.ASCENDING), ('priority', pymongo.DESCENDING)], 'unique': False},
},
'dataset_files': {
'dataset_id_index': {'keys': 'dataset_id', 'unique': False},
'task_id_index': {'keys': 'task_id', 'unique': False},
}
}
}
[docs]
class MultiTasksHandler(APIBase):
"""
Handle multi tasks requests.
"""
[docs]
@authorization(roles=['admin', 'system'])
async def get(self):
"""
Get task entries.
Params (optional):
status: | separated list of task status to filter by
keys: | separated list of keys to return for each task
sort: | separated list of sort key=values, with values of 1 or -1
limit: number of tasks to return
Returns:
dict: {'tasks': [<task>]}
"""
filters = {}
status = self.get_argument('status', None)
if status:
filters['status'] = {'$in': status.split('|')}
sort = self.get_argument('sort', None)
mongo_sort = []
if sort:
for s in sort.split('|'):
if '=' in s:
name, order = s.split('=', 1)
if order == '-1':
mongo_sort.append((name, pymongo.DESCENDING))
else:
mongo_sort.append((name, pymongo.ASCENDING))
else:
mongo_sort.append((s, pymongo.ASCENDING))
limit = self.get_argument('limit', 0)
if limit:
try:
limit = int(limit)
except Exception:
limit = 0
projection = {x:True for x in self.get_argument('keys','').split('|') if x}
projection['_id'] = False
ret = []
async for row in self.db.tasks.find(filters, projection=projection, sort=mongo_sort, limit=limit):
ret.append(row)
self.write({'tasks': ret})
[docs]
@authorization(roles=['admin', 'system'])
async def post(self):
"""
Create a task entry.
Body should contain the task data.
Returns:
dict: {'result': <task_id>}
"""
data = json.loads(self.request.body)
# validate first
req_fields = {
'dataset_id': str,
'job_id': str,
'task_index': int,
'job_index': int,
'name': str,
'depends': list,
'requirements': dict,
}
for k in req_fields:
if k not in data:
raise tornado.web.HTTPError(400, reason='missing key: '+k)
if not isinstance(data[k], req_fields[k]):
r = 'key {} should be of type {}'.format(k, req_fields[k])
raise tornado.web.HTTPError(400, reason=r)
# set some fields
task_id = uuid.uuid1().hex
data.update({
'task_id': task_id,
'status_changed': nowstr(),
'failures': 0,
'evictions': 0,
'walltime': 0.0,
'walltime_err': 0.0,
'walltime_err_n': 0,
'site': '',
})
if 'status' not in data:
data['status'] = 'waiting'
if 'priority' not in data:
data['priority'] = 1.
await self.db.tasks.insert_one(data)
self.set_status(201)
self.write({'result': task_id})
self.finish()
[docs]
class TasksHandler(APIBase):
"""
Handle single task requests.
"""
[docs]
@authorization(roles=['admin', 'system'])
async def get(self, task_id):
"""
Get a task entry.
Args:
task_id (str): the task id
Returns:
dict: task entry
"""
ret = await self.db.tasks.find_one({'task_id':task_id}, projection={'_id':False})
if not ret:
self.send_error(404, reason="Task not found")
else:
self.write(ret)
self.finish()
[docs]
@authorization(roles=['admin', 'system'])
async def patch(self, task_id):
"""
Update a task entry.
Body should contain the task data to update. Note that this will
perform a merge (not replace).
Args:
task_id (str): the task id
Returns:
dict: updated task entry
"""
data = json.loads(self.request.body)
if not data:
raise tornado.web.HTTPError(400, reason='Missing update data')
ret = await self.db.tasks.find_one_and_update(
{'task_id':task_id},
{'$set':data},
projection={'_id':False},
return_document=pymongo.ReturnDocument.AFTER
)
if not ret:
self.send_error(404, reason="Task not found")
else:
self.write(ret)
self.finish()
[docs]
class TasksStatusHandler(APIBase):
"""
Handle single task requests.
"""
[docs]
@authorization(roles=['admin', 'system'])
async def put(self, task_id):
"""
Set a task status.
Body should have {'status': <new_status>}
Args:
task_id (str): the task id
Returns:
dict: empty dict
"""
data = json.loads(self.request.body)
if (not data) or 'status' not in data:
raise tornado.web.HTTPError(400, reason='Missing status in body')
if data['status'] not in task_statuses:
raise tornado.web.HTTPError(400, reason='Bad status')
update_data = {
'status': data['status'],
'status_changed': nowstr(),
}
ret = await self.db.tasks.update_one({'task_id':task_id}, {'$set':update_data})
if (not ret) or ret.modified_count < 1:
self.send_error(404, reason="Task not found")
else:
self.write({})
self.finish()
[docs]
class TaskCountsStatusHandler(APIBase):
"""
Handle task summary grouping by status.
"""
[docs]
@authorization(roles=['admin', 'system'])
async def get(self):
"""
Get the task counts for all tasks, group by status.
Returns:
dict: {<status>: num}
"""
ret = {}
for status in task_statuses:
ret[status] = await self.db.tasks.count_documents({"status":status})
ret2 = {}
for k in sorted(ret, key=task_status_sort):
ret2[k] = ret[k]
self.write(ret2)
self.finish()
[docs]
class DatasetMultiTasksHandler(APIBase):
"""
Handle multi tasks requests.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id):
"""
Get task entries.
Params (optional):
status: | separated list of task status to filter by
job_id: job_id to filter by
job_index: job_index to filter by
keys: | separated list of keys to return for each task
Args:
dataset_id (str): dataset id
Returns:
dict: {'task_id': {task data}}
"""
filters = {'dataset_id':dataset_id}
status = self.get_argument('status', None)
if status:
filters['status'] = {'$in': status.split('|')}
job_id = self.get_argument('job_id', None)
if job_id:
filters['job_id'] = job_id
job_index = self.get_argument('job_index', None)
if job_index:
try:
filters['job_index'] = int(job_index)
except ValueError:
raise tornado.web.HTTPError(400, reason='Bad argument "job_index": must be integer')
projection = {'_id': False}
keys = self.get_argument('keys','')
if keys:
projection.update({x:True for x in keys.split('|') if x})
projection['task_id'] = True
ret = {}
async for row in self.db.tasks.find(filters, projection=projection):
ret[row['task_id']] = row
self.write(ret)
[docs]
class DatasetTasksHandler(APIBase):
"""
Handle single task requests.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id, task_id):
"""
Get a task entry.
Args:
dataset_id (str): dataset id
task_id (str): the task id
Params (optional):
keys: | separated list of keys to return for each task
Returns:
dict: task entry
"""
projection = {'_id': False}
keys = self.get_argument('keys','')
if keys:
projection.update({x:True for x in keys.split('|') if x})
projection['task_id'] = True
ret = await self.db.tasks.find_one({'task_id':task_id,'dataset_id':dataset_id},
projection=projection)
if not ret:
self.send_error(404, reason="Task not found")
else:
self.write(ret)
self.finish()
[docs]
class DatasetTasksStatusHandler(APIBase):
"""
Handle single task requests.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='write')
async def put(self, dataset_id, task_id):
"""
Set a task status.
Body should have {'status': <new_status>}
Args:
dataset_id (str): dataset id
task_id (str): the task id
Returns:
dict: empty dict
"""
data = json.loads(self.request.body)
if (not data) or 'status' not in data:
raise tornado.web.HTTPError(400, reason='Missing status in body')
if data['status'] not in task_statuses:
raise tornado.web.HTTPError(400, reason='Bad status')
update_data = {
'status': data['status'],
'status_changed': nowstr(),
}
if data['status'] == 'reset':
update_data['failures'] = 0
ret = await self.db.tasks.update_one({'task_id':task_id,'dataset_id':dataset_id},
{'$set':update_data})
if (not ret) or ret.modified_count < 1:
self.send_error(404, reason="Task not found")
else:
self.write({})
self.finish()
[docs]
class DatasetTaskSummaryStatusHandler(APIBase):
"""
Handle task summary grouping by status.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id):
"""
Get the task summary for all tasks in a dataset, group by status.
Args:
dataset_id (str): dataset id
Returns:
dict: {<status>: [<task_id>,]}
"""
cursor = self.db.tasks.find({'dataset_id':dataset_id},
projection={'_id':False,'status':True,'task_id':True})
ret = defaultdict(list)
async for row in cursor:
ret[row['status']].append(row['task_id'])
ret2 = {}
for k in sorted(ret, key=task_status_sort):
ret2[k] = ret[k]
self.write(ret2)
self.finish()
[docs]
class DatasetTaskCountsStatusHandler(APIBase):
"""
Handle task summary grouping by status.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id):
"""
Get the task counts for all tasks in a dataset, group by status.
Args:
dataset_id (str): dataset id
Returns:
dict: {<status>: num}
"""
cursor = self.db.tasks.aggregate([
{'$match':{'dataset_id':dataset_id}},
{'$group':{'_id':'$status', 'total': {'$sum':1}}},
])
ret = {}
async for row in cursor:
ret[row['_id']] = row['total']
ret2 = {}
for k in sorted(ret, key=task_status_sort):
ret2[k] = ret[k]
self.write(ret2)
self.finish()
[docs]
class DatasetTaskCountsNameStatusHandler(APIBase):
"""
Handle task summary grouping by name and status.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id):
"""
Get the task counts for all tasks in a dataset, group by name,status.
Args:
dataset_id (str): dataset id
Returns:
dict: {<name>: {<status>: num}}
"""
cursor = self.db.tasks.aggregate([
{'$match':{'dataset_id':dataset_id}},
{'$group':{
'_id':{'name':'$name','status':'$status'},
'ordering':{'$first':'$task_index'},
'total': {'$sum':1}
}},
])
ret = defaultdict(dict)
ordering = {}
async for row in cursor:
ret[row['_id']['name']][row['_id']['status']] = row['total']
ordering[row['_id']['name']] = row['ordering']
ret2 = {}
for k in sorted(ordering, key=lambda n:ordering[n]):
ret2[k] = ret[k]
self.write(ret2)
self.finish()
[docs]
class DatasetTaskStatsHandler(APIBase):
"""
Handle task stats
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id):
"""
Get the task statistics for all tasks in a dataset, group by name.
Args:
dataset_id (str): dataset id
Returns:
dict: {<name>: {<stat>: <value>}}
"""
cursor = self.db.tasks.aggregate([
{'$match':{'dataset_id':dataset_id, 'status':'complete'}},
{'$group':{
'_id':'$name',
'count': {'$sum': 1},
'gpu': {'$sum': '$requirements.gpu'},
'total_hrs': {'$sum': '$walltime'},
'total_err_hrs': {'$sum': '$walltime_err'},
'avg_hrs': {'$avg': '$walltime'},
'stddev_hrs': {'$stdDevSamp': '$walltime'},
'min_hrs': {'$min': '$walltime'},
'max_hrs': {'$max': '$walltime'},
'ordering': {'$first': '$task_index'},
}},
])
ret = {}
ordering = {}
async for row in cursor:
denom = row['total_hrs'] + row['total_err_hrs']
row['efficiency'] = row['total_hrs']/denom if denom > 0 else 0.0
name = row.pop('_id')
ordering[name] = row.pop('ordering')
ret[name] = row
ret2 = {}
for k in sorted(ordering, key=lambda n:ordering[n]):
ret2[k] = ret[k]
self.write(ret2)
self.finish()
[docs]
class TasksActionsQueueHandler(APIBase):
"""
Handle task action for waiting -> queued.
"""
[docs]
@authorization(roles=['admin', 'system'])
async def post(self):
"""
Take a number of waiting tasks and queue them.
Order by priority.
Body args (json):
num_tasks: int
Returns:
dict: {queued: num tasks queued}
"""
data = json.loads(self.request.body)
num_tasks = data.get('num_tasks', 100)
query = {'status': 'waiting'}
val = {'$set': {'status': 'queued'}}
queued = 0
while queued < num_tasks:
ret = await self.db.tasks.find_one_and_update(
query,
val,
projection={'_id':False,'task_id':True},
sort=[('priority', -1)]
)
if not ret:
logger.debug('no more tasks to queue')
break
queued += 1
logger.info(f'queued {queued} tasks')
self.write({'queued': queued})
[docs]
class TasksActionsProcessingHandler(APIBase):
"""
Handle task action for queued -> processing.
"""
[docs]
@authorization(roles=['admin', 'system'])
async def post(self):
"""
Take one queued task, set its status to processing, and return it.
Body args (json):
requirements: dict
query_params: (optional) dict of mongodb params
Returns:
dict: <task dict>
"""
filter_query = {'status':'queued'}
sort_by = [('priority',-1)]
site = 'unknown'
if self.request.body:
data = json.loads(self.request.body)
# handle requirements
reqs = data.get('requirements', {})
req_filters = []
for k in reqs:
if k == 'gpu' and reqs[k] > 0:
val = {'$lte': reqs[k], '$gte': 1}
req_filters.append({'requirements.'+k: val})
continue
elif isinstance(reqs[k], (int,float)):
val = {'$lte': reqs[k]}
else:
val = reqs[k]
req_filters.append({'$or': [
{'requirements.'+k: {'$exists': False}},
{'requirements.'+k: val},
]})
if req_filters:
filter_query['$and'] = req_filters
if 'site' in reqs:
site = reqs['site']
# handle query_params
params = data.get('query_params', {})
for k in params:
if k in filter_query:
raise tornado.web.HTTPError(400, reason=f'param {k} would override an already set filter')
filter_query[k] = params[k]
print('filter_query', filter_query)
ret = await self.db.tasks.find_one_and_update(
filter_query,
{'$set':{'status':'processing'}},
projection={'_id':False},
sort=sort_by,
return_document=pymongo.ReturnDocument.AFTER
)
if not ret:
logger.info('filter_query: %r', filter_query)
self.send_error(404, reason="Task not found")
else:
self.statsd.incr('site.{}.task_processing'.format(site))
self.write(ret)
self.finish()
[docs]
class TasksActionsErrorHandler(APIBase):
"""
Handle task action on error (* -> reset).
"""
final_status = 'reset'
[docs]
@authorization(roles=['admin', 'system'])
async def post(self, task_id):
"""
Take one task, set its status to reset.
Args:
task_id (str): task id
Body args (json):
time_used (int): (optional) time used to run task, in seconds
resources (dict): (optional) resources used by task
site (str): (optional) site the task was running at
reason (str): (optional) reason for error
Returns:
dict: {} empty dict
"""
filter_query = {'task_id': task_id, 'status': {'$ne': 'complete'}}
update_query = defaultdict(dict,{
'$set': {
'status': self.final_status,
'status_changed': nowstr(),
},
'$inc': {
'failures': 1,
},
})
if self.request.body:
data = json.loads(self.request.body)
task = await self.db.tasks.find_one(filter_query)
if 'time_used' in data:
update_query['$inc']['walltime_err_n'] = 1
update_query['$inc']['walltime_err'] = data['time_used']/3600.
elif 'resources' in data and 'time' in data['resources']:
update_query['$inc']['walltime_err_n'] = 1
update_query['$inc']['walltime_err'] = data['resources']['time']
for k in ('cpu','memory','disk','time'):
if 'resources' in data and k in data['resources']:
try:
new_val = float(data['resources'][k])
old_val = task['requirements'][k] if k in task['requirements'] else Resources.defaults[k]
if k == 'cpu': # special handling for cpu
if new_val <= 1.1 or new_val > 20:
continue
if new_val < old_val*1.1:
continue
new_val = old_val+1 # increase linearly
elif new_val < 0.5:
logger.info('ignoring val below 0.5 for %s: %f', k, new_val)
continue
else:
new_val *= 1.5 # increase new request by 1.5
if isinstance(Resources.defaults[k], (int, list)):
new_val = math.ceil(new_val)
except Exception:
logger.info('error converting requirement %r',
data['resources'][k], exc_info=True)
else:
update_query['$max']['requirements.'+k] = new_val
site = 'unknown'
if 'site' in data:
site = data['site']
update_query['$set']['site'] = site
if self.statsd and 'reason' in data and data['reason']:
reason = 'other'
reasons = [
('Exception: failed to download', 'download_failure'),
('Exception: failed to upload', 'upload_failure'),
('Exception: module failed', 'module_failure'),
('Resource overusage for cpu', 'cpu_overuse'),
('Resource overusage for gpu', 'gpu_overuse'),
('Resource overusage for memory', 'memory_overuse'),
('Resource overusage for disk', 'disk_overuse'),
('Resource overusage for time', 'time_overuse'),
('pilot SIGTERM', 'sigterm'),
('killed', 'killed'),
]
for text,r in reasons:
if text in data['reason']:
reason = r
break
self.statsd.incr('site.{}.task_{}.{}'.format(site, self.final_status, reason))
ret = await self.db.tasks.find_one_and_update(
filter_query,
update_query,
projection={'_id':False}
)
if not ret:
logger.info('filter_query: %r', filter_query)
self.send_error(400, reason="Task not found")
else:
self.write(ret)
self.finish()
[docs]
class TasksActionsFailedHandler(TasksActionsErrorHandler):
final_status = 'failed'
[docs]
class TasksActionsCompleteHandler(APIBase):
"""
Handle task action on processing -> complete.
"""
[docs]
@authorization(roles=['admin', 'system'])
async def post(self, task_id):
"""
Take one task, set its status to complete.
Args:
task_id (str): task id
Body args (json):
time_used (int): (optional) time used to run task, in seconds
site (str): (optional) site the task was running at
Returns:
dict: {} empty dict
"""
filter_query = {'task_id': task_id, 'status': 'processing'}
update_query = {
'$set': {
'status': 'complete',
'status_changed': nowstr(),
},
}
if self.request.body:
data = json.loads(self.request.body)
if 'time_used' in data:
update_query['$set']['walltime'] = data['time_used']/3600.
site = 'unknown'
if 'site' in data:
site = data['site']
update_query['$set']['site'] = site
self.statsd.incr('site.{}.task_complete'.format(site))
ret = await self.db.tasks.find_one_and_update(
filter_query,
update_query,
projection={'_id':False}
)
if not ret:
logger.info('filter_query: %r', filter_query)
self.send_error(400, reason="Task not found or not processing")
else:
self.write(ret)
self.finish()
[docs]
class TaskBulkStatusHandler(APIBase):
"""
Update the status of multiple tasks at once.
"""
[docs]
@authorization(roles=['admin', 'system'])
async def post(self, status):
"""
Set multiple tasks' status.
Body should have {'tasks': [<task_id>, <task_id>, ...]}
Args:
status (str): the status
Returns:
dict: empty dict
"""
data = json.loads(self.request.body)
if (not data) or 'tasks' not in data or not data['tasks']:
raise tornado.web.HTTPError(400, reason='Missing tasks in body')
tasks = list(data['tasks'])
if len(tasks) > 100000:
raise tornado.web.HTTPError(400, reason='Too many tasks specified (limit: 100k)')
if status not in task_statuses:
raise tornado.web.HTTPError(400, reason='Bad status')
query = {
'task_id': {'$in': tasks},
}
update_data = {
'status': status,
'status_changed': nowstr(),
}
if status == 'reset':
update_data['failures'] = 0
ret = await self.db.tasks.update_many(query, {'$set':update_data})
if (not ret) or ret.modified_count < 1:
self.send_error(404, reason="Tasks not found")
else:
self.write({})
self.finish()
[docs]
class DatasetTaskBulkStatusHandler(APIBase):
"""
Update the status of multiple tasks at once.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='write')
async def post(self, dataset_id, status):
"""
Set multiple tasks' status.
Body should have {'tasks': [<task_id>, <task_id>, ...]}
Args:
dataset_id (str): dataset id
status (str): the status
Returns:
dict: empty dict
"""
data = json.loads(self.request.body)
if (not data) or 'tasks' not in data or not data['tasks']:
raise tornado.web.HTTPError(400, reason='Missing tasks in body')
tasks = list(data['tasks'])
if len(tasks) > 100000:
raise tornado.web.HTTPError(400, reason='Too many tasks specified (limit: 100k)')
if status not in task_statuses:
raise tornado.web.HTTPError(400, reason='Bad status')
query = {
'dataset_id': dataset_id,
'task_id': {'$in': tasks},
}
update_data = {
'status': status,
'status_changed': nowstr(),
}
if status == 'reset':
update_data['failures'] = 0
ret = await self.db.tasks.update_many(query, {'$set':update_data})
if (not ret) or ret.modified_count < 1:
self.send_error(404, reason="Tasks not found")
else:
self.write({})
self.finish()
[docs]
class DatasetTaskBulkRequirementsHandler(APIBase):
"""
Update the requirements of multiple tasks at once.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='write')
async def patch(self, dataset_id, name):
"""
Set multiple tasks' requirements. Sets for all tasks in a dataset
with the specified name.
Body should have {<resource>: <requirement>}.
Args:
dataset_id (str): dataset id
name (str): the task name
Returns:
dict: empty dict
"""
valid_req_keys = set(Resources.defaults)
valid_req_keys.add('os')
valid_req_keys.add('site')
data = json.loads(self.request.body)
if (not data):
raise tornado.web.HTTPError(400, reason='Missing body')
elif set(data) - valid_req_keys:
raise tornado.web.HTTPError(400, reason='Invalid resource types')
reqs = {}
for key in valid_req_keys.intersection(data):
val = data[key]
if key == 'os':
if not isinstance(val, list):
raise tornado.web.HTTPError(400, reason='Bad type for {}, should be list'.format(key))
elif key in Resources.defaults and isinstance(Resources.defaults[key], (int, list)):
if not isinstance(val, int):
raise tornado.web.HTTPError(400, reason='Bad type for {}, should be int'.format(key))
elif key in Resources.defaults and isinstance(Resources.defaults[key], float):
if not isinstance(val, (int,float)):
raise tornado.web.HTTPError(400, reason='Bad type for {}, should be float'.format(key))
else:
val = str(val)
reqs['requirements.'+key] = val
query = {
'dataset_id': dataset_id,
'name': name,
}
ret = await self.db.tasks.update_many(query, {'$max':reqs})
if (not ret) or ret.matched_count < 1:
self.send_error(404, reason="Tasks not found")
else:
self.write({})
self.finish()
[docs]
class DatasetMultiFilesHandler(APIBase):
"""
Handle multi files requests, by dataset.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id):
"""
Get dataset_files entries.
Args:
dataset_id (str): dataset id
Returns:
dict: {'files': [<file>]}
"""
filters = {'dataset_id': dataset_id}
projection = {'_id':False, 'dataset_id':False, 'task_id':False}
ret = []
async for row in self.db.dataset_files.find(filters, projection=projection):
ret.append(row)
self.write({'files': ret})
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='write')
async def post(self, dataset_id):
"""
Create a dataset_files entry.
Body should contain the file data.
Parameters:
filename (str): the full url filename
movement (str): [input | output | both]
job_index (int): the job index to add to
task_name (str): the name of the task
local (str): (optional) the local filename the task sees
transfer (str): whether to transfer the file (can be bool or str)
compression (str): whether to automatically compress/decompress the file
Returns:
dict: {'result': <task_id>}
"""
data = json.loads(self.request.body)
# validate first
req_fields = {
'filename': str,
'movement': str,
'job_index': int,
'task_name': str,
}
for k in req_fields:
if k not in data:
raise tornado.web.HTTPError(400, reason='missing key: '+k)
if not isinstance(data[k], req_fields[k]):
r = 'key {} should be of type {}'.format(k, req_fields[k])
raise tornado.web.HTTPError(400, reason=r)
# find the task referred to
filters = {
'dataset_id': dataset_id,
'job_index': data['job_index'],
'name': data['task_name'],
}
ret = await self.db.tasks.find_one(filters)
if not ret:
raise tornado.web.HTTPError(400, reason='task referred to not found')
# set some fields
file_data = dataclasses.Data()
file_data.update({
'task_id': ret['task_id'],
'dataset_id': dataset_id,
'remote': data['filename'],
'movement': data['movement'],
})
if 'local' in data:
if not isinstance(data['local'], str):
r = 'key {} should be of type {}'.format('local', str)
raise tornado.web.HTTPError(400, reason=r)
file_data['local'] = data['local']
if 'transfer' in data:
if not isinstance(data['transfer'], (str,bool)):
r = 'key {} should be of type {}'.format('transfer', str)
raise tornado.web.HTTPError(400, reason=r)
file_data['transfer'] = data['transfer']
if 'compression' in data:
if not isinstance(data['compression'], (str,bool)):
r = 'key {} should be of type {}'.format('compression', str)
raise tornado.web.HTTPError(400, reason=r)
file_data['compression'] = data['compression']
if not file_data.valid():
raise tornado.web.HTTPError(400, reason='invalid file data')
ret = await self.db.dataset_files.insert_one(dict(file_data))
self.set_status(201)
self.write({'result': file_data['task_id']})
self.finish()
[docs]
class DatasetTaskFilesHandler(APIBase):
"""
Handle multi files requests, by task.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id, task_id):
"""
Get dataset_files entries.
Args:
dataset_id (str): dataset id
task_id (str): task_id
Returns:
dict: {'files': [<file>]}
"""
filters = {'dataset_id': dataset_id, 'task_id': task_id}
projection = {'_id':False, 'dataset_id':False, 'task_id':False}
ret = []
async for row in self.db.dataset_files.find(filters, projection=projection):
ret.append(row)
self.write({'files': ret})
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='write')
async def post(self, dataset_id, task_id):
"""
Create a dataset_files entry.
Body should contain the file data.
Parameters:
filename (str): the full url filename
movement (str): [input | output | both]
local (str): (optional) the local filename the task sees
transfer (str): whether to transfer the file (can be bool or str)
compression (str): whether to automatically compress/decompress the file
Returns:
dict: {}
"""
data = json.loads(self.request.body)
# validate first
req_fields = {
'filename': str,
'movement': str,
}
for k in req_fields:
if k not in data:
raise tornado.web.HTTPError(400, reason='missing key: '+k)
if not isinstance(data[k], req_fields[k]):
r = 'key {} should be of type {}'.format(k, req_fields[k])
raise tornado.web.HTTPError(400, reason=r)
# set some fields
file_data = dataclasses.Data()
file_data.update({
'task_id': task_id,
'dataset_id': dataset_id,
'remote': data['filename'],
'movement': data['movement'],
})
if 'local' in data:
if not isinstance(data['local'], str):
r = 'key {} should be of type {}'.format('local', str)
raise tornado.web.HTTPError(400, reason=r)
file_data['local'] = data['local']
if 'transfer' in data:
if not isinstance(data['transfer'], (str,bool)):
r = 'key {} should be of type {}'.format('transfer', str)
raise tornado.web.HTTPError(400, reason=r)
file_data['transfer'] = data['transfer']
if 'compression' in data:
if not isinstance(data['compression'], (str,bool)):
r = 'key {} should be of type {}'.format('compression', str)
raise tornado.web.HTTPError(400, reason=r)
file_data['compression'] = data['compression']
if not file_data.valid():
raise tornado.web.HTTPError(400, reason='invalid file data')
await self.db.dataset_files.insert_one(dict(file_data))
self.set_status(201)
self.write({})
self.finish()
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='write')
async def delete(self, dataset_id, task_id):
"""
Delete dataset_files entries.
Args:
dataset_id (str): dataset id
task_id (str): task_id
Returns:
dict: {}
"""
filters = {'dataset_id': dataset_id, 'task_id': task_id}
await self.db.dataset_files.delete_many(filters)
self.write({})