Source code for iceprod.server.module

Interface for configuring modules

from __future__ import absolute_import, division, print_function

import logging

from statsd import TCPStatsClient
import requests

    import boto3
    import botocore.client
except ImportError:
    boto3 = None

from rest_tools.client import ClientCredentialsAuth

logger = logging.getLogger('module')

[docs] class FakeStatsClient(object): def __init__(self, *args, **kwargs): self._prefix = '' def __getattr__(self, name): def foo(*args, **kwargs): pass return foo
[docs] class StatsClientIgnoreErrors(object): def __init__(self, *args, **kwargs): kwargs['timeout'] = 0.1 self._statsclient = TCPStatsClient(*args, **kwargs) def __getattr__(self, name): def foo(*args, **kwargs): try: try: return getattr(self._statsclient, name)(*args, **kwargs) except BrokenPipeError: self._statsclient.reconnect() return getattr(self._statsclient, name)(*args, **kwargs) except Exception:'StatsClient dropped %s %r %r', name, args, kwargs, exc_info=True) return foo
[docs] class ElasticClient(object): def __init__(self, hostname, basename='iceprod'): self.session = requests.Session() # handle auth if '@' in hostname: self.session.auth = tuple(hostname.split('@')[0].split('://')[1].split(':')) hostname = hostname.split('://',1)[0]+'://'+hostname.split('@',1)[1] # try a connection r = self.session.get(hostname, timeout=5) r.raise_for_status() # concat hostname and basename self.hostname = hostname+'/'+basename+'_'
[docs] def head(self, name, index_name): try: r = self.session.head(self.hostname+name+'/item/'+index_name, timeout=5) r.raise_for_status() except Exception: return False else: return True
[docs] def get(self, name, index_name): r = self.session.get(self.hostname+name+'/item/'+index_name, timeout=5) r.raise_for_status() return r.json()
[docs] def post(self, name, index_name, data): r ='/item/'+index_name, timeout=5) r.raise_for_status() return r.json()
[docs] def put(self, name, index_name, data): r = None try: kwargs = {'timeout':5} if isinstance(data,dict): kwargs['json'] = data else: kwargs['data'] = data r = self.session.put(self.hostname+name+'/item/'+index_name, **kwargs) r.raise_for_status() except Exception: logger.warning('cannot put to elasticsearch: %s%s/%s', self.hostname, name, index_name, exc_info=True) if r:'%r',r.content)
[docs] class module(object): """ This is an abstract class representing a server module. :param cfg: An :class:`IceProdConfig`. :param executor: A :class:`concurrent.futures.ThreadPoolExecutor`. :param modules: A dict of other module's public services. """ def __init__(self, cfg, executor, modules): self.cfg = cfg self.executor = executor self.statsd = FakeStatsClient() self.elasticsearch = FakeStatsClient() self.s3 = None self.rest_client = None self.cred_client = None self.modules = modules self.service = {'start': self.start, 'stop': self.stop, 'kill': self.kill}
[docs] def start(self): """ Set up a module. """ logger.warning('starting module %s', self.__class__.__name__) if 'statsd' in self.cfg and self.cfg['statsd']: try: addr = self.cfg['statsd'] port = 8125 if ':' in addr: addr,port = addr.split(':') port = int(port) self.statsd = StatsClientIgnoreErrors( addr, port=port, prefix=self.cfg['site_id']+'.'+self.__class__.__name__ ) except Exception: logger.warning('failed to connect to statsd: %r', self.cfg['statsd'], exc_info=True) if 'elasticsearch' in self.cfg and self.cfg['elasticsearch']: try: self.elasticsearch = ElasticClient(self.cfg['elasticsearch']) except Exception: logger.warning('failed to connect to elasicsearch: %r', self.cfg['elasticsearch'], exc_info=True) if (boto3 and 's3' in self.cfg and 'access_key' in self.cfg['s3'] and 'secret_key' in self.cfg['s3']): try: self.s3 = boto3.client( 's3', 'us-east-1', aws_access_key_id=self.cfg['s3']['access_key'], aws_secret_access_key=self.cfg['s3']['secret_key'], config=botocore.client.Config(max_pool_connections=101) ) except Exception: logger.warning('failed to connect to s3: %r', self.cfg['s3'], exc_info=True) if ('rest_api' in self.cfg and 'url' in self.cfg['rest_api'] and 'oauth_url' in self.cfg['rest_api'] and 'oauth_client_id' in self.cfg['rest_api'] and 'oauth_client_secret' in self.cfg['rest_api']): try: self.rest_client = ClientCredentialsAuth( address=self.cfg['rest_api']['url'], token_url=self.cfg['rest_api']['oauth_url'], client_id=self.cfg['rest_api']['oauth_client_id'], client_secret=self.cfg['rest_api']['oauth_client_secret'], ) self.cred_client = ClientCredentialsAuth( address=self.cfg['rest_api']['cred_url'], token_url=self.cfg['rest_api']['oauth_url'], client_id=self.cfg['rest_api']['oauth_client_id'], client_secret=self.cfg['rest_api']['oauth_client_secret'], ) except Exception: logger.warning('failed to connect to rest api: %r', self.cfg['rest_api'].get('url',''), exc_info=True)
[docs] def stop(self): logger.warning('stopping module %s', self.__class__.__name__)
[docs] def kill(self): logger.warning('killing module %s', self.__class__.__name__)