Source code for iceprod.server

"""
Some general functions used by the iceprod server
"""

from __future__ import absolute_import, division, print_function

import os
import sys
import logging
from pkgutil import get_loader
import importlib
import subprocess


[docs] def find_module_recursive(name, path=None): """ Recursively search for submodule. Submodules must be separated with a '.' """ import imp res = None for x in name.split('.'): res = imp.find_module(x, path) path = [res[1]] return res
[docs] def listmodules(package_name=''): """List modules in a package or directory""" file, pathname, description = find_module_recursive(package_name) if file: # Not a package return [] ret = [] for module in os.listdir(pathname): if module.endswith('.py') and module != '__init__.py': tmp = os.path.splitext(module)[0] ret.append(package_name+'.'+tmp) return ret
[docs] def run_module(name,*args,**kwargs): """Import and start the module""" class_name = name.rsplit('.',1)[1] x = importlib.import_module(name) return (getattr(x,class_name))(*args,**kwargs)
[docs] class GlobalID(object): """Global ID configuration and generation""" import string # never change these settings, otherwise all old ids will fail CHARS = string.ascii_letters+string.digits CHARS_LEN = len(CHARS) # define dict to make reverse lookup super fast INTS_DICT = {c:i for i,c in enumerate(CHARS)} IDLEN = 15 MAXSITEID = 10**10 MAXLOCALID = 10**15
[docs] @classmethod def int2char(cls,i): if not isinstance(i,int) or i < 0: # only deal with positive ints logging.warning('bad input to int2char: %r',i) raise Exception('bad input to int2char') out = '' while i >= 0: out += cls.CHARS[i%cls.CHARS_LEN] i = i//cls.CHARS_LEN - 1 return out[::-1]
[docs] @classmethod def char2int(cls,c): if not isinstance(c,str) or len(c) < 1: # only deal with string logging.warning('bad input to char2int: %r',c) raise Exception('bad input to char2int') out = -1 for i,cc in enumerate(reversed(c)): if cc not in cls.CHARS: raise Exception('non-char input to chars2int') out += (cls.INTS_DICT[cc]+1)*(cls.CHARS_LEN**i) return out
[docs] @classmethod def siteID_gen(cls): """Generate a new site id""" import random return cls.int2char(random.randint(0,cls.MAXSITEID-1))
[docs] @classmethod def globalID_gen(cls,id,site_id): """Generate a new global id given a local id and site id""" if isinstance(id,str): id = cls.char2int(id) elif not isinstance(id,int): raise Exception('id is not a string, int, or long') if isinstance(site_id,str): return cls.int2char(cls.char2int(site_id)*cls.MAXLOCALID+id) elif isinstance(site_id,int): return cls.int2char(site_id*cls.MAXLOCALID+id) else: raise Exception('Site id is not a string, int, or long')
[docs] @classmethod def localID_ret(cls,id,type='str'): """Retrieve a local id from a global id""" ret = cls.char2int(id) % cls.MAXLOCALID if type == 'str': ret = cls.int2char(ret) return ret
[docs] @classmethod def siteID_ret(cls,id,type='str'): """Retrieve a site id from a global id""" ret = cls.char2int(id) // cls.MAXLOCALID if type == 'str': ret = cls.int2char(ret) return ret
[docs] def salt(length=2): """Returns a string of random letters""" import string import random letters = string.ascii_letters+string.digits return ''.join([random.SystemRandom().choice(letters) for _ in range(length)])
[docs] class KwargConfig(object): """A way to validate kwargs passed in to a class""" def __init__(self): # defaults self._cfg = {} self._cfg_types = {}
[docs] def validate(self,kwargs): # setup cfg variables for s in kwargs.keys(): v = kwargs[s] if not isinstance(s,str): raise Exception('parameter name %s is not a string'%(str(s))) if s not in self._cfg: logging.warning('%s is not a valid arg',s) continue t = self._cfg_types[s] if t in ('str','file','dir'): if not isinstance(v,str): raise Exception('%s is not a string'%(str(s))) if t in ('file','dir'): v = os.path.expanduser(os.path.expandvars(v)) if t == 'file' and not ('_file' in s or '_log' in s): try: os.path.exists(v) except Exception: raise Exception('parameter %s with filepath %s does not exist'%(s,v)) elif t == 'int': if not isinstance(v,int): raise Exception('%s is not an int'%(str(s))) elif t == 'float': if not isinstance(v,float): raise Exception('%s is not a float'%(str(s))) else: raise Exception('%s has an unknown type'%(str(s))) self._cfg[s] = v # make directories for c in self._cfg_types: if self._cfg_types[c] == 'file': d = os.path.dirname(self._cfg[c]) if not os.path.isdir(d): os.makedirs(d) if self._cfg_types[c] == 'dir': d = self._cfg[c] if not os.path.isdir(d): os.makedirs(d)
[docs] def get_pkgdata_filename(package, resource): """Get a filename for a resource bundled within the package""" loader = get_loader(package) if loader is None or not hasattr(loader, 'get_data'): return None mod = sys.modules.get(package) or loader.load_module(package) if mod is None or not hasattr(mod, '__file__'): return None # Modify the resource name to be compatible with the loader.get_data # signature - an os.path format "filename" starting with the dirname of # the package's __file__ parts = resource.split('/') parts.insert(0, os.path.dirname(mod.__file__)) return os.path.join(*parts)
[docs] def get_pkg_binary(package, binary): """Try finding the binary path based on the python package""" try: loader = get_loader(package) f = loader.get_filename() while f and 'lib' in f: f = os.path.dirname(f) filepath = os.path.join(f,'bin',binary) if os.path.exists(filepath): return filepath filepath = os.path.join(f,'sbin',binary) if os.path.exists(filepath): return filepath except Exception: pass # try going up from sys.argv[0] try: f = os.path.abspath(sys.argv[0]) while f and 'iceprod' in f: filepath = os.path.join(f,'bin',binary) if os.path.exists(filepath): return filepath f = os.path.dirname(f) filepath = os.path.join(f,'bin',binary) if os.path.exists(filepath): return filepath except Exception: pass # try just asking the shell try: filepath = subprocess.check_output(["which",binary]).decode('utf-8').strip('\n') if os.path.exists(filepath): return filepath except Exception: pass return None