Source code for iceprod.rest.handlers.jobs

import logging
import json
import uuid
from collections import defaultdict

import pymongo
import tornado.web

from ..base_handler import APIBase
from ..auth import authorization, attr_auth
from iceprod.server.util import nowstr, job_statuses, job_status_sort

logger = logging.getLogger('rest.jobs')


[docs] def setup(handler_cfg): """ Setup method for Jobs REST API. Args: handler_cfg (dict): args to pass to the route Returns: dict: routes, database, indexes """ return { 'routes': [ (r'/jobs', MultiJobsHandler, handler_cfg), (r'/jobs/(?P<job_id>\w+)', JobsHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/jobs', DatasetMultiJobsHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/jobs/(?P<job_id>\w+)', DatasetJobsHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/jobs/(?P<job_id>\w+)/status', DatasetJobsStatusHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/job_actions/bulk_status/(?P<status>\w+)', DatasetJobBulkStatusHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/job_summaries/status', DatasetJobSummariesStatusHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/job_counts/status', DatasetJobCountsStatusHandler, handler_cfg), ], 'database': 'jobs', 'indexes': { 'jobs': { 'job_id_index': {'keys': 'job_id', 'unique': True}, 'dataset_id_index': {'keys': 'dataset_id', 'unique': False}, } } }
[docs] class MultiJobsHandler(APIBase): """ Handle multi jobs requests. """
[docs] @authorization(roles=['admin', 'system']) async def post(self): """ Create a job entry. Body should contain the job data. Returns: dict: {'result': <job_id>} """ data = json.loads(self.request.body) # validate first req_fields = { 'dataset_id': str, 'job_index': int, } for k in req_fields: if k not in data: raise tornado.web.HTTPError(400, reason='missing key: '+k) if not isinstance(data[k], req_fields[k]): r = 'key {} should be of type {}'.format(k, req_fields[k]) raise tornado.web.HTTPError(400, reason=r) # validate job_id if given if 'job_id' in data: try: job_id = uuid.UUID(hex=data['job_id']).hex except Exception: raise tornado.web.HTTPError(400, reason='job_id should be a valid uuid') else: job_id = uuid.uuid1().hex # set some fields data.update({ 'job_id': job_id, 'status': 'processing', 'status_changed': nowstr(), }) await self.db.jobs.insert_one(data) self.set_status(201) self.write({'result': job_id}) self.finish()
[docs] class JobsHandler(APIBase): """ Handle single job requests. """
[docs] @authorization(roles=['admin', 'system']) async def get(self, job_id): """ Get a job entry. Args: job_id (str): the job id Returns: dict: job entry """ ret = await self.db.jobs.find_one({'job_id':job_id}, projection={'_id':False}) if not ret: self.send_error(404, reason="Job not found") else: self.write(ret) self.finish()
[docs] @authorization(roles=['admin', 'system']) async def patch(self, job_id): """ Update a job entry. Body should contain the job data to update. Note that this will perform a merge (not replace). Args: job_id (str): the job id Returns: dict: updated job entry """ data = json.loads(self.request.body) if not data: raise tornado.web.HTTPError(400, reason='Missing update data') ret = await self.db.jobs.find_one_and_update( {'job_id':job_id}, {'$set':data}, projection={'_id':False}, return_document=pymongo.ReturnDocument.AFTER ) if not ret: self.send_error(404, reason="Job not found") else: self.write(ret) self.finish()
[docs] class DatasetMultiJobsHandler(APIBase): """ Handle multi jobs requests. """
[docs] @authorization(roles=['admin', 'user', 'system']) @attr_auth(arg='dataset_id', role='read') async def get(self, dataset_id): """ Get all jobs for a dataset. Params (optional): job_index: job_index to filter by status: | separated list of task status to filter by keys: | separated list of keys to return for each task Args: dataset_id (str): dataset id Returns: dict: {'job_id':{job_data}} """ filters = {'dataset_id':dataset_id} status = self.get_argument('status', None) if status: filters['status'] = {'$in': status.split('|')} job_index = self.get_argument('job_index', None) if job_index: try: filters['job_index'] = int(job_index) except ValueError: raise tornado.web.HTTPError(400, reason='Bad argument "job_index": must be integer') projection = {'_id': False} keys = self.get_argument('keys','') if keys: projection.update({x:True for x in keys.split('|') if x}) projection['job_id'] = True cursor = self.db.jobs.find(filters, projection=projection) ret = {} async for row in cursor: ret[row['job_id']] = row self.write(ret) self.finish()
[docs] class DatasetJobsHandler(APIBase): """ Handle single job requests. """
[docs] @authorization(roles=['admin', 'user', 'system']) @attr_auth(arg='dataset_id', role='read') async def get(self, dataset_id, job_id): """ Get a job entry. Args: dataset_id (str): dataset id job_id (str): the job id Returns: dict: job entry """ ret = await self.db.jobs.find_one( {'job_id':job_id,'dataset_id':dataset_id}, projection={'_id':False} ) if not ret: self.send_error(404, reason="Job not found") else: self.write(ret) self.finish()
[docs] class DatasetJobsStatusHandler(APIBase): """ Handle single job requests. """
[docs] @authorization(roles=['admin', 'user', 'system']) @attr_auth(arg='dataset_id', role='write') async def put(self, dataset_id, job_id): """ Set a job status. Body should have {'status': <new_status>} Args: dataset_id (str): dataset id job_id (str): the job id Returns: dict: empty dict """ data = json.loads(self.request.body) if (not data) or 'status' not in data: raise tornado.web.HTTPError(400, reason='Missing status in body') if data['status'] not in job_statuses: raise tornado.web.HTTPError(400, reason='Bad status') update_data = { 'status': data['status'], 'status_changed': nowstr(), } ret = await self.db.jobs.update_one( {'job_id':job_id,'dataset_id':dataset_id}, {'$set':update_data} ) if (not ret) or ret.modified_count < 1: self.send_error(404, reason="Job not found") else: self.write({}) self.finish
[docs] class DatasetJobBulkStatusHandler(APIBase): """ Update the status of multiple jobs at once. """
[docs] @authorization(roles=['admin', 'user', 'system']) @attr_auth(arg='dataset_id', role='write') async def post(self, dataset_id, status): """ Set multiple jobs' status. Body should have {'jobs': [<job_id>, <job_id>, ...]} Args: dataset_id (str): dataset id status (str): the status Returns: dict: empty dict """ data = json.loads(self.request.body) if (not data) or 'jobs' not in data or not data['jobs']: raise tornado.web.HTTPError(400, reason='Missing jobs in body') jobs = list(data['jobs']) if len(jobs) > 100000: raise tornado.web.HTTPError(400, reason='Too many jobs specified (limit: 100k)') if status not in job_statuses: raise tornado.web.HTTPError(400, reason='Bad status') query = { 'dataset_id': dataset_id, 'job_id': {'$in': jobs}, } update_data = { 'status': status, 'status_changed': nowstr(), } ret = await self.db.jobs.update_many(query, {'$set':update_data}) if (not ret) or ret.modified_count < 1: self.send_error(404, reason="Jobs not found") else: self.write({}) self.finish()
[docs] class DatasetJobSummariesStatusHandler(APIBase): """ Handle job summary grouping by status. """
[docs] @authorization(roles=['admin', 'user', 'system']) @attr_auth(arg='dataset_id', role='read') async def get(self, dataset_id): """ Get the job summary for all jobs in a dataset, group by status. Args: dataset_id (str): dataset id Returns: dict: {<status>: [<job_id>,]} """ cursor = self.db.jobs.find( {'dataset_id':dataset_id}, projection={'_id':False,'status':True,'job_id':True} ) ret = defaultdict(list) async for row in cursor: ret[row['status']].append(row['job_id']) ret2 = {} for k in sorted(ret, key=job_status_sort): ret2[k] = ret[k] self.write(ret2) self.finish()
[docs] class DatasetJobCountsStatusHandler(APIBase): """ Handle job counts by status. """
[docs] @authorization(roles=['admin', 'user', 'system']) @attr_auth(arg='dataset_id', role='read') async def get(self, dataset_id): """ Get the job counts for all jobs in a dataset, group by status. Args: dataset_id (str): dataset id Returns: dict: {<status>: [<job_id>,]} """ cursor = self.db.jobs.aggregate([ {'$match':{'dataset_id':dataset_id}}, {'$group':{'_id':'$status', 'total': {'$sum':1}}}, ]) ret = {} async for row in cursor: ret[row['_id']] = row['total'] ret2 = {} for k in sorted(ret, key=job_status_sort): ret2[k] = ret[k] self.write(ret2) self.finish()