Source code for csky.analysis



"""Analysis cache objects for csky.

This module provides caching of key objects required for virtually all analyses.  These
include background space parameterizations, signal acceptance parameterizations, and
energy PDF ratio models.
"""

from __future__ import print_function, absolute_import

import abc
from abc import ABCMeta, abstractmethod
import copy
import inspect
try:
    from itertools import izip
except ImportError:
    izip = zip
import os
import re
import sys

import numpy as np

from . import selections, inj, pdf, llh, utils, hyp
import healpy as hp

[docs] class SubAnalysis(object): """Analysis cache for a single dataset (i.e. "season"). This class holds references to the background space parameterizations, signal acceptance parameterizations, and energy PDF ratio models for a single dataset. These can be saved to and loaded from disk. Parameterizations for custom signal fluxes can also be requested on-demand, in which case these will be cached during execution but not included in the on-disk representation. These should not be constructed directly; instead, see :meth:`csky.conf.get_analysis`. See :mod:`csky.selections` for pre-defined :class:`csky.selections.DataSpec` options. """
[docs] def __init__(self, repo, spec, src=[],mask_plane=False,gp_width=np.radians(5),mask_radius=0.,src_tomask=[], dir='', gammas=np.r_[1:4+1e-5:2**-4], energy_kw={}, acc_kw={}, space_bg_kw={}, min_sigma=np.radians(0.2), compress=True, load_sig=True, mmap=True, energy_pdf_ratio_model_cls=pdf.EnergyPDFRatioModel, acc_param_cls=pdf.SinDecAccParameterization, bg_space_param_cls=pdf.BgSinDecParameterization): """Construct a SubAnalysis. Args: repo (:class:`csky.selections.Repository`): the repository for loading the data spec (:class:`csky.selections.DataSpec`): the dataset specification src (list of :class:`csky.utils.Sources`): if given, sources whose [t_start,t_stop] time windows must be excluded from background parameterizations dir (str): directory from which to load cached PDF parameterizations gammas (array): gammas to parameterize in energy PDFs and signal acceptance energy_kw (mapping): extra kwargs for the ``energy_pdf_ratio_model`` acc_kw (mapping): extra kwargs for the ``acc_param`` space_bg_kw (mapping): extra kwargs for the ``bg_space_param`` min_sigma (float): minimum ``sigma`` (angular uncertainty estimate) to allow compress (bool): whether to compress the dataset (drop unneeded columns and use float32 where possible) load_sig (bool): whether to load signal MC (set to False to save RAM in background trial jobs) mmap (bool): whether to use memmap disk reading (should reduce peak RAM usage) energy_pdf_ratio_model_cls (type): class to use for ``energy_pdf_ratio_model`` acc_param_cls (type): class to use for ``acc_param`` bg_space_param_cls (type): class to use for ``bg_space_param_cls`` gp_width (float) : size of galactic plane gp_width to mask in radians. """ self.repo = repo self.spec = spec self.key = key = spec.key self.version = spec.version print('Setting up {}...'.format(key)) self.ds = ds = spec.load(repo, compress=compress, load_sig=load_sig, mmap=mmap) ds.apply_min_sigma(min_sigma) self.data, self.sig = data, sig = ds.data, ds.sig self.mask_plane, self.gp_width = mask_plane, gp_width if mask_radius!=0. or mask_plane==True: self.bg_data, self.on_time_intervals, self.dOmega_corr = self._set_bg_data(spec, src, mask_plane=mask_plane, gp_width=gp_width, mask_radius=mask_radius, src_tomask=src_tomask) else: self.dOmega_corr = None self.bg_data, self.on_time_intervals = self._set_bg_data(spec, src, mask_plane=mask_plane, mask_radius=mask_radius) self.livetime, self.range_sindec = ds.livetime, ds.range_sindec self.grl = ds.grl self.bg_livetime = self.livetime - 86400*np.sum( [t_stop-t_start for t_start, t_stop in self.on_time_intervals]) self.n_data, self.n_bg_data = len(data), len(self.bg_data) if 'mjd' in data: self.mjd_min, self.mjd_max = np.min(data.mjd), np.max(data.mjd) else: self.mjd_min = self.mjd_max = None self.custom_energy_models = {} self.custom_energy_models_origin = {} self.custom_acc_params = {} # TODO: is this the right place to call init_from_file()? if os.path.isfile(self.cache_filename(dir)): self.init_from_file(dir, src) return self.spec = spec = spec() if inspect.isclass(spec) else spec self.gammas = gammas print('Energy PDF Ratio Model...') hkw = dict(bins=(spec.kw_energy['bins_sindec'], spec.kw_energy['bins_logenergy'])) self.kw_energy = kw_energy = {} kw_energy['hkw'] = hkw kw_energy['gammas'] = gammas kw_energy['bg_mc_weight'] = spec.kw_energy.get('bg_mc_weight', '') kw_energy.update(energy_kw) self.energy_pdf_ratio_model = energy_pdf_ratio_model_cls(self.bg_data, sig, **kw_energy) self.energy_pdf_ratio_model_origin = copy.deepcopy(self.energy_pdf_ratio_model) print('Signal Acceptance Model...') hkw = dict(bins=spec.bins_sindec) self.kw_acc = kw_acc = {} kw_acc['hkw'] = hkw kw_acc['gammas'] = gammas kw_acc.update(acc_kw) self.acc_param = acc_param_cls(sig, **kw_acc) self.kw_space_bg = kw_space_bg = dict(hkw=hkw) kw_space_bg.update(space_bg_kw) if mask_radius!=0. or mask_plane==True: # let's add a safety check to not accidentally use a bg-space # parameterization that does not handle source masking properly. # The explicit passing of `dOmega_corr` should already take care # of this, but maybe better to be safe than sorry. if bg_space_param_cls not in [pdf.BgSinDecParameterization]: msg = 'The bg-space PDF "{}" does not handle source masking!' raise NotImplementedError(msg.format(bg_space_param_cls)) self.bg_space_param = bg_space_param_cls( self.bg_data, dOmega_corr=self.dOmega_corr, **kw_space_bg) elif 'bg_mc_weight' in kw_space_bg: print("using mc bg") self.bg_space_param = bg_space_param_cls( self.sig, **kw_space_bg) else: print("not applying any masking") self.bg_space_param = bg_space_param_cls( self.bg_data, **kw_space_bg) self.bg_space_param_origin = copy.deepcopy(self.bg_space_param)
def __repr__(self): # TODO: make more informative? return 'SubAnalysis(key={})'.format(self.key) def cache_filename(self, dir): if self.version == 'current': version_str = '' else: version_str = '.{}'.format(self.version) return '{}/{}.subanalysis{}.npy'.format(dir, self.key, version_str) def save(self, dir): state = dict( gammas=self.gammas, kw_acc=self.kw_acc, kw_energy=self.kw_energy, kw_space_bg=self.kw_space_bg, energy_pdf_ratio_model=self.energy_pdf_ratio_model, acc_param=self.acc_param, bg_space_param=self.bg_space_param, ) filename = self.cache_filename(dir) print('->', filename, '...', end='') sys.stdout.flush() np.save(filename, state) print('\r->', filename, ' ') sys.stdout.flush() def init_from_file(self, dir, src=[]): # load dataset repo, spec = self.repo, self.spec #self.key = key = spec.key #self.ds = ds = spec.load(repo) #self.data, self.sig = data, sig = ds.data, ds.sig #self.bg_data, self.on_time_intervals = self._set_bg_data(src) #self.livetime, self.range_sindec = ds.livetime, ds.range_sindec #self.grl = ds.grl #self.bg_livetime = self.livetime - 86400*np.sum( # [t_stop-t_start for t_start, t_stop in self.on_time_intervals]) # load PDFs etc from cache filename = self.cache_filename(dir) print('<-', filename, '...', end='') sys.stdout.flush() try: state = np.load(filename, allow_pickle=True)[()] except: state = np.load(filename, allow_pickle=True, encoding='latin1')[()] print('\r<-', filename, ' ') sys.stdout.flush() self.acc_param = state['acc_param'] self.energy_pdf_ratio_model = state['energy_pdf_ratio_model'] self.gammas = state['gammas'] self.kw_acc = state['kw_acc'] self.kw_energy = state['kw_energy'] self.kw_space_bg = state['kw_space_bg'] self.bg_space_param = state['bg_space_param'] self.bg_space_param_origin = state['bg_space_param'] def energy_pdf_ratio_model(self): return self.energy_pdf_ratio_model def get_custom_flux_energy_pdf_ratio_model(self, flux): fluxs = np.atleast_1d(flux) kw = copy.deepcopy(self.kw_energy) kw.pop('gammas') #when have a list of custom flux, loop over all since they all different for flux in fluxs: models=[] if flux in self.custom_energy_models: models.append(self.custom_energy_models[flux]) if len(models)>0: if len(fluxs) != len(models): print("WARNING: length mismatching, make sure it is intended: input flux:", fluxs, "; returning models: ", models) #print("already pre-loaded fluxes: ", models) return models for flux in fluxs: energy_model = pdf.CustomFluxEnergyPDFRatioModel( self.bg_data, self.sig, flux, **kw) self.custom_energy_models[flux] = energy_model self.custom_energy_models_origin = copy.deepcopy(self.custom_energy_models) return energy_model @property def custom_flux_acc_kw(self): kw = copy.deepcopy(self.kw_acc) kw.pop('gammas') skw = kw.setdefault('skw', {}) skw.pop('ky', 0) skw.setdefault('k', skw.pop('kx', 2)) return kw def get_custom_flux_acc_parameterization(self, flux): if flux in self.custom_acc_params: return self.custom_acc_params[flux] kw = self.custom_flux_acc_kw acc_model = pdf.SinDecCustomFluxAccParameterization(self.sig, flux, **kw) self.custom_acc_params[flux] = acc_model return acc_model def _set_bg_data(self, spec, src, mask_radius, mask_plane, gp_width = np.radians(5), src_tomask=[]): data = self.data if ( all(var in src for var in ["t_start", "t_stop"]) ): on_time_intervals = utils.union_continuous_intervals(zip(src.t_start, src.t_stop)) on_time_masks = [(data.mjd>t_start) & (data.mjd<t_stop) for t_start, t_stop in on_time_intervals] if ( len(on_time_masks)>1 ): on_time_mask = np.any(on_time_masks, axis=0) else: on_time_mask = on_time_masks[0] idx_bg = np.where(~on_time_mask)[0] bg_data = data[idx_bg] if src and mask_radius!=0.: print("masking sources only: ", src) #calculate masked radius and vertex of the center of the masked region radius = mask_radius #in radians center = hp.ang2vec(np.pi/2-src.dec,src.ra) masked_pixels = hp.query_disc(512,center[0],radius) #find healpy pixels that lie within each dec band bin_edges = np.arcsin(spec.bins_sindec) total_pixels = 0 dOmega_corr = [] for i in np.arange(len(bin_edges)-1): pixels_in_band = hp.query_strip(512,np.pi/2-bin_edges[i+1],np.pi/2-bin_edges[i]) bool_array = np.isin(pixels_in_band,masked_pixels) number_true = np.count_nonzero(bool_array) total_pixels += number_true corr = float(len(pixels_in_band)/float((len(pixels_in_band)-number_true))) dOmega_corr.append(corr) #compute mask for data array for i in np.arange(len(src)): data_pix = hp.ang2pix(512,np.pi/2-data.dec,data.ra) mask = [] for pix in data_pix: if pix in masked_pixels: mask.append(False) else: mask.append(True) bg_data = data[mask] on_time_intervals = [] elif mask_plane==True or src_tomask: print("masking GP plane and chosen sources") #get empty mask nside = 512 npix = hp.nside2npix(nside) pixels = range(npix) mask_plane_map = np.zeros(npix) #get pixels/angles/coordinates (th, ph) = hp.pix2ang(nside, pixels) # healpy theta (th) and phi (ph) for each pixel [radians] r = hp.Rotator(coord=['C','G']) # rotator for equatorial to galactic throt, phrot = r(th, ph) # rotated theta (throt) and phi (phrot) for each pixel [radians] dec = np.pi/2 - th # declination of each pixel [radians] ra = ph # right ascension of each pixel [radians glat = np.pi/2 - throt # galactic latitude of each pixel [radians] glon = phrot # galactic longitude of each pixel [radians] #get mask of galactic plane if mask_plane: mask_plane_map[(glat >= -1*gp_width) & (glat <= gp_width)] = 1 mask_bool = mask_plane_map.astype(bool) #get masked pixels masked_pixs = [] for i in np.arange(0,len(mask_bool)): if mask_bool[i]==True: masked_pixs.append(i) else: continue masked_pixels = np.array(masked_pixs) if src_tomask: for src in src_tomask: center = hp.ang2vec(np.pi/2-src.dec,src.ra) tmp = hp.query_disc(nside, center[0], mask_radius) masked_pixels = np.append(masked_pixels,tmp) masked_pixels = np.unique(masked_pixels) masked_pixels = np.sort(masked_pixels) #find healpy pixels that lie within each dec band bin_edges = np.arcsin(spec.bins_sindec) total_pixels = 0 dOmega_corr = [] print("masking for bkg pdf...\n") for i in np.arange(len(bin_edges)-1): pixels_in_band = hp.query_strip(nside, np.pi/2-bin_edges[i+1], np.pi/2-bin_edges[i]) matched = np.intersect1d(pixels_in_band,masked_pixels) total_pixels += len(matched) corr = float(len(pixels_in_band)/float((len(pixels_in_band)-len(matched)))) dOmega_corr.append(corr) #compute mask for data array data_pix = hp.ang2pix(nside,np.pi/2-data.dec,data.ra) mask = [] for pix in data_pix: if pix in masked_pixels: mask.append(False) else: mask.append(True) bg_data = data[mask] on_time_intervals = [] else: bg_data = data on_time_intervals = [] dOmega_corr = np.ones(len(spec.bins_sindec)-1) if mask_radius!=0. or mask_plane==True: self.bin_edges = bin_edges self.dOmega_corr = dOmega_corr return bg_data, on_time_intervals, dOmega_corr else: return bg_data, on_time_intervals @property def plot_key(self): return re.sub('_', r'\_', self.key)
[docs] class Analysis(object): """Analysis cache for all datasets (i.e. seasons) used in an analysis. This class manages an ensemble of :class:`SubAnalysis` instances so they can be treated as a unit when producing a :class:`csky.trial.TrialRunner` in order to perform analysis tasks. These should not be constructed directly; instead, see :meth:`csky.conf.get_analysis`. """
[docs] def __init__(self, repo, specs, load_sig=True, **kw): specs = [spec() if inspect.isclass(spec) else spec for spec in specs] quiet = kw.pop('_quiet', False) if not quiet: print('Setting up Analysis for:') print(', '.join([spec.key for spec in specs])) self.repo = repo self.anas = anas = [] for spec in specs: if isinstance(spec, SubAnalysis): self.anas.append(spec) else: anas.append(SubAnalysis(repo, spec, load_sig=load_sig, **kw)) self.keys = [ana.key for ana in anas] self.plot_keys = [ana.plot_key for ana in anas] self.anasd = {ana.key: ana for ana in anas} self.has_sig = load_sig mins = [a.mjd_min for a in anas if a.mjd_min is not None] maxs = [a.mjd_max for a in anas if a.mjd_max is not None] self.mjd_min = min(mins) if mins else None self.mjd_max = max(maxs) if maxs else None if not quiet: print('Done.')
def __repr__(self): # TODO: make this more informative? return 'Analysis(keys=[{}])'.format(', '.join(map(str,self.keys))) def save(self, dir): for a in self: a.save(dir) def __len__(self): return len(self.keys) def __iter__(self): return (a for a in self.anas) def __getitem__(self, key): if not isinstance(key, slice): for (i,a) in enumerate(self): if key in (i, i - len(self), a.key): return a raise KeyError('SubAnalysis not found: {}'.format(key)) else: return self.only(self.keys[key]) def only(self, keys): anas = [self[key] for key in keys] return Analysis(self.repo, anas, _quiet=True)