# pdf.py
r"""PDF and PDF ratio implementation.
This module provides the PDF modeling and evaluation framework for csky. Here is an overview of
the key concepts:
PDF ratio models
In csky, it is computationally convenient and efficient to work in terms of log
likelihood ratios rather than directly in terms of the likelihood itself. This
enables us to represent all PDF calculations in terms of PDF ratios. In the standard
case, it looks like:
.. math::
\Lambda(\vec\mu)
= -2 \ln \left[ \frac{L(n_s=0)}{L(\vec\mu)} \right]
= 2\sum_\text{events} \ln \left[
\left(\frac{S(\text{event}|\vec\mu)}{B(\text{event}|\vec\mu)} - 1 \right)
\cdot \frac{n_s}{N} + 1
\right],
where :math:`S` and :math:`B` are the signal and background PDFs, and the fit
parameters :math:`\vec\mu` always start with :math:`n_s` and typically also include a
spectral index :math:`\gamma`. Because we have only the ratio :math:`S/B`, we can
define PDF ratios
:math:`W(\text{event}|\vec\mu)=S(\text{event}|\vec\mu)/B(\text{event}|\vec\mu)`.
Because the :math:`\text{events}` vary with each trial, and :math:`\vec\mu` changes
with each log likelihood ratio evaluation, we want a piecemeal prescription for
evaluating :math:`W`. The first step is the PDF ratio model, which you can think of
as :math:`W(?|?)`. In csky, classes providing such a definition are derived from
:class:`PDFRatioModel`. These are the classes that the user is most likely to
interact with directly, as they must be configured prior to running any trials.
PDF ratio evaluators
We have discussed trials so far without providing a definition. In csky, a *trial* is
simply a list of one or more event ensembles (structured as :class:`utils.Events`)
along with a count of events that are present in principle but masked out of the
detailed calculations in practice.
Once we have a trial, we want to be able to evaluate :math:`\Lambda(\vec\mu)`. In
general, many evaluations will be required to find the parameters
:math:`\hat{\vec\mu}` which maximize :math:`\Lambda`. Usually, significant
optimizations can be achieved by caching certain values in between evaluations.
Therefore, we do not directly ask PDF ratio models to evaluate PDF ratios. Instead we
ask them for PDF ratio evaluators, which you can think of as
:math:`W(\text{events}|?)`. In csky, classes providing this functionality are derived
from :class:`PDFRatioEvaluator`.
Acceptance Models
One of the crucial observations during the design of this framework is that we need to
care about variable signal acceptance at many steps in the analysis process:
* For source stacking, :math:`S(\text{event}|\vec\mu)` becomes a weighted average over
the values it would take for each individual source. The weighting in this average
includes a factor :math:`w_\text{acc}`, the relative signal acceptance for each
source. This typically depends on declination, but in the case of time-dependent
analysis it also depends on the relative flux expectations for ecah source at the
event arrival times.
* For multi-dataset analysis, the overall likelihood is simply a product over the
per-dataset likelihoods. However, we do not fit :math:`(n_s)_d` independently for
each dataset :math:`d`. Instead, these values are constrained such that the ratios
:math:`(n_s)_1/(n_s)_2`, etc. are equal to the ratios of the absolute signal
acceptances for each dataset, :math:`(w_\text{acc})_1/(w_\text{acc})_2` and so on.
* For counts <--> flux/fluence conversion, we need a factor
:math:`n_s/w_\text{acc}`. If we have a signal injector for the specific spectrum
under consideration, this :math:`w_\text{acc}` can be obtained directly from the
signal MC. However, in general :math:`w_\text{acc}=w_\text{acc}(\vec\mu)`, and we
want to be able to calculate its value for arbitrary :math:`\hat{\vec\mu}`.
Because variable signal acceptance is relevant in so many places, in csky we treat it
as roughly on par with PDF ratio models and evaluators. Specifically, we only
evaluate likelihoods in terms of acceptance weighted PDF ratio model/evaluator pairs:
:class:`AccWeightedPDFRatioModel` and :class:`AccWeightedPDFRatioEvaluator`. Each
class implementing :class:`AccWeightedPDFRatioModel` must provide an acceptance
parameterization, which by convention is an instance of a nested class (e.g.
:class:`PointSourceSpacePDFRatioModel.AccModel`) that implements the :class:`AccModel`
interface.
Parameterizations
Often, we can pre-configure some of the functions needed by PDF ratio evaluators, so
that they can be re-used for many different analyses. The most common examples are
the background space PDF (e.g. :class:`BgSinDecParameterization`) and the declination-
and spectrum-dependent signal acceptance (e.g. :class:`SinDecAccParameterization`).
These parameterizations do not constitute PDF ratio models or evaluators themselves,
but they are defined in this module because they are tools used by those classes.
Note that for most applications we can also pre-compute an
:class:`EnergyPDFRatioModel`, which is indeed a proper :class:`PDFRatioModel`.
TODO:
* discuss signal subtraction
* document :class:`TransientTimePDFRatioModel` and :class:`TransientTimePDFRatioEvaluator`
"""
from __future__ import print_function
import abc
import numbers
import os
import sys
import collections
from abc import ABCMeta, abstractmethod, abstractproperty
from copy import copy, deepcopy
import healpy as hp
import numpy as np
from scipy import ndimage, sparse, stats
from . import cext, coord, hyp, inj, utils
try:
from itertools import izip
except ImportError:
izip = zip
try:
xrange
except NameError:
xrange = range
pi = np.pi
import histlite as hl
[docs]
class BgSinDecParameterization(object):
r"""Traditional parameterization of the background space PDF.
This class implements the traditional background space PDF as a function of the sin of
the declination. The PDF satisfies the condition :math:`\int d\Omega f = \int d\alpha
\int d(\sin(\delta)) f = 1`.
"""
keep = 'dec'.split()
[docs]
def __init__(self, ev, dOmega_corr=[], hkw={}, skw={},
bg_mc_weight='',
_other=None):
"""Construct a BgSinDecParameterization.
Args:
ev (:class:`utils.Events`): the background-like events
hkw (mapping) :meth:`histlite.hist` kwargs
skw (mapping): :meth:`histlite.Hist.spline_fit` kwargs
_other (BgSinDecParameterization): another similarly-constructed BgSinDecKDEParameterization
to be added to the first one.
"""
self.hkw, self.skw = hkw, skw = deepcopy(hkw), deepcopy(skw)
self.dOmega_corr = dOmega_corr = deepcopy(dOmega_corr)
if _other is not None:
for k in _other.hkw:
self.hkw[k] = _other.hkw[k]
for k in _other.skw:
self.skw[k] = _other.skw[k]
if bg_mc_weight:
if isinstance(bg_mc_weight, hyp.Flux):
weights = ev.oneweight * bg_mc_weight(ev.true_energy)
else:
weights = ev[bg_mc_weight]
else:
weights = None
if len(self.dOmega_corr) == 0:
self.h_counts = hl.hist(np.sin(ev.dec), weights=weights, **hkw)
else:
self.h_counts_nocorr = hl.hist(
np.sin(ev.dec), weights=weights, **hkw)
counts_corr = self.h_counts_nocorr.values * self.dOmega_corr
self.h_counts = hl.Hist(values=counts_corr, weights=weights, **hkw)
if _other is not None:
self.h_counts += _other.h_counts
self.h = self.h_counts.normalize(density=True) / (2*pi)
self.range = self.h.range[0]
skw.setdefault('s', 0)
skw.setdefault('k', 2)
skw.setdefault('log', True)
self.s_hl = self.h.spline_fit(**skw)
self.s = self.s_hl.spline
## compute norm correction
#self._compute_norm()
def _compute_norm(self):
"""Compute normalization correction.
This function is currently unused, but might be useful if extremely precise
background space PDF normalization is important.
"""
x = np.linspace(self.range[0], self.range[1], 100000)
self.norm = 1 / ( np.sum(np.exp(self.s(x))) * np.diff(x[:2]) * 2*pi)
[docs]
def __call__(self, ev=[], sindec=None):
"""Evaluate for given events or sindecs.
Args:
ev (:class:`utils.Events`): the events
sindec (array-like): the sin(dec) array)
Returns:
ndarray
"""
#if not hasattr(self, 'norm'):
# self._compute_norm()
if not len(ev) and sindec is None:
return np.array([])
sd1, sd2 = self.range
sindec = np.sin(ev.dec) if sindec is None else sindec
# formerly masked, now clipping
# should check if we can guarantee everything is in-bounds
#out = np.empty_like(sindec)
#mask = (sd1 <= sindec) & (sindec <= sd2)
sindec = np.clip(sindec, sd1, sd2)
out = np.exp(self.s(sindec))
#return self.norm * out
return out
def __add__(self, new_ev):
return type(self) (new_ev, hkw=self.hkw, skw=self.skw, _other=self)
[docs]
class BgSinDecKDEParameterization(BgSinDecParameterization):
"""KDE-like parameterization of the background space PDF.
This class implements a background space PDF using a pseudo-KDE, where the kernel
widths are determined by the event angular uncertainties.
"""
keep = 'dec'.split()
[docs]
def __init__(self, ev, hkw={}, skw={}, _other=None):
super(BgSinDecKDEParameterization, self).__init__(
ev, hkw=hkw, skw=skw, _other=_other)
bg_pdf = lambda sindec: np.mean([
1 / (2*pi*np.cos(ev.dec)) / np.sqrt(2*pi*ev.sigma**2)
* np.exp(-(np.arcsin(sindec)-ev.dec)**2 / (2*ev.sigma**2))])
self.sd = sd = np.linspace(-1, 1, 300)
self.h = hl.hist_from_eval(bg_pdf, vectorize=True, range=self.h.range, bins=1000)
self.s_hl = self.h.spline_fit(**self.skw)
self.s = self.s_hl.spline
[docs]
class BgAzimuthSinDecParameterization(object):
"""Background space PDF that accounts for azimuthal uncertainty.
This class implements a zenith-dependentn azimuthal asymmetry that modulates the
expectation measured by a BgSinDecParameterization.
"""
keep = 'dec azimuth'.split()
[docs]
def __init__(self, ev, hkw2={}, skw2={}, hkw={}, skw={}, smooth=None, _other=None):
# copy basic params
self.hkw2, self.skw2 = hkw2, skw2 = deepcopy(hkw2), deepcopy(skw2)
self.hkw, self.skw = hkw, skw = deepcopy(hkw), deepcopy(skw)
# take params to match _other if it's given
if _other is not None:
for k in _other.hkw:
self.hkw[k] = _other.hkw[k]
for k in _other.skw:
self.skw[k] = _other.skw[k]
smooth = _other.smooth
self.smooth = smooth
# get sindec dependence
self.sindec_param = BgSinDecParameterization(
ev, hkw=hkw, skw=skw, _other=_other)
# set 2D hist defaults
hkw2.setdefault('bins', (np.linspace(0, 2*pi, 360+1), hkw['bins']))
# get independent azimuth dependence
self.h_counts = hl.hist((ev.azimuth, np.sin(ev.dec)), **hkw2)
self.range = self.h_counts.range
if _other is not None:
self.h_counts += _other.h_counts
if smooth:
self.h_smooth = self.h_counts.gaussian_filter(smooth)
else:
self.h_smooth = self.h_counts
self.h = deepcopy(self.h_smooth)
self.h._values /= np.mean(self.h.values, axis=0)
# set spline defaults
skw2.setdefault('s', 0)
skw2.setdefault('kx', 2)
skw2.setdefault('ky', 2)
skw2['log'] = False
# build spline
self.s_hl = self.h.spline_fit(**skw2)
self.s = self.s_hl.spline
[docs]
def __call__(self, ev):
# quit if no events
if not len(ev):
return np.array([])
# get valid range
(az1, az2), (sd1, sd2) = self.range
# get event properties
sindec = np.sin(ev.dec)
mask = (sd1 <= sindec) & (sindec <= sd2)
out = self.s.ev(ev.azimuth, sindec) * self.sindec_param(ev, sindec=sindec)
eps = np.finfo(float).eps
out[~mask] = eps
return np.maximum(eps, out)
def __add__(self, new_ev):
kw = dict(
hwk2=self.hkw2, skw2=self.skw2,
hkw=self.hkw, skw=self.skw, _other=self)
return type(self) (new_ev, **kw)
[docs]
class NullBgSpaceParameterization(object):
"""Uniform background space PDF.
This class is intended to implement a uniform background space PDF. It has not been
tested thoroughly... if at all.
"""
keep = []
[docs]
def __call__(self, ev, **kw):
return np.ones(len(ev))
[docs]
class SinDecAccParameterization(object):
r"""Signal acceptance parameterization vs sin(dec) over a range of power law spectra.
This class implements a spline lookup of the signal acceptance
:math:`A(\sin(\delta),\gamma)`. This is the time-independent acceptance -- it must be
integrated over time to obtain, e.g., the relative acceptance ratios across multiple
data taking configurations.
"""
[docs]
def __init__(self, sig_ev,
hkw={}, skw={},
gammas=np.r_[1:4.01:0.125]):
"""Construct a SinDecAccParameterization.
Args:
sig_ev (utils.Events): the signal-like events
hkw (mapping): :meth:`histlite.hist` kwargs
skw (mapping): :meth:`histlite.Hist.spline_fit` kwargs
gammas (array-like): set of gammas to evaluate
"""
# construct histograms
self.hkw, self.skw, self.gammas = hkw, skw, gammas
# get starting gamma bins
dgamma = np.diff(self.gammas)
self.gamma_range = (gammas[0], gammas[-1])
# add extra gamma bins to avoid edge effects
gammas = np.r_[gammas[0] - dgamma[0], gammas, gammas[-1] + dgamma[-1]]
gammas = np.r_[gammas[0] - dgamma[0], gammas, gammas[-1] + dgamma[-1]]
self.gammas = gammas
sig_weight = lambda gamma: sig_ev.oneweight * sig_ev.true_energy**-gamma
self.hs = hs = []
print(' * gamma = {:.4f} ...'.format(self.gamma_range[0]), end='')
sys.stdout.flush()
self.h0 = h0 = hl.hist(np.sin(sig_ev.true_dec), sig_weight(gammas[0]), **hkw)
indices = h0.ravel_indices(np.sin(sig_ev.true_dec))
hs.append(h0)
for gamma in gammas[1:]:
g = np.clip(gamma, *self.gamma_range)
print('\r * gamma = {:.4f} ...'.format(g), end='')
sys.stdout.flush()
h = hl.hist_like_indices(h0, indices, sig_weight(gamma))
hs.append(h)
print()
self.hsd = {gamma: self.hs[i] for (i,gamma) in enumerate(gammas)}
# "normalize" wrt bin widths
for h in self.hs:
h._values /= np.diff(h.bins[0]) * (2*pi)
# construct big Hist
dgamma = np.diff(self.gammas)
gamma_bins = self.gammas[:-1] + 0.5 * dgamma
gamma_bins = np.r_[gamma_bins[0] - dgamma[0], gamma_bins, gamma_bins[-1] + dgamma[-1]]
bins = [gamma_bins] + h0.bins
values = np.vstack([h.values for h in self.hs])
self.h = h = hl.Hist(bins, values)
# spline fit
skw['log'] = True
skw.setdefault('s', 0)
skw.setdefault('kx', 2)
skw.setdefault('ky', 2)
self.s_hl = h.spline_fit(**skw)
self.s = self.s_hl.spline
self.bins = h.bins
self.range = h.range
[docs]
def __call__(self, a, **params):
"""Compute acceptance.
Args:
a (:class:`utils.Arrays`): object with a ``.dec`` property
params: fit arguments(should include ``gamma``)
Returns:
ndarray: the acceptance values
"""
eps = np.finfo(float).eps
gamma = params['gamma']
sindecs = np.atleast_1d(np.sin(a.dec))
gammas = np.repeat(gamma, len(sindecs))
#(g1, g2), (sd1, sd2) = self.range
#mask = (g1 <= gammas) & (gammas <= g2) & (sd1 <= sindecs) & (sindecs <= sd2)
#out = np.empty_like(sindecs)
out = np.exp(self.s.ev(gammas, sindecs))
#out[~mask] = eps
return out
[docs]
class SinDecCustomFluxAccParameterization(object):
"""Signal acceptance parameterization vs sin(dec) for a specific spectrum.
Args:
sig_ev (utils.Events): the signal-like events
flux (hyp.Flux): the spectrum of interest
hkw (mapping): :meth:`histlite.hist` kwargs
skw (mapping): :meth:`histlite.Hist.spline_fit` kwargs
"""
[docs]
def __init__(self, sig_ev, flux,
hkw={}, skw={}):
# construct histograms
self.hkw, self.skw = hkw, skw
self.flux = flux
sig_weight = sig_ev.oneweight * flux(sig_ev.true_energy)
self.h = h = hl.hist(np.sin(sig_ev.true_dec), sig_weight, **hkw)
# "normalize" wrt bin widths
h._values /= np.diff(h.bins[0]) * (2*pi)
# spline fit
skw['log'] = True
skw.setdefault('s', 0)
skw.setdefault('k', 2)
self.s_hl = h.spline_fit(**skw)
self.s = self.s_hl.spline
self.bins = h.bins
self.range = h.range
[docs]
def __call__(self, a, **params):
eps = np.finfo(float).eps
sindecs = np.atleast_1d(np.sin(a.dec))
sd1, sd2 = self.range[0]
mask = (sd1 <= sindecs) & (sindecs <= sd2)
#out = np.empty_like(sindecs)
out = np.exp(self.s(sindecs))
out[~mask] = eps
return out
[docs]
class Gauss2DAngResParameterization(object):
"""
Parameterization of the dec and ra angular resolution in reco
sin(dec) and log(energy) bins.
This is mainly here for reproducing the MESC-2yr analysis. You probably don't want or
need to use it.
"""
keep = 'dec energy'.split()
[docs]
def __init__(self, sig, flux=hyp.PowerLawFlux(2.5), hkw={},
smooth=None, smooth_bins=None, fit=True):
self.hkw = deepcopy(hkw)
flush = sys.stdout.flush
space = 20 * ' '
print('Gauss2DAngResParameterization...')
print(' * histograms...' + space, end=''), flush()
self.hdec_base = hdec = hl.hist(
(np.sin(sig.dec), np.log10 (sig.energy), np.abs(sig.xdec)),
sig.oneweight * flux(sig.true_energy), **hkw).contain(-1)
self.hra_base = hra = hl.hist(
(np.sin(sig.dec), np.log10 (sig.energy), np.abs(sig.xra)),
sig.oneweight * flux(sig.true_energy), **hkw).contain(-1)
if smooth:
if smooth_bins is not None:
print('\r * rebinning...' + space, end=''), flush()
hdec = hl.hist_from_eval(
hdec.get_values, vectorize=False,
bins=smooth_bins, range=hdec.range, ndim=2)
hra = hl.hist_from_eval(
hra.get_values, vectorize=False,
bins=smooth_bins, range=hra.range, ndim=2)
print('\r * smoothing...' + space, end=''), flush()
hdec = hdec.gaussian_filter(smooth)
hra = hra.gaussian_filter(smooth)
self.hdec = hdec
self.hra = hra
self.range = hdec.range
print('\r * calculating norm...' + space, end=''), flush()
self.hnorm = hnorm = hl.Hist(
self.hdec.bins,
self._get_norm_on_sphere(hdec.values, hra.values))
if fit:
self.sdec = self.hdec.spline_fit(log=True)
self.sra = self.hra.spline_fit(log=True)
self.snorm = self.hnorm.spline_fit(log=True)
else:
self.sdec = self.sra = self.snorm = None
print('\r * done.' + space), flush()
[docs]
def __call__(self, ev):
(sd1, sd2), (le1, le2) = self.range
sd, le = np.sin(ev.dec), np.log10 (ev.energy)
sd = np.clip(sd, sd1, sd2 - 1e-5)
le = np.clip(le, le1, le2 - 1e-5)
if self.sdec is None:
fdec, fra, fnorm = self.hdec.get_values, self.hra.get_values, self.hnorm.get_values
else:
fdec, fra, fnorm = self.sdec, self.sra, self.snorm
sigma_dec = fdec(sd, le)
sigma_ra = fra(sd, le)
norm = fnorm(sd, le)
return sigma_ra, sigma_dec, norm
@staticmethod
def _get_norm_on_sphere(sigma_dec, sigma_ra):
"""
Assume a Gaussian on a sphere; calculate the norm
"""
@np.vectorize
def get_norm(sigma_dec, sigma_ra):
sindec = np.linspace(-1, 1, 1e3)
ra = np.linspace(-pi, pi, sindec.size)
dec = np.arcsin(sindec)
dsddr = (sindec[1] - sindec[0]) * (ra[1] - ra[0])
return dsddr * np.sum(np.outer(
stats.norm.pdf(dec, 0, sigma_dec),
stats.norm.pdf(ra, 0, sigma_ra)
))
return get_norm(sigma_dec, sigma_ra)
[docs]
class AccModel(object):
"""Acceptance model base class.
This is a base class for acceptance models. Any acceptance-weighted PDFRatioModel must
implement an acceptance model that derives from this class.
"""
__metaclass__ = ABCMeta
[docs]
@abstractmethod
def get_acc_per_source(self, **params):
"""Get the absolute acceptance on a per-source basis.
:param params: fit arguments
"""
pass
[docs]
def get_acc_total(self, **params):
"""Get the absolute acceptance, summing over sources.
:param params: fit arguments
"""
return np.sum(self.get_acc_per_source(**params))
[docs]
class PDFRatioModel(object):
"""PDF ratio model base class.
This is the base class for PDF ratio models. Models based on e.g. space, energy,
and/or time information must derive from this class. These models essentially define
the signal/background discrimination that will be used in the likelihood, and they
should be aware of everything that is needed to compute the PDF ratios, once *given* a
set of events. :class:`PDFRatioEvaluator` instances, created automatically by the
trial running machinery in :mod:`trial`, are responsible for the actual evaluation
based on a given set of events.
"""
__metaclass__ = ABCMeta
[docs]
@abstractmethod
def __call__(self, ev, i):
"""Apply the model to a set of events to obtain a PDFRatioEvaluator.
Args:
ev (utils.Events): the events
i (tuple of ([array of int], [array of int])): i_ev, i_src: aligned arrays
indicating pairs of events and sources to which the evaluator should apply
Returns:
PDFRatioEvaluator
"""
pass
[docs]
def set_ra(self, ra):
"""Set the right ascension of the(presumably one-and-only) source.
Args:
ra (float): the right ascension
Users do not need to call this method explicitly.
This method sets the right ascension of a single source, enabling a powerful
optimization when computing skymaps. Specifically, space PDFs can be recomputed
at a new coordinate without repeating "band" and/or "box" masking calculations.
The default implementation here does nothing, but any space PDFRatioModel's should
update when this method is called.
"""
pass
[docs]
def get_updated(self, evs):
"""Update the PDFRatioModel in light of a set of events.
Args:
evs (sequence of utils.Events): the event ensembles
Users do not need to call this method explicitly.
This method updates the PDFRatioModel, given a sequence of event ensembles in the
form of utils.Events. This allows an optional mode in which the background PDFs
should be recomputed to account not only for data events(``evs[0]``) but also for
injected signal events(``evs[1:]``).
"""
return self
@staticmethod
def _array_to_params(key, values):
params = {}
N = values.size
for i in xrange(N):
k = '{}_{:04d}'.format(key, i)
params[k] = values[i]
return params
[docs]
class EnergyPDFRatioModel(PDFRatioModel):
r"""Model of energy PDF ratios.
This class implements the classic energy PDF ratio. Specifically, it computes 1D or
2D histograms for an ensemble of background-like and signal-like events, and then fits
a spline (by default, in terms of log-value) tto the resulting histogram ratio. This
is done on a grid of spectral indices :math:`\gamma`(see also
:class:`CustomFluxEnergyPDFRatioModel` for a fixed, non-power-law spectrum). The
dimensions in practice are typically :math:`\sin(\delta,\log_{10)(E)})`. The axes
along which to normalize can be specified; typically this is done along the
:math:`\log_{10}(E)` axis.
Formally, we can say that :math:`S=S(\log_{10}(E)|\vec{\mu})`, and similarly for
:math:`B`.
In principle, this class can be used for arbitrary 1D or 2D spline-of-histogram PDF
ratios, and it could be readily generalized to a signal weighting parameter other than
:math:`\gamma`. If these cases arise in practice, this class should be refactored
accordingly.
The 2D limitation is driven by difficulties fitting higher dimensional splines with
scipy. A motivated developer could in principle migrate these fits to photospline, in
which case arbitrary dimensionality should be possible.
"""
[docs]
def __init__(self, bg_ev, sig_ev,
features=['sindec', 'log10energy'],
normalize_axes=[1],
hkw={}, skw={},
gammas=np.r_[1:4.01:0.125],
bg_mc_weight='',
):
"""Construct an EnergyPDFRatioModel.
Args:
bg_ev (utils.Events): background-like events
sig_ev (utils.Events): signal-like events
features (list of str): the event features to histogram
normalize_axes (list of int): the axes along which to normalize
hkw (mapping): :meth:`histlite.hist` kwargs
skw (mapping): :meth:`histlite.Hist.spline_fit` kwargs
gammas (array of float): spectral indices to consider
bg_mc_weights (str): if given, background histogram is weighted by
bg_ev[bg_mc_weight]. otherwise, the default is equal weights per event.
"""
# construct bg hist
self.hkw, self.skw = deepcopy(hkw), deepcopy(skw)
self.features, self.gammas = features, gammas
self.keep = copy(self.features)
self.normalize_axes = normalize_axes
if bg_mc_weight:
bg_data = tuple(sig_ev[feat] for feat in features)
bg_weights = sig_ev[bg_mc_weight]
else:
bg_data = tuple(bg_ev[feat] for feat in features)
bg_weights = None
self.h_bg_counts = hl.hist(bg_data, weights=bg_weights, **hkw)
self.h_bg = self.h_bg_counts.normalize(normalize_axes)
# construct sig hists
sig_data = tuple(sig_ev[feat] for feat in features)
sig_weight = lambda gamma: sig_ev.oneweight * sig_ev.true_energy**-gamma
print(' * gamma = {:.4f} ...'.format(gammas[0]), end='')
sys.stdout.flush()
self.h_sig_0 = h_sig_0 = hl.hist(sig_data, sig_weight(gammas[0]), **hkw)
self.hs_sig_counts = [h_sig_0]
self.hs_sig = [h_sig_0.normalize(normalize_axes)]
if len(gammas) >= 2:
indices = h_sig_0.ravel_indices(*sig_data)
for gamma in gammas[1:]:
print('\r * gamma = {:.4f} ...'.format(gamma), end='')
sys.stdout.flush()
h_sig_counts = hl.hist_like_indices(
h_sig_0, indices, sig_weight(gamma))
self.hs_sig_counts.append(h_sig_counts)
self.hs_sig.append(h_sig_counts.normalize(normalize_axes))
print()
self.fits = {
'gamma': tuple(np.r_[
gammas[0],
gammas[0]:gammas[-1]+0.01:.25,
gammas[-1]])}
self.param_names = ['gamma']
self._set_ratio_splines()
def _set_ratio_splines(self):
"""Compute the ratio splines."""
# fill empties
bg_domain = self.h_bg_counts.values > 0
sig_domain = self.h_sig_0.values > 0
self.h_bg.values[~bg_domain] = np.min(self.h_bg.values[bg_domain])
for h in self.hs_sig:
h.values[~sig_domain] = self.h_bg.values[~sig_domain]
# take ratios
self.hs_ratio = np.array([h / self.h_bg for h in self.hs_sig])
# spline
features, skw = self.features, self.skw
skw['log'] = True
skw.setdefault('s', 0)
if len(features) == 1:
skw.setdefault('k', 1)
if len(features) == 2:
skw.setdefault('kx', 1)
skw.setdefault('ky', 1)
self.ss_hl = np.array([h.spline_fit(**skw) for h in self.hs_ratio])
self.ss = np.array([s.spline for s in self.ss_hl])
self.ssd = {gamma: self.ss[i] for (i, gamma) in enumerate(self.gammas)}
# store more metadata
self.bins = self.h_sig_0.bins
self.range = self.h_sig_0.range
[docs]
def __call__(self, ev):
return EnergyPDFRatioEvaluator(ev, self)
[docs]
def get_updated(self, evs):
out = copy(self)
for ev in evs[1:]:
bg_data = tuple(ev[feat] for feat in out.features)
out.h_bg_counts += hl.hist(bg_data, **out.hkw)
out.h_bg = out.h_bg_counts.normalize(out.normalize_axes)
out._set_ratio_splines()
return out
[docs]
class CustomFluxEnergyPDFRatioModel(PDFRatioModel):
"""Model of energy PDF ratios for a custom flux.
This class implements the energy PDF ratio similar to :class:`EnergyPDFRatioModel`,
except for a specific flux which need not be a simple unbroken power law.
"""
fits = {}
param_names = []
[docs]
def __init__(self, bg_ev, sig_ev, flux,
features=['sindec', 'log10energy'],
normalize_axes=[1],
hkw={}, skw={},
bg_mc_weight='', keep_pdfs=False
):
"""Construct a CustomFluxEnergyPDFRatioModel.
Args:
flux (hyp.Flux): the spectral hypothesis.
Other parameters: see :meth:`EnergyPDFRatioModel.__init__`.
"""
# construct bg hist
self.hkw, self.skw, self.features = deepcopy(hkw), deepcopy(skw), features
self.normalize_axes = normalize_axes
self.keep_pdfs = keep_pdfs
if bg_mc_weight:
bg_data = tuple(sig_ev[feat] for feat in features)
bg_weights = sig_ev[bg_mc_weight]
else:
bg_data = tuple(bg_ev[feat] for feat in features)
bg_weights = None
self.h_bg_counts = hl.hist(bg_data, weights=bg_weights, **hkw)
self.h_bg = self.h_bg_counts.normalize(normalize_axes)
# construct sig hists
sig_data = tuple(sig_ev[feat] for feat in features)
sig_weight = sig_ev.oneweight * flux(sig_ev.true_energy)
self.h_sig_counts = hl.hist(sig_data, sig_weight, **hkw)
self.h_sig = self.h_sig_counts.normalize(normalize_axes)
self.keep = copy(self.features)
self._set_ratio_spline()
def _set_ratio_spline(self):
"""Compute the ratio splines."""
# fill empties
bg_domain = self.h_bg.values > 0
sig_domain = self.h_sig.values > 0
self.h_bg.values[~bg_domain] = np.min(self.h_bg.values[bg_domain])
self.h_sig.values[~sig_domain] = self.h_bg.values[~sig_domain]
# take ratios
self.h_ratio = self.h_sig / self.h_bg
# spline
skw = self.skw
skw['log'] = True
skw.setdefault('s', 0)
if len(self.features) == 1:
skw.setdefault('k', 1)
if len(self.features) == 2:
skw.setdefault('kx', 1)
skw.setdefault('ky', 1)
self.s_hl = self.h_ratio.spline_fit(**skw)
self.s = self.s_hl.spline
# store more metadata
self.bins = self.h_sig.bins
self.range = self.h_sig.range
[docs]
def __call__(self, ev):
return CustomFluxEnergyPDFRatioEvaluator(ev, self)
[docs]
def get_updated(self, evs):
self = deepcopy(self)
for ev in evs[1:]:
bg_data = tuple(ev[feat] for feat in self.features)
self.h_bg_counts += hl.hist(bg_data, **self.hkw)
self.h_bg = self.h_bg_counts.normalize(self.normalize_axes)
self._set_ratio_spline()
return self
[docs]
class NullEnergyPDFRatioModel(PDFRatioModel):
"""Uniform energy PDF ratio model.
This class is intended to implement a uniform energy PDF ratio, i.e. simply
:math:`S/B=1`. It has not been tested thoroughly... if at all. It also is likely
useless; if you don't want an energy PDF to be used, you should build your likelihood
without one.
"""
fits = {}
param_names = []
[docs]
def __call__(self, ev):
return NullEnergyPDFRatioEvaluator(ev, self)
[docs]
class GenericPDFRatioModel(PDFRatioModel):
"""
Generic PDF ratio model.
This class allows an arbitrary callable to be used as a PDFRatioModel.
"""
[docs]
def __init__(self, func, features, fits, ana=None, src=None):
"""
Construct a GenericPDFRatioModel.
Args:
func (callable): python object that is callable and returns S/B.
If `update_bg` setting is set to True, then the object must
also implement a `get_updated` method.
features (str->str mapping): mapping of func argname => Events column name
fits (str->number-or-sequence mapping): mapping with str keys and number (for
fixed values) or sequence (for fittable values) pairs. sequence form
should be like (min_value, seed1, seed2, ..., max_value)
"""
self.ana = ana
self.src = src
self.func = func
self.features = features
self.fits = fits
self.param_names = list(fits.keys())
self.keep = list(features.values())
self.N_bg = len(ana.data)
[docs]
def __call__(self, ev):
return GenericPDFRatioEvaluator(ev, self)
[docs]
def get_updated(self, evs):
"""Update the GenericPDFRatioModel in light of a set of events.
:type evs: sequence of utils.Events
:param evs: the event ensembles
"""
out = copy(self)
out.func = self.func.get_updated(evs, N_bg=self.N_bg)
return out
[docs]
class AccWeightedPDFRatioModel(PDFRatioModel):
"""Base class for acceptance-weighted PDF ratio models.
This is the base class for PDF ratio models which track acceptance weighting.
Examples include:
:class:`PointSourceSpacePDFRatioModel`
Tracks declination-dependent signal acceptance.
:class:`SpaceTimePDFRatioModel`
Integrates declination-dependent signal acceptance over per-source time PDFs.
:class:`MultiPDFRatioModel`
Combines exactly one :class:`AccWeightedPDFRatioModel` with one or more additional
non-acceptance-weighted :class:`PDFRatioModels`(such as
:class:`EnergyPDFRatioModel`).
"""
@abstractproperty
def acc_model(self):
"""Acceptance model.
Returns:
:class:`AccModel`: the acceptance model
"""
pass
[docs]
class AccWeightedGenericPDFRatioModel(AccWeightedPDFRatioModel):
[docs]
def __init__(self, func, features, fits, ana=None, src=None, acc_param=None, bg_param=None, kent_min=np.radians(7), cut_n_sigma=5,sigsub=False):
"""
Construct an AccGenericPDFRatioModel. Intended for use with funcs that
already take acceptance weighting into account.
Args:
func (callable): python object that is callable and returns S/B.
If `update_bg` setting is set to True, then the object must
also implement a `get_updated` method.
features (str->str mapping): mapping of func argname => Events column name
fits (str->number-or-sequence mapping): mapping with str keys and number (for
fixed values) or sequence (for fittable values) pairs. sequence form
should be like (min_value, seed1, seed2, ..., max_value)
"""
self.ana = ana
self.src = src
self.func = func
self.features = features
self.fits = fits
self.param_names = list(fits.keys())
self.keep = list(features.values())
self.N_bg = len(ana.data)
self.bg_param = bg_param = bg_param or ana.bg_space_param
self.acc_param = acc_param = acc_param or ana.acc_param
self._acc_model = self.AccModel(src, acc_param, ana.livetime)
self.cut_n_sigma = cut_n_sigma
self.cut_pdf_threshold = stats.norm.sf(self.cut_n_sigma)
self.kent_min = kent_min
self.sigsub = sigsub
[docs]
def __call__(self, ev, i=(None, None)):
i_ev, i_src = i
return GenericPDFRatioEvaluator(ev, self, (i_ev, i_src))
[docs]
def get_updated(self, evs):
"""Update the GenericPDFRatioModel in light of a set of events.
Args:
evs (utils.Events): the event ensembles
"""
out = copy(self)
out.func = self.func.get_updated(evs, N_bg=self.N_bg)
return out
@abstractproperty
def acc_model(self):
"""The acceptance model (just a formality in this case).
"""
return self._acc_model
[docs]
class AccModel(AccModel):
"""Acceptance model for PointSourceSpacePDFRatioModel.
This acceptance model evaluates the time-independent acceptance for each source,
weights these by the per-source intrinsic weights, and finally multiplies by the
livetime.
"""
[docs]
def __init__(self, src, acc_param, livetime):
self.src, self.acc_param, self.livetime = src, acc_param, livetime
[docs]
def get_acc_per_source(self, **params):
return self.livetime * self.src.weight * self.acc_param(self.src, **params)
[docs]
class MultiPDFRatioModel(AccWeightedPDFRatioModel):
"""Model of a product of separable PDF ratio models."""
[docs]
def __init__(self, acc_weighted_model, *other_models, **kwargs):
"""Construct a MultiPDFRatioModel.
Args:
acc_weighted_model (:class:`AccWeightedPDFRatioModel`): the acceptance-weighted model
*other_models (:class:`PDFRatioModel`): one or more additional models
"""
self.acc_weighted_model = acc_weighted_model
self.other_models = other_models
self.kwargs = kwargs
self.keep_pdfs = self.kwargs.get('keep_pdfs', False)
for mod in other_models:
try:
if mod.keep_pdfs==True: self.keep_pdfs = True
except:
print("keep pdfs is not implemented")
if not isinstance(acc_weighted_model, AccWeightedPDFRatioModel):
raise TypeError(
'acc_weighted_model must be an instance of AccWeightedPDFRatioModel')
for model in other_models:
if isinstance(model, AccWeightedPDFRatioModel):
raise TypeError(
'other models must not be instances of AccWeightedPDFRatioModel')
self._acc_model = acc_weighted_model.acc_model
self.models = [acc_weighted_model] + list(other_models)
self.fits = fits = {}
self.param_names = param_names = []
self.keep = keep = []
for model in self.models:
fits.update(model.fits)
param_names += model.param_names
keep += model.keep
def __repr__(self):
n = len(self.models)
return 'MultiPDFRatioModel({} PDF ratio model{})'.format(
n, '' if n == 1 else 's')
@property
def acc_model(self):
return self._acc_model
def get_time_model(self) -> PDFRatioModel:
return self.acc_weighted_model.get_time_model()
[docs]
def set_ra(self, ra):
for model in self.models:
model.set_ra(ra)
[docs]
def __call__(self, ev):
return MultiPDFRatioEvaluator(ev, self)
[docs]
def get_updated(self, evs):
models = [model.get_updated(evs) for model in self.models]
self = type(self) (*models)
return self
[docs]
class PointSourceSpacePDFRatioModel(AccWeightedPDFRatioModel):
r"""Space PDF ratio model for one or more point or point-like sources.
This class implements the space PDF ratio; it is one of the most important workhorses
of csky. The source list should give the coordinates :math:`(\alpha,\delta)` for one
or more sources; multiple sources will result in a stacking analysis. Per-source
extensions and intrinsic weights may be given as well.
A declination band cut is applied at a given :math:`n\sigma` from each event, using
the per-event angular error estimates. If signal subtraction is not enabled, an
additional right ascension cut is applied(this is the so-called "box" cut).
"""
[docs]
def __init__(self, ana, src, bg_param=None, acc_param=None, cut_n_sigma=5,
sigsub=False, mc_pdf=False, kent_min=np.radians(7)):
"""Construct a PointSourceSpacePDFRatioModel.
Args:
ana (:class:`analysis.SubAnalysis`): the sub analysis
src (:class:`utils.Sources`): the source list. required keys: ``ra`` and ``dec``.
optional keys: ``weight``, ``extension``
bg_param: the background space PDF parameterization
acc_param: the signal acceptance parameterization
cut_n_sigma (float): number of sigmas away from a source an event must be within
to be included
sigsub (bool): whether signal subtraction will be enabled
"""
self.ana, self.src = ana, src
self.bg_param = bg_param = bg_param or ana.bg_space_param
self.acc_param = acc_param = acc_param or ana.acc_param
self._acc_model = self.AccModel(src, acc_param, ana.livetime)
self.cut_n_sigma = cut_n_sigma
self.cut_pdf_threshold = stats.norm.sf(self.cut_n_sigma)
self.kent_min = kent_min
self.sigsub = sigsub
self.keep = 'ra dec sigma'.split()
for k in bg_param.keep:
if k not in self.keep:
self.keep.append(k)
self.mc_pdf = mc_pdf
self.fits = {}
self.param_names = []
if mc_pdf:
print('Setting up {}...'.format(ana.key))
gammas = self.ana.gammas
bins = np.r_[
0 : 1 : .1,
1 : self.cut_n_sigma : 0.25,
self.cut_n_sigma : 10 * self.cut_n_sigma + 1e-5 : 1.
]
self.splines = []
for source in src:
sd = np.sin(source.dec[0])
ev_sd = np.sin(ana.sig.dec)
sig = ana.sig[((sd - 0.05) <= ev_sd)
& (ev_sd <= sd + 0.05)]
dpsi = np.hypot(sig.xra, sig.xdec)
hs = []
for gamma in gammas:
h = hl.hist(
dpsi / sig.sigma,
sig.oneweight * sig.true_energy**-gamma,
bins=bins)
h = h.normalize(density=True)
h /= (2 * pi * h.centers[0])
#h = h.gaussian_filter(1)
hs.append(h)
values = np.array([h.values for h in hs])
dgamma = gammas[1] - gammas[0]
bins = np.r_[gammas - 0.5*dgamma, gammas[-1] + 0.5*dgamma]
bins = bins, hs[0].bins[0]
h = hl.Hist(bins, values)
spline = h.spline_fit(kx=2, ky=2, log=True)
self.splines.append(spline)
[docs]
def set_ra(self, ra):
self.src['ra'] = ra
@property
def acc_model(self):
return self._acc_model
[docs]
def get_updated(self, evs):
self = copy(self)
for ev in evs[1:]:
self.bg_param = self.bg_param + ev
return self
[docs]
class AccModel(AccModel):
"""Acceptance model for PointSourceSpacePDFRatioModel.
This acceptance model evaluates the time-independent acceptance for each source,
weights these by the per-source intrinsic weights, and finally multiplies by the
livetime.
"""
[docs]
def __init__(self, src, acc_param, livetime):
self.src, self.acc_param, self.livetime = src, acc_param, livetime
[docs]
def get_acc_per_source(self, **params):
self.acc_param = np.atleast_1d(self.acc_param)
if len(self.acc_param) ==1:
return self.livetime * self.src.weight * self.acc_param[0](self.src, **params)
acc_params = []
# for sources with custom or mixed types of fluxes, better to handle separately
for weight, acc_param, src in zip(self.src.weight, self.acc_param, self.src):
acc_params.append(self.livetime * weight * acc_param(src, **params))
return acc_params
[docs]
def __call__(self, ev, i=(None, None)):
i_ev, i_src = i
if self.mc_pdf:
return MCPointSourceSpacePDFRatioEvaluator(ev, self, (i_ev, i_src))
else:
return PointSourceSpacePDFRatioEvaluator(ev, self, (i_ev, i_src))
[docs]
class FitPointSourceSpacePDFRatioModel(AccWeightedPDFRatioModel):
"""Space PDF ratio model that fits the sources.
This model, which is implemented internally in terms of PointSourceSpacePDFRatioModel,
adds per source cooordinates :math:`(\alpha,\delta)` to the fitter parameters. This
is mainly useful for finding the true hottest spot in a small, already-identified
region, in which case the user can ask for such refinement without mentioning this
class explicitly by name.
Do not expect good performance for multiple sources. This class may be broken if a time PDF
ratio model is used.
"""
[docs]
def __init__(self, ana, src,
bg_param=None, acc_param=None,
cut_n_sigma=5, cut_extension=np.radians(3),
sigma=None,
sigsub=False):
"""Construct a FitPointSourceSpacePDFRatioModel
Args:
ana (:class:`analysis.SubAnalysis`): the sub analysis
src (:class:`utils.Sources`): the source list. required keys: ``ra`` and ``dec``.
optional keys: ``weight``, ``extension``
bg_param: the background space PDF parameterization
acc_param: the signal acceptance parameterization
cut_n_sigma (float): number of sigmas away from a source an event must be within
to be included
cut_extension (float): additional Gaussian smearing width(in radians) to apply
before pre-computing band and/or box cuts
sigma (float): offset(in radians) of additional seed grid points in (ra,dec)
sigsub (bool): whether signal subtraction will be enabled
"""
self.ana, self.src = ana, src
self.n_src = len(src)
self.livetime = ana.livetime
self.bg_param = bg_param or ana.bg_space_param
self.acc_param = acc_param or ana.acc_param
self._acc_model = self.AccModel(self)
self.cut_n_sigma = cut_n_sigma
self.cut_extension = cut_extension
self.cut_pdf_threshold = stats.norm.sf(self.cut_n_sigma)
self.sigsub = sigsub
self.fits = fits = {}
self.param_names = param_names = []
sigma = np.radians(0.25) if sigma is None else sigma
for i in xrange(self.n_src):
ra, dec = src.ra[i], src.dec[i]
kra, kdec = 'ra_{:04d}'.format(i), 'dec_{:04d}'.format(i)
seed_ra, seed_dec = ra, dec
if sigma and not ana.spec.cascades:
seed_ra = np.r_[ra - sigma, ra, ra + sigma]
seed_dec = np.r_[dec - sigma, dec, dec + sigma]
fits[kra] = tuple(np.r_[ra-pi, seed_ra, ra+pi])
fits[kdec] = tuple(np.r_[-pi/2, seed_dec, pi/2])
param_names += [kra, kdec]
self.keep = 'ra dec sigma'.split()
for k in bg_param.keep:
if k not in self.keep:
self.keep.append(k)
@property
def acc_model(self):
return self._acc_model
[docs]
class AccModel(AccModel):
"""Acceptance model for FitPointSourceSpacePDFRatioModel.
This acceptance model functions similarly to
:class:`PointSourceSpacePDFRatioModel.AccModel`, but it requires the per-source
coordinates to be specified in order to perform the evaluation.
"""
[docs]
def __init__(self, model):
self.model = model
self.weight = model.src.weight
self.acc_param = model.acc_param
self.livetime = model.ana.livetime
[docs]
def get_acc_per_source(self, **params):
src = self.model.params_to_src(self.model.src, **params)
return self.livetime * src.weight * self.acc_param(src, **params)
[docs]
def __call__(self, ev, i=(None, None)):
i_ev, i_src = i
return FitPointSourceSpacePDFRatioEvaluator(ev, self, (i_ev, i_src))
[docs]
@staticmethod
def params_to_src(src, **params):
"""Extract source coordinate fit parameters into a source list.
Args:
src (:class:`utils.Sources`): base source list(may include weights and extensions)
Returns:
:class:`utils.Sources`: the source list
"""
src = deepcopy(src)
n_src = len(src)
ra, dec = src.ra, src.dec
for i in xrange(n_src):
ra[i] = params['ra_{:04d}'.format(i)]
dec[i] = params['dec_{:04d}'.format(i)]
src['ra'], src['dec'] = ra, dec
return src
[docs]
@staticmethod
def src_to_params(src):
"""Convert a source list into a fit parameters mapping.
Args:
src (:class:`utils.Sources`): source list
Returns:
dict: the fit parameters mapping
"""
params = {}
for i in xrange(n_src):
params['ra_{:04d}'.format(i)] = src.ra[i]
params['dec_{:04d}'.format(i)] = src.dec[i]
return params
[docs]
class TemplateSpacePDFRatioModel(AccWeightedPDFRatioModel):
"""Template space PDF ratio model.
This class implements a template space PDF ratio using healpy [1]. The template
should be specified either as a 1D array of shape ``(npix,)`` or as a 2D array of
shape ``(npix,nEbins)``. In the former case, a spectrum must be given, wheres in the
latter case the energy bin edges must also be given. Because the Gaussian smoothing
calculations can be computationally intense(despite the relatively efficient
implementation in healpy), it is possible to specify a directory in which to keep a
cache.
If per-pixel binned spectra are given, the multiplication of the map by the signal
acceptance is computed in declination bands using the signal MC directly unless
requested otherwise(in which case the pixel-averaged spectrum is used).
[1] https://healpy.readthedocs.io/en/latest/
"""
fits = {}
param_names = []
[docs]
def __init__(self, ana, template, bg_param=None, acc_param=None,
flux=None, bins_energy=None, sigmas=None,
cut_n_sigma=5, sigsub=False, fast_weight=False,
sindec_bandwidth=np.radians(1), dir='', quiet=False,
floor=1e-12, keep_pdfs=False):
"""Construct a TemplateSpacePDFRatioModel.
Args:
ana (:class:`analysis.SubAnalysis`): the sub analysis
template (ndarray): (npix,) array of per-pixel weights, or (npix,nEbins) array of
per-pixel and per-energy-bin dN/dE values
bg_param: the background space PDF parameterization
acc_param: the signal acceptance parameterization
flux (:class:`hyp.Flux`): the signal spectrum
bins_energy (kind): the energy bin left edges
sigmas (ndarray): if given, the Gaussian smoothing grid to consider. if not given,
reasonable defaults are chosen depending on whether ``ana`` includes tracks or
cascades
cut_n_sigma (float): number of sigmas away from a source an event must be within
to be included
sigsub (bool): whether signal subtraction will be enabled
fast_weight (bool): whether to compute the acceptance-weighted maps very fast
using the ``acc_param``; otherwise, by default small declination bands of signal
MC are used for each row of constant declination pixels
sindec_bandwidth (float): width of declination bands for computing the
acceptance-weighted map.
dir (str): path to a directory in which to cache this template's state(filename
will include ``ana.key``)
quiet (bool): if true, suppress status printouts during construction
floor (float): minimum allowed value for the normalized template and smoothed
signal space PDFs
keep_pdfs (bool): when have more pdfs have option of keep bkg pdfs for custom fluxes
"""
from .hyp import BinnedFlux
# save inputs
self.ana = ana
self.flux, self.bins_energy = flux, bins_energy
self.bg_param = bg_param = bg_param or ana.bg_space_param
self.acc_param = acc_param = acc_param or ana.acc_param
self.sigsub = sigsub
self.cut_n_sigma, self.fast_weight = cut_n_sigma, fast_weight
self.cut_pdf_threshold = stats.norm.sf(self.cut_n_sigma)
self.sindec_bandwidth = sindec_bandwidth
# TODO: should have a more robust way to get dec range
self.min_sindec, self.max_sindec = self.bg_param.range
self.floor = floor
self.keep_pdfs = keep_pdfs
self.template = template = np.maximum(
template, template[template > 0].min() * self.floor)
self.per_pixel_flux = (len(self.template.shape) == 2)
self.dir, self.quiet = dir, quiet
self._check_template()
# note healpy map parameters
self.npix = npix = template.shape[0]
self.nside = nside = hp.npix2nside(npix)
self.pixarea = pixarea = hp.nside2pixarea(nside)
# if per-pixel flux given, calculate total flux
if self.per_pixel_flux:
self.flux = BinnedFlux(bins_energy, np.sum(self.template, axis=0) * pixarea)
# save pixel coordinate arrays
pix_zenith, self.pix_ra = hp.pix2ang(self.nside, np.r_[:self.npix])
self.pix_dec = pi/2 - pix_zenith
# calculate space-only template
if len(template.shape) == 2:
#self.template_space = np.sum(self.template, axis=1)
self.template_space = np.sum(template[:,1:] * np.diff(bins_energy), axis=-1)
else:
self.template /= self.template.sum() * pixarea
self.template_space = self.template
self.src = src = utils.Sources(
ra=self.pix_ra, dec=self.pix_dec,
weight=self.template_space / self.template_space.sum())
self._acc_model = self.AccModel(
src, acc_param, ana.livetime,
sindec_range=(self.min_sindec, self.max_sindec))
if sigmas is None:
if ana.ds.cascades:
sigmas = np.radians(np.r_[3:20, 20:40:2, 40:60:4, 60:91:5])
else:
sigmas = np.radians(np.r_[0.5:3.01:.125])
self.sigmas = np.unique(np.r_[0, sigmas])
# set up PDFs
self.keep = 'ra dec sigma'.split()
if not self._load_pdfs():
self._calculate_pdfs()
for k in bg_param.keep:
if k not in self.keep:
self.keep.append(k)
[docs]
def get_updated(self, evs):
self = copy(self)
for ev in evs[1:]:
self.bg_param = self.bg_param + ev
return self
@property
def acc_model(self):
return self._acc_model
[docs]
class AccModel(AccModel):
[docs]
def __init__(self, src, acc_param, livetime, sindec_range):
self.src, self.acc_param, self.livetime = src, acc_param, livetime
min_sindec, max_sindec = sindec_range
# get sources at unique decs
udec = np.unique(self.src.dec)
sin_udec = np.sin(udec)
udec = udec[(min_sindec <= sin_udec) & (sin_udec <= max_sindec)]
self.uniq_dec_src = utils.Sources(dec=udec)
self.idx_to_uniq = np.searchsorted(self.uniq_dec_src.dec, self.src.dec)
[docs]
def get_acc_per_source(self, **params):
# following is per-pixel rather than per-dec (non-accelerated version)
#return self.livetime * self.src.weight * self.acc_param(self.src, **params)
uniq_dec_acc = self.acc_param(self.uniq_dec_src, **params)
acc = uniq_dec_acc[self.idx_to_uniq]
return self.livetime * self.src.weight * acc
[docs]
def __call__(self, ev):
return TemplateSpacePDFRatioEvaluator(ev, self)
def _check_template(self):
# check template, flux, bins_energy for self-consistency
flux, bins_energy = self.flux, self.bins_energy
if self.per_pixel_flux:
if flux is not None:
raise ValueError(
'cannot specify `flux` when template include per-pixel flux')
if bins_energy is None:
raise ValueError(
'must specify `bins_energy` when template includes per-pixel flux')
else:
if flux is None:
raise ValueError(
'must specify `flux` when template does not include per-pixel flux')
if bins_energy is not None:
raise ValueError(
'cannot specify `bins_energy` when template does not include per-pixel flux')
def cache_filename(self, dir):
return '{}/{}.template{}.npy'.format(
dir, self.ana.key, '.fastweight' if self.fast_weight else '')
def _load_pdfs(self):
dir = self.dir
filename = self.cache_filename(dir)
if not os.path.isfile(filename):
return False
print('<-', filename, '...', end='')
sys.stdout.flush()
state = np.load(filename, allow_pickle=True, encoding='latin1')[()]
print('\r<-', filename, ' ')
sys.stdout.flush()
try:
template = state['template']
template_space = state['template_space']
sigmas_deg = state['sigmas_deg']
if not np.all(np.isclose(template, self.template)):
print('Restore failed: template mismatch')
return False
if not np.all(np.isclose(template_space, self.template_space)):
print('Restore failed space-only template mismatch')
return False
if not np.all(np.isclose(sigmas_deg, np.degrees(self.sigmas))):
print('Restore failed: sigma mismatch')
return False
self.pix_probs_normed = state['pix_probs_normed']
self.pdf_space_sig = state['pdf_space_sig']
self.pdf_space_sigsub = state['pdf_space_sigsub']
except KeyboardInterrupt:
raise
except Exception as e:
print('Restore failed:', e)
return False
print('Restore successful.')
return True
def _save_pdfs(self):
dir = self.dir
if not dir:
return
utils.ensure_dir(dir)
state = dict(
template = self.template,
template_space = self.template_space,
sigmas_deg = np.degrees(self.sigmas),
pix_probs_normed = self.pix_probs_normed,
pdf_space_sig = self.pdf_space_sig,
pdf_space_sigsub = self.pdf_space_sigsub,
)
filename = self.cache_filename(dir)
print('->', filename, '...', end='')
sys.stdout.flush()
np.save(filename, state)
print('\r->', filename, ' ')
sys.stdout.flush()
def save(self, dir):
dir, self.dir = self.dir, dir
self._save_pdfs()
#dir, self.dir = self.dir, dir
@property
def pix_ra_dec(self):
# "zenith" here with "celestial" coordinates:
# zenith=0 is the north pole
return self.pix_ra, self.pix_dec
def get_mask_and_i_energy(self, E):
bins_energy = self.bins_energy
assert bins_energy is not None
mask = (bins_energy[0] < E) & (E < bins_energy[-1])
i_energy = np.searchsorted(bins_energy, E) - 1
i_energy[i_energy < 0] = 0
return mask, i_energy
def _calculate_pdfs(self):
from .hyp import PowerLawFlux
template, template_space = self.template, self.template_space
flux, bins_energy = self.flux, self.bins_energy
sigmas = self.sigmas
npix, nside, pixarea = self.npix, self.nside, self.pixarea
min_sindec, max_sindec = self.min_sindec, self.max_sindec
pix_ra, pix_dec = self.pix_ra_dec
pix_sindec = np.sin(pix_dec)
dec_mask = (min_sindec <= pix_sindec) & (pix_sindec <= max_sindec)
pix_sindec = np.sin(pix_dec)
# first calculate acceptance-weighted map
pix_weight = np.zeros_like(pix_dec)
prefix = '{:25} | '.format(self.ana.key)
if self.fast_weight:
sig = self.ana.sig
if not self.quiet:
print('{}Computing acceptance-weighted template...'.format(prefix), end='')
sys.stdout.flush()
# parameterization for whole sky given flux
acc_param = self.ana.get_custom_flux_acc_parameterization(flux)
# acc weight
src_before_acc = utils.Sources(ra=pix_ra[dec_mask], dec=pix_dec[dec_mask])
acc_orig = np.zeros_like(template_space)
acc_orig[dec_mask] = acc_param(src_before_acc)
# acc + spatial template weight
pix_weight = self.ana.livetime * template_space * acc_orig
else:
for dec in reversed(np.unique(pix_dec)):
if not (min_sindec <= np.sin(dec) <= max_sindec):
continue
pixels = np.where(pix_dec == dec)[0]
src = utils.Sources(dec=dec, ra=0)
if not self.quiet:
print('\r{}Acceptance weighting dec = {:.3f} deg...'.format(
prefix, src.dec_deg[0]), end='')
if bins_energy is None:
# weight whole dec "row" to given flux
psi = inj.PointSourceInjector(
self.ana, src, flux,
sindec_bandwidth=self.sindec_bandwidth,
keep=self.keep)
ev_weights = psi.src_weights[0]
pix_weight[pixels] = ev_weights * template[pixels]
else:
# weight dec band to E^0, i.e. no spectrum yet
psi = inj.PointSourceInjector(
self.ana, src, PowerLawFlux(0),
sindec_bandwidth=self.sindec_bandwidth,
keep=self.keep)
ev_weights = psi.weights[0]
ev_indices = psi.indices[0]
# find events within range of energy bins, discard the rest
ev_true_energy = psi.sig.true_energy[ev_indices]
ev_mask, i_energy = self.get_mask_and_i_energy(ev_true_energy)
ev_weights = ev_weights[ev_mask]
i_energy = i_energy[ev_mask]
# simultaneously weight each MC event
# to the flux of each pixel in the "row"
# and then sum over events to get per-pixel weights
pix_weight[pixels] = np.sum(
ev_weights[None] * template[pixels][:,i_energy], axis=-1)
if not self.quiet:
print('\r{}Acceptance weighting complete.'.format(prefix) + 20 * ' ')
# normalize
pix_weight *= pixarea
pix_probs = pix_weight / np.sum(pix_weight)
self.pix_probs_normed = pix_probs / pixarea
pix_probs_normed = self.pix_probs_normed.copy()
# then calculate smeared maps
mask = (template_space > 0) & (pix_probs_normed <= 0)
pix_probs_normed[mask] = hp.UNSEEN
pdf_space_sig = []
pdf_space_sigsub = []
prefix = '{:25} | '.format(self.ana.key)
for sigma in sigmas:
print('\r{}Smearing with sigma = {:.3f} deg...'.format(
prefix, np.degrees(sigma)), end='')
sys.stdout.flush()
pdf_sig = pix_probs_normed.copy()
if sigma:
pdf_sig = hp.smoothing(pdf_sig, sigma=sigma)
pdf_sig[mask] = 0
pdf_sig[pdf_sig <= self.floor] = self.floor
pdf_sig /= np.sum(pdf_sig[dec_mask]) * pixarea
pdf_sigsub = np.concatenate([
np.repeat(np.mean(pdf_sig[pix_dec == dec]),
np.sum(pix_dec == dec))
for dec in reversed(np.unique(pix_dec)) ])
pdf_sigsub /= np.sum(pdf_sigsub) * pixarea
pdf_space_sig.append(pdf_sig)
pdf_space_sigsub.append(pdf_sigsub)
print('\r{}Smearing complete.'.format(prefix) + 30 * ' ')
sys.stdout.flush()
self.pdf_space_sig = np.array(pdf_space_sig)
self.pdf_space_sigsub = np.array(pdf_space_sigsub)
self._save_pdfs()
[docs]
class PriorSpacePDFRatioModel(PointSourceSpacePDFRatioModel):
r"""Space PDF ratio model for one or more point or point-like sources whose
position is described by a spatial prior
The source list should contain the healpix prior for each of the sources.
"""
[docs]
def __init__(self, ana, src, bg_param=None, acc_param=None, cut_n_sigma=5,
sigsub=False, mc_pdf=False, kent_min=np.radians(7)):
super(PriorSpacePDFRatioModel, self).__init__(
ana, src, bg_param=bg_param, acc_param=acc_param,
cut_n_sigma=cut_n_sigma, sigsub=sigsub, mc_pdf=mc_pdf,
kent_min=kent_min)
# Fit source pos with same conf (but using ps for "space")
from .conf import get_spatial_prior_trial_runner, CONF
Configuration = CONF.copy()
Configuration["space"] = "ps"
self.sptrs_conf = Configuration
self.sptrs = []
for src in self.src:
prior = src.prior[0]
if( prior is None ):
self.sptrs.append( None )
else:
# Needs to be modified if using more than one subanalysis!
self.sptrs.append( get_spatial_prior_trial_runner(
conf=Configuration, src_tr=src, llh_priors=prior.map,
refine_max=False) )
[docs]
def __call__(self, ev, i=(None, None)):
i_ev, i_src = i
return PriorSpacePDFRatioEvaluator(ev, self, (i_ev, i_src))
[docs]
class AccModel(AccModel):
"""Acceptance model for PriorSpacePDFRatioModel.
This acceptance model evaluates the time-independent acceptance for each source,
weights these by the per-source weights given the fitted positions, and finally
multiplies by the livetime.
"""
[docs]
def __init__(self, src, acc_param, livetime):
self.src, self.acc_param, self.livetime = src, acc_param, livetime
[docs]
def get_acc_per_source(self, **params):
fit_src = deepcopy(self.src)
fit_src["dec"] = params.pop("dec")
return self.livetime * self.src.weight * self.acc_param(fit_src, **params)
[docs]
class TimePDFRatioModel(PDFRatioModel):
"""Base class for time PDF ratio models.
Time PDF ratio models must provide an additional method,
:meth:`TimePDFRatioModel.get_frac_during_livetime`, which gives the fraction of the
total integrated signal lightcurve that is covered by active livetime for a given
:class:`analysis.SubAnalysis`.
"""
keep = ['mjd']
[docs]
@abstractmethod
def get_frac_during_livetime(self, **params):
"""Get the fraction of the total lightcurve covered by a given dataset.
Returns:
float: fraction of the total integrated signal lightcurve that is covered by
active livetime for a given :class:`analysis.SubAnalysis`
"""
pass
[docs]
class UntriggeredTimePDFRatioModel(TimePDFRatioModel):
"""Untriggered flare time PDF ratio model.
This class implements a Gaussian or box-time-window flare signal hypothesis for which
the peak time and flare duration are free to float. Gaussian is the default, but
``box=True`` can easily be selected. The anchor time is the center of the flare by
default, but ``box_mode='pre'`` and ``box_mode='post'`` allow precursor and afterglow
fits, respectively. The user can specify the allowed range for each of these values.
In principle it is possible to fit for flares from multiple source candidates
simultaneously in a stacking analysis, though this is not well tested if at all.
"""
[docs]
def __init__(self, ana, src, t0_min=None, t0_max=None,
dt_min=1e-6/86400, dt_max=200,
livetime=None,
n_seeds_t0=6, n_seeds_dt=5,
box=False, box_mode='center',
use_grl=True,
cut_n_sigma=5):
"""Construct a UntriggeredTimePDFRatioModel.
Args:
ana (:class:`analysis.SubAnalysis`): the sub analysis
src (:class:`utils.Sources`): the source list(this class just needs to know it's length)
t0_min (float): earliest allowed t0 (default: time of first event in the sub
subanalysis
t0_max (float): latest allowed t0 (default: time of first event in the sub
subanalysis
dt_min (float): smallest allowed dt
dt_max (float): largest allowed dt
n_seed_t0 (int): number of t0 seeds to try
n_seeds_dt (int): number of dt seeds to try
cut_n_sigma (float): number of sigmas away from a source an event must be within
to be included
"""
self.ana, self.src = ana, deepcopy(src)
self.n_src = len(self.src)
if self.n_src >= 2:
#raise NotImplementedError('time-dep untriggered flare fitting not yet implemented')
# TODO: true multiflare stacking probably doesn't work yet
# this needs to be tested, but in the mean time...
# by allowing this to proceed we enable MultiTrialRunner use cases
pass
mjd = ana.data.mjd
if t0_min is None:
t0_min = np.min(mjd)
if t0_max is None:
t0_max = np.max(mjd)
self.t0_min, self.t0_max = t0_min, t0_max
if dt_max is None:
dt_max = t0_max - t0_min
self.dt_min, self.dt_max = dt_min, dt_max
self.mjd_min, self.mjd_max = np.min(mjd), np.max(mjd)
self.livetime = livetime or ana.livetime
self.grl = ana.grl if use_grl else None
self.cut_n_sigma = cut_n_sigma
self.cut_pdf_threshold = stats.norm.sf(self.cut_n_sigma)
self.box = box
if box:
if box_mode not in 'pre center post'.split():
raise ValueError('invalid box mode: "{}"'.format(box_mode))
self.box_mode = box_mode
else:
self.box_mode = None
self.fits = fits = {}
self.param_names = param_names = []
for i in xrange(self.n_src):
if self.n_src == 1:
pad = ''
else:
pad = '_{:04d}'.format(i)
kt0, kdt = 't0{}'.format(pad), 'dt{}'.format(pad)
fits[kt0] = tuple(np.linspace(t0_min, t0_max, n_seeds_t0 + 2))
fits[kdt] = tuple(np.linspace(dt_min, dt_max, n_seeds_dt + 2))
param_names += [kt0, kdt]
[docs]
def __call__(self, ev):
return UntriggeredTimePDFRatioEvaluator(ev, self)
[docs]
@staticmethod
def get_t0_dt(self=None, n_src=None, box_mode='center', **params):
"""Extract timing parameters as aligned arrays ``(t0,dt)``
Returns:
tuple of (ndarray,ndarray): the t0 and dt arrays
"""
# first handle explicit start and stop times
if self is not None:
n_src, box_mode = self.n_src, self.box_mode
if 't1' in params and 't2' in params:
t0 = params['t1']
dt = params['t2'] - t0
if box_mode == 'center':
t0 = t0 + 0.5 * dt
elif box_mode == 'pre':
t0 = t0 + dt
return np.atleast_1d(t0), np.atleast_1d(dt)
# otherwise handle arrays or per-src params
if 't0' in params:
t0 = np.atleast_1d(params['t0'])
else:
t0 = np.array([params['t0_{:04d}'.format(i)] for i in xrange(n_src)])
if 'dt' in params:
dt = np.atleast_1d(params['dt'])
else:
dt = np.array([params['dt_{:04d}'.format(i)] for i in xrange(n_src)])
return t0, dt
def _get_frac_during_livetime_single(self, t0, dt, mjd1, mjd2):
t0 = t0[0]
dt = dt[0]
if self.box:
if self.box_mode == 'center':
t0 = t0 - 0.5 * dt
elif self.box_mode == 'pre':
t0 = t0 - dt
if t0 + dt < mjd1[0] or mjd2[-1] < t0:
return 0
else:
return cext.sum_pdf_time_box(mjd1, mjd2, t0, dt)
else:
return cext.sum_pdf_time_normal(mjd1, mjd2, t0, dt, self.cut_n_sigma)
def _get_frac_during_livetime_multi(self, t0, dt, mjd1, mjd2):
if self.box:
if self.box_mode == 'center':
t0 = t0 - 0.5 * dt
elif self.box_mode == 'pre':
t0 = t0 - dt
out = []
for i in xrange(self.n_src):
if self.box:
if t0 + dt < mjd1[0] or mjd2[-1] < t0:
out.append(0)
else:
out.append(cext.sum_pdf_time_box(mjd1, mjd2, t0, dt))
else:
out.append(cext.sum_pdf_time_normal(mjd1, mjd2, t0, dt, self.cut_n_sigma))
return np.array(out)
[docs]
def get_frac_during_livetime(self, t0_array=None, dt_array=None, **params):
box, box_mode = self.box, self.box_mode
# get start-stop times
grl = self.grl
if (grl is None) or (not params.get('use_grl', True)):
mjd1 = np.array([self.mjd_min])
mjd2 = np.array([self.mjd_max])
else:
mjd1, mjd2 = grl.start, grl.stop
# determine signal time PDF parameters
if t0_array is not None:
t0, dt = t0_array, dt_array
else:
t0, dt = self.get_t0_dt(self, **params)
if self.n_src == 1:
return self._get_frac_during_livetime_single(t0, dt, mjd1, mjd2)
else:
return self._get_frac_during_livetime_multi(t0, dt, mjd1, mjd2)
[docs]
class BinnedTimePDFRatioModel(TimePDFRatioModel):
"""Binned lightcurve time PDF ratio model.
This class implements a binned lightcurve signal time PDF. The PDFs are specified as
histlite histograms. By default the flux threshold is a fit parameter. In principle,
stacking of multiple sources should be supported, though this has not been well
tested, if at all. Note that in the stacking case, this implementation fits the
threshold for each source independently; some refactoring would be required to
constrain the thresholds to match.
Todo:
* probably want to (optionally?) correct
:meth:`BinnedTimePDFRatioModel.get_frac_during_livetime` to integrate only over true
livetime.
"""
[docs]
def __init__(self, ana, src, lcs,
n_seeds_thresh=10,
n_seeds_lag=10,
range_lag=(-1,1),
thresh_seeding='quantiles',
lag_seeding = 'uniform',
use_grl=True,
cut_n_sigma=None,thresh_seeds=None,lag_seeds=None):
"""Construct a BinnedTimePDFRatioModel
Args:
ana (:class:`analysis.SubAnalysis`): the sub analysis
src (:class:`utils.Sources`): the source list(this class just needs to know it's length)
lcs (array of :class:`histlite.Hist`): the lightcurves. values are flux; bin edges
are in units of MJD
n_seeds_thresh (int): number of flux threshold seeds to try
n_seeds_lag (int): number of lag seeds to try
range_lag (tuple of (float, float)): range of lag times to fit
thresh_seeding (str): one of 'quantiles' (recommended), 'log', 'linear','custom';
determines how threshold seeds are selected
lag_seeding (str): one of 'uniform','custom';
determines how lag seeds are selected
cut_n_sigma (float): ignored (but present for consistency with other time models)
thresh_seeds (list): list of threshold seeds if thresh_seeding is 'custom'
lag_seeds (list): list of lag seeds if lag_seeding is 'custom'
"""
lcs = np.atleast_1d(lcs)
self.ana, self.src, self.lcs = ana, src, lcs
self.grl = ana.grl
self.use_grl = use_grl
self.n_src = len(src)
assert self.n_src == len(lcs)
mjd = ana.data.mjd
self.mjd_min, self.mjd_max = np.min(mjd), np.max(mjd)
self.livetime = ana.livetime
self.fits = fits = {}
self.param_names = param_names = []
for i in xrange(self.n_src):
if self.n_src == 1:
pad = ''
else:
pad = '_{:04d}'.format(i)
kthresh, klag = 'thresh{}'.format(pad), 'lag{}'.format(pad)
# threshold
#lc_top = lcs[i].values.max()
lc_top = np.sort(deepcopy(lcs[i].values))[-2]
if thresh_seeding == 'quantiles':
hlc = hl.kde(lcs[i].values, lcs[i].volumes)
threshs = [hlc.contain(0, q).values
for q in np.linspace(0, .999, n_seeds_thresh)]
elif thresh_seeding == 'log':
lc_thresh_max = .99 * lc_top
log10_lc_thresh_max = np.log10(lc_thresh_max)
threshs = np.logspace(
log10_lc_thresh_max - 3, log10_lc_thresh_max, n_seeds_thresh)
elif thresh_seeding == 'linear':
threshs = np.linspace(
0, .99 * lcs[i].values.max(), n_seeds_thresh)
elif thresh_seeding == 'custom':
threshs = thresh_seeds
#fits[kthresh] = tuple(np.r_[0, threshs, .9999 * lc_top])
fits[kthresh] = tuple(np.r_[0, threshs, lc_top])
self.param_names += [kthresh]
# lag
if lag_seeding == 'uniform':
fits[klag] = tuple(np.r_[
range_lag[0],
np.linspace(range_lag[0], range_lag[1], n_seeds_lag),
range_lag[1]])
elif lag_seeding == 'custom':
fits[klag] = tuple(np.r_[range_lag[0],lag_seeds,range_lag[1]])
self.param_names += [klag]
[docs]
def __call__(self, ev):
return BinnedTimePDFRatioEvaluator(ev, self)
[docs]
def get_threshs(self, **params):
"""Extract per-source thresholds.
Returns:
ndarray of float: the flux thresholds for each source
"""
n_src = self.n_src
if 'thresh' in params:
threshs = np.atleast_1d(params['thresh'])
else:
threshs = np.array([params['thresh_{:04d}'.format(i)] for i in xrange(n_src)])
return threshs
[docs]
def get_lags(self, **params):
"""Extract per-source lagolds.
Returns:
ndarray of float: the flux lags for each source
"""
n_src = self.n_src
if 'lag' in params:
lags = np.atleast_1d(params['lag'])
else:
lags = np.array([params['lag_{:04d}'.format(i)] for i in xrange(n_src)])
return lags
[docs]
def get_lc_pdfs(self, threshs_array=None, lags_array=None, **params):
"""Generate normalized per-source time PDFs from lightcurves, given the per-source
thresholds.
Args:
threshs_array (ndarray of float): the thresholds
lags_array (ndarray of float): the lags in MJD
params: if ``threshs_array`` and/or lags_array are not given, they are
extracted from the fit parameters which are given as kwargs
Returns:
ndarray of :class:`histlite.Hist`: the signal time PDFs
"""
if threshs_array is None:
threshs = self.get_threshs(**params)
else:
threshs = threshs_array
if lags_array is None:
lags = self.get_lags(**params)
else:
lags = lags_array
# normalize manually to avoid higher-complexity checks
# in histlite's builtin normalization routines
pdfs = [
hl.Hist([lc.bins[0] + lag], np.maximum(0, lc.values - thresh))
for (lc, thresh, lag) in izip(self.lcs, threshs, lags) ]
for pdf in pdfs:
pdf._log = np.array([False])
norms = np.array([np.sum(np.diff(pdf.bins) * pdf.values) for pdf in pdfs ])
return [pdf / norm for (pdf, norm) in izip(pdfs, norms)]
[docs]
def get_lc_pdfs_cropped(self, **params):
"""Get cropped signal time PDFs spanning only the livetime of the originally
specified sub analysis.
Crop to sub analysis, or edge of lc pdf
Returns:
ndarray of :class:`histlite.Hist`: the cropped signal time PDFs
The PDFs are not renormalized after being cropped.
"""
uncropped_pdfs = self.get_lc_pdfs(**params)
# Need to crop pdf down to correct time bounds, bins and values.
new_pdfs = [pdf for pdf in uncropped_pdfs ]
pdf_list = []
for pdf in new_pdfs:
bins = pdf.bins
values = pdf.values
new_start = max(self.mjd_min, pdf.bins[0][0])
new_end = min(self.mjd_max, pdf.bins[0][-1])
new_bins = []
new_vals = []
if math.isclose(pdf.bins[0][0], new_start, rel_tol = 1e-6):
new_bins.append( pdf.bins[0][0])
new_vals.append( pdf.values[0])
else:
new_bins.append( new_start)
for i in range(len(bins[0]) - 1):
if pdf.bins[0][i] < new_start < pdf.bins[0][i + 1]:
new_vals.append( pdf.values[i] )
for i in range(1, len(bins[0]) - 1): # First/last index case handled outside
if new_start < pdf.bins[0][i] < new_end:
new_bins.append(pdf.bins[0][i])
new_vals.append(pdf.values[i])
if math.isclose(pdf.bins[0][-1], new_end, rel_tol = 1e-6):
new_bins.append( pdf.bins[0][-1])
else:
new_bins.append( new_end)
for i in range(len(bins[0]) - 1):
if pdf.bins[0][i] < new_end < pdf.bins[0][i + 1]:
new_vals[-1] = pdf.values[i]
bins[0] = new_bins
values = new_vals
pdf_list.append( hl.Hist(new_bins, new_vals) )
return pdf_list
[docs]
def get_frac_during_livetime(self, **params):
#pdfs = self.get_lc_pdfs_cropped(**params)
pdfs = self.get_lc_pdfs(**params)
grl = self.grl
values = {}
for pdf in pdfs:
t, p = pdf.bins[0], pdf.values
dt = np.diff(t)
v = np.r_[0, (p * dt).cumsum()]
values[pdf] = v
def cdf(mjd, pdf):
v = values[pdf]
bins = pdf.bins[0]
return np.interp(mjd, bins, v, left=0, right=1)
def frac(pdf, start, stop):
if (not len(pdf.values)) or (pdf.values.max() == 0):
return 0
else:
return np.sum(cdf(stop, pdf) - cdf(start, pdf))
if self.use_grl:
fracs = np.array([frac(pdf, grl.start, grl.stop) for pdf in pdfs])
else:
fracs = np.array([frac(pdf, self.mjd_min, self.mjd_max) for pdf in pdfs])
return fracs
[docs]
class TransientTimePDFRatioModel(TimePDFRatioModel):
fits = {}
param_names = []
_root_2pi = np.sqrt(2*pi)
[docs]
def __init__(self, ana, src, use_grl=True, cut_n_sigma=4):
self.ana, self.src = ana, src
self.n_src = n_src = len(src)
if( np.isinf(cut_n_sigma) ):
# Set to inf for injection over full sky, but obviously not needed here
cut_n_sigma = 5
self.cut_n_sigma = cut_n_sigma
self.cut_pdf_threshold = stats.norm.sf(cut_n_sigma)
src_mjd, sigma_t, t_100 = src.mjd, src.sigma_t, src.t_100
self.mjd_min, self.mjd_max = ana.mjd_min, ana.mjd_max
self.src_mjd_min = src_mjd_min = src_mjd - cut_n_sigma * sigma_t
self.src_mjd = src_mjd
self.src_mjd_t_100 = src_mjd_t_100 = src_mjd + t_100
self.src_mjd_max = src_mjd_max = src_mjd_t_100 + cut_n_sigma * sigma_t
self.src_duration = src_duration = src_mjd_max - src_mjd_min
self.src_max_pdf_ratio = src_duration / (t_100 + self._root_2pi * sigma_t)
self.src_gauss_norm = self._root_2pi * sigma_t / (t_100 + self._root_2pi * sigma_t)
self.avg_rate_Hz = avg_rate_Hz = ana.n_bg_data / ana.bg_livetime
self.avg_rate_perday = avg_rate_perday = avg_rate_Hz * 86400
self.src_rate = src_rate = np.repeat(avg_rate_perday, n_src)
if use_grl and (ana.grl is not None):
# TODO
# verify this treatment and consciously choose
# whether to keep it or to migrate to the methods used for
# UntriggeredTimePDFRatioModel and BinnedTimePDFRatioModel
self.active_time_windows, self.active_time_per_source = self._get_active_time_windows()
src_bg_time = np.zeros((len(src)))
#used_intervals = []
for i, active_time in enumerate(self.active_time_per_source):
for interval in active_time:
# if ( interval not in used_intervals ):
src_bg_time[i] += interval[1] - interval[0]
# used_intervals.append(interval)
self.src_nb = src_nb = src_rate * src_bg_time
else:
self.src_nb = src_nb = src_rate * src_duration
self.nb = np.sum(src_nb)
self.has_sigma_t = np.any(sigma_t)
[docs]
def __call__(self, ev):
return TransientTimePDFRatioEvaluator(ev, self)
def _get_active_time_windows(self):
'''
Construct the union of time windows during which at least one of the sources
was active and the detector was taking data
'''
grl = self.ana.grl
grl_times = utils.union_continuous_intervals(zip(grl["start"], grl["stop"]))
GRB_times = utils.union_continuous_intervals(zip(self.src_mjd_min, self.src_mjd_max))
active_times = utils.intersection_time_intervals(grl_times, GRB_times)
active_time_per_source = []
for i, src_i in enumerate(self.src):
src_time = [(self.src_mjd_min[i], self.src_mjd_max[i])]
src_time = utils.intersection_time_intervals(src_time, active_times)
active_time_per_source.append(src_time)
return active_times, active_time_per_source
[docs]
def get_frac_during_livetime(self, **params):
if ( self.ana.grl is not None ):
fracs = []
for i, src_i in enumerate(self.src):
src_time = [(self.src_mjd_min[i], self.src_mjd_max[i])]
active_times = self.active_time_windows
src_time = utils.intersection_time_intervals(src_time, active_times)
fracs.append( sum( [t_max-t_min for t_min, t_max in src_time] )/(self.src_mjd_max[i]-self.src_mjd_min[i]) )
return np.array(fracs)
else:
# approximating this as in or out of the livetime
mjd_min, mjd_max, src = self.mjd_min, self.mjd_max, self.src
return np.where((mjd_min < src.mjd) & (src.mjd < mjd_max), 1., 0.)
[docs]
class SpaceTimePDFRatioModel(AccWeightedPDFRatioModel):
"""Space * Time acceptance weighted PDF ratio model.
This class implements a space * time PDF ratio model, where the acceptance weight is
computed by integrating the declination-dependent signal acceptance over the
per-source time PDFs.
"""
[docs]
def __init__(self, space_model, time_model, transient=False):
"""Consstruct a SpaceTimePDFRatioModel.
Args:
space_model (:class:`AccWeightedPDFRatioModel`): the space model(should be a
point-like source model, not a template model)
time_model (:class:`TimePDFRatioModel`): the time model
transient (bool): whether the time model should be understood as a transient
source model. TODO: why is this here? probably a never-finished idea for some
optimization?
Todo:
* why is transient here? probably a never-finished idea for some optimization?
"""
self.space_model = space_model
self.time_model = time_model
self.src = space_model.src
self.fits = deepcopy(space_model.fits)
self.fits.update(deepcopy(time_model.fits))
self.param_names = space_model.param_names + time_model.param_names
self.keep = space_model.keep + time_model.keep
self.transient = transient
self._acc_model = self.AccModel(self)
[docs]
def set_ra(self, ra):
self.space_model.set_ra(ra)
self.time_model.set_ra(ra)
@property
def acc_model(self):
return self._acc_model
def get_time_model(self) -> PDFRatioModel:
return self.time_model
[docs]
class AccModel(AccModel):
"""Acceptance model for SpaceTimePDFRatioModel.
This acceptance model corrects the acceptance given by the space model's
``acc_model``, accounting for the fraction of the total livetime covered by the
signal time PDF.
"""
[docs]
def __init__(self, spacetime_model):
self.spacetime_model = spacetime_model
self.space_model = space_model = spacetime_model.space_model
self.time_model = time_model = spacetime_model.time_model
self.base_acc_model = space_model.acc_model
self.livetime = self.base_acc_model.livetime
[docs]
def get_acc_per_source(self, **params):
base_acc = self.base_acc_model.get_acc_per_source(**params)
frac_during_livetime = self.time_model.get_frac_during_livetime(**params)
return base_acc * frac_during_livetime / self.livetime
[docs]
def __call__(self, ev):
return SpaceTimePDFRatioEvaluator(ev, self)
[docs]
def get_updated(self, evs):
space_model = self.space_model.get_updated(evs)
return type(self) (space_model, self.time_model, transient=self.transient)
[docs]
class PDFRatioEvaluator(object):
"""PDF ratio evaluator base class.
This is the base class for PDF ratio evaluators. Evaluators based on e.g. space,
energy, and/or time information must derive from this class. Subclasses apply
corresponding PDFRatioModels to a given set of events such that, once further given a
set of fit parameter kwargs, they can provide the S/B PDF ratios, as well as the
S-tilde/B scrambled signal PDF ratios used in the signal subtracting likelihood.
Users should not need to make direct, explicit use of any PDFRatioEvaluators; they are
created automatically by the trial running machinery in :mod:`trial`.
"""
__metaclass__ = ABCMeta
[docs]
@abstractmethod
def __call__(self, mask=None, **params):
pass
@abstractproperty
def n_excluded(self):
pass
[docs]
class AccWeightedPDFRatioEvaluator(PDFRatioEvaluator):
@abstractmethod
def get_kept_i_ev(self):
pass
@abstractmethod
def finalize_i_ev(self, i_ev=None):
pass
[docs]
class MultiPDFRatioEvaluator(AccWeightedPDFRatioEvaluator):
[docs]
def __init__(self, ev, model):
self.orig_ev = orig_ev = ev
self.multi_model = multi_model = model
self.keep_pdfs = model.keep_pdfs
self.acc_weighted_model = multi_model.acc_weighted_model
self.other_models = multi_model.other_models
self.acc_weighted_evaluator = acc_weighted_evaluator = self.acc_weighted_model(ev)
self.kept_i_ev = i_ev = acc_weighted_evaluator.get_kept_i_ev()
acc_weighted_evaluator.finalize_i_ev(i_ev)
self.ev = ev = orig_ev[i_ev]
self.other_evaluators = other_evaluators = [model(ev) for model in self.other_models]
self.evaluators = [acc_weighted_evaluator] + other_evaluators
self.n_selected = len(ev)
self._n_excluded = len(orig_ev) - self.n_selected
def get_kept_i_ev(self):
return self.kept_i_ev
def finalize_i_ev(self, i_ev=None):
if i_ev is not None:
raise NotImplementedError(
'please do not apply `finalize_i_ev`, which is for internal use, '
'to MultiPDFRatioEvaluator instance')
@property
def n_excluded(self):
return self._n_excluded
[docs]
def __call__(self, _mask=None, **params):
evaluators = self.evaluators
#ones = lambda dtype: np.ones(len(self.ev), dtype=float)
#W, W_sigsub = ones(self.n_selected), ones(self.n_selected)
if _mask is not None:
n_selected = np.sum(_mask)
else:
n_selected = self.n_selected
W, W_sigsub = evaluators[0](_mask=_mask, **params)
W = W.copy()
W_sigsub = W_sigsub.copy()
list_W, list_W_sigsub = [], []
#b_list = []
for evaluator in evaluators[1:]:
this_W, this_W_sigsub = evaluator(_mask=_mask, **params)
#b_list.append(this_W)
#this_W = this_W.copy()
#this_W_sigsub = this_W # not the acc-weighted one: W_sigsub should be equal to W?
if self.keep_pdfs==True:
tmp_W = np.multiply(W, this_W)
tmp_W_sigsub = np.multiply(W_sigsub, this_W_sigsub)
tmp_W = tmp_W.copy()
tmp_W_sigsub = tmp_W_sigsub.copy()
list_W.append(tmp_W)
list_W_sigsub.append(tmp_W_sigsub)
else:
np.multiply(W, this_W, out=W)
np.multiply(W_sigsub, this_W_sigsub, out=W_sigsub)
#W *= this_W
#W_sigsub *= this_W_sigsub
#for debugging or inspection:
#print('saving b_pdfs for ', b_list)
#np.save('./b_pdfs.npy', b_list)
if self.keep_pdfs==True:
return list_W, list_W_sigsub
return W, W_sigsub
[docs]
class PointSourceSpacePDFRatioEvaluator(AccWeightedPDFRatioEvaluator):
[docs]
def __init__(self, ev, model, i=(None,None)):
i_ev, i_src = i
self.ev, self.model = ev, model
self.src = model.src
self.n_ev, self.n_src = len(ev), len(self.src)
# TODO: confirm that sparse always works
# allow manual setting, and maybe only turn on
# by default for like >= 10 sources
self.sparse = self.n_src >= 2
self.bg_param = model.bg_param
self.acc_model = model.acc_model
self.cut_n_sigma = model.cut_n_sigma
self.cut_pdf_threshold = model.cut_pdf_threshold
self.kent_min = model.kent_min
self.sigsub = model.sigsub
self.pdf_bg = self.bg_param(ev)
self._evaluate_src(i_ev, i_src)
self._n_excluded = 0
def _evaluate_src(self, i_ev, i_src):
ev, src = self.ev, self.src
n_ev, n_src = self.n_ev, self.n_src
sigsub = self.sigsub
kent = cext.kent_1D_space_pdf
if i_ev is None:
sig_data, sigsub_data, row, column = kent(
ev.dec, ev.ra, ev.sigma,
src.dec, src.ra, src.extension,
sigsub, self.cut_n_sigma, self.cut_pdf_threshold,
self.kent_min)
else:
sig_data, sigsub_data, row, column = [[]], [[]], [[]], [[]]
for j in np.unique(i_src):
i = i_ev[j == i_src]
s, ss, r, c = kent(
ev.dec[i], ev.ra[i], ev.sigma[i],
src.dec[j:j+1], src.ra[j:j+1], src.extension[j:j+1],
sigsub, self.cut_n_sigma, self.cut_pdf_threshold,
self.kent_min)
sig_data.append(s)
sigsub_data.append(ss)
row.append(i[r])
column.append(np.repeat(j, len(s)))
sig_data = np.concatenate(sig_data)
sigsub_data = np.concatenate(sigsub_data)
row = np.concatenate(row).astype(int)
column = np.concatenate(column).astype(int)
self.i_ev, self.i_src = row, column
self.kept_i_ev = np.unique(self.i_ev)
if len(sig_data):
sig_data /= self.pdf_bg[row]
sigsub_data /= self.pdf_bg[row]
if self.sparse:
self.pdf_ratio_sig_ev_src = sparse.csr_matrix(
(sig_data, (row, column)), shape=(n_ev, n_src))
self.pdf_ratio_sigsub_ev_src = sparse.csr_matrix(
(sigsub_data, (row, column)), shape=(n_ev, n_src))
elif n_src >= 2:
pdf_ratio_sig_ev_src = np.zeros((n_ev,n_src))
pdf_ratio_sig_ev_src[row,column] = sig_data
pdf_ratio_sigsub_ev_src = np.zeros((n_ev,n_src))
pdf_ratio_sigsub_ev_src[row,column] = sigsub_data
self.pdf_ratio_sig_ev_src = pdf_ratio_sig_ev_src
self.pdf_ratio_sigsub_ev_src = pdf_ratio_sigsub_ev_src
else:
pdf_ratio_sig_ev_src = np.zeros(n_ev)
pdf_ratio_sig_ev_src[row] = sig_data
pdf_ratio_sigsub_ev_src = np.zeros(n_ev)
pdf_ratio_sigsub_ev_src[row] = sigsub_data
self.pdf_ratio_sig_ev_src = pdf_ratio_sig_ev_src
self.pdf_ratio_sigsub_ev_src = pdf_ratio_sigsub_ev_src
@property
def n_excluded(self):
return self._n_excluded
def get_kept_i_ev(self):
return self.kept_i_ev
def finalize_i_ev(self, i_ev=None):
if i_ev is not None:
self.pdf_ratio_sig_ev_src = self.pdf_ratio_sig_ev_src[i_ev]
self.pdf_ratio_sigsub_ev_src = self.pdf_ratio_sigsub_ev_src[i_ev]
self._n_excluded = self.n_ev - self.pdf_ratio_sig_ev_src.shape[0]
self.final_ev = self.ev[i_ev]
def evaluate_sparse(self, **params):
src = self.src
ratio_sig = self.pdf_ratio_sig_ev_src
ratio_sigsub = self.pdf_ratio_sigsub_ev_src
return ratio_sig, ratio_sigsub
[docs]
def __call__(self, _mask=None, **params):
ratio_sig, ratio_sigsub = self.evaluate_sparse(**params)
if self.n_src >= 2:
weight = self.acc_model.get_acc_per_source(**params)
weight_sum = np.sum(weight)
if self.sparse:
W_sig = np.ravel(ratio_sig.dot(weight) / weight_sum)
W_sigsub = np.ravel(ratio_sigsub.dot(weight) / weight_sum)
else:
W_sig = np.dot(ratio_sig, weight) / weight_sum
W_sigsub = np.dot(ratio_sigsub, weight) / weight_sum
else:
W_sig = ratio_sig
W_sigsub = ratio_sigsub
if _mask is not None:
W_sig = W_sig[mask]
W_sigsub = W_sigsub[mask]
return W_sig, W_sigsub
[docs]
class MCPointSourceSpacePDFRatioEvaluator(AccWeightedPDFRatioEvaluator):
[docs]
def __init__(self, ev, model, i=(None,None)):
i_ev, i_src = i
self.ev, self.model = ev, model
self.src = model.src
self.n_ev, self.n_src = len(ev), len(self.src)
# TODO: sparse = True not implemented yet:
# how to align with the per-source splines?
self.sparse = False
self.bg_param = model.bg_param
self.acc_model = model.acc_model
self.cut_n_sigma = model.cut_n_sigma
self.cut_pdf_threshold = model.cut_pdf_threshold
self.pdf_bg = self.bg_param(ev)
self._evaluate_src(i_ev, i_src)
self._n_excluded = 0
def _evaluate_src(self, i_ev, i_src):
ev, src = self.ev, self.src
n_ev, n_src = self.n_ev, self.n_src
if i_ev is None:
dpsis_by_sigmas_data, row, column = cext.get_dpsis_by_sigmas_for_space_pdf(
ev.dec, ev.ra, ev.sigma,
src.dec, src.ra, src.extension,
self.cut_n_sigma)
else:
raise NotImplementedError(
'per-ev-per-src evaluation not yet supported for MC space PDF')
sig_data, sigsub_data, row, column = [[]], [[]], [[]], [[]]
for j in np.unique(i_src):
i = i_ev[j == i_src]
s, ss, r, c = kent(
ev.dec[i], ev.ra[i], ev.sigma[i],
src.dec[j:j+1], src.ra[j:j+1], src.extension[j:j+1],
sigsub, self.cut_n_sigma, self.cut_pdf_threshold,
self.kent_min)
sig_data.append(s)
sigsub_data.append(ss)
row.append(i[r])
column.append(np.repeat(j, len(s)))
sig_data = np.concatenate(sig_data)
sigsub_data = np.concatenate(sigsub_data)
row = np.concatenate(row).astype(int)
column = np.concatenate(column).astype(int)
self.i_ev, self.i_src = row, column
self.kept_i_ev = np.unique(self.i_ev)
if self.sparse:
self.dpsi_by_sigma_ev_src = sparse.csr_matrix(
(dpsis_by_sigmas_by_sigmas_data, (row, column)), shape=(n_ev, n_src))
elif n_src >= 2:
dpsi_by_sigma_ev_src = np.zeros((n_ev,n_src))
dpsi_by_sigma_ev_src[row,column] = sig_data
self.dpsi_by_sigma_ev_src = dpsi_by_sigma_ev_src
else:
dpsi_by_sigma_ev_src = np.zeros(n_ev)
dpsi_by_sigma_ev_src[row] = dpsis_by_sigmas_data
self.dpsi_by_sigma_ev_src = dpsi_by_sigma_ev_src
@property
def n_excluded(self):
return self._n_excluded
def get_kept_i_ev(self):
return self.kept_i_ev
def finalize_i_ev(self, i_ev=None):
if i_ev is not None:
self.dpsi_by_sigma_ev_src = self.dpsi_by_sigma_ev_src[i_ev]
self.pdf_bg = self.pdf_bg[i_ev]
self._n_excluded = self.n_ev - self.dpsi_by_sigma_ev_src.shape[0]
self.final_ev = self.ev[i_ev]
def evaluate_sparse(self, **params):
src = self.src
dpsi_by_sigma = self.dpsi_by_sigma_ev_src
sigma = self.final_ev.sigma
gamma = params['gamma']
# loop over sources
if self.n_src >= 2:
ratios = []
for (j,source) in enumerate(src):
source_dpsi_by_sigma = dpsi_by_sigma[:,j]
gamma = np.repeat(gamma, len(source_dpsi_by_sigma))
pdf_sig = self.model.splines[j](gamma, source_dpsi_by_sigma) / sigma**2
ratio = pdf_sig / self.pdf_bg
ratios.append(ratio)
ratios = np.array(ratios)
else:
gamma = np.repeat(gamma, len(dpsi_by_sigma))
pdf_sig = self.model.splines[0](gamma, dpsi_by_sigma) / sigma**2
ratios = pdf_sig / self.pdf_bg
#if len(self.final_ev) < 20:
# print(gamma[0], dpsi_by_sigma)
return ratios, ratios
[docs]
def __call__(self, _mask=None, **params):
ratio_sig, ratio_sigsub = self.evaluate_sparse(**params)
if self.n_src >= 2:
#weight = self.src.weight * self.acc_model.get_acc_per_source(**params)
weight = self.acc_model.get_acc_per_source(**params)
weight_sum = np.sum(weight)
if self.sparse:
W_sig = np.ravel(ratio_sig.dot(weight) / weight_sum)
W_sigsub = np.ravel(ratio_sigsub.dot(weight) / weight_sum)
else:
W_sig = np.dot(ratio_sig, weight) / weight_sum
W_sigsub = np.dot(ratio_sigsub, weight) / weight_sum
else:
W_sig = ratio_sig
W_sigsub = ratio_sigsub
if _mask is not None:
W_sig = W_sig[mask]
W_sigsub = W_sigsub[mask]
return W_sig, W_sigsub
[docs]
class Gauss2DPointSourceSpacePDFRatioEvaluator(PointSourceSpacePDFRatioEvaluator):
[docs]
def __init__(self, ev, model, i=(None, None)):
super(Gauss2DPointSourceSpacePDFRatioEvaluator, self).__init__(
ev, model, i=i)
def _evaluate_src(self, i_ev, i_src):
self.sparse = False
ev = self.ev
src = self.src
n_ev = len(ev)
n_src = len(src)
sigma_ra, sigma_dec, norm = self.model.sigma_param(ev)
# TODO: handle differing extensions?
extension = np.mean(src.extension)
if extension:
sigma_ra = np.sqrt(sigma_ra**2 + extension**2)
sigma_dec = np.sqrt(sigma_dec**2 + extension**2)
# TODO: handle sigsub precisely?
#sigma_sigsub = 0.5 * (sigma_ra + sigma_dec)
sigma_sigsub = sigma_dec
sigma2_ra = sigma_ra**2
sigma2_dec = sigma_dec**2
one_by_root2pi = 1 / np.sqrt(2*pi)
one_by_root2pi_sigma2_ra = one_by_root2pi / sigma_ra
one_by_root2pi_sigma2_dec = one_by_root2pi / sigma_dec
one_by_root2pi_sigma2_sigsub_cosdec = \
one_by_root2pi / (2 * pi * np.cos(ev.dec) * sigma_sigsub)
one_by_minus2sigma2_ra = -1 / (2 * sigma2_ra)
one_by_minus2sigma2_dec = -1 / (2 * sigma2_dec)
one_by_minus2sigma2_sigsub = -1 / (2 * sigma_sigsub**2)
sig_data_ra = []
sig_data_dec = []
sigsub_data = []
for (ra, dec) in izip(src.ra, src.dec):
rot_dec, rot_ra = coord.rotate_source_to_xaxis(
dec, ra, ev.dec, ev.ra, latlon=True)
rot_ra[rot_ra > pi] -= 2*pi
rot_dec2 = rot_dec**2
sig_data_ra.append(
one_by_root2pi_sigma2_ra
* np.exp(one_by_minus2sigma2_ra * rot_ra**2))
sig_data_dec.append(
one_by_root2pi_sigma2_dec
* np.exp(one_by_minus2sigma2_dec * rot_dec2))
sigsub_data.append(
one_by_root2pi_sigma2_sigsub_cosdec
* np.exp(one_by_minus2sigma2_sigsub * rot_dec2))
sig_data_ra = np.reshape(sig_data_ra, (n_src, n_ev))
sig_data_dec = np.reshape(sig_data_dec, (n_src, n_ev))
sig_data = sig_data_ra * sig_data_dec / norm
sigsub_data = np.reshape(sigsub_data, (n_src, n_ev))
sig_data /= self.pdf_bg
sigsub_data /= self.pdf_bg
sig_data = sig_data.T.copy()
sigsub_data = sigsub_data.T.copy()
if n_src >= 2:
self.pdf_ratio_sig_ev_src = sig_data
self.pdf_ratio_sigsub_ev_src = sigsub_data
self.hard_mask = np.any(self.pdf_ratio_sig_ev_src != 0, axis=1)
else:
self.pdf_ratio_sig_ev_src = sig_data[:,0]
self.pdf_ratio_sigsub_ev_src = sigsub_data[:,0]
self.hard_mask = self.pdf_ratio_sig_ev_src != 0
[docs]
class TemplateSpacePDFRatioEvaluator(PDFRatioEvaluator):
[docs]
def __init__(self, ev, model):
# save refs to things
self.ev, self.model = ev, model
self.N_orig = len(ev)
self.bg_param = model.bg_param
self.acc_model = model.acc_model
self.cut_n_sigma = model.cut_n_sigma
self.cut_pdf_threshold = model.cut_pdf_threshold
self.sigsub = model.sigsub
self.pdf_bg = pdf_bg = self.bg_param(ev)
# find(i_pixel, i_sigma) lookup indices for each event
sigmas = model.sigmas
self.i_pixel = i_pixel = hp.ang2pix(model.nside, pi/2 - ev.dec, ev.ra)
self.i_sigma = i_sigma = np.searchsorted(sigmas, ev.sigma)
i_sigma[i_sigma == sigmas.shape[0]] = sigmas.shape[0] - 1
self.W = model.pdf_space_sig[i_sigma, i_pixel] / pdf_bg
self.W_sigsub = model.pdf_space_sigsub[i_sigma, i_pixel] / pdf_bg
if self.sigsub:
self.kept_i_ev = np.arange(self.N_orig)
else:
mask = self.W >= self.cut_pdf_threshold
self.kept_i_ev = np.where(mask)[0]
self._n_excluded = 0
def get_acc_factor(self, **params):
#if params or True:
#if params:
if not self.model.per_pixel_flux:
pix_probs_orig = self.model.pix_probs_normed
ana = self.model.ana
model = self.model
pixarea = model.pixarea
pix_weight = self.acc_model.get_acc_per_source(**params)
pix_probs_new = pix_weight / (np.sum(pix_weight) * pixarea)
factor = np.ones_like(pix_probs_orig)
pix_mask = pix_probs_orig > 0
factor[pix_mask] = pix_probs_new[pix_mask] / pix_probs_orig[pix_mask]
else:
factor = np.ones(self.model.npix)
return factor
[docs]
def __call__(self, _mask=None, **params):
factor = self.get_acc_factor(**params)
factor = factor[self.i_pixel]
W, W_sigsub = factor * self.W, factor * self.W_sigsub
if _mask is not None:
W = W[_mask]
W_sigsub = W_sigsub[_mask]
return W, W_sigsub
@property
def n_excluded(self):
return self._n_excluded
def get_kept_i_ev(self):
return self.kept_i_ev
def finalize_i_ev(self, i_ev=None):
if i_ev is not None:
self.W = self.W[i_ev]
self.W_sigsub = self.W_sigsub[i_ev]
self.i_pixel = self.i_pixel[i_ev]
# TODO: should more things get masked?
self._n_excluded = self.N_orig - self.W.shape[0]
[docs]
class PriorSpacePDFRatioEvaluator(PointSourceSpacePDFRatioEvaluator):
[docs]
def __init__(self, ev, model, i=(None, None)):
super(PriorSpacePDFRatioEvaluator, self).__init__(ev, model, i=i)
def _evaluate_src(self, i_ev, i_src):
ev, src = self.ev, self.src
model = self.model
n_ev, n_src = self.n_ev, self.n_src
sigsub = self.sigsub
sig_data, sigsub_data, row, column = [[]], [[]], [[]], [[]]
self.fit_decs = src.dec
if( i_src is None ):
i_src = np.arange(len(src))
for j in np.unique(i_src):
if( i_ev is None ):
i = np.arange(len(ev))
ev_j = ev
else:
i = i_ev[j == i_src]
ev_j = ev[i]
prior = src.prior[j]
if( prior is None ):
# Use known position
fit_ra = src.ra[j]
fit_dec = src.dec[j]
else:
# Fit the position
sptr = model.sptrs[j]
SpatialPriorTrial = collections.namedtuple('SpatialPriorTrial', 'evss n_exs ra dec')
trial = SpatialPriorTrial(*(([[ev_j]], [0]) + (0.0, 0.0)))
fit_res = sptr.get_one_fit_from_trial(
trial, mp_cpus=model.sptrs_conf["mp_cpus"], logging=True)
fit_ra, fit_dec = fit_res[-2:]
#print("Fit results:", fit_res)
#print("Best fit pos in deg:", np.degrees([fit_dec, fit_ra]))
self.fit_decs[j] = fit_dec
s, ss, r, c = cext.kent_1D_space_pdf(
ev_j.dec, ev_j.ra, ev_j.sigma, np.array([fit_dec]),
np.array([fit_ra]), src.extension[j:j+1], sigsub,
self.cut_n_sigma, self.cut_pdf_threshold, self.kent_min)
sig_data.append(s)
sigsub_data.append(ss)
row.append(i[r])
column.append(np.repeat(j, len(s)))
sig_data = np.concatenate(sig_data)
sigsub_data = np.concatenate(sigsub_data)
row = np.concatenate(row).astype(int)
column = np.concatenate(column).astype(int)
self.i_ev, self.i_src = row, column
self.kept_i_ev = np.unique(self.i_ev)
if len(sig_data):
sig_data /= self.pdf_bg[row]
sigsub_data /= self.pdf_bg[row]
if self.sparse:
self.pdf_ratio_sig_ev_src = sparse.csr_matrix(
(sig_data, (row, column)), shape=(n_ev, n_src))
self.pdf_ratio_sigsub_ev_src = sparse.csr_matrix(
(sigsub_data, (row, column)), shape=(n_ev, n_src))
elif n_src >= 2:
pdf_ratio_sig_ev_src = np.zeros((n_ev,n_src))
pdf_ratio_sig_ev_src[row,column] = sig_data
pdf_ratio_sigsub_ev_src = np.zeros((n_ev,n_src))
pdf_ratio_sigsub_ev_src[row,column] = sigsub_data
self.pdf_ratio_sig_ev_src = pdf_ratio_sig_ev_src
self.pdf_ratio_sigsub_ev_src = pdf_ratio_sigsub_ev_src
else:
pdf_ratio_sig_ev_src = np.zeros(n_ev)
pdf_ratio_sig_ev_src[row] = sig_data
pdf_ratio_sigsub_ev_src = np.zeros(n_ev)
pdf_ratio_sigsub_ev_src[row] = sigsub_data
self.pdf_ratio_sig_ev_src = pdf_ratio_sig_ev_src
self.pdf_ratio_sigsub_ev_src = pdf_ratio_sigsub_ev_src
def evaluate_sparse(self, **params):
ratio_sig = self.pdf_ratio_sig_ev_src
ratio_sigsub = self.pdf_ratio_sigsub_ev_src
return ratio_sig, ratio_sigsub
[docs]
def __call__(self, _mask=None, **params):
ratio_sig, ratio_sigsub = self.evaluate_sparse(**params)
if self.n_src >= 2:
params["dec"] = self.fit_decs
weight = self.acc_model.get_acc_per_source(**params)
weight_sum = np.sum(weight)
if self.sparse:
W_sig = np.ravel(ratio_sig.dot(weight) / weight_sum)
W_sigsub = np.ravel(ratio_sigsub.dot(weight) / weight_sum)
else:
W_sig = np.dot(ratio_sig, weight) / weight_sum
W_sigsub = np.dot(ratio_sigsub, weight) / weight_sum
else:
W_sig = ratio_sig
W_sigsub = ratio_sigsub
if _mask is not None:
W_sig = W_sig[mask]
W_sigsub = W_sigsub[mask]
return W_sig, W_sigsub
[docs]
class EnergyPDFRatioEvaluator(PDFRatioEvaluator):
[docs]
def __init__(self, ev, model):
self.ev, self.model = ev, model
self.gammas, self.ss, self.ssd = model.gammas, model.ss, model.ssd
self.ev_data = tuple(
np.maximum(low, np.minimum(high, self.ev[feature]))
for (feature, (low, high)) in zip(model.features, model.range) )
self._W = {}
self._logW = {}
#for gamma in self.gammas:
# self.get_W(gamma)
def get_logW(self, gamma, _mask=None):
if gamma not in self._logW:
self._logW[gamma] = self.ssd[gamma].ev(*self.ev_data)
out = self._logW[gamma]
if _mask is not None:
out = out[_mask]
return out
def get_W(self, gamma, _mask=None):
if gamma not in self._W:
self._W[gamma] = np.exp(self.get_logW(gamma))
out = self._W[gamma]
if _mask is not None:
out = out[_mask]
return out
[docs]
def __call__(self, _mask=None, **params):
gamma = params['gamma']
if gamma in self.gammas:
W = self.get_W(gamma, _mask=_mask)
return W, W
gammas = self.gammas
i1, i2, i3 = np.sort(np.argsort(np.abs(gammas - gamma))[:3])
#gamma1, gamma2, gamma3 = gammas[[i1, i2, i3]]
gamma1 = gammas[i1]
gamma2 = gammas[i2]
gamma3 = gammas[i3]
r1 = self.get_logW(gamma1, _mask=_mask)
r2 = self.get_logW(gamma2, _mask=_mask)
r3 = self.get_logW(gamma3, _mask=_mask)
W = cext.pdf_ratio_energy(
r1, r2, r3,
gamma1, gamma2, gamma3,
gamma2 - gamma1, gamma - gamma2)
return W, W
@property
def n_excluded(self):
return 0
[docs]
class CustomFluxEnergyPDFRatioEvaluator(PDFRatioEvaluator):
[docs]
def __init__(self, ev, model):
self.ev, self.model = ev, model
self.s = model.s
self.ev_data = tuple(
np.clip(self.ev[feature], low, high)
for (feature, (low, high)) in zip(model.features, model.range) )
self.W = np.exp(self.s.ev(*self.ev_data))
[docs]
def __call__(self, **params):
W = self.W
return W, W
@property
def n_excluded(self):
return 0
[docs]
class NullEnergyPDFRatioEvaluator(PDFRatioEvaluator):
[docs]
def __init__(self, ev, model):
self.ev, self.model = ev, model
self.N = len(self.ev)
[docs]
def __call__(self, **params):
return np.ones(self.N)
[docs]
class GenericPDFRatioEvaluator(PDFRatioEvaluator):
[docs]
def __init__(self, ev, model, i=(None, None)):
i_ev, i_src = i
self.ev, self.model = ev, model
self.func, self.features = model.func, model.features
self.src = model.src
self.n_ev, self.n_src = len(ev), len(self.src)
self.sparse = False
self.bg_param = model.bg_param
self.acc_model = model.acc_model
self.cut_n_sigma = model.cut_n_sigma
self.cut_pdf_threshold = model.cut_pdf_threshold
self.kent_min = model.kent_min
self.sigsub = model.sigsub
self.pdf_bg = self.bg_param(ev)
self._evaluate_src(i_ev, i_src)
self._n_excluded = 0
def _evaluate_src(self, i_ev, i_src):
ev, src = self.ev, self.src
n_ev, n_src = self.n_ev, self.n_src
sigsub = self.sigsub
kent = cext.kent_1D_space_pdf
if i_ev is None:
sig_data, sigsub_data, row, column = kent(
ev.dec, ev.ra, ev.sigma,
src.dec, src.ra, src.extension,
sigsub, self.cut_n_sigma, self.cut_pdf_threshold,
self.kent_min)
else:
sig_data, sigsub_data, row, column = [[]], [[]], [[]], [[]]
for j in np.unique(i_src):
i = i_ev[j == i_src]
s, ss, r, c = kent(
ev.dec[i], ev.ra[i], ev.sigma[i],
src.dec[j:j+1], src.ra[j:j+1], src.extension[j:j+1],
sigsub, self.cut_n_sigma, self.cut_pdf_threshold,
self.kent_min)
sig_data.append(s)
sigsub_data.append(ss)
row.append(i[r])
column.append(np.repeat(j, len(s)))
sig_data = np.concatenate(sig_data)
sigsub_data = np.concatenate(sigsub_data)
row = np.concatenate(row).astype(int)
column = np.concatenate(column).astype(int)
self.i_ev, self.i_src = row, column
self.kept_i_ev = np.unique(self.i_ev)
if len(sig_data):
sig_data /= self.pdf_bg[row]
sigsub_data /= self.pdf_bg[row]
if self.sparse:
self.pdf_ratio_sig_ev_src = sparse.csr_matrix(
(sig_data, (row, column)), shape=(n_ev, n_src))
self.pdf_ratio_sigsub_ev_src = sparse.csr_matrix(
(sigsub_data, (row, column)), shape=(n_ev, n_src))
elif n_src >= 2:
pdf_ratio_sig_ev_src = np.zeros((n_ev,n_src))
pdf_ratio_sig_ev_src[row,column] = sig_data
pdf_ratio_sigsub_ev_src = np.zeros((n_ev,n_src))
pdf_ratio_sigsub_ev_src[row,column] = sigsub_data
self.pdf_ratio_sig_ev_src = pdf_ratio_sig_ev_src
self.pdf_ratio_sigsub_ev_src = pdf_ratio_sigsub_ev_src
else:
pdf_ratio_sig_ev_src = np.zeros(n_ev)
pdf_ratio_sig_ev_src[row] = sig_data
pdf_ratio_sigsub_ev_src = np.zeros(n_ev)
pdf_ratio_sigsub_ev_src[row] = sigsub_data
self.pdf_ratio_sig_ev_src = pdf_ratio_sig_ev_src
self.pdf_ratio_sigsub_ev_src = pdf_ratio_sigsub_ev_src
[docs]
def __call__(self, **params):
ev = self.ev
kwargs = dict(**params)
kwargs.update({argname: ev[feature] for (argname, feature) in self.features.items()})
kwargs['src'] = self.model.src
if kwargs['_mask'] is None:
kwargs.pop('_mask')
result = self.func(**kwargs)
if isinstance(result, tuple):
return result
else:
return result, result
@property
def n_excluded(self):
return self._n_excluded
def get_kept_i_ev(self):
return self.kept_i_ev
def finalize_i_ev(self, i_ev=None):
if i_ev is not None:
self.pdf_ratio_sig_ev_src = self.pdf_ratio_sig_ev_src[i_ev]
self.pdf_ratio_sigsub_ev_src = self.pdf_ratio_sigsub_ev_src[i_ev]
self._n_excluded = self.n_ev - self.pdf_ratio_sig_ev_src.shape[0]
self.final_ev = self.ev[i_ev]
def evaluate_sparse(self,_mask=None, **params):
ratio_sig = self(_mask=_mask, **params)[0]
ratio_sigsub = self.pdf_ratio_sigsub_ev_src
return ratio_sig, ratio_sigsub
[docs]
class FitPointSourceSpacePDFRatioEvaluator(AccWeightedPDFRatioEvaluator):
[docs]
def __init__(self, ev, model, i=(None,None)):
i_ev, i_src = i
self.ev, self.model = ev, model
self.i_ev, self.i_src = i
def _get_ps_space_model(self, src):
model = self.model
ana, bg_param, acc_param = model.ana, model.bg_param, model.acc_param
return PointSourceSpacePDFRatioModel(
ana, src, bg_param, acc_param,
cut_n_sigma=model.cut_n_sigma, sigsub=model.sigsub)
[docs]
def __call__(self, **params):
model = self.model
src = model.params_to_src(model.src, **params)
ps_space_model = self._get_ps_space_model(src)
evaluator = ps_space_model(self.ev, (self.i_ev, self.i_src))
out = evaluator(**params)
return out
def get_kept_i_ev(self):
model = self.model
src = model.src[:]
src['extension'] = np.sqrt(src.extension**2 + model.cut_extension**2)
ps_space_model = self._get_ps_space_model(src)
evaluator = ps_space_model(self.ev)
return evaluator.get_kept_i_ev()
def finalize_i_ev(self, i_ev=None):
if i_ev is not None:
N = len(self.ev)
self.ev = self.ev[i_ev]
self._n_excluded = N - len(self.ev)
def evaluate_sparse(self, **params):
model = self.model
src = model.params_to_src(model.src, **params)
ps_space_model = self._get_ps_space_model(src)
evaluator = ps_space_model(self.ev, (self.i_ev, self.i_src))
return evaluator.evaluate_sparse(**params)
@property
def n_excluded(self):
return self._n_excluded
[docs]
class UntriggeredTimePDFRatioEvaluator(PDFRatioEvaluator):
[docs]
def __init__(self, ev, model):
self.mjd, self.model = ev.mjd, model
self.src = model.src
self.n_src = len(self.src)
self.min_mjd, self.max_mjd = np.min(self.mjd), np.max(self.mjd)
self.kept_i_ev_i_src = None, None
self._n_excluded = 0
self.N = self.mjd.shape[0]
self.prev_t0_dt, self.prev_pdf_ratio = (None, None), None
@property
def n_excluded(self):
return self._n_excluded
def get_kept_i_ev(self):
return np.arange(self.N)
def finalize_i_ev(self, i_ev=None):
if i_ev is not None:
self.mjd = self.mjd[i_ev]
orig_N = self.N
self.N = self.mjd.shape[0]
self._n_excluded = orig_N - self.N
def evaluate_sparse(self, **params):
# use cached values?
model = self.model
t0, dt = model.get_t0_dt(model, **params)
(prev_t0, prev_dt), prev_pdf_ratio = self.prev_t0_dt, self.prev_pdf_ratio
if prev_pdf_ratio is not None:
if np.all(t0 == prev_t0) and np.all(dt == prev_dt):
return prev_pdf_ratio, prev_pdf_ratio
mjd, mjd_min, mjd_max = self.mjd, model.mjd_min, model.mjd_max
livetime_days = model.livetime / 86400.
frac_during_livetime = model.get_frac_during_livetime(**params)
cut_n_sigma = model.cut_n_sigma
n_ev = len(mjd)
n_src = self.n_src
box = model.box
if box:
if model.box_mode == 'center':
t1, t2 = t0 - .5 * dt, t0 + .5 * dt
elif model.box_mode == 'post':
t1, t2 = t0, t0 + dt
elif model.box_mode == 'pre':
t1, t2 = t0 - dt, t0
else:
raise RuntimeError('box mode is invalid')
t1 = np.nextafter(t1, t1 - 1)
t2 = np.nextafter(t2, t2 + 1)
if n_src >= 2:
data, row, column = [[]], [[]], [[]]
for i in xrange(n_src):
delta_t = np.abs(mjd - t0[i])
if box:
idx = np.where((t1[i] <= mjd) & (mjd <= t2[i]))[0]
else:
idx = np.where(np.abs(mjd - t0[i]) < cut_n_sigma * dt[i])[0]
n = len(idx)
if not n:
continue
if box:
data_i = livetime_days / np.repeat(dt[i], n)
else:
data_i = livetime_days * stats.norm.pdf(mjd[idx], t0[i], dt[i])
data.append(data_i)
row.append(idx)
column.append(np.repeat(i, n))
data = np.concatenate(data)
row = np.concatenate(row)
column = np.concatenate(column)
pdf_ratio = sparse.csr_matrix(
(data, (row, column)), shape=(n_ev, n_src))
else:
pdf_ratio = np.zeros(n_ev)
if box:
if not (t1 > mjd_max or t2 < mjd_min):
idx = np.where((t1[0] <= mjd) & (mjd <= t2[0]))[0]
pdf_ratio[idx] = livetime_days / (dt[0])
else:
#idx = np.where(np.abs(mjd - t0[0]) < cut_n_sigma * dt[0])[0]
idx = np.abs(mjd - t0[0]) < cut_n_sigma * dt[0]
pdf_ratio[idx] = livetime_days * stats.norm.pdf(mjd[idx], t0[0], dt[0])
if frac_during_livetime:
pdf_ratio /= frac_during_livetime
self.prev_t0_dt = t0, dt
self.prev_pdf_ratio = pdf_ratio
return pdf_ratio, pdf_ratio
[docs]
def __call__(self, **params):
if self.n_src == 1:
return self.evaluate_sparse(**params)
else:
raise ValueError(
'cannot compute time-only, per-event results (try `evaluate_sparse()`)')
[docs]
class BinnedTimePDFRatioEvaluator(PDFRatioEvaluator):
[docs]
def __init__(self, ev, model):
self.ev, self.model = ev, model
self.mjd = ev.mjd
self.src = model.src
self.n_src = len(self.src)
self.min_mjd, self.max_mjd = np.min(self.mjd), np.max(self.mjd)
# masking-related stuff
self.N = len(self.ev)
self._n_excluded = 0
self.kept_i_ev_i_src = None, None
@property
def n_excluded(self):
return self._n_excluded
def get_kept_i_ev(self):
return np.arange(self.N)
def finalize_i_ev(self, i_ev=None):
if i_ev is not None:
self.mjd = self.mjd[i_ev]
orig_N = self.N
self.N = self.mjd.shape[0]
self._n_excluded = orig_N - self.N
def evaluate_sparse(self, **params):
model = self.model
mjd = self.mjd
livetime_days = model.livetime / 86400.
n_ev = len(mjd)
n_src = self.n_src
pdfs = model.get_lc_pdfs_cropped(**params)
frac_during_livetime = model.get_frac_during_livetime(**params)
#print(sorted(params.items()))
data, row, column = [[]], [[]], [[]]
for i in xrange(n_src):
prenorm_pdf = pdfs[i]
frac_during_livetime_i = frac_during_livetime[i]
pdf = prenorm_pdf
mjd_min, mjd_max = prenorm_pdf.range[0]
data_i = np.zeros_like(mjd)
### TODO: do we need to normlize again?
##norm = prenorm_pdf.integrate().values
##if norm <= 0 or ~np.isfinite(norm):
## continue
##pdf = prenorm_pdf / norm
# evaluate PDF for in-bounds events
idx0 = (mjd_min <= mjd) & (mjd < mjd_max)
#values = pdf.get_values([mjd[idx0]])
if not np.any(idx0):
continue
bin_idx = pdf.indices(mjd[idx0])
values = pdf.values[bin_idx]
## TEST: can we obtain smoother lag distributions by
## inducing a small preference for the centers of bins?
#centers = pdf.centers[0][bin_idx]
#half_widths = .5 * pdf.volumes[bin_idx]
#eps = 1e-2
#caps = 1 + eps - eps * ((mjd[idx0] - centers)/(half_widths))**2
#values *= caps
## TEST: can we obtain smoother lag distributions by
## fitting a spline to the PDF?
#spline = pdf.spline_fit(s=0, k=2)
#values = np.maximum(0, spline(mjd[idx0]))
data_i[idx0] = livetime_days * values
if frac_during_livetime_i:
data_i /= frac_during_livetime_i
if np.any(~np.isfinite(data_i)):
print('warning')
print(prenorm_pdf.integrate().values)
print(pdf.values.min(), pdf.values.max())
idx = np.where(data_i > 0)[0]
n = len(idx)
if not n:
continue
data.append(data_i[idx])
row.append(idx)
column.append(np.repeat(i, n))
data = np.concatenate(data)
row = np.concatenate(row)
column = np.concatenate(column)
if n_src >= 2:
pdf_ratio = sparse.csr_matrix(
(data, (row, column)), shape=(n_ev, n_src))
else:
pdf_ratio = data_i
pdf_ratio = pdf_ratio #[:,None]
return pdf_ratio, pdf_ratio
[docs]
def __call__(self, **params):
if self.n_src == 1:
return self.evaluate_sparse(**params)
else:
raise ValueError(
'cannot compute time-only, per-event results (try `evaluate_sparse()`)')
[docs]
class TransientTimePDFRatioEvaluator(PDFRatioEvaluator):
[docs]
def __init__(self, ev, model):
self.ev, self.model = ev, model
self.src = src = model.src
self.n_src = len(self.src)
self.sparse = self.n_src >= 2
self._evaluate_src()
self._n_excluded = 0
@property
def n_excluded(self):
return self._n_excluded
def get_kept_i_ev(self):
return self.kept_i_ev
def finalize_i_ev(self, i_ev=None):
if i_ev is not None:
N = self.pdf_ratio.shape[0]
self.pdf_ratio = self.pdf_ratio[i_ev]
self._n_excluded = N - self.pdf_ratio.shape[0]
def _evaluate_src(self):
model, src, ev = self.model, self.src, self.ev
n_ev = len(ev)
n_src = len(src)
data, row, column = [[]], [[]], [[]]
for i in xrange(n_src):
idx = (model.src_mjd_min[i] <= ev.mjd) & (ev.mjd <= model.src_mjd_max[i])
idx = np.where(idx)[0]
n = len(idx)
if not n:
continue
ev_mjd = ev.mjd[idx]
src_mjd = src.mjd[i]
src_sigma_t = src.sigma_t[i]
src_duration = model.src_duration[i]
src_gauss_norm = model.src_gauss_norm[i]
src_mjd_t_100 = model.src_mjd_t_100[i]
data_i = np.repeat(src_duration * src_gauss_norm, len(ev_mjd))
if model.has_sigma_t:
mask_pre = ev_mjd < src_mjd
if np.any(mask_pre):
data_i[mask_pre] *= stats.norm.pdf(
ev_mjd[mask_pre], src_mjd, src_sigma_t)
mask_post = ev_mjd > src_mjd_t_100
if np.any(mask_post):
data_i[mask_post] *= stats.norm.pdf(
ev_mjd[mask_post], src_mjd_t_100, src_sigma_t)
mask_mid = (~mask_pre) & (~mask_post)
else:
mask_mid = slice(0, data_i.size)
if np.any(mask_mid):
data_i[mask_mid] = model.src_max_pdf_ratio[i]
data.append(data_i)
row.append(idx)
column.append(np.repeat(i, n))
data = np.concatenate(data)
row = np.concatenate(row).astype(int)
column = np.concatenate(column).astype(int)
if self.sparse:
pdf_ratio = sparse.csr_matrix(
(data, (row, column)), shape=(n_ev, n_src))
elif n_src >= 2:
pdf_ratio = np.zeros((n_ev, n_src))
pdf_ratio[row, column] = data
else:
pdf_ratio = np.zeros(n_ev)
pdf_ratio[row] = data
self.pdf_ratio = pdf_ratio
#self.hard_mask = np.in1d(
# np.arange(len(self.ev)),
# row[data > 0])
self.kept_i_ev_i_src = row, column
self.kept_i_ev = np.unique(row)
def evaluate_sparse(self, **params):
return self.pdf_ratio, self.pdf_ratio
[docs]
def __call__(self, **params):
if self.n_src == 1:
return self.evaluate_sparse(**params)
else:
raise ValueError(
'cannot compute time-only, per-event results (try `evaluate_sparse()`)')
[docs]
class SpaceTimePDFRatioEvaluator(PDFRatioEvaluator):
[docs]
def __init__(self, ev, model):
self.ev, self.model = ev, model
self.space_model, self.time_model = model.space_model, model.time_model
self.time_evaluator = self.time_model(ev)
i_ev, i_src = self.time_evaluator.kept_i_ev_i_src
self.space_evaluator = self.space_model(ev, (i_ev, i_src))
## TODO: try to cut again based on space PDF? should make things much faster?
@property
def n_excluded(self):
return self.space_evaluator.n_excluded
def get_kept_i_ev(self):
return self.space_evaluator.get_kept_i_ev()
def finalize_i_ev(self, i_ev=None):
self.space_evaluator.finalize_i_ev(i_ev)
self.time_evaluator.finalize_i_ev(i_ev)
[docs]
def __call__(self, _mask=None, **params):
se, te = self.space_evaluator, self.time_evaluator
tW, tW_sigsub = te.evaluate_sparse(**params)
if _mask is not None:
sW, sW_sigsub = se.evaluate_sparse(_mask=_mask, **params)
else:
sW, sW_sigsub = se.evaluate_sparse(**params)
if len(sW.shape) == len(tW.shape) == 1:
# short-circuit for single source case
W = sW * tW
W_sigsub = sW_sigsub * tW_sigsub
if _mask is not None:
return W[_mask], W_sigsub[_mask]
else:
return W, W_sigsub
if len(sW.shape) == 1:
sW = sW[:,None]
sW_sigsub = sW_sigsub[:,None]
if len(tW.shape) == 1:
tW = tW[:,None]
tW_sigsub = tW_sigsub[:,None]
if hasattr(sW, 'multiply'):
W = sW.multiply(tW)
W_sigsub = sW_sigsub.multiply(tW_sigsub)
elif hasattr(tW, 'multiply'):
W = tW.multiply(sW)
W_sigsub = tW_sigsub.multiply(sW_sigsub)
else:
W = sW * tW
W_sigsub = sW_sigsub * tW_sigsub
# W, W_sigsub are definitely shaped (n_ev, n_src) by now
n_ev, n_src = W.shape
if n_src >= 2:
if( type(se) is PriorSpacePDFRatioEvaluator ):
params["dec"] = se.fit_decs
weight = self.model.acc_model.get_acc_per_source(**params)
tweight = tW.astype(bool)
if hasattr(tW, 'multiply'):
tweight = tweight.multiply(weight)
weight_sums = np.ravel(tweight.sum(axis=1))
else:
tweight = tweight * weight
weight_sums = tweight.sum(axis=1)
weight_sum = np.sum(weight_sums)
else:
weight_sum = 1
if weight_sum == 0:
W = np.zeros(W.shape[0])
W_sigsub = np.zeros(W.shape[0])
elif W.shape[1] >= 2:
w = np.where(weight_sums > 0, weight_sums, np.inf)
W = np.ravel(W.dot(weight)) / w
W_sigsub = np.ravel(W_sigsub.dot(weight)) / w
elif hasattr(W, 'toarray'):
W = W.toarray()[:,0]
W_sigsub = W_sigsub.toarray()[:,0]
else:
#print(W.shape)
W = W[:,0]
W_sigsub = W_sigsub[:,0]
#else:
#if hasattr(W, 'toarray'):
# W = W.toarray()
# W_sigsub = W_sigsub.toarray()
#if W.shape[1] >= 2:
# mask = weight > 0
# W = np.ravel(W[:,mask].dot(weight[mask]) / weight_sum)
# W_sigsub = np.ravel(W_sigsum[:,mask].dot(weight[mask]) / weight_sum)
#else:
# W = W[:,0]
# W_sigsub = W_sigsub[:,0]
#print(W)
if _mask is not None:
W, W_sigsub = W[_mask], W_sigsub[_mask]
return W, W_sigsub