Source code for iceprod.rest.handlers.datasets

import logging
import json
import uuid
from collections import defaultdict

import pymongo
import tornado.web

from ..base_handler import APIBase
from ..auth import authorization, attr_auth
from iceprod.server.util import nowstr, dataset_statuses, dataset_status_sort

logger = logging.getLogger('rest.datasets')


[docs] def setup(handler_cfg): """ Setup method for Dataset REST API. Args: handler_cfg (dict): args to pass to the route Returns: dict: routes, database, indexes """ return { 'routes': [ (r'/datasets', MultiDatasetHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)', DatasetHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/description', DatasetDescriptionHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/status', DatasetStatusHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/priority', DatasetPriorityHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/jobs_submitted', DatasetJobsSubmittedHandler, handler_cfg), (r'/dataset_summaries/status', DatasetSummariesStatusHandler, handler_cfg), ], 'database': 'datasets', 'indexes': { 'datasets': { 'dataset_id_index': {'keys': 'dataset_id', 'unique': True}, } }, }
[docs] class MultiDatasetHandler(APIBase): """ Handle multi-dataset requests. """
[docs] @authorization(roles=['admin', 'user', 'system']) async def get(self): """ Get a dict of datasets. Params (optional): status: | separated list of status filters groups: | separated list of groups to filter on users: | separated list of users to filter on keys: | separated list of keys to return for each dataset Returns: dict: {<dataset_id>: metadata} """ query = {} status = self.get_argument('status', None) if status: query['status'] = {'$in': status.split('|')} groups = self.get_argument('groups', None) if groups: query['group'] = {'$in': groups.split('|')} users = self.get_argument('users', None) if users: query['username'] = {'$in': users.split('|')} projection = {'_id': False} keys = self.get_argument('keys', None) if keys: projection.update({x:True for x in keys.split('|') if x}) projection['dataset_id'] = True # must be in projection ret = {} async for row in self.db.datasets.find(query, projection=projection): k = row['dataset_id'] ret[k] = row self.write(ret) self.finish()
[docs] @authorization(roles=['admin', 'user']) async def post(self): """ Add a dataset. Body should contain all necessary fields for a dataset. """ data = json.loads(self.request.body) # validate first req_fields = { 'description': str, 'jobs_submitted': int, 'tasks_submitted': int, 'tasks_per_job': int, 'group': str, } for k in req_fields: if k not in data: raise tornado.web.HTTPError(400, reason='missing key: '+k) if not isinstance(data[k], req_fields[k]): r = 'key "{}" should be of type {}'.format(k, req_fields[k].__name__) raise tornado.web.HTTPError(400, reason=r) opt_fields = { 'priority': float, 'debug': bool, 'jobs_immutable': bool, 'status': str, } for k in opt_fields: if k in data and not isinstance(data[k], opt_fields[k]): r = 'key "{}" should be of type {}'.format(k, opt_fields[k].__name__) raise tornado.web.HTTPError(400, reason=r) bad_fields = set(data).difference(set(opt_fields).union(req_fields)) if bad_fields: r = 'invalid keys found' raise tornado.web.HTTPError(400, reason=r) if data['jobs_submitted'] == 0 and data['tasks_per_job'] <= 0: r = '"tasks_per_job" must be > 0' raise tornado.web.HTTPError(400, reason=r) elif data['tasks_submitted'] != 0 and data['tasks_submitted'] / data['jobs_submitted'] != data['tasks_per_job']: r = '"tasks_per_job" does not match "tasks_submitted"/"jobs_submitted"' raise tornado.web.HTTPError(400, reason=r) # generate dataset number ret = await self.db.settings.find_one_and_update( {'name': 'dataset_num'}, {'$inc': {'num': 1}}, projection={'num': True, '_id': False}, upsert=True, return_document=pymongo.ReturnDocument.AFTER) dataset_num = ret['num'] # set some fields dataset_id = uuid.uuid1().hex data['dataset_id'] = dataset_id data['dataset'] = dataset_num if 'status' not in data: data['status'] = 'processing' data['start_date'] = nowstr() data['username'] = self.current_user if 'priority' not in data: data['priority'] = 0.5 if 'debug' not in data: data['debug'] = False if 'jobs_immutable' not in data: data['jobs_immutable'] = False # insert ret = await self.db.datasets.insert_one(data) # set auth rules write_groups = list({'admin', data['group']}) if data['group'] != 'users' else ['admin'] await self.set_attr_auth( 'dataset_id', data['dataset_id'], read_groups=list({'admin', data['group'], 'users'}), write_groups=write_groups, read_users=[data['username']], write_users=[data['username']], ) # make sure user prio is set try: await self.add_user(self.current_user) except pymongo.errors.DuplicateKeyError: # ignore already added users pass # return success self.set_status(201) self.set_header('Location', f'/datasets/{dataset_id}') self.write({'result': f'/datasets/{dataset_id}'}) self.finish()
[docs] class DatasetHandler(APIBase): """ Handle dataset requests. """
[docs] @authorization(roles=['admin', 'user', 'system']) @attr_auth(arg='dataset_id', role='read') async def get(self, dataset_id): """ Get a dataset. Args: dataset_id (str): the dataset Returns: dict: dataset metadata """ ret = await self.db.datasets.find_one({'dataset_id':dataset_id}, projection={'_id':False}) if not ret: self.send_error(404, reason="Dataset not found") else: self.write(ret) self.finish()
[docs] class DatasetDescriptionHandler(APIBase): """ Handle dataset description updates. """
[docs] @authorization(roles=['admin', 'user']) @attr_auth(arg='dataset_id', role='write') async def put(self, dataset_id): """ Set a dataset description. Args: dataset_id (str): the dataset Returns: dict: empty dict """ data = json.loads(self.request.body) if 'description' not in data: raise tornado.web.HTTPError(400, reason='missing description') elif not isinstance(data['description'],str): raise tornado.web.HTTPError(400, reason='bad description') ret = await self.db.datasets.find_one_and_update( {'dataset_id':dataset_id}, {'$set':{'description': data['description']}}, projection=['_id'] ) if not ret: self.send_error(404, reason="Dataset not found") else: self.write({}) self.finish()
[docs] class DatasetStatusHandler(APIBase): """ Handle dataset status updates. """
[docs] @authorization(roles=['admin', 'user', 'system']) @attr_auth(arg='dataset_id', role='write') async def put(self, dataset_id): """ Set a dataset status. Args: dataset_id (str): the dataset Returns: dict: empty dict """ data = json.loads(self.request.body) if 'status' not in data: raise tornado.web.HTTPError(400, reason='missing status') elif data['status'] not in dataset_statuses: raise tornado.web.HTTPError(400, reason='bad status') ret = await self.db.datasets.find_one_and_update( {'dataset_id':dataset_id}, {'$set':{'status': data['status']}}, projection=['_id'] ) if not ret: self.send_error(404, reason="Dataset not found") else: self.write({}) self.finish()
[docs] class DatasetPriorityHandler(APIBase): """ Handle dataset priority updates. """
[docs] @authorization(roles=['admin', 'user', 'system']) @attr_auth(arg='dataset_id', role='write') async def put(self, dataset_id): """ Set a dataset priority. Args: dataset_id (str): the dataset Returns: dict: empty dict """ data = json.loads(self.request.body) if 'priority' not in data: raise tornado.web.HTTPError(400, reason='missing priority') elif not isinstance(data['priority'], (int, float)): raise tornado.web.HTTPError(400, reason='priority is not a number') ret = await self.db.datasets.find_one_and_update( {'dataset_id':dataset_id}, {'$set':{'priority': data['priority']}}, projection=['_id'] ) if not ret: self.send_error(404, reason="Dataset not found") else: self.write({}) self.finish()
[docs] class DatasetJobsSubmittedHandler(APIBase): """ Handle dataset jobs_submitted updates. """
[docs] @authorization(roles=['admin', 'user']) @attr_auth(arg='dataset_id', role='write') async def put(self, dataset_id): """ Set a dataset's jobs_submitted. Only allows increases, if the jobs_immutable flag is not set. Args: dataset_id (str): the dataset Json body: jobs_submitted (int): the number of jobs submitted Returns: dict: empty dict """ data = json.loads(self.request.body) if 'jobs_submitted' not in data: raise tornado.web.HTTPError(400, reason='missing jobs_submitted') try: jobs_submitted = int(data['jobs_submitted']) except Exception: raise tornado.web.HTTPError(400, reason='jobs_submitted is not an int') ret = await self.db.datasets.find_one({'dataset_id':dataset_id}) if not ret: raise tornado.web.HTTPError(404, reason='Dataset not found') if ret['jobs_immutable']: raise tornado.web.HTTPError(400, reason='jobs_submitted is immutable') if ret['jobs_submitted'] > jobs_submitted: raise tornado.web.HTTPError(400, reason='jobs_submitted must be larger than before') if 'tasks_per_job' not in ret or ret['tasks_per_job'] <= 0: raise tornado.web.HTTPError(400, reason='tasks_per_job not valid') ret = await self.db.datasets.find_one_and_update( {'dataset_id':dataset_id}, {'$set':{ 'jobs_submitted': jobs_submitted, 'tasks_submitted': int(jobs_submitted*ret['tasks_per_job']), }}, projection=['_id'] ) if not ret: self.send_error(404, reason="Dataset not found") else: self.write({}) self.finish()
[docs] class DatasetSummariesStatusHandler(APIBase): """ Handle dataset summary grouping by status. """
[docs] @authorization(roles=['admin', 'user', 'system']) async def get(self): """ Get the dataset summary for all datasets, group by status. Returns: dict: {<status>: [<dataset_id>,]} """ cursor = self.db.datasets.find(projection={'_id':False,'status':True,'dataset_id':True}) ret = defaultdict(list) async for row in cursor: ret[row['status']].append(row['dataset_id']) ret2 = {} for k in sorted(ret, key=dataset_status_sort): ret2[k] = ret[k] self.write(ret2) self.finish()