"""Analysis cache objects for csky.
This module provides caching of key objects required for virtually all analyses. These
include background space parameterizations, signal acceptance parameterizations, and
energy PDF ratio models.
"""
from __future__ import print_function, absolute_import
import abc
from abc import ABCMeta, abstractmethod
import copy
import inspect
try:
from itertools import izip
except ImportError:
izip = zip
import os
import re
import sys
import numpy as np
from . import selections, inj, pdf, llh, utils, hyp
import healpy as hp
[docs]
class SubAnalysis(object):
"""Analysis cache for a single dataset (i.e. "season").
This class holds references to the background space parameterizations, signal
acceptance parameterizations, and energy PDF ratio models for a single dataset. These
can be saved to and loaded from disk. Parameterizations for custom signal fluxes can
also be requested on-demand, in which case these will be cached during execution but
not included in the on-disk representation.
These should not be constructed directly; instead, see :meth:`csky.conf.get_analysis`.
See :mod:`csky.selections` for pre-defined :class:`csky.selections.DataSpec` options.
"""
[docs]
def __init__(self, repo, spec,
src=[],mask_plane=False,gp_width=np.radians(5),mask_radius=0.,src_tomask=[],
dir='',
gammas=np.r_[1:4+1e-5:2**-4],
energy_kw={}, acc_kw={}, space_bg_kw={},
min_sigma=np.radians(0.2),
compress=True, load_sig=True, mmap=True,
energy_pdf_ratio_model_cls=pdf.EnergyPDFRatioModel,
acc_param_cls=pdf.SinDecAccParameterization,
bg_space_param_cls=pdf.BgSinDecParameterization):
"""Construct a SubAnalysis.
Args:
repo (:class:`csky.selections.Repository`): the repository for loading the data
spec (:class:`csky.selections.DataSpec`): the dataset specification
src (list of :class:`csky.utils.Sources`): if given, sources whose [t_start,t_stop]
time windows must be excluded from background parameterizations
dir (str): directory from which to load cached PDF parameterizations
gammas (array): gammas to parameterize in energy PDFs and signal acceptance
energy_kw (mapping): extra kwargs for the ``energy_pdf_ratio_model``
acc_kw (mapping): extra kwargs for the ``acc_param``
space_bg_kw (mapping): extra kwargs for the ``bg_space_param``
min_sigma (float): minimum ``sigma`` (angular uncertainty estimate) to allow
compress (bool): whether to compress the dataset (drop unneeded columns and use
float32 where possible)
load_sig (bool): whether to load signal MC (set to False to save RAM in
background trial jobs)
mmap (bool): whether to use memmap disk reading (should reduce peak RAM usage)
energy_pdf_ratio_model_cls (type): class to use for ``energy_pdf_ratio_model``
acc_param_cls (type): class to use for ``acc_param``
bg_space_param_cls (type): class to use for ``bg_space_param_cls``
gp_width (float) : size of galactic plane gp_width to mask in radians.
"""
self.repo = repo
self.spec = spec
self.key = key = spec.key
self.version = spec.version
print('Setting up {}...'.format(key))
self.ds = ds = spec.load(repo, compress=compress, load_sig=load_sig, mmap=mmap)
ds.apply_min_sigma(min_sigma)
self.data, self.sig = data, sig = ds.data, ds.sig
self.mask_plane, self.gp_width = mask_plane, gp_width
if mask_radius!=0. or mask_plane==True:
self.bg_data, self.on_time_intervals, self.dOmega_corr = self._set_bg_data(spec, src, mask_plane=mask_plane, gp_width=gp_width, mask_radius=mask_radius, src_tomask=src_tomask)
else:
self.dOmega_corr = None
self.bg_data, self.on_time_intervals = self._set_bg_data(spec, src, mask_plane=mask_plane, mask_radius=mask_radius)
self.livetime, self.range_sindec = ds.livetime, ds.range_sindec
self.grl = ds.grl
self.bg_livetime = self.livetime - 86400*np.sum(
[t_stop-t_start for t_start, t_stop in self.on_time_intervals])
self.n_data, self.n_bg_data = len(data), len(self.bg_data)
if 'mjd' in data:
self.mjd_min, self.mjd_max = np.min(data.mjd), np.max(data.mjd)
else:
self.mjd_min = self.mjd_max = None
self.custom_energy_models = {}
self.custom_energy_models_origin = {}
self.custom_acc_params = {}
# TODO: is this the right place to call init_from_file()?
if os.path.isfile(self.cache_filename(dir)):
self.init_from_file(dir, src)
return
self.spec = spec = spec() if inspect.isclass(spec) else spec
self.gammas = gammas
print('Energy PDF Ratio Model...')
hkw = dict(bins=(spec.kw_energy['bins_sindec'], spec.kw_energy['bins_logenergy']))
self.kw_energy = kw_energy = {}
kw_energy['hkw'] = hkw
kw_energy['gammas'] = gammas
kw_energy['bg_mc_weight'] = spec.kw_energy.get('bg_mc_weight', '')
kw_energy.update(energy_kw)
self.energy_pdf_ratio_model = energy_pdf_ratio_model_cls(self.bg_data, sig, **kw_energy)
self.energy_pdf_ratio_model_origin = copy.deepcopy(self.energy_pdf_ratio_model)
print('Signal Acceptance Model...')
hkw = dict(bins=spec.bins_sindec)
self.kw_acc = kw_acc = {}
kw_acc['hkw'] = hkw
kw_acc['gammas'] = gammas
kw_acc.update(acc_kw)
self.acc_param = acc_param_cls(sig, **kw_acc)
self.kw_space_bg = kw_space_bg = dict(hkw=hkw)
kw_space_bg.update(space_bg_kw)
if mask_radius!=0. or mask_plane==True:
# let's add a safety check to not accidentally use a bg-space
# parameterization that does not handle source masking properly.
# The explicit passing of `dOmega_corr` should already take care
# of this, but maybe better to be safe than sorry.
if bg_space_param_cls not in [pdf.BgSinDecParameterization]:
msg = 'The bg-space PDF "{}" does not handle source masking!'
raise NotImplementedError(msg.format(bg_space_param_cls))
self.bg_space_param = bg_space_param_cls(
self.bg_data, dOmega_corr=self.dOmega_corr, **kw_space_bg)
elif 'bg_mc_weight' in kw_space_bg:
print("using mc bg")
self.bg_space_param = bg_space_param_cls(
self.sig, **kw_space_bg)
else:
print("not applying any masking")
self.bg_space_param = bg_space_param_cls(
self.bg_data, **kw_space_bg)
self.bg_space_param_origin = copy.deepcopy(self.bg_space_param)
def __repr__(self):
# TODO: make more informative?
return 'SubAnalysis(key={})'.format(self.key)
def cache_filename(self, dir):
if self.version == 'current':
version_str = ''
else:
version_str = '.{}'.format(self.version)
return '{}/{}.subanalysis{}.npy'.format(dir, self.key, version_str)
def save(self, dir):
state = dict(
gammas=self.gammas,
kw_acc=self.kw_acc,
kw_energy=self.kw_energy,
kw_space_bg=self.kw_space_bg,
energy_pdf_ratio_model=self.energy_pdf_ratio_model,
acc_param=self.acc_param,
bg_space_param=self.bg_space_param,
)
filename = self.cache_filename(dir)
print('->', filename, '...', end='')
sys.stdout.flush()
np.save(filename, state)
print('\r->', filename, ' ')
sys.stdout.flush()
def init_from_file(self, dir, src=[]):
# load dataset
repo, spec = self.repo, self.spec
#self.key = key = spec.key
#self.ds = ds = spec.load(repo)
#self.data, self.sig = data, sig = ds.data, ds.sig
#self.bg_data, self.on_time_intervals = self._set_bg_data(src)
#self.livetime, self.range_sindec = ds.livetime, ds.range_sindec
#self.grl = ds.grl
#self.bg_livetime = self.livetime - 86400*np.sum(
# [t_stop-t_start for t_start, t_stop in self.on_time_intervals])
# load PDFs etc from cache
filename = self.cache_filename(dir)
print('<-', filename, '...', end='')
sys.stdout.flush()
try:
state = np.load(filename, allow_pickle=True)[()]
except:
state = np.load(filename, allow_pickle=True, encoding='latin1')[()]
print('\r<-', filename, ' ')
sys.stdout.flush()
self.acc_param = state['acc_param']
self.energy_pdf_ratio_model = state['energy_pdf_ratio_model']
self.gammas = state['gammas']
self.kw_acc = state['kw_acc']
self.kw_energy = state['kw_energy']
self.kw_space_bg = state['kw_space_bg']
self.bg_space_param = state['bg_space_param']
self.bg_space_param_origin = state['bg_space_param']
def energy_pdf_ratio_model(self):
return self.energy_pdf_ratio_model
def get_custom_flux_energy_pdf_ratio_model(self, flux):
fluxs = np.atleast_1d(flux)
kw = copy.deepcopy(self.kw_energy)
kw.pop('gammas')
#when have a list of custom flux, loop over all since they all different
for flux in fluxs:
models=[]
if flux in self.custom_energy_models:
models.append(self.custom_energy_models[flux])
if len(models)>0:
if len(fluxs) != len(models):
print("WARNING: length mismatching, make sure it is intended: input flux:", fluxs, "; returning models: ", models)
#print("already pre-loaded fluxes: ", models)
return models
for flux in fluxs:
energy_model = pdf.CustomFluxEnergyPDFRatioModel(
self.bg_data, self.sig, flux, **kw)
self.custom_energy_models[flux] = energy_model
self.custom_energy_models_origin = copy.deepcopy(self.custom_energy_models)
return energy_model
@property
def custom_flux_acc_kw(self):
kw = copy.deepcopy(self.kw_acc)
kw.pop('gammas')
skw = kw.setdefault('skw', {})
skw.pop('ky', 0)
skw.setdefault('k', skw.pop('kx', 2))
return kw
def get_custom_flux_acc_parameterization(self, flux):
if flux in self.custom_acc_params:
return self.custom_acc_params[flux]
kw = self.custom_flux_acc_kw
acc_model = pdf.SinDecCustomFluxAccParameterization(self.sig, flux, **kw)
self.custom_acc_params[flux] = acc_model
return acc_model
def _set_bg_data(self, spec, src, mask_radius, mask_plane, gp_width = np.radians(5), src_tomask=[]):
data = self.data
if ( all(var in src for var in ["t_start", "t_stop"]) ):
on_time_intervals = utils.union_continuous_intervals(zip(src.t_start, src.t_stop))
on_time_masks = [(data.mjd>t_start) & (data.mjd<t_stop) for t_start, t_stop in on_time_intervals]
if ( len(on_time_masks)>1 ):
on_time_mask = np.any(on_time_masks, axis=0)
else:
on_time_mask = on_time_masks[0]
idx_bg = np.where(~on_time_mask)[0]
bg_data = data[idx_bg]
if src and mask_radius!=0.:
print("masking sources only: ", src)
#calculate masked radius and vertex of the center of the masked region
radius = mask_radius #in radians
center = hp.ang2vec(np.pi/2-src.dec,src.ra)
masked_pixels = hp.query_disc(512,center[0],radius)
#find healpy pixels that lie within each dec band
bin_edges = np.arcsin(spec.bins_sindec)
total_pixels = 0
dOmega_corr = []
for i in np.arange(len(bin_edges)-1):
pixels_in_band = hp.query_strip(512,np.pi/2-bin_edges[i+1],np.pi/2-bin_edges[i])
bool_array = np.isin(pixels_in_band,masked_pixels)
number_true = np.count_nonzero(bool_array)
total_pixels += number_true
corr = float(len(pixels_in_band)/float((len(pixels_in_band)-number_true)))
dOmega_corr.append(corr)
#compute mask for data array
for i in np.arange(len(src)):
data_pix = hp.ang2pix(512,np.pi/2-data.dec,data.ra)
mask = []
for pix in data_pix:
if pix in masked_pixels:
mask.append(False)
else:
mask.append(True)
bg_data = data[mask]
on_time_intervals = []
elif mask_plane==True or src_tomask:
print("masking GP plane and chosen sources")
#get empty mask
nside = 512
npix = hp.nside2npix(nside)
pixels = range(npix)
mask_plane_map = np.zeros(npix)
#get pixels/angles/coordinates
(th, ph) = hp.pix2ang(nside, pixels) # healpy theta (th) and phi (ph) for each pixel [radians]
r = hp.Rotator(coord=['C','G']) # rotator for equatorial to galactic
throt, phrot = r(th, ph) # rotated theta (throt) and phi (phrot) for each pixel [radians]
dec = np.pi/2 - th # declination of each pixel [radians]
ra = ph # right ascension of each pixel [radians
glat = np.pi/2 - throt # galactic latitude of each pixel [radians]
glon = phrot # galactic longitude of each pixel [radians]
#get mask of galactic plane
if mask_plane:
mask_plane_map[(glat >= -1*gp_width) & (glat <= gp_width)] = 1
mask_bool = mask_plane_map.astype(bool)
#get masked pixels
masked_pixs = []
for i in np.arange(0,len(mask_bool)):
if mask_bool[i]==True:
masked_pixs.append(i)
else:
continue
masked_pixels = np.array(masked_pixs)
if src_tomask:
for src in src_tomask:
center = hp.ang2vec(np.pi/2-src.dec,src.ra)
tmp = hp.query_disc(nside, center[0], mask_radius)
masked_pixels = np.append(masked_pixels,tmp)
masked_pixels = np.unique(masked_pixels)
masked_pixels = np.sort(masked_pixels)
#find healpy pixels that lie within each dec band
bin_edges = np.arcsin(spec.bins_sindec)
total_pixels = 0
dOmega_corr = []
print("masking for bkg pdf...\n")
for i in np.arange(len(bin_edges)-1):
pixels_in_band = hp.query_strip(nside, np.pi/2-bin_edges[i+1], np.pi/2-bin_edges[i])
matched = np.intersect1d(pixels_in_band,masked_pixels)
total_pixels += len(matched)
corr = float(len(pixels_in_band)/float((len(pixels_in_band)-len(matched))))
dOmega_corr.append(corr)
#compute mask for data array
data_pix = hp.ang2pix(nside,np.pi/2-data.dec,data.ra)
mask = []
for pix in data_pix:
if pix in masked_pixels:
mask.append(False)
else:
mask.append(True)
bg_data = data[mask]
on_time_intervals = []
else:
bg_data = data
on_time_intervals = []
dOmega_corr = np.ones(len(spec.bins_sindec)-1)
if mask_radius!=0. or mask_plane==True:
self.bin_edges = bin_edges
self.dOmega_corr = dOmega_corr
return bg_data, on_time_intervals, dOmega_corr
else:
return bg_data, on_time_intervals
@property
def plot_key(self):
return re.sub('_', r'\_', self.key)
[docs]
class Analysis(object):
"""Analysis cache for all datasets (i.e. seasons) used in an analysis.
This class manages an ensemble of :class:`SubAnalysis` instances so they can be
treated as a unit when producing a :class:`csky.trial.TrialRunner` in order to perform
analysis tasks.
These should not be constructed directly; instead, see :meth:`csky.conf.get_analysis`.
"""
[docs]
def __init__(self, repo, specs, load_sig=True, **kw):
specs = [spec() if inspect.isclass(spec) else spec for spec in specs]
quiet = kw.pop('_quiet', False)
if not quiet:
print('Setting up Analysis for:')
print(', '.join([spec.key for spec in specs]))
self.repo = repo
self.anas = anas = []
for spec in specs:
if isinstance(spec, SubAnalysis):
self.anas.append(spec)
else:
anas.append(SubAnalysis(repo, spec, load_sig=load_sig, **kw))
self.keys = [ana.key for ana in anas]
self.plot_keys = [ana.plot_key for ana in anas]
self.anasd = {ana.key: ana for ana in anas}
self.has_sig = load_sig
mins = [a.mjd_min for a in anas if a.mjd_min is not None]
maxs = [a.mjd_max for a in anas if a.mjd_max is not None]
self.mjd_min = min(mins) if mins else None
self.mjd_max = max(maxs) if maxs else None
if not quiet:
print('Done.')
def __repr__(self):
# TODO: make this more informative?
return 'Analysis(keys=[{}])'.format(', '.join(map(str,self.keys)))
def save(self, dir):
for a in self:
a.save(dir)
def __len__(self):
return len(self.keys)
def __iter__(self):
return (a for a in self.anas)
def __getitem__(self, key):
if not isinstance(key, slice):
for (i,a) in enumerate(self):
if key in (i, i - len(self), a.key):
return a
raise KeyError('SubAnalysis not found: {}'.format(key))
else:
return self.only(self.keys[key])
def only(self, keys):
anas = [self[key] for key in keys]
return Analysis(self.repo, anas, _quiet=True)