"""
Some JSON encoding and decoding utilities.
"""
from __future__ import absolute_import, division, print_function
import json
from datetime import date,datetime,time
import base64
import zlib
import logging
import ast
import inspect
from iceprod.core import dataclasses
from iceprod.core import util
logger = logging.getLogger('jsonUtil')
[docs]
class json_compressor:
"""Used for files and other large things sent over json.
Great for log files.
"""
[docs]
@staticmethod
def compress(obj):
return base64.b64encode(zlib.compress(obj)) if obj else b''
[docs]
@staticmethod
def uncompress(obj):
return zlib.decompress(base64.b64decode(obj)).decode('utf-8') if obj else ''
[docs]
class datetime_converter:
[docs]
@staticmethod
def dumps(obj):
return obj.isoformat()
[docs]
@staticmethod
def loads(obj,name=None):
if ':' in obj:
if 'T' in obj or ' ' in obj:
center = ' '
if 'T' in obj:
center = 'T'
# must be datetime
if '.' in obj:
return datetime.strptime(obj, "%Y-%m-%d"+center+"%H:%M:%S.%f")
else:
return datetime.strptime(obj, "%Y-%m-%d"+center+"%H:%M:%S")
else:
# must be time
if '.' in obj:
return datetime.strptime(obj, "%H:%M:%S.%f")
else:
return datetime.strptime(obj, "%H:%M:%S")
else:
# must be date
return datetime.strptime(obj, "%Y-%m-%d")
[docs]
class date_converter(datetime_converter):
[docs]
@staticmethod
def loads(obj,name=None):
d = datetime_converter.loads(obj)
return date(d.year,d.month,d.day)
[docs]
class time_converter(datetime_converter):
[docs]
@staticmethod
def loads(obj,name=None):
d = datetime_converter.loads(obj)
return time(d.hour,d.minute,d.second,d.microsecond)
[docs]
class binary_converter:
"""note that is is really only for decode of json, since python bytes are strings"""
[docs]
@staticmethod
def dumps(obj,name=None):
return base64.b64encode(obj)
[docs]
@staticmethod
def loads(obj,name=None):
return base64.b64decode(obj).decode('utf-8')
[docs]
class bytearray_converter:
[docs]
@staticmethod
def dumps(obj,name=None):
return base64.b64encode(str(obj))
[docs]
@staticmethod
def loads(obj,name=None):
return bytearray(base64.b64decode(obj))
[docs]
class set_converter:
[docs]
@staticmethod
def dumps(obj):
return list(obj)
[docs]
@staticmethod
def loads(obj,name=None):
return set(obj)
# do some dataclass json conversions
[docs]
class var_converter:
[docs]
@staticmethod
def dumps(obj):
return obj.__dict__
[docs]
@staticmethod
def loads(obj,name=None):
ret = getattr(dataclasses,name)()
for k in obj:
setattr(ret,k,obj[k])
return ret
# convert the IFace
[docs]
class iface_converter:
[docs]
@staticmethod
def dumps(obj):
return obj.__dict__
[docs]
@staticmethod
def loads(obj,name=None):
ret = util.IFace()
for k in obj:
setattr(ret,k,obj[k])
return ret
# do some default conversions
# for things like OrderedDict
[docs]
class repr_converter:
[docs]
@staticmethod
def dumps(obj):
return repr(obj)
[docs]
@staticmethod
def loads(obj,name=None):
parts = obj.split('(',1)
type = parts[0]
if type not in globals():
raise Exception()
parts2 = parts[1].rsplit(')',1)
args = ast.literal_eval(parts2[0])
if isinstance(args,tuple):
ret = globals()['type'](*args)
else:
ret = globals()['type'](args)
return ret
JSONConverters = {
'datetime':datetime_converter,
'date':date_converter,
'time':time_converter,
'binary':binary_converter,
'bytearray':bytearray_converter,
'OrderedDict':repr_converter,
'set':set_converter,
'IFace':iface_converter,
}
for k in dict(inspect.getmembers(dataclasses,inspect.isclass)):
JSONConverters[k] = var_converter
[docs]
def objToJSON(obj):
if isinstance(obj,(dict,list,tuple,str,int,float,bool)) or obj is None:
return obj
else:
name = obj.__class__.__name__
if name in JSONConverters:
return {'__jsonclass__':[name,JSONConverters[name].dumps(obj)]}
else:
logger.error('name: %s, obj: %r', name, obj)
raise Exception('Cannot encode %s class to JSON'%name)
[docs]
def JSONToObj(obj):
ret = obj
if isinstance(obj,dict) and '__jsonclass__' in obj:
logger.info('try unpacking class')
try:
name = obj['__jsonclass__'][0]
if name not in JSONConverters:
raise Exception('class %r not found in converters'%name)
obj_repr = obj['__jsonclass__'][1]
ret = JSONConverters[name].loads(obj_repr,name=name)
except Exception as e:
logger.warning('error making json class: %r',e,exc_info=True)
return ret
# copied from tornado.escape so we don't have to include that project
[docs]
def recursive_unicode(obj):
"""Walks a simple data structure, converting byte strings to unicode.
Supports lists, tuples, sets, and dictionaries.
"""
if isinstance(obj, dict):
return {recursive_unicode(k): recursive_unicode(obj[k]) for k in obj}
elif isinstance(obj, set):
return {recursive_unicode(i) for i in obj}
elif isinstance(obj, list):
return [recursive_unicode(i) for i in obj]
elif isinstance(obj, tuple):
return tuple(recursive_unicode(i) for i in obj)
elif isinstance(obj, bytes):
return obj.decode("utf-8")
else:
return obj
[docs]
def json_encode(value, indent=None):
"""JSON-encodes the given Python object."""
return json.dumps(recursive_unicode(value),default=objToJSON,separators=(',',':'), indent=indent).replace("</", "<\\/")
[docs]
def json_decode(value):
"""Returns Python objects for the given JSON string."""
return json.loads(value,object_hook=JSONToObj)