import logging
import json
import uuid
import tornado.web
from ..base_handler import APIBase
from ..auth import authorization, attr_auth
from iceprod.server.util import nowstr
logger = logging.getLogger('rest.task_stats')
[docs]
def setup(handler_cfg):
"""
Setup method for Task Stats REST API.
Args:
handler_cfg (dict): args to pass to the route
Returns:
dict: routes, database, indexes
"""
return {
'routes': [
(r'/tasks/(?P<task_id>\w+)/task_stats', MultiTaskStatsHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/bulk/task_stats', DatasetsBulkTaskStatsHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/tasks/(?P<task_id>\w+)/task_stats', DatasetsMultiTaskStatsHandler, handler_cfg),
(r'/datasets/(?P<dataset_id>\w+)/tasks/(?P<task_id>\w+)/task_stats/(?P<task_stat_id>\w+)', DatasetsTaskStatsHandler, handler_cfg),
],
'database': 'task_stats',
'indexes': {
'jobs': {
'task_stat_id_index': {'keys': 'task_stat_id', 'unique': True},
'task_id_index': {'keys': 'task_id', 'unique': False},
'dataset_id_index': {'keys': 'dataset_id', 'unique': False},
}
}
}
[docs]
class MultiTaskStatsHandler(APIBase):
"""
Handle multi task_stats requests.
"""
[docs]
@authorization(roles=['admin', 'system'])
async def post(self, task_id):
"""
Create a task_stat entry.
Body should contain the task stat data.
Args:
task_id (str): the task id for this task_stat
Returns:
dict: {'result': <task_stat_id>}
"""
stat_data = json.loads(self.request.body)
if 'dataset_id' not in stat_data:
raise tornado.web.HTTPError(400, reason='Missing dataset_id in body')
# set some fields
task_stat_id = uuid.uuid1().hex
data = {
'task_stat_id': task_stat_id,
'task_id': task_id,
'dataset_id': stat_data['dataset_id'],
'create_date': nowstr(),
'stats': stat_data,
}
await self.db.task_stats.insert_one(data)
self.set_status(201)
self.write({'result': task_stat_id})
self.finish()
[docs]
class DatasetsBulkTaskStatsHandler(APIBase):
"""
Handle a dataset bulk task_stats requests.
Stream the output of all stats.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id):
"""
Get task_stats for a dataset and task.
Args:
dataset_id (str): dataset id
Params (optional):
last: bool (True: only last task_stat. False: all task_stats)
after: only return stats created more recently than this date
keys: | separated list of keys to return for each task_stat
buffer_size: number of records to buffer before flushing (default 100)
Returns:
dict: {<task_stat_id>: stats}
"""
last = self.get_argument('last', 'f').lower() in ('true','t','1','yes','y')
query = {'dataset_id':dataset_id}
after = self.get_argument('after', None)
if after:
query['create_date'] = {"$gte": after}
projection = {'_id': False}
keys = self.get_argument('keys','')
if keys:
keys = keys.split('|')
projection.update({x:True for x in keys if x})
projection['task_stat_id'] = True
projection['task_id'] = True
projection['create_date'] = True
buffer_size = int(self.get_argument('buffer_size', '1000'))
task_id = None
data = []
n = 0
async for row in self.db.task_stats.find(query, projection=projection).sort([('task_id',1)]):
if row['task_id'] == task_id:
data.append(row)
continue
if data:
ret = sorted(data, key=lambda x: x['create_date'])
if keys:
ret = [{k:d[k] for k in d if k in keys} for d in ret]
if last:
self.write(ret[-1])
self.write('\n')
else:
for ret in data:
self.write(ret)
self.write('\n')
n += 1
if n >= buffer_size:
n = 0
await self.flush()
data = [row]
task_id = row['task_id']
if data:
ret = sorted(data, key=lambda x: x['create_date'])
if keys:
ret = [{k:d[k] for k in d if k in keys} for d in ret]
if last:
self.write(ret[-1])
self.write('\n')
else:
for ret in data:
self.write(ret)
self.write('\n')
self.finish()
[docs]
class DatasetsMultiTaskStatsHandler(APIBase):
"""
Handle multi task_stats requests.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id, task_id):
"""
Get task_stats for a dataset and task.
Args:
dataset_id (str): dataset id
task_id (str): task id
Params (optional):
last: bool (True: only last task_stat. False: all task_stats)
keys: | separated list of keys to return for each task_stat
Returns:
dict: {<task_stat_id>: stats}
"""
last = self.get_argument('last', 'f').lower() in ('true','t','1','yes','y')
projection = {'_id': False}
keys = self.get_argument('keys','')
if keys:
projection.update({x:True for x in keys.split('|') if x})
projection['task_stat_id'] = True
if last:
projection['create_date'] = True
ret = await self.db.task_stats.find(
{'dataset_id':dataset_id,'task_id':task_id},
projection=projection
).to_list(10000)
if last:
ret = sorted(ret, key=lambda x: x['create_date'])[-1:]
self.write({row['task_stat_id']:row for row in ret})
self.finish()
[docs]
class DatasetsTaskStatsHandler(APIBase):
"""
Handle single task_stat requests.
"""
[docs]
@authorization(roles=['admin', 'user', 'system'])
@attr_auth(arg='dataset_id', role='read')
async def get(self, dataset_id, task_id, task_stat_id):
"""
Get a task_stat entry.
Args:
dataset_id (str): dataset id
task_id (str): task id
task_stat_id (str): the task_stat id
Returns:
dict: task_stat entry
"""
ret = await self.db.task_stats.find_one(
{'dataset_id':dataset_id,'task_id':task_id,'task_stat_id':task_stat_id},
projection={'_id':False}
)
if not ret:
self.send_error(404, reason="Task stat not found")
else:
self.write(ret)
self.finish()