# utils.py
"""
Utility classes and functions.
"""
from __future__ import print_function
import sys
import numpy as np
_PY2 = sys.version_info[0] == 2
if _PY2:
_str_types = (str, unicode, np.string_)
else:
_str_types = (str, np.string_)
try:
from astropy.table import Table
except:
Table = None
try:
xrange
except NameError:
xrange = range
try:
from pandas import DataFrame
except:
DataFrame = None
import os
try:
from os import errno
except:
import errno
import healpy as hp
pi = np.pi
from . import coord, cext
try:
from icecube import astro
except:
astro = None
[docs]
class Arrays(object):
"""Optimized DataFrame-like data structure.
This is the baseline data structure for all event data in csky. It acts a lot like a
pandas DataFrame, and can be converted to one with the ``.as_dataframe`` property.
Conversion to astropy Table and to numpy structured array are supported by the
``.as_table`` and ``.as_array`` properties.
Each stored array is treated as a column and can be accessed like, e.g.,
``data['ra']`` **or** ``data.ra``. Assignment of new columns is **only** available by
the [] syntax, e.g. ``data['something_else'] = some_new_array``.
Additional dictionary-like access is possible through the ``.keys()``, ``.values()``,
and ``.items()`` methods.
When the [] operand is not a string, slicing/masking is performed, e.g. ``src_0_1_2 =
srcs[:3]`` or ``north_data = data[data.dec > 0]``. Iteration over an ``Arrays``
yields per-row instances, that is, new ``Arrays`` instances with column lengths of 1.
This class is easier than DataFrames to use for computationally-intense work because
it does very little to impose and maintain column alignment. This means it can expose
direct references to the underlying numpy arrays rather than routing through some kind
of Series objects.
"""
[docs]
def __init__(self, init={}, names=None, convert=False, _require_array=True, **kw):
"""
Construct an Arrays collection.
Args:
init (``numpy.ndarray`` or object with ``keys()`` or ``._dict.keys()`` method): the data
names (list of str): the column names
convert (bool): if True, add references converting from skylab to csky
naming convention
"""
self._dict = _dict = {}
# select columns
if names is None:
if isinstance(init, np.ndarray):
names = init.dtype.names
else:
try:
names = init.keys()
except:
try:
names = init._dict.keys()
except:
raise TypeError('could not deduce column names')
names = [name for name in names if name != 'meta']
# load data
names = sorted(list(names))
for name in names:
self[name] = init[name]
for name in sorted(kw):
self[name] = kw[name]
names += sorted(kw)
if convert:
if 'mjd' not in names and 'time' in names:
self['mjd'] = self.time
if 'dec' not in names:
if 'sinDec' in names:
self['dec'] = np.arcsin(self.sinDec)
elif 'zenith' in names and 'azimuth' in names:
if astro is None:
raise RuntimeError(
'`astro` was not successfully loaded; conversion not possible')
self['ra'], self['dec'] = astro.dir_to_equa(
self.zenith, self.azimuth, self.mjd)
if 'energy' not in names and 'logE' in names:
self['energy'] = 10**self.logE
if 'true_ra' not in names and 'trueRa' in names:
self['true_ra'] = self.trueRa
if 'true_dec' not in names and 'trueDec' in names:
self['true_dec'] = self.trueDec
if 'true_energy' not in names and 'trueE' in names:
self['true_energy'] = self.trueE
if 'oneweight' not in names and 'ow' in names:
self['oneweight'] = self.ow
#if 'dist' not in names:
# self['dist'] = np.zeros_like(self.dec)
if 'true_dec' in self._dict:
xdec, xra = coord.rotate_source_to_xaxis(
self.true_dec, self.true_ra, self.dec, self.ra, latlon=True
)
xra[xra < pi] += 2*pi
xra[xra > pi] -= 2*pi
self['xra'] = xra
self['xdec'] = xdec
self['dpsi'] = np.sqrt(xra**2 + xdec**2)
if 'azimuth' not in names and 'ra' in self and 'mjd' in self:
self['azimuth'] = coord.switch_ra_azimuth(self.ra, self.mjd)
if 'sigma' not in names and 'angErr' in self:
self['sigma'] = self.angErr
for k in 'sinDec logE trueRa trueDec trueE ow'.split():
if k in self:
del self[k]
if _require_array:
for k in self.keys():
self[k] = np.atleast_1d(self[k])
self.check_columns()
def check_columns(self):
# check column lengths
if not self._dict:
return
N = len(self._dict[self.keys()[0]])
for key in self.keys()[1:]:
if len(self._dict[key]) != N:
raise ValueError('mismatched column length: `{}`'.format(key))
def _set(self, k, v, _require_array=True):
if _require_array:
v = np.asarray(v)
try:
setattr(self, k, v)
except:
raise AttributeError("can't set attribute: {}".format(k))
self._dict[k] = v
def _unset(self, k):
del self._dict[k]
del self.__dict__[k]
def compress(self, keep=[], keep32=[], from_mmap=False):
for (key, a) in self.items():
if key in keep:
if from_mmap:
self[key] = a.copy()
elif key in keep32:
self[key] = a.astype(np.float32)
else:
del self[key]
try:
temp = type(self) (self.as_array)
self._dict = {}
for (key, a) in temp.items():
self[key] = a
except ImportError:
return self
def _check_astropy(self, prop):
if Table is None:
raise ImportError('{} requires astropy, which could not be imported'.format(prop))
def keys(self):
keys = sorted(self._dict)
out = [key for key in keys if 'true_' not in key and key != 'oneweight' ]
out += [key for key in keys if 'true_' in key]
if 'oneweight' in keys:
out += ['oneweight']
return out
def values(self):
return (self[key] for key in self.keys())
def items(self):
return ((key, self[key]) for key in self.keys())
@property
def as_table(self):
self._check_astropy('as_table')
return Table(self._dict)
@property
def as_array(self):
self._check_astropy('as_array')
return Table(self._dict).as_array()
@property
def as_dataframe(self):
if DataFrame is None:
raise ImportError('as_dataframe requires pandas, which could not be imported')
return DataFrame(self._dict)
def __contains__(self, k):
return k in self._dict
def __delitem__(self, k):
self._unset(k)
def __getitem__(self, k):
if type(k) in _str_types:
return self._dict[k]
else:
return self._subsample(k)
def __setitem__(self, k, v):
self._set(k, v)
def __len__(self):
return len(self._dict[self.keys()[0]])
def __repr__(self):
out = '{}({} items | columns: {})'.format(
self.__class__.__name__, len(self), ', '.join(self.keys()))
return out
def __str__(self):
return repr(self)
def __add__(self, other):
assert sorted(self._dict) == sorted(other._dict)
init = {}
for k in sorted(self._dict):
init[k] = np.concatenate((self[k], other[k]))
return type(self) (init)
def __iter__(self):
for i in range(len(self)):
yield self[i]
def _subsample(self, cond):
return type(self) ({k:self._dict[k][cond] for k in self.keys()})
@staticmethod
def concatenate(arrays):
for i in xrange(len(arrays) - 1):
k1, k2 = arrays[i].keys(), arrays[i+1].keys()
if arrays[i].keys() != arrays[i+1].keys():
raise ValueError('keys do not match:\n{}\nvs\n{}'.format(k1, k2))
init = {}
keys = arrays[0].keys()
for k in keys:
init[k] = np.concatenate([a[k] for a in arrays])
out = type(arrays[0]) (init)
return out
[docs]
class Events(Arrays):
"""Subclass of :class:`Arrays` automatically providing certain event properties.
Bonus properties include ``sindec``, ``log10energy``, ``ra_deg``, ``dec_deg``,
``sigma_deg``, and (for MC where true values are known) ``dpsi_deg``."""
[docs]
def __init__(self, *a, **kw):
super(Events, self).__init__(*a, **kw)
if 'dec' in self:
self['sindec'] = np.sin(self.dec)
if 'energy' in self:
self['log10energy'] = np.log10(self.energy)
if 'azimuth' in self:
self['azimuth'] = self.azimuth % (2*pi)
def __setitem__(self, x, v):
si = super(Events, self).__setitem__
si(x, v)
if isinstance(x, str):
if x == 'dec':
si('sindec', np.sin(self.dec))
elif x == 'sindec':
si('dec', np.arcsin(self.sindec))
elif x == 'energy':
si('log10energy', np.log10(self.energy))
elif x == 'log10energy':
si('energy', 10**self.log10energy)
elif x == 'azimuth':
si('azimuth', self.azimuth % (2*pi))
def compress(self, keep=[], keep32=[]):
if 'dec' in keep:
keep.append('sindec')
if 'dec' in keep32:
keep32.append('sindec')
if 'energy' in keep32:
keep32.append('log10energy')
if 'energy' in keep:
keep.append('log10energy')
return super(Events, self).compress(keep=keep, keep32=keep32)
@property
def dec_deg(self):
return np.degrees(self.dec)
@property
def ra_deg(self):
return np.degrees(self.ra)
@property
def dpsi_deg(self):
return np.degrees(self.dpsi)
@property
def sigma_deg(self):
return np.degrees(self.sigma)
[docs]
class Sources(Arrays):
"""Subclass of :class:`Arrays` automatically providing certain source properties.
Bonus properties include ``weight`` (normalization: sum to unity) and ``extension``
(zero by default). If ``deg=True`` on construction, then (ra,dec,extension) are
converted from degrees to radians.
"""
[docs]
def __init__(self, *a, **kw):
kw.setdefault('_require_array', True)
deg = kw.pop('deg', False)
super(Sources, self).__init__(*a, **kw)
if 'weight' not in self:
self['weight'] = np.ones(len(self)) / len(self)
if 'extension' not in self:
self['extension'] = np.zeros(len(self))
if deg:
self['ra'], self['dec'] = np.radians(self.ra), np.radians(self.dec)
self['extension'] = np.radians(self.extension)
def __setitem__(self, x, v):
si = super(Sources, self).__setitem__
si(x, v)
if isinstance(x, str):
if x == 'ra':
si('ra_deg', np.degrees(self.ra))
elif x == 'dec':
si('dec_deg', np.degrees(self.dec))
si('sindec', np.sin(self.dec))
elif x == 'extension':
si('extension_deg', np.degrees(self.extension))
[docs]
def sources(ra, dec, **kw):
"""Convenience wrapper for constructing :class:`Sources` instances."""
return Sources(ra=ra, dec=dec, **kw)
[docs]
def ensure_dir(dirname):
"""Make sure ``dirname`` exists and is a directory."""
if not os.path.isdir(dirname):
try:
os.makedirs(dirname) # throws if exists as file
except OSError as e:
if e.errno != errno.EEXIST:
raise
return dirname
[docs]
def get_pixval(map, ra, dec):
"""Get the value of a healpix map at specific equatorial coordinates.
Args:
map (ndarray): the healpix map
ra (float or ndarray): the right ascension(s)
dec (float or ndarray): the declination(s)
Returns:
float or ndarray: the value(s)
"""
nside = hp.get_nside(map)
theta = pi/2 - dec
idx = hp.ang2pix(nside, theta, ra)
return map[idx]
[docs]
def get_random(seed=None):
"""Get numpy RandomState."""
if seed is None:
return np.random
elif isinstance(seed, np.random.RandomState):
return seed
else:
return np.random.RandomState(seed)
[docs]
def get_pixmask(nside, events, radius=3):
r""" Query disc of given radius around selected events.
Args:
events (np.ndarray): Array of data events
nside (int): Resolution of the healpix map to use
radius (float): Radius around each event which is used to
select pixels
Returns:
src_pix (np.ndarray): list of pixels within 'radius' of the data events
"""
# Convert ra and dec of events to cartesian coordinates
src_ra = events['ra']
src_dec = events['dec']
src_theta = np.pi / 2 - src_dec
src_phi = src_ra
src_x = np.cos(src_phi) * np.sin(src_theta)
src_y = np.sin(src_phi) * np.sin(src_theta)
src_z = np.cos(src_theta)
# get pixels within given raidus of each event
src_pix = np.empty(0)
if 'sigma' in events:
sigma = events['sigma']
elif 'extension' in events:
sigma = events['extension']
for i in range(len (events)):
src_pix = np.concatenate((
src_pix,
hp.query_disc(nside, [src_x[i], src_y[i], src_z[i]], radius * events['sigma'][i])))
src_pix = np.unique(src_pix.astype(int))
return src_pix
[docs]
def union_continuous_intervals(intervals):
'''
Calculate the union of a given list of continuous intervals
'''
sorted_intervals = sorted(intervals, key=lambda x: x[1])
union_sorted_intervals = []
for start, stop in sorted_intervals:
if union_sorted_intervals and union_sorted_intervals[-1][1] >= start:
union_sorted_intervals[-1][1] = max(union_sorted_intervals[-1][1], stop)
else:
union_sorted_intervals.append([start, stop])
return union_sorted_intervals
[docs]
def intersection_time_intervals(intervals_1, intervals_2):
'''
Calculate intersections between two sorted lists containing continuous, non-overlapping intervals
'''
res = []
sorted_intervals_1 = sorted(intervals_1, key=lambda x: x[1])
sorted_intervals_2 = sorted(intervals_2, key=lambda x: x[1])
for start, stop in sorted_intervals_1:
for start_2, stop_2 in sorted_intervals_2:
if ( stop_2<start ):
continue
elif ( start_2<stop ):
res.append( [max(start, start_2), min(stop, stop_2)] )
else:
break
return res
def get_frac_during_livetime_single(ana, t0, dt, cut_n_sigma, box_mode=None):
t0 = t0[0]
dt = dt[0]
for a in ana.anas:
mjd1, mjd2 = a.grl.start, a.grl.stop
if box_mode is not None:
if box_mode == 'center':
t0 = t0 - 0.5 * dt
elif box_mode == 'pre':
t0 = t0 - dt
if t0 + dt < mjd1[0] or mjd2[-1] < t0:
return 0
else:
return cext.sum_pdf_time_box(mjd1, mjd2, t0, dt)
else:
return cext.sum_pdf_time_normal(mjd1, mjd2, t0, dt, cut_n_sigma)