Source code for csky.utils

# utils.py

"""
Utility classes and functions.
"""

from __future__ import print_function
import sys
import numpy as np
_PY2 = sys.version_info[0] == 2

if _PY2:
    _str_types = (str, unicode, np.string_)
else:
    _str_types = (str, np.string_)


try:
    from astropy.table import Table
except:
    Table = None
try:
    xrange
except NameError:
    xrange = range
try:
    from pandas import DataFrame
except:
    DataFrame = None
import os
try:
    from os import errno
except:
    import errno
import healpy as hp
pi = np.pi

from . import coord, cext

try:
    from icecube import astro
except:
    astro = None


[docs] class Arrays(object): """Optimized DataFrame-like data structure. This is the baseline data structure for all event data in csky. It acts a lot like a pandas DataFrame, and can be converted to one with the ``.as_dataframe`` property. Conversion to astropy Table and to numpy structured array are supported by the ``.as_table`` and ``.as_array`` properties. Each stored array is treated as a column and can be accessed like, e.g., ``data['ra']`` **or** ``data.ra``. Assignment of new columns is **only** available by the [] syntax, e.g. ``data['something_else'] = some_new_array``. Additional dictionary-like access is possible through the ``.keys()``, ``.values()``, and ``.items()`` methods. When the [] operand is not a string, slicing/masking is performed, e.g. ``src_0_1_2 = srcs[:3]`` or ``north_data = data[data.dec > 0]``. Iteration over an ``Arrays`` yields per-row instances, that is, new ``Arrays`` instances with column lengths of 1. This class is easier than DataFrames to use for computationally-intense work because it does very little to impose and maintain column alignment. This means it can expose direct references to the underlying numpy arrays rather than routing through some kind of Series objects. """
[docs] def __init__(self, init={}, names=None, convert=False, _require_array=True, **kw): """ Construct an Arrays collection. Args: init (``numpy.ndarray`` or object with ``keys()`` or ``._dict.keys()`` method): the data names (list of str): the column names convert (bool): if True, add references converting from skylab to csky naming convention """ self._dict = _dict = {} # select columns if names is None: if isinstance(init, np.ndarray): names = init.dtype.names else: try: names = init.keys() except: try: names = init._dict.keys() except: raise TypeError('could not deduce column names') names = [name for name in names if name != 'meta'] # load data names = sorted(list(names)) for name in names: self[name] = init[name] for name in sorted(kw): self[name] = kw[name] names += sorted(kw) if convert: if 'mjd' not in names and 'time' in names: self['mjd'] = self.time if 'dec' not in names: if 'sinDec' in names: self['dec'] = np.arcsin(self.sinDec) elif 'zenith' in names and 'azimuth' in names: if astro is None: raise RuntimeError( '`astro` was not successfully loaded; conversion not possible') self['ra'], self['dec'] = astro.dir_to_equa( self.zenith, self.azimuth, self.mjd) if 'energy' not in names and 'logE' in names: self['energy'] = 10**self.logE if 'true_ra' not in names and 'trueRa' in names: self['true_ra'] = self.trueRa if 'true_dec' not in names and 'trueDec' in names: self['true_dec'] = self.trueDec if 'true_energy' not in names and 'trueE' in names: self['true_energy'] = self.trueE if 'oneweight' not in names and 'ow' in names: self['oneweight'] = self.ow #if 'dist' not in names: # self['dist'] = np.zeros_like(self.dec) if 'true_dec' in self._dict: xdec, xra = coord.rotate_source_to_xaxis( self.true_dec, self.true_ra, self.dec, self.ra, latlon=True ) xra[xra < pi] += 2*pi xra[xra > pi] -= 2*pi self['xra'] = xra self['xdec'] = xdec self['dpsi'] = np.sqrt(xra**2 + xdec**2) if 'azimuth' not in names and 'ra' in self and 'mjd' in self: self['azimuth'] = coord.switch_ra_azimuth(self.ra, self.mjd) if 'sigma' not in names and 'angErr' in self: self['sigma'] = self.angErr for k in 'sinDec logE trueRa trueDec trueE ow'.split(): if k in self: del self[k] if _require_array: for k in self.keys(): self[k] = np.atleast_1d(self[k]) self.check_columns()
def check_columns(self): # check column lengths if not self._dict: return N = len(self._dict[self.keys()[0]]) for key in self.keys()[1:]: if len(self._dict[key]) != N: raise ValueError('mismatched column length: `{}`'.format(key)) def _set(self, k, v, _require_array=True): if _require_array: v = np.asarray(v) try: setattr(self, k, v) except: raise AttributeError("can't set attribute: {}".format(k)) self._dict[k] = v def _unset(self, k): del self._dict[k] del self.__dict__[k] def compress(self, keep=[], keep32=[], from_mmap=False): for (key, a) in self.items(): if key in keep: if from_mmap: self[key] = a.copy() elif key in keep32: self[key] = a.astype(np.float32) else: del self[key] try: temp = type(self) (self.as_array) self._dict = {} for (key, a) in temp.items(): self[key] = a except ImportError: return self def _check_astropy(self, prop): if Table is None: raise ImportError('{} requires astropy, which could not be imported'.format(prop)) def keys(self): keys = sorted(self._dict) out = [key for key in keys if 'true_' not in key and key != 'oneweight' ] out += [key for key in keys if 'true_' in key] if 'oneweight' in keys: out += ['oneweight'] return out def values(self): return (self[key] for key in self.keys()) def items(self): return ((key, self[key]) for key in self.keys()) @property def as_table(self): self._check_astropy('as_table') return Table(self._dict) @property def as_array(self): self._check_astropy('as_array') return Table(self._dict).as_array() @property def as_dataframe(self): if DataFrame is None: raise ImportError('as_dataframe requires pandas, which could not be imported') return DataFrame(self._dict) def __contains__(self, k): return k in self._dict def __delitem__(self, k): self._unset(k) def __getitem__(self, k): if type(k) in _str_types: return self._dict[k] else: return self._subsample(k) def __setitem__(self, k, v): self._set(k, v) def __len__(self): return len(self._dict[self.keys()[0]]) def __repr__(self): out = '{}({} items | columns: {})'.format( self.__class__.__name__, len(self), ', '.join(self.keys())) return out def __str__(self): return repr(self) def __add__(self, other): assert sorted(self._dict) == sorted(other._dict) init = {} for k in sorted(self._dict): init[k] = np.concatenate((self[k], other[k])) return type(self) (init) def __iter__(self): for i in range(len(self)): yield self[i] def _subsample(self, cond): return type(self) ({k:self._dict[k][cond] for k in self.keys()}) @staticmethod def concatenate(arrays): for i in xrange(len(arrays) - 1): k1, k2 = arrays[i].keys(), arrays[i+1].keys() if arrays[i].keys() != arrays[i+1].keys(): raise ValueError('keys do not match:\n{}\nvs\n{}'.format(k1, k2)) init = {} keys = arrays[0].keys() for k in keys: init[k] = np.concatenate([a[k] for a in arrays]) out = type(arrays[0]) (init) return out
[docs] class Events(Arrays): """Subclass of :class:`Arrays` automatically providing certain event properties. Bonus properties include ``sindec``, ``log10energy``, ``ra_deg``, ``dec_deg``, ``sigma_deg``, and (for MC where true values are known) ``dpsi_deg``."""
[docs] def __init__(self, *a, **kw): super(Events, self).__init__(*a, **kw) if 'dec' in self: self['sindec'] = np.sin(self.dec) if 'energy' in self: self['log10energy'] = np.log10(self.energy) if 'azimuth' in self: self['azimuth'] = self.azimuth % (2*pi)
def __setitem__(self, x, v): si = super(Events, self).__setitem__ si(x, v) if isinstance(x, str): if x == 'dec': si('sindec', np.sin(self.dec)) elif x == 'sindec': si('dec', np.arcsin(self.sindec)) elif x == 'energy': si('log10energy', np.log10(self.energy)) elif x == 'log10energy': si('energy', 10**self.log10energy) elif x == 'azimuth': si('azimuth', self.azimuth % (2*pi)) def compress(self, keep=[], keep32=[]): if 'dec' in keep: keep.append('sindec') if 'dec' in keep32: keep32.append('sindec') if 'energy' in keep32: keep32.append('log10energy') if 'energy' in keep: keep.append('log10energy') return super(Events, self).compress(keep=keep, keep32=keep32) @property def dec_deg(self): return np.degrees(self.dec) @property def ra_deg(self): return np.degrees(self.ra) @property def dpsi_deg(self): return np.degrees(self.dpsi) @property def sigma_deg(self): return np.degrees(self.sigma)
[docs] class Sources(Arrays): """Subclass of :class:`Arrays` automatically providing certain source properties. Bonus properties include ``weight`` (normalization: sum to unity) and ``extension`` (zero by default). If ``deg=True`` on construction, then (ra,dec,extension) are converted from degrees to radians. """
[docs] def __init__(self, *a, **kw): kw.setdefault('_require_array', True) deg = kw.pop('deg', False) super(Sources, self).__init__(*a, **kw) if 'weight' not in self: self['weight'] = np.ones(len(self)) / len(self) if 'extension' not in self: self['extension'] = np.zeros(len(self)) if deg: self['ra'], self['dec'] = np.radians(self.ra), np.radians(self.dec) self['extension'] = np.radians(self.extension)
def __setitem__(self, x, v): si = super(Sources, self).__setitem__ si(x, v) if isinstance(x, str): if x == 'ra': si('ra_deg', np.degrees(self.ra)) elif x == 'dec': si('dec_deg', np.degrees(self.dec)) si('sindec', np.sin(self.dec)) elif x == 'extension': si('extension_deg', np.degrees(self.extension))
[docs] def sources(ra, dec, **kw): """Convenience wrapper for constructing :class:`Sources` instances.""" return Sources(ra=ra, dec=dec, **kw)
[docs] def ensure_dir(dirname): """Make sure ``dirname`` exists and is a directory.""" if not os.path.isdir(dirname): try: os.makedirs(dirname) # throws if exists as file except OSError as e: if e.errno != errno.EEXIST: raise return dirname
[docs] def get_pixval(map, ra, dec): """Get the value of a healpix map at specific equatorial coordinates. Args: map (ndarray): the healpix map ra (float or ndarray): the right ascension(s) dec (float or ndarray): the declination(s) Returns: float or ndarray: the value(s) """ nside = hp.get_nside(map) theta = pi/2 - dec idx = hp.ang2pix(nside, theta, ra) return map[idx]
[docs] def get_random(seed=None): """Get numpy RandomState.""" if seed is None: return np.random elif isinstance(seed, np.random.RandomState): return seed else: return np.random.RandomState(seed)
[docs] def get_pixmask(nside, events, radius=3): r""" Query disc of given radius around selected events. Args: events (np.ndarray): Array of data events nside (int): Resolution of the healpix map to use radius (float): Radius around each event which is used to select pixels Returns: src_pix (np.ndarray): list of pixels within 'radius' of the data events """ # Convert ra and dec of events to cartesian coordinates src_ra = events['ra'] src_dec = events['dec'] src_theta = np.pi / 2 - src_dec src_phi = src_ra src_x = np.cos(src_phi) * np.sin(src_theta) src_y = np.sin(src_phi) * np.sin(src_theta) src_z = np.cos(src_theta) # get pixels within given raidus of each event src_pix = np.empty(0) if 'sigma' in events: sigma = events['sigma'] elif 'extension' in events: sigma = events['extension'] for i in range(len (events)): src_pix = np.concatenate(( src_pix, hp.query_disc(nside, [src_x[i], src_y[i], src_z[i]], radius * events['sigma'][i]))) src_pix = np.unique(src_pix.astype(int)) return src_pix
[docs] def union_continuous_intervals(intervals): ''' Calculate the union of a given list of continuous intervals ''' sorted_intervals = sorted(intervals, key=lambda x: x[1]) union_sorted_intervals = [] for start, stop in sorted_intervals: if union_sorted_intervals and union_sorted_intervals[-1][1] >= start: union_sorted_intervals[-1][1] = max(union_sorted_intervals[-1][1], stop) else: union_sorted_intervals.append([start, stop]) return union_sorted_intervals
[docs] def intersection_time_intervals(intervals_1, intervals_2): ''' Calculate intersections between two sorted lists containing continuous, non-overlapping intervals ''' res = [] sorted_intervals_1 = sorted(intervals_1, key=lambda x: x[1]) sorted_intervals_2 = sorted(intervals_2, key=lambda x: x[1]) for start, stop in sorted_intervals_1: for start_2, stop_2 in sorted_intervals_2: if ( stop_2<start ): continue elif ( start_2<stop ): res.append( [max(start, start_2), min(stop, stop_2)] ) else: break return res
def get_frac_during_livetime_single(ana, t0, dt, cut_n_sigma, box_mode=None): t0 = t0[0] dt = dt[0] for a in ana.anas: mjd1, mjd2 = a.grl.start, a.grl.stop if box_mode is not None: if box_mode == 'center': t0 = t0 - 0.5 * dt elif box_mode == 'pre': t0 = t0 - dt if t0 + dt < mjd1[0] or mjd2[-1] < t0: return 0 else: return cext.sum_pdf_time_box(mjd1, mjd2, t0, dt) else: return cext.sum_pdf_time_normal(mjd1, mjd2, t0, dt, cut_n_sigma)