Source code for iceprod.rest.handlers.logs

import json
import logging
import uuid

import tornado.web

from ..base_handler import APIBase
from ..auth import authorization, attr_auth
from iceprod.server.util import nowstr

logger = logging.getLogger('rest.logs')


[docs] def setup(handler_cfg): """ Setup method for Logs REST API. Args: handler_cfg (dict): args to pass to the route Returns: dict: routes, database, indexes """ return { 'routes': [ (r'/logs', MultiLogsHandler, handler_cfg), (r'/logs/(?P<log_id>\w+)', LogsHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/logs', DatasetMultiLogsHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/logs/(?P<log_id>\w+)', DatasetLogsHandler, handler_cfg), (r'/datasets/(?P<dataset_id>\w+)/tasks/(?P<task_id>\w+)/logs', DatasetTaskLogsHandler, handler_cfg), ], 'database': 'logs', 'indexes': { 'logs': { 'log_id_index': {'keys': 'log_id', 'unique': True}, 'task_id_index': {'keys': 'task_id', 'unique': False}, 'dataset_id_index': {'keys': 'dataset_id', 'unique': False}, } } }
[docs] class MultiLogsHandler(APIBase): """ Handle logs requests. """
[docs] @authorization(roles=['admin', 'system']) async def get(self): """ Get multiple log entries based on search arguments. Body args (json): from (str): timestamp to start searching to (str): timestamp to end searching name (str): name (type) of log dataset_id (str): dataset_id task_id (str): task_id keys: | separated list of keys to return for each task Returns: dict: {log_id: {keys}} """ query = {} projection = {'_id': False, 'log_id': True} try: limit = int(self.get_argument('limit', 0)) except Exception: raise tornado.web.HTTPError(500, reason='non-integer limit') date_to = self.get_argument('to', None) date_from = self.get_argument('from', None) if date_to and date_from: query['timestamp'] = {'$gte': date_from, '$lte': date_to} elif date_from: query['timestamp'] = {'$gte': date_from} elif date_to: query['timestamp'] = {'$lte': date_to} for k in ('name', 'dataset_id', 'task_id'): val = self.get_argument(k, None) if val: query[k] = val keys = self.get_argument('keys', 'name|data|timestamp|task_id|dataset_id') projection.update({x:True for x in keys.split('|') if x}) logging.debug('query: %r', query) logging.debug('projection: %r', projection) ret = {} f = self.db.logs.find(query, projection=projection, limit=limit) async for row in f: if 'data' in projection and 'data' not in row: if self.s3: row['data'] = await self.s3.get(row['log_id']) else: raise tornado.web.HTTPError(500, reason='no data field and s3 disabled') ret[row['log_id']] = row self.write(ret) self.finish()
[docs] @authorization(roles=['admin', 'system']) async def post(self): """ Create a log entry. Body should contain the following fields: data: str Optional fields: dataset_id: str task_id: str name: str Returns: dict: {'result': <log_id>} """ data = json.loads(self.request.body) if 'data' not in data: raise tornado.web.HTTPError(400, reason='data field not in body') if 'name' not in data: data['name'] = 'log' log_id = uuid.uuid1().hex data['log_id'] = log_id data['timestamp'] = nowstr() if self.s3 and len(data['data']) > 1000000: await self.s3.put(log_id, data['data']) del data['data'] await self.db.logs.insert_one(data) self.set_status(201) self.write({'result': log_id}) self.finish()
[docs] class LogsHandler(APIBase): """ Handle logs requests. """
[docs] @authorization(roles=['admin', 'system']) async def get(self, log_id): """ Get a log entry. Args: log_id (str): the log id of the entry Returns: dict: all body fields """ ret = await self.db.logs.find_one({'log_id':log_id}, projection={'_id':False}) if not ret: self.send_error(404, reason="Log not found") else: if 'data' not in ret: if self.s3: ret['data'] = await self.s3.get(ret['log_id']) else: raise tornado.web.HTTPError(500, reason='no data field and s3 disabled') self.write(ret) self.finish()
[docs] @authorization(roles=['admin', 'system']) async def delete(self, log_id): """ Delete a log entry. Args: log_id (str): the log id of the entry Returns: dict: empty dict on success """ ret = await self.db.logs.find_one_and_delete({'log_id':log_id}) if ret: if 'data' not in ret: if self.s3: e = await self.s3.exists(log_id) if e: await self.s3.delete(log_id) else: logging.warn('no data field and s3 disabled') self.write({}) self.finish()
[docs] class DatasetMultiLogsHandler(APIBase): """ Handle logs requests. """
[docs] @authorization(roles=['admin', 'user']) @attr_auth(arg='dataset_id', role='write') async def post(self, dataset_id): """ Create a log entry. Body should contain the following fields: data: str Optional fields: task_id: str name: str Args: dataset_id (str): the dataset id Returns: dict: {'result': <log_id>} """ data = json.loads(self.request.body) if 'data' not in data: raise tornado.web.HTTPError(400, reason='data field not in body') log_id = uuid.uuid1().hex data['log_id'] = log_id data['dataset_id'] = dataset_id data['timestamp'] = nowstr() if self.s3 and len(data['data']) > 1000000: await self.s3.put(log_id, data['data']) del data['data'] await self.db.logs.insert_one(data) self.set_status(201) self.write({'result': log_id}) self.finish()
[docs] class DatasetLogsHandler(APIBase): """ Handle logs requests. """
[docs] @authorization(roles=['admin', 'user']) @attr_auth(arg='dataset_id', role='write') async def get(self, dataset_id, log_id): """ Get a log. Args: dataset_id (str): the dataset id log_id (str): the log id of the entry Returns: dict: all body fields """ ret = await self.db.logs.find_one( {'dataset_id':dataset_id,'log_id':log_id}, projection={'_id':False} ) if not ret: self.send_error(404, reason="Log not found") else: if 'data' not in ret: if self.s3: ret['data'] = await self.s3.get(ret['log_id']) else: raise tornado.web.HTTPError(500, reason='no data field and s3 disabled') self.write(ret) self.finish()
[docs] class DatasetTaskLogsHandler(APIBase): """ Handle log requests for a task """
[docs] @authorization(roles=['admin', 'user']) @attr_auth(arg='dataset_id', role='read') async def get(self, dataset_id, task_id): """ Get logs for a dataset and task. Note: "num" and "group" are generally not used together. Params (optional): num (int): number of logs, or groups of logs, to return group {true, false}: group by log name order {asc, desc}: order by time keys: | separated list of keys to return for each log Args: dataset_id (str): the dataset id task_id (str): the task id Returns: dict: {'logs': [log entry dict, log entry dict]} """ filters = {'dataset_id': dataset_id, 'task_id': task_id} num = self.get_argument('num', None) if num: try: num = int(num) except Exception: raise tornado.web.HTTPError(400, reason='bad num param. must be int') group = self.get_argument('group', 'false').lower() == 'true' order = self.get_argument('order', 'desc').lower() if order not in ('asc', 'desc'): raise tornado.web.HTTPError(400, reason='bad order param. should be "asc" or "desc".') projection = {'_id': False} keys = self.get_argument('keys', None) if keys: projection.update({x:True for x in keys.split('|') if x}) steps = [ {'$match': filters}, {'$sort': {'timestamp': -1 if order == 'desc' else 1}}, ] if group: if not keys: keys = 'log_id|name|task_id|dataset_id|data|timestamp' grouping = {x:{'$first':'$'+x} for x in keys.split('|') if x} grouping['_id'] = '$name' if 'timestamp' not in grouping: grouping['timestamp'] = {'$first': '$timestamp'} steps.extend([ {'$group': grouping}, {'$sort': {'timestamp': -1 if order == 'desc' else 1}}, ]) steps.append({'$project': projection}) if num: steps.append({'$limit': num}) logger.debug('steps: %r', steps) cur = self.db.logs.aggregate(steps, allowDiskUse=True) ret = [] async for entry in cur: ret.append(entry) if not ret: self.send_error(404, reason="Log not found") else: for log in ret: if 'data' not in log and ((not keys) or 'data' in keys.split('|')): try: if self.s3: log['data'] = await self.s3.get(log['log_id']) else: raise Exception('no data field and s3 disabled') except Exception as e: self.send_error(500, reason=str(e)) return self.write({'logs':ret}) self.finish()
[docs] @authorization(roles=['admin', 'user']) @attr_auth(arg='dataset_id', role='write') async def delete(self, dataset_id, task_id): """ Delete all logs for a dataset and task. Args: dataset_id (str): the dataset id task_id (str): the task id Returns: dict: empty dict on success """ async for row in self.db.logs.find({'dataset_id': dataset_id, 'task_id': task_id}): log_id = row['log_id'] await self.db.logs.delete_one({'log_id':log_id}) if 'data' not in row: if self.s3: e = await self.s3.exists(log_id) if e: await self.s3.delete(log_id) else: logging.warn('no data field and s3 disabled') self.write({}) self.finish()