"""
A `recursive descent parser
<http://en.wikipedia.org/wiki/Recursive_descent_parser>`_ for the IceProd meta
language. Most commonly used in IceProd dataset configurations to refer to
other parts of the same configuration.
"""
from __future__ import absolute_import, division, print_function
import re
import random
import functools
import json
import builtins
import logging
import ast
import operator as op
from iceprod.core import dataclasses
logger = logging.getLogger('parser')
class safe_eval:
# supported operators
operators = {
ast.Add: op.add,
ast.Sub: op.sub,
ast.Mult: op.mul,
ast.Div: op.truediv,
ast.FloorDiv: op.floordiv,
ast.Mod: op.mod,
ast.Pow: op.pow,
ast.BitXor: op.xor,
ast.Invert: op.invert,
ast.Not: op.not_,
ast.UAdd: op.pos,
ast.USub: op.neg,
}
[docs]
@classmethod
def eval(cls,expr):
"""
Safe evaluation of arithmatic operations using
:mod:`Abstract Syntax Trees <ast>`.
"""
return cls.__eval(ast.parse(expr).body[0].value) # Module(body=[Expr(value=...)])
@classmethod
def __eval(cls,node):
if isinstance(node, ast.Num): # <number>
return node.n
elif isinstance(node, (ast.operator,ast.unaryop)): # <operator>
return cls.operators[type(node)]
elif isinstance(node, ast.BinOp): # <left> <operator> <right>
return cls.__eval(node.op)(cls.__eval(node.left), cls.__eval(node.right))
elif isinstance(node, ast.UnaryOp): # <operator> <right>
return cls.__eval(node.op)(cls.__eval(node.operand))
else:
raise TypeError(node)
class GrammarException(Exception):
pass
def getType(output):
try:
if isinstance(output,dataclasses.String) and not (output.startswith('"') and output.endswith('"')):
try:
output = json.loads(output.replace("'",'"'))
except Exception:
logging.debug('error formatting json', exc_info=True)
if output.lower() == 'true':
output = True
elif output.lower() == 'false':
output = False
elif output.isdigit():
output = int(output)
else:
output = float(output)
except Exception:
pass
return output
def parse_ret_type(ret):
if isinstance(ret, (list,dict)):
return ret
else:
return str(ret)
tokens = ["word", "name", "starter", "scopeL", "scopeR", "bracketL", "bracketR"]
def scanner(data):
"""A lexical scanner, yielding token pairs"""
word = ''
escape = False
for ch in data:
if escape:
word += ch
escape = False
elif ch == '\\':
escape = True
elif ch in '$()[]':
if word:
yield ('word', word)
word = ''
if ch == '$':
yield ('starter', '$')
elif ch == '(':
yield ('scopeL', '(')
elif ch == ')':
yield ('scopeR', ')')
elif ch == '[':
yield ('bracketL', '[')
elif ch == ']':
yield ('bracketR', ']')
else:
word += ch
if word:
yield ('word', word)
def parser(data):
"""A syntactic parser, yielding syntactically accurate token pairs"""
last_token = None
last_word = ''
nestings = 0
stack = []
for token,word in scanner(data):
logger.debug('%s,%s,%s,%s,%d,%r',token,word,last_token,last_word,nestings,stack)
if token == 'starter':
if last_token == 'word':
yield ("word", last_word)
if last_token:
stack.append(nestings)
nestings = 0
last_word = ''
yield ("starter", '$')
last_token = 'starter'
elif token == 'scopeL':
if last_token == 'starter':
yield ('name', None)
last_token = 'name'
if last_token == 'name':
yield (token,word)
last_token = 'scopeL'
last_word = ''
else:
# must be part of a broken word
if last_token == 'word':
last_word += word
else:
last_token = 'word'
last_word = word
nestings += 1
elif token == 'scopeR':
if last_token == 'word':
if nestings > 0:
last_word += word
nestings -= 1
else:
yield ("word", last_word)
yield (token,word)
if stack:
nestings = stack.pop()
else:
nestings = 0
last_word = ''
elif last_token == 'scopeL':
yield ("word", '')
yield (token,word)
last_token = 'word'
last_word = ''
else:
raise SyntaxError()
elif token == 'bracketL':
if last_token == 'word':
if last_word:
# part of a broken word
last_word += '['
nestings += 1
else:
yield ('bracketL', '[')
last_token = 'bracketL'
else:
raise SyntaxError()
elif token == 'bracketR':
if last_token == 'word':
if nestings > 0:
last_word += word
nestings -= 1
else:
yield ("word", last_word)
yield (token,word)
if stack:
nestings = stack.pop()
else:
nestings = 0
last_word = ''
elif last_token == 'bracketL':
yield ("word", '')
yield (token,word)
last_token = 'word'
last_word = ''
else:
raise SyntaxError()
elif token == 'word':
if last_token == 'starter':
yield ('name', word)
last_token = 'name'
elif last_token == 'word':
last_word += word
else:
last_token = 'word'
last_word = word
else:
# bad token
raise SyntaxError()
if last_word:
yield ("word", last_word)
if nestings or stack:
raise SyntaxError()
[docs]
class ExpParser:
"""
Expression parsing class for parameter values.
Grammar Definition::
char := any unicode character other than $()[]
word := char | char + word
starter := $
scopeL := (
scopeR := )
bracketL := [
bracketR := ]
symbol := starter | starter + word
phrase := symbol + scopeL + sentence + scopeR
lookup := word + bracketL + word + bracketR | phrase + bracketL + word + bracketR
sentence := lookup | phrase | word | lookup + sentence | phrase + sentence | word + sentence
Keywords:
* steering : A parameter from :class:`iceprod.core.dataclasses.Steering`
* system : A system value from :class:`iceprod.core.dataclasses.Steering`
* args, options : An option value from :class:`iceprod.core.dataclasses.Job`
* metadata : A value from :class:`iceprod.core.dataclasses.Dif` or
:class:`iceprod.core.dataclasses.Plus`
* eval : An arithmatic expression
* sum, min, max, len : Apply a reduction to a sequence
* choice : A random choice from a list of possibilites
* sprintf : The sprintf string syntax
Examples::
$steering(my_parameter)
$system(gpu_opts)
$args(option1)
$options(option1)
$metadata(sensor_name)
$eval(1+2)
$choice(1,2,3,4)
$sprintf("%04d",4)
"""
def __init__(self):
self.job = None
self.env = None
self.depth = 0
# dict of keyword : function mappings
self.keywords = {
'steering' : self.steering_func,
'system' : self.system_func,
'environ' : self.environ_func,
'args' : self.options_func,
'options' : self.options_func,
'metadata' : self.difplus_func,
'eval' : self.eval_func,
'choice' : self.choice_func,
'sprintf' : self.sprintf_func
}
for reduction in 'sum', 'min', 'max', 'len':
self.keywords[reduction] = functools.partial(self.reduce_func, getattr(builtins, reduction))
[docs]
def parse(self,input,job=None,env=None,depth=20):
"""
Parse the input, expanding where possible.
:param input: input string
:param job: :class:`iceprod.core.dataclasses.Job`, optional
:param env: env dictionary, optional
:param depth: how deep to recursively parse
:returns: expanded string
"""
if depth < 1:
logger.warning("recursion depth of parse exceeded")
return input
logger.debug("parse: %r",input)
if not isinstance(input,dataclasses.String) or not input:
# check for lists or dicts to recurse into
if isinstance(input, list):
logger.debug("recursing into list: %r", input)
input = [self.parse(x, job=job, env=env, depth=depth-1) for x in input]
elif isinstance(input, dict):
logger.debug("recursing into dict: %r", input)
input = {self.parse(x, job=job, env=env, depth=depth-1):self.parse(input[x],job=job,env=env,depth=depth-1) for x in input}
return input
# set job and env
if job:
self.job = job
else:
self.job = dataclasses.Job()
if env:
self.env = env
else:
self.env = {}
self.depth = 0 # start at a depth of 0
while True:
# parse input
stack = []
try:
for token,word in parser(input):
logger.debug('exp %s,%s,%r',token,word,stack)
if token in ('starter','name','word','scopeL','bracketL'):
stack.append((token,word))
elif token == 'scopeR':
# coelsce stack up to scopeL
word = ''
while stack and stack[-1][0] != 'scopeL':
word = stack.pop()[1] + word
stack.pop() # remove scopeL
name = stack.pop()[1]
stack.pop() # remove starter
# try evaluating this
try:
args = []
if name:
args.append(name)
args.append(word)
ret = self.process_phrase(*args)
if isinstance(ret, (list, dict)):
ret = json.dumps(ret)
stack.append(('word',str(ret)))
except GrammarException:
logger.debug('GrammarException')
stack.append(('word','$'+(name if name else '')+'('+word+')'))
elif token == 'bracketR':
# coelsce stack up to bracketL
word = ''
while stack and stack[-1][0] != 'bracketL':
word = stack.pop()[1] + word
stack.pop() # remove bracketL
# try evaluating this
if word.endswith(']'):
# nested bracket, so recurse
word = self.parse(word, job=job, env=env, depth=depth-1)
try:
ret = self.process_phrase(word)
except GrammarException:
ret = word
# coelsce words
word = ''
while (stack and stack[-1][0] == 'word'
and (not word.startswith('['))
and (not word.startswith('{'))):
word = stack.pop()[1] + word
# now do list/dict index
try:
innerType = getType(ret)
ret = getType(word)[innerType]
stack.append(('word',str(ret)))
except Exception:
logger.debug('cannot eval: %s[%s]', word, ret,
exc_info=True)
stack.append(('word',word+'['+ret+']'))
else:
raise SyntaxError()
except Exception:
logger.debug('SyntaxError', exc_info=True)
output = getType(input)
else:
logger.debug('joining stack: %r', ''.join(s[1] for s in stack))
output = getType(''.join(s[1] for s in stack))
if isinstance(output,dataclasses.String) and output != input:
logger.debug('reprocessing output: %r', output)
input = output
continue
break
# check for lists or dicts to recurse into
if isinstance(output, (list,dict)):
output = self.parse(output, job=job, env=env, depth=depth)
# return parsed output
logger.debug('parser out: %r',output)
return output
def process_phrase(self,keyword,param=None):
# search for keyword in special list
ret = None
if keyword in self.keywords and param is not None:
try:
ret = self.keywords[keyword](param)
except GrammarException:
pass
if ret is None and param is None:
# do general search for keyword
if 'parameters' in self.env and keyword in self.env['parameters']:
# search env params first
ret = self.env['parameters'][keyword]
elif keyword in self.job and not isinstance(self.job[keyword],dict):
# search job second
try:
ret = self.job[keyword]
except Exception:
pass
elif keyword in self.job['options']: # search options third
try:
ret = self.job['options'][keyword]
except Exception:
pass
elif self.job['steering'] and keyword in self.job['steering']['parameters']:
# search job steering last
ret = self.job['steering']['parameters'][keyword]
if ret is None:
raise GrammarException()
return parse_ret_type(ret)
def steering_func(self,param):
"""Find param in steering"""
if self.job['steering'] and param in self.job['steering']['parameters']:
return parse_ret_type(self.job['steering']['parameters'][param])
else:
raise GrammarException('steering:'+param)
def system_func(self,param):
"""Find param in steering.system"""
if self.job['steering'] and param in self.job['steering']['system']:
return parse_ret_type(self.job['steering']['system'][param])
else:
raise GrammarException('system:'+param)
def environ_func(self,param):
"""Find param in env["environment"]"""
if 'environment' in self.env and param in self.env['environment']:
return parse_ret_type(self.env['environment'][param])
else:
raise GrammarException('environ:'+param)
def options_func(self,param):
"""Find param in options"""
if param in self.job['options']:
return parse_ret_type(self.job['options'][param])
else:
raise GrammarException('options:'+param)
def difplus_func(self,param):
"""Find param in dif plus"""
try:
# try dif, then plus
return parse_ret_type(self.job['difplus']['dif'][param])
except Exception:
try:
return parse_ret_type(self.job['difplus']['plus'][param])
except Exception:
raise GrammarException('difplus:'+param)
def choice_func(self,param):
"""Evaluate param as choice expression"""
if not param:
raise GrammarException('no choices available')
try:
if isinstance(param,(tuple,list)):
return parse_ret_type(random.choice(param))
else:
return parse_ret_type(random.choice(param.split(',')))
except Exception:
raise GrammarException('not a valid choice')
def eval_func(self,param):
"""Evaluate param as arithmetic expression"""
bad = functools.reduce(lambda a, b: a or (b in param),('import','open','for','while','def','class','lambda'),False)
if bad:
raise GrammarException('Unsafe operator call')
else:
try:
return parse_ret_type(safe_eval.eval(param))
except Exception:
raise GrammarException('Eval is not basic arithmetic')
def reduce_func(self, func, param):
try:
return parse_ret_type(func(getType(param)))
except Exception:
raise GrammarException('Not a reducible sequence')
def sprintf_func(self,param):
"""Evaluate param as sprintf. param = arg_str, arg0, arg1, ... """
# separate into format string and args
strchar = param[0]
if strchar in '\'"':
pos = param.find(strchar,1)
if pos < 0:
raise GrammarException("Can't find closing quote for format string")
fmt_str = param[1:pos]
else:
pos = param.find(',',0)
if pos < 0:
raise GrammarException("Can't find end of format string")
fmt_str = param[0:pos]
args = []
pos = param.find(',',pos)
while pos >= 0:
pos2 = param.find(',',pos+1)
if pos2 < 0:
args.append(param[pos+1:])
break
else:
args.append(param[pos+1:pos2])
pos = pos2
try:
# cast args to correct type
def cast_string(fstring,arg):
"""cast string to value according to formatting character"""
if not fstring:
return arg
if fstring[-1] in 'cs':
if arg[0] in '\'"':
return str(arg[1:-1])
else:
return str(arg)
elif fstring[-1] == 'r':
return repr(arg)
elif fstring[-1].lower() in 'xo':
return int(arg)
elif fstring[-1].lower() in 'idufeg':
return float(arg)
else:
raise GrammarException('Unable to cast %s using format %s'%(arg,fstring))
fstrings = re.findall(r'\%[#0\- +]{0,1}[0-9]*\.{0,1}[0-9]*[csridufegExXo]',fmt_str)
args = list(map(cast_string,fstrings,args))[0:len(args)]
# do sprintf on fmt_str and args
if fstrings:
return fmt_str % tuple(args)
else:
return fmt_str
except Exception as e:
raise GrammarException(str(e))