# inj.py
"""Event injectors and randomization methods."""
from __future__ import print_function
import abc
from abc import ABCMeta, abstractmethod, abstractproperty
try:
from collections.abc import Iterable
except:
from collections import Iterable
from copy import deepcopy
try:
from itertools import izip
except ImportError:
izip = zip
try:
xrange
except NameError:
xrange = range
import sys
import healpy as hp
import numpy as np
pi = np.pi
from . import coord, utils
from .utils import get_random
[docs]
class Randomizer(object):
"""Base class for event randomizers.
Classes which randomize(or scramble) data events should derive from this class.
"""
__metaclass__ = ABCMeta
[docs]
def __init__(self):
pass
[docs]
@abstractmethod
def __call__(self, ev, seed=None):
"""Randomize events in place.
:type ev: :class:`utils.Events`
:param ev: the events
:type seed: int or :class:`numpy.random.RandomState`
:param seed: seed spec
"""
pass
[docs]
class RARandomizer(Randomizer):
"""Randomize events uniformly in right ascension."""
def __repr__(self):
return 'RARandomizer()'
[docs]
def __call__(self, ev, seed=None):
random = get_random(seed)
ev['ra'] = random.uniform(0, 2*pi, len(ev))
[docs]
class DecRandomizer(Randomizer):
"""Randomize events in declination.
This class applies randomization to the event declinations.
Multiple randomization methods are available, which need to be
specified via the `dec_rand_method` keyword.
Additionally, events near the poles can be excluded from the
randomization by setting the pole exclusion radius via the
keyword `dec_rand_pole_exlusion`.
"""
[docs]
def __init__(
self,
dec_rand_method='gaussian_sigma',
dec_rand_kwargs=None,
dec_rand_pole_exlusion=None,
):
"""Construct a DecRandomizer
:type dec_rand_method: str
:param dec_rand_method: The randomization method to use.
'gaussian_sigma':
Applies Gaussian-distributed noise to the event declinations.
The width of the Gaussians is based on the per-event angular
uncertainties ``sigma``.
Additional kwargs (to be passed via `dec_rand_kwargs`):
'sigma_scaling':
Scaling factor that will be applied to the per-event
angular uncertainties prior to randomization. This only
affects the randomization - event angular uncertainties
will not be modified.
'uniform_sigma':
Applies uniform jitter to the event declinations.
The width of the jitter is based on the per-event angular
uncertainties ``sigma``.
Additional kwargs (to be passed via `dec_rand_kwargs`):
'sigma_scaling':
Scaling factor that will be applied to the per-event
angular uncertainties prior to randomization. This only
affects the randomization - event angular uncertainties
will not be modified.
'gaussian_fixed':
Applies Gaussian-distributed noise to the event declinations.
The width of the Gaussians is set to `randomization_width`.
Additional kwargs (to be passed via `dec_rand_kwargs`):
'randomization_width':
The sigma of the Gaussian.
'uniform_fixed':
Applies uniform jitter to the event declinations.
The width of the Gaussians is set to `randomization_width`.
Additional kwargs (to be passed via `dec_rand_kwargs`):
'randomization_width':
The width for the jitter. Events will be
randomized to dec +- width.
:type dec_rand_kwargs: dict
:param dec_rand_kwargs:
Additional keyword arguments provided as a dictionary.
See `dec_rand_method` for details on which kwargs are
necessary for the specified randomization method.
:type dec_rand_pole_exlusion: float
:param dec_rand_pole_exlusion:
Radius of the polar regions(in radians), which will be ignored.
Events with declinations in this pole region will not be
randomized. If `None`(default), then all events are randomized.
"""
self.dec_rand_method = dec_rand_method.lower()
self.dec_rand_pole_exlusion = dec_rand_pole_exlusion
self.dec_rand_kwargs = deepcopy(dec_rand_kwargs)
self.half_pi = np.pi / 2.
# to save some space we'll define an error message template
if self.dec_rand_kwargs:
keys = sorted(list(self.dec_rand_kwargs.keys()))
else:
keys = []
msg = 'Only expected {}, but got the following kwargs: ' + str(keys)
# Make sure the randomization method is understood and that all
# necessary keyword arguments are provided and no additional ones
if self.dec_rand_method in ['gaussian_sigma', 'uniform_sigma']:
# set default if None
if self.dec_rand_kwargs is None:
self.dec_rand_kwargs = {'sigma_scaling': None}
# check if correct keywords are passed
elif keys != ['sigma_scaling']:
raise ValueError(msg.format(['sigma_scaling']))
elif self.dec_rand_method in ['gaussian_fixed', 'uniform_fixed']:
if keys != ['randomization_width']:
raise ValueError(msg.format(['randomization_width']))
else:
raise ValueError(
'Unknown `DecRandomizer` randomization method: {}'.format(
self.dec_rand_method))
def __repr__(self):
"""Get string representation of the class
"""
msg = 'DecRandomizer(dec_rand_method={}, dec_rand_pole_exlusion={})'
return msg.format(self.dec_rand_method, self.dec_rand_pole_exlusion)
[docs]
def __call__(self, ev, seed=None):
"""Randomize events
"""
random = get_random(seed)
# -------------------
# Apply randomization
# -------------------
# Sigma-based randomization
if self.dec_rand_method in ['gaussian_sigma', 'uniform_sigma']:
# get sigma scaling factor
if self.dec_rand_kwargs['sigma_scaling']:
scale = ev.sigma * self.dec_rand_kwargs['sigma_scaling']
else:
scale = ev.sigma
if self.dec_rand_method == 'gaussian_sigma':
decs = random.normal(loc=ev.dec, scale=scale)
elif self.dec_rand_method == 'uniform_sigma':
decs = ev.dec + random.uniform(low=-scale, high=scale)
else:
raise ValueError('Should never happen!')
# Fixed Gaussian randomization
elif self.dec_rand_method == 'gaussian_fixed':
decs = random.normal(
loc=ev.dec, scale=self.dec_rand_kwargs['randomization_width'])
# Fixed uniform randomization
elif self.dec_rand_method == 'uniform_fixed':
width = self.dec_rand_kwargs['randomization_width']
decs = ev.dec + random.uniform(
low=-width, high=width, size=len(ev.dec))
# safety check
else:
raise ValueError('Should never happen!')
# move dec back into range by reflecting at pole.
# Note: this is technically only valid if RA is scrambled as well,
# otherwise the azimuth should change when flipping over the pole
# make sure everything is within [-pi/2, pi/2]
m_above = decs > self.half_pi
m_below = decs < -self.half_pi
decs[m_above] = np.pi - decs[m_above]
decs[m_below] = -np.pi - decs[m_below]
# -------------------------------------
# Exclude randomization for pole region
# -------------------------------------
if self.dec_rand_pole_exlusion is not None:
mask = np.logical_or(
ev['dec'] < -self.half_pi + self.dec_rand_pole_exlusion,
ev['dec'] > self.half_pi - self.dec_rand_pole_exlusion,
)
decs[mask] = ev['dec'][mask]
# assign new values
ev['dec'] = decs
[docs]
class SinDecRandomizer(Randomizer):
"""Randomize events in sin declination.
This class applies a jitter of specified strength to the
sin(dec) values. The jitter strength should not be chosen much larger
than the sin(dec) bin-widths in the PDFs.
Randomization should not change the joint PDFs of the observables, since
the background PDF is only created once at initialization (and optionally
updated with the injected signal via the `update_bg` parameter).
If binning is performed uniformly in sin(dec), a jitter strength of the
order of the bin-width is probably fine as long as there are no big jumps
in the PDF between consecutive bins. Otherwise, a smaller jitter value
should be chosen.
Because the SinDecRandomizer modifies the sin(dec) values instead of the
declination directly, it applies a stronger randomization in terms of
modified declination at the poles.
"""
[docs]
def __init__(self, sindec_jitter):
"""Construct a SinDecRandomizer
:type sindec_jitter: float
:param sindec_jitter: jitter strength in sin(dec).
The sin(dec) values will be modified via:
sin(dec) += uniform(-sindec_jitter, sindec_jitter)
"""
self.sindec_jitter = sindec_jitter
def __repr__(self):
return 'SinDecRandomizer(sindec_jitter={:.3f})'.format(
self.sindec_jitter)
[docs]
def __call__(self, ev, seed=None):
random = get_random(seed)
sin_decs = (
np.sin(ev['dec']) + random.uniform(
-self.sindec_jitter, self.sindec_jitter, size=len(ev['dec']))
)
# make sure everything is within [-1, 1]
m_above = sin_decs > 1.
m_below = sin_decs < -1.
sin_decs[m_above] = 2. - sin_decs[m_above]
sin_decs[m_below] = -2 - sin_decs[m_below]
# additional clip for numerical issues right at the boundary
ev['dec'] = np.arcsin(np.clip(sin_decs, -1, 1))
[docs]
class PoleRandomizer(Randomizer):
"""Randomize declinations of events near the North and South poles.
This class randomizes the declinations of events near the poles. This is mainly useful for
cascade analysis, where it is non-obvious how much of the polar regions should simply be
neglected. It may also be useful in track analyses if the user is specifically interested in
sources near one or both of the poles.
"""
[docs]
def __init__(self, pole_radius, shuffle=True, sample_dec=False):
"""Construct a PoleRandomizer.
:type pole_radius: float
:param pole_radius: radius of the polar regions(in radians)
:type shuffle: bool
:param shuffle: if false, scramble polar events uniformly
throughout the poles in which they each actually originated;
if true(default), then instead randomize polar events'
declinations by applying a random permutation of these.
if `sample_dec` is set to true, then the declination values are
additionally smeared by a Gaussian (see description for
`sample_dec`).
:type sample_dec: bool
:param sample_dec: if true, the event declinations at the
poles are additionally randomized by sampling from a Gaussian with
the width set to the event sigma. Note that this setting may push
events from the poles away towards the equator and therefore
modify the joint PDFs! If false(default), only the permutation
of event declination across events at the pole is performed.
This setting is only relevant if `shuffle` is set to True.
"""
self.pole_radius = pole_radius
self.pole_dec = pi/2 - pole_radius
self.pole_sindec = np.sin(self.pole_dec)
self.shuffle = shuffle
self.sample_dec = sample_dec
def __repr__(self):
return (
'PoleRandomizer(pole_radius={:.1f}deg, shuffle={}, '
'sample_dec={})'
).format(
np.degrees(self.pole_radius),
self.shuffle,
self.sample_dec,
)
[docs]
def __call__(self, ev, seed=None):
random = get_random(seed)
pole_mask = np.abs(ev.dec) > self.pole_dec
n = np.sum(pole_mask)
if n:
ev['dec'] = ev.dec.copy()
sign = np.sign(ev.dec[pole_mask])
if self.shuffle:
pole_decs = ev.dec[pole_mask]
if self.sample_dec:
pole_decs = random.normal(pole_decs, ev.sigma[pole_mask])
north_mask = sign > 0
pole_decs[north_mask] = random.permutation(pole_decs[north_mask])
pole_decs[~north_mask] = random.permutation(pole_decs[~north_mask])
# keep declinations in bounds
pole_decs %= 2*pi
pole_decs[pole_decs > pi] -= 2*pi
bad_north = pole_decs > pi/2
pole_decs[bad_north] -= 2 * (pole_decs[bad_north] - pi/2)
bad_south = pole_decs < -pi/2
pole_decs[bad_south] -= 2 * (pole_decs[bad_south] + pi/2)
ev['dec'][pole_mask] = pole_decs
else:
sd = random.uniform(self.pole_sindec, 1, n)
ev['dec'][pole_mask] = sign * np.arcsin(sd)
[docs]
class EnergyRandomizer(Randomizer):
"""Randomize event energies.
This class randomizes log10(E/GeV). This may be useful if some events are causing intense
spikes in the background distributions at particular declinations. Note that this feature has
not been thoroughly tested, if at all.
"""
[docs]
def __init__(self, sigma_log10):
"""Construct an EnergyRandomizer.
:type sigma_log10: float
:param sigma_log10: width of the Gaussian noise to apply to log10(E/GeV)
"""
self.sigma_log10 = sigma_log10
def __repr__(self):
return 'EnergyRandomizer(sigma_log10={:.3f})'.format(self.sigma_log10)
[docs]
def __call__(self, ev, seed=None):
random = get_random(seed)
ev['energy'] *= np.abs(random.normal(1, self.sigma_log10, size=len(ev)))
ev['log10energy'] = np.log10 (ev.energy)
[docs]
class MJDShuffleRandomizer(Randomizer):
"""Shuffle event times.
This class shuffles the arrival times(in MJD) of the events.
"""
def __repr__(self):
return 'MJDShuffleRandomizer()'
[docs]
def __call__(self, ev, seed=None):
random = get_random(seed)
ev['mjd'] = mjd = random.permutation(ev.mjd)
ev['ra'] = coord.switch_ra_azimuth(ev.azimuth, mjd)
[docs]
class MJDGRLRandomizer(Randomizer):
"""Randomize event times according to GRL."""
[docs]
def __init__(self, grl, presample=0, rates_by='events'):
self.grl, self.presample = grl, presample
self.livetime = grl.livetime.sum()
if rates_by == 'livetime':
self.p = self.grl.livetime / self.livetime
elif rates_by == 'events':
self.p = self.grl.events/float(self.grl.events.sum())
else:
raise NotImplementedError('unsupported rates_by="{}"'.format(rates_by))
self.n_runs = len(self.grl)
if presample:
raise NotImplementedError('presampling from GRL not yet implemented')
def __repr__(self):
return 'MJDGRLRandomizer({} runs over {} years{})'.format(
len(self.grl), self.livetime / 365.25,
', presampled' if self.presample else '')
def get_times(self, N, seed=None):
random = get_random(seed)
grl = self.grl
i_runs = random.choice(self.n_runs, size=N, p=self.p)
start, stop = grl.start[i_runs], grl.stop[i_runs]
return random.uniform(start, stop)
[docs]
def __call__(self, ev, seed=None):
ev['mjd'] = mjd = self.get_times(len(ev), seed=seed)
ev['ra'] = coord.switch_ra_azimuth(ev.azimuth, mjd)
[docs]
class Selector(object):
"""Base class for event selectors.
This is an interface for pre-emptively selecting events prior to [any randomizations as well
as] likelihood evaluation. For per-trial masking, see :class:`PDFRatioEvaluator`.
TODO: actually go and add documentation for masking functionality under :class:`PDFRatioEvaluator`
"""
__metaclass__ = ABCMeta
[docs]
@abstractmethod
def mask(self, arr):
"""Get a boolean mask for selected elements of ``arr``.
:type arr: :class:`utils.Arrays`
:param arr: the Arrays to mask
:rtype: ndarray of bool
:return: the mask
"""
pass
[docs]
@abstractmethod
def where(self, arr):
"""Get the indices of selected elements of ``arr``.
:type arr: :class:`utils.Arrays`
:param arr: the Arrays to mask
:rtype: ndarray of bool
:return: the indices
"""
pass
[docs]
@abstractmethod
def __call__(self, arr):
"""Get a downselected copy of ``arr``.
:type arr: :class:`utils.Arrays`
:param arr: the Arrays to mask
:rtype: same as ``arr``
:return: downselected copy(or potentially view) of ``arr``
"""
pass
[docs]
class DecBandSelector(Selector):
"""Select events based on proximity in declination to source(s).
This class allows pre-emptive application of the so-called "band" cut(see
:class:`PointSourceSpacePDFRatioModel` for more).
"""
[docs]
def __init__(self, src, cut_n_sigma=5):
"""Construct a DecBandSelector.
:type src: :class:`utis.Sources`
:param src: the source list. required keys: ``ra`` and ``dec``. optional keys:
``weight``, ``extension``
:type cut_n_sigma: float
:param cut_n_sigma: number of sigmas away from a source an event must be within to be
included
"""
self.dec = src.dec
self.cut_n_sigma = cut_n_sigma
if np.any(src.extension > 0):
self.sigma = src.extension
else:
self.sigma = None
[docs]
def mask(self, arr):
out = np.zeros(len(arr), dtype=bool)
for (i_dec, dec) in enumerate(self.dec):
if self.sigma is None:
sigma = arr.sigma
else:
sigma = np.sqrt(arr.sigma**2 + self.sigma[i_dec]**2)
out |= np.abs(arr.dec - dec) / sigma < self.cut_n_sigma
return out
[docs]
def where(self, arr):
return np.where(self.mask(arr))[0]
[docs]
def __call__(self, arr):
return arr[self.mask(arr)]
[docs]
class TimeWindowSelector(Selector):
"""Select events within a given time window.
You probably don't want to use this class; you might be looking for :class:`DataOnOffInjector`.
"""
[docs]
def __init__(self, mjd_min, mjd_max):
self.mjd_min, self.mjd_max = mjd_min, mjd_max
[docs]
def mask(self, arr):
out = np.zeros(len(arr), dtype=bool)
for (i_dec, (mjd_min, mjd_max)) in enumerate(izip(self.mjd_min, self.mjd_max)):
out |= (mjd_min <= arr.mjd) & (arr.mjd <= mjd_max)
return out
[docs]
def where(self, arr):
return np.where(self.mask(arr))[0]
[docs]
def __call__(self, arr):
return arr[self.mask(arr)]
[docs]
class Injector(object):
"""Base class for event injectors.
Injectors produce event ensembles along with counts of events that could have been injected but
were instead pre-emptively excluded using a :class:`Selector`.
TODO: revisit hierarchy. Background injectors don't seem to want an ``n_inj`` argument at all,
in the :meth:`Injector.inject` method.
"""
__metaclass__ = ABCMeta
[docs]
@abstractmethod
def inject(self, n_inj, seed=None):
"""Get some number of events.
:type n_inj: int
:param n_inj: number of events to inject
:type seed: int or :class:`numpy.random.RandomState`
:param seed: seed spec
:rtype: (list of :class:`utils.Events`, int)
:return: (event ensembles, number of pre-emptively excluded events)
"""
pass
[docs]
class DataInjector(Injector):
"""Inject real data events.
This class injects real events from the data, possibly(in fact, almost always) with one or
more randomizations applied.
"""
[docs]
def __init__(self, ana, data, keep, randomizers=None, bg_replace=False):
"""Construct a DataInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type data: :class:`utils.Events`
:param data: the data events to inject
:type keep: list of str
:param keep: names of event features to keep
:type randomizers: list of :class:`Randomizer`
:param randomizers: the randomizers to apply; if not given, RARandomizer is used.
:type bg_replace: bool
:param bg_replace: If true, sampling with replacement will be used.
"""
self.ana, self.data = ana, data
self.keep = keep
if randomizers is None:
randomizers = [RARandomizer()]
self.randomizers = randomizers
self.replace = bg_replace
self.N = ana.n_data
[docs]
def inject(self, seed=None):
random = get_random(seed)
out = utils.Events({key: self.data[key] for key in self.keep})
size = len(out)
out['idx'] = np.arange(size)
if self.replace:
# sample event indices with replacement
idx = random.choice(out['idx'], size, replace=self.replace)
out = out[idx]
out['inj'] = np.repeat(self, size)
n_selected = len(out)
n_excluded = len(self.ana.data) - n_selected
#out.compress(self.keep + 'idx inj'.split())
for randomize in self.randomizers:
randomize(out, seed=seed)
return [out], n_excluded
[docs]
class DataOnOffInjector(Injector):
"""Inject off-time events into on-time windows.
This class performs the sort of background injection typically used in transient analyses,
where the background is taken from off-time data while the analysis is performed in specific
on-time windows.
"""
[docs]
def __init__(self, ana, time_model, keep, randomizers=None, cut_n_sigma=5, full_sky=False, bg_replace=True):
"""Construct a DataOnOffInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type time_model: :class:`pdf.TimePDFRatioModel`
:param time_model: the time PDF ratio model
:type keep: list of str
:param keep: names of event features to keep
:type randomizers: list of :class:`Randomizer`
:param randomizers: the randomizers to apply. you probably don't want to apply any(TODO:
reconsider, why exactly is this argument here?)
:type cut_n_sigma: float
:param cut_n_sigma: number of sigmas away from a source an event must be within to be
included
:type bg_replace: bool
:param bg_replace: If true, sampling with replacement will be used.
"""
self.ana, self.time_model, self.keep = ana, time_model, deepcopy(keep)
if 'azimuth' not in keep:
self.keep.append('azimuth')
self.data, self.src = ana.bg_data, time_model.src
self.cut_n_sigma = cut_n_sigma
self.full_sky = full_sky
self.arange = np.arange(len(self.src))
self._set_indices(full_sky=full_sky)
self.randomizers = randomizers or []
self.replace = bg_replace
[docs]
def inject(self, seed=None):
# note: default is replace=True because it's faster
random = get_random(seed)
n_injs = random.poisson(self.n_injs_poisson)
i_src_injs = np.where(n_injs > 0)[0]
if len(i_src_injs) == 0:
return [], 0
data, indices, time_model = self.data, self.indices, self.time_model
idx = np.concatenate([
random.choice(indices[i_src], n, replace=self.replace)
for (i_src, n) in izip(i_src_injs, n_injs[i_src_injs]) ])
out = utils.Events({key: data[key][idx] for key in self.keep})
out['idx'] = idx
out['inj'] = np.repeat(self, len(out))
if ( self.ana.grl is not None ):
out['mjd'] = []
active_time_per_source = time_model.active_time_per_source
for i, n in enumerate(n_injs):
if( n>0 ):
if ( len(active_time_per_source[i])==1 ):
start, stop = active_time_per_source[i][0]
out['mjd'] = np.append(out['mjd'], random.uniform(start, stop, n))
else:
probs = np.array([t_stop-t_start for t_start, t_stop in active_time_per_source[i]])
probs = probs/sum(probs)
idx = random.choice(np.arange(len(probs)), n, p=probs, replace=True)
for index in idx:
start, stop = active_time_per_source[i][index]
out['mjd'] = np.append(out['mjd'], random.uniform(start, stop, 1))
else:
out['mjd'] = np.append(out['mjd'], np.array([],dtype=float) )
else:
start_mjds = np.repeat(time_model.src_mjd_min, n_injs)
end_mjds = np.repeat(time_model.src_mjd_max, n_injs)
out['mjd'] = random.uniform(start_mjds, end_mjds)
for randomize in self.randomizers:
randomize(out, seed=seed)
out['ra'] = coord.switch_ra_azimuth(out.azimuth, out.mjd)
return [out], 0
def _set_indices(self, full_sky=False):
data, src, cut_n_sigma = self.data, self.src, self.cut_n_sigma
time_model = self.time_model
indices = []
n_injs_poisson = []
N = 1. * len(data)
for i in xrange(len(src)):
if(full_sky):
idx = np.array(xrange(len(data)))
else:
selector = DecBandSelector(src[i:i+1], cut_n_sigma=cut_n_sigma)
idx = selector.where(data)
rate_frac = len(idx) / N
n_inj = rate_frac * time_model.src_nb[i]
indices.append(idx)
n_injs_poisson.append(n_inj)
self.indices = indices
self.n_injs_poisson = np.array(n_injs_poisson)
[docs]
class MCBackgroundInjector(Injector):
"""Inject background events using weighted MC events to model the background instead of scrambling data in RA.
This class injects simulated events from the monte carlo, possibly(in fact, almost always) with one or
more randomizations applied.
"""
[docs]
def __init__(self, ana, bg_weight_names, keep, randomizers = None):
"""Construct a PointSourceInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type mc: :class:`utils.Events`
:param mc: the monte carlo events to inject
:type bg_weight_names: list of str
:param bg_weight_names: names of the monte carlo weights saved in the signal events. Weights should
be in Hz and already have flux incorperated.
:type keep: list of str
:param keep: names of event features to keep
:type randomizers: list of :class:`Randomizer`
:param randomizers: the randomizers to apply to the monte carlo events
"""
self.ana, self.mc, self.keep = ana, ana.sig, deepcopy(keep)
if 'azimuth' not in keep:
self.keep.append('azimuth')
if randomizers is None:
randomizers = [RARandomizer()]
self.randomizers = randomizers
self.N = ana.n_data
self.weight_names = bg_weight_names
if 'idx' not in keep:
self.keep.append('idx')
if 'inj' not in keep:
self.keep.append('inj')
self._set_weights()
def _set_weights(self):
weights = []
mc = self.mc
weight_names = self.weight_names
self.weights = []
self.n_exp = []
self.n_exp_err = []
self.n_w = len(weight_names)
for weight_name in weight_names:
#weights should already have flux incorperated and in units of Hz
weights = mc[weight_name] * self.ana.livetime
self.weights.append(weights)
self.n_exp.append(sum(weights))
self.n_exp_err.append(np.sqrt(sum(weights**2)))
self.probs = [self.weights[i] / np.sum(self.weights[i]) for i in xrange(self.n_w)]
self.mc['idx'] = xrange(len(mc['dec']))
self.mc['inj'] = np.zeros(len(mc['dec']))
[docs]
def inject(self, replace=False, seed=None):
random = get_random(seed)
mc = self.mc
mc_evts = utils.Events({key: mc[key] for key in self.keep})
outs = []
for i in xrange(self.n_w):
# grab a random map for each of the weight types, use the weights to get the uncertainty
n_bg_mcfluct = random.normal(self.n_exp[i], scale = self.n_exp_err[i])
n_bg_mcfluct = int(n_bg_mcfluct) if n_bg_mcfluct > 0. else 0
n_bg = random.poisson(n_bg_mcfluct)
probs = self.probs[i]
indices = mc['idx']
# draw events
idx = random.choice(indices, n_bg, p=probs, replace = True)
out = mc_evts[idx]
out['inj'] = i*np.ones(len(out['dec']))
outs.append(out)
outs = utils.Arrays.concatenate(outs)
# randomize drawn events to further the randomness of maps
for randomize in self.randomizers:
randomize(outs, seed=seed)
return [outs], 0
[docs]
class SigInjector(Injector):
"""Base class for signal injectors.
Signal injectors, unlike background injectors, must also be able to provide their total
acceptance. This class enforces that requirement using :meth:`SigInjector.acc_total`.
"""
@abstractproperty
def acc_total(self):
"""Total signal acceptance.
This is similar to :meth:`LLHEvaluatorBase.get_acc_total`, but evaluated in light of all
specified signal injection details and typically in terms of the actual simulated signal
events that will be used.
"""
pass
[docs]
class PointSourceInjector(SigInjector):
"""Inject one or more point-like sources.
This class implements the signal injection for classic time-integrated analyses. It
transparently supports stacking of multiple sources as well as per-source Gaussian spatial
extension.
"""
[docs]
def __init__(self, ana, src, flux, keep,
sindec_bandwidth=np.radians(2),):
"""Construct a PointSourceInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type src: :class:`utis.Sources`
:param src: the source list. required keys: ``ra`` and ``dec``. optional keys:
``weight``, ``extension``
:type flux: :class:`hyp.Flux`
:param flux: the signal spectrum
:type keep: list of str
:param keep: names of event features to keep
:type sindec_bandwidth: float
:param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to
draw simulated signal events
"""
from .hyp import Flux
self.ana, self.sig, self.src, self.keep = ana, ana.sig, src, deepcopy(keep)
if 'idx' not in keep:
self.keep.append('idx')
if 'inj' not in keep:
self.keep.append('inj')
if isinstance(flux, Iterable):
self.flux = flux
else:
self.flux = len(src) * [flux]
self.sindec_bandwidth = sindec_bandwidth
self.solid_angle = 2 * pi * self.sindec_bandwidth
self._set_indices()
self._set_weights()
def _set_indices(self):
sig, src, sindec_bandwidth = self.sig, self.src, self.sindec_bandwidth
src_sindecs = np.sin(src.dec)
min_sindecs = np.maximum(-1, src_sindecs - .5*sindec_bandwidth)
max_sindecs = np.minimum(+1, src_sindecs + .5*sindec_bandwidth)
min_sindecs[max_sindecs == +1] = +1 - sindec_bandwidth
max_sindecs[min_sindecs == -1] = -1 + sindec_bandwidth
min_decs, max_decs = np.arcsin(min_sindecs), np.arcsin(max_sindecs)
indices = []
N = len(src)
logging = N > 50
flux0 = self.flux[0]
energy_min, energy_max = flux0.energy_range
mask_energy = ((energy_min <= self.sig.true_energy)
& (self.sig.true_energy <= energy_max))
flush = sys.stdout.flush
for (i, (min_dec, max_dec, flux)) in enumerate(izip(min_decs, max_decs, self.flux)):
if flux.energy_range != (energy_min, energy_max):
energy_min, energy_max = flux.energy_range
this_mask_energy = ((energy_min <= self.sig.true_energy)
& (self.sig.true_energy <= energy_max))
else:
this_mask_energy = mask_energy
mask_dec = (min_dec <= sig.true_dec) & (sig.true_dec <= max_dec)
mask = mask_dec & this_mask_energy
indices.append(np.where(mask)[0])
if logging:
print('\r{:<40s}{:4.0f}% ...'.format(
self.ana.key, 100. * i / N), end=''), flush()
if logging:
print('\r{:<40s}{:4.0f}%. '.format(self.ana.key, 100))
self.indices = indices
def _set_weights(self):
weights = []
sig, src = self.sig, self.src
ow, E = self.ana.livetime / self.solid_angle * sig.oneweight, sig.true_energy
n_src = len(self.src)
for i_src in xrange(n_src):
indices = self.indices[i_src]
weight = ow[indices] * self.flux[i_src](E[indices])
weights.append(weight)
self.weights = weights # Can be removed to reduce memory consumption?
self.src_weights = src.weight * np.array([np.sum(weight) for weight in weights])
self.probs = [weights[i] / np.sum(weights[i]) for i in xrange(n_src)]
self.src_prob = self.src_weights / np.sum(self.src_weights)
self._acc_total = np.sum(self.src_weights)
@property
def acc_total(self):
return self._acc_total
[docs]
def inject(self, n_inj, seed=None):
random = get_random(seed)
src_n_injs = random.multinomial(n_inj, self.src_prob)
sig, src = self.sig, self.src
n_src = len(self.src)
outs = []
for i_src in xrange(n_src):
src_n_inj = src_n_injs[i_src]
if src_n_inj == 0:
continue
# collect source info
indices = self.indices[i_src]
probs = self.probs[i_src]
extension = src.extension[i_src]
src_ra, src_dec = src.ra[i_src], src.dec[i_src]
# draw events
idx = random.choice(indices, src_n_inj, p=probs)
ev_ra, ev_dec = sig.xra[idx], sig.xdec[idx]
# smear if necessary
if extension:
ev_ra += random.normal(0, extension, src_n_inj)
ev_dec += random.normal(0, extension, src_n_inj)
# rotate to source position
ev_dec, ev_ra = coord.rotate_xaxis_to_source(
src_dec, src_ra, ev_dec, ev_ra, latlon=True)
out = sig[idx]
out['ra'], out['dec'] = ev_ra, ev_dec
out['idx'] = idx
out['inj'] = np.repeat(self, len(out))
for key in out.keys():
if key not in self.keep:
del out[key]
#out.compress(self.keep + 'idx inj'.split())
outs.append(out)
return outs, 0
[docs]
class PointSourceTimeDepInjector(SigInjector):
"""Base class for for time-dependent injection from point-like sources.
This is an abstract base class for time-dependent injection from point-like sources. Most of
the injection work is done in terms of per-source instances of :class:`PointSourceInjector`;
time-dependence is handled by derived classes' implementations of the abstract methods
:meth:`PointSourceTimeDepInjector.get_time_model`,
:meth:`PointSourceTimeDepInjector.get_frac_during_livetime`, and
:meth:`PointSourceTimeDepInjector.sample_mjd`.
"""
[docs]
def __init__(self, ana, src, flux, keep,
sindec_bandwidth=np.radians(2),
spatial_prior=False):
"""Construct a PointSourceTimeDepInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type src: :class:`utis.Sources`
:param src: the source list. required keys: ``ra`` and ``dec``. optional keys:
``weight``, ``extension``
:type flux: :class:`hyp.Flux`
:param flux: the signal spectrum
:type keep: list of str
:param keep: names of event features to keep
:type sindec_bandwidth: float
:param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to
draw simulated signal events
:type spatial_prior: bool
:param spatial_prior: option to use a spatial prior healpix map for source localisations
"""
self.ana, self.src, self.keep = ana, src, deepcopy(keep)
self.spatial_prior = spatial_prior
#if 'azimuth' not in keep:
# self.keep.append('azimuth')
self.sig = ana.sig
if( spatial_prior ):
if isinstance(flux, Iterable):
self.acc_param = [ana.custom_acc_params[f] for f in flux]
else:
self.acc_param = len(src) * [ana.custom_acc_params[flux]]
if isinstance(flux, Iterable):
self.flux = flux = flux
else:
self.flux = flux = len(src) * [flux]
self.n_src = n_src = len(src)
if spatial_prior:
psis = []
nside = [prior.nside for prior in src.prior if( prior is not None )]
N_nsides = len(np.unique(nside))
if( N_nsides==1 ):
self.nside = nside[0]
self.npix = 12 * self.nside**2
self._set_hp_idx(nside[0])
self.pix_idx = list(xrange(self.npix))
elif( N_nsides>1 ):
raise Exception("Handling of maps with varying nside not implemented yet")
for i in xrange(n_src):
src_i = src[i:i+1]
if( src_i.prior[0] is not None ):
psi = SpatialPriorInjector(
ana, src_i, flux[i], self.keep,
self.hp_idx, sindec_bandwidth=sindec_bandwidth)
else:
psi = PointSourceInjector(
ana, src_i, flux[i],
self.keep, sindec_bandwidth=sindec_bandwidth)
psis.append(psi)
self.psis = psis
else:
self.psis = psis = [
PointSourceInjector(
ana, src[i:i+1], flux[i], self.keep, sindec_bandwidth=sindec_bandwidth)
for i in xrange(n_src)]
self.time_model = time_model = self.get_time_model()
self.base_src_weights = np.array([psi.src_weights[0] for psi in psis])
self.frac_during_livetime = self.get_frac_during_livetime()
self.src_weights = self.base_src_weights * self.frac_during_livetime / ana.livetime
# all sources have zero probability if none have S_time > 0 for this dataset
if np.sum(self.src_weights):
self.src_prob = self.src_weights / np.sum(self.src_weights)
else:
self.src_prob = self.src_weights
self._acc_total = np.sum(self.src_weights)
[docs]
@abstractmethod
def get_time_model(self):
"""Get the time PDF ratio model.
:rtype: :class:`pdf.TimePDFRatioModel`
"""
pass
[docs]
@abstractmethod
def get_frac_during_livetime(self):
"""Get the fraction of the total lightcurve covered by a given dataset.
See :meth:`pdf.TimePDFRatioModel.get_frac_during_livetime`.
"""
pass
[docs]
@abstractmethod
def sample_mjd(self, src_n_injs, seed=None):
"""Get per-source ``mjd`` arrays.
:type src_n_injs: ndarray of int
:param src_n_injs: per-source number of events to inject
:type seed: int or :class:`numpy.random.RandomState`
:param seed: seed spec
"""
pass
@property
def acc_total(self):
return self._acc_total
[docs]
def inject(self, n_inj, seed=None):
random = get_random(seed)
sig, src, ana = self.sig, self.src, self.ana
if( self.spatial_prior ):
base_src_weights = np.zeros(self.n_src)
src_inj_ra, src_inj_dec = np.zeros(self.n_src), np.zeros(self.n_src)
for i_src in xrange(self.n_src):
prior = src.prior[i_src]
if( prior is None ):
src_inj_dec[i_src] = src.dec[i_src] #not used
src_inj_ra[i_src] = src.ra[i_src] #not used
base_src_weights[i_src] = self.base_src_weights[i_src]
else:
# Draw source positions from prior
pix_src = random.choice(self.pix_idx, p=prior.map)
theta_pix_center, ra_pix_center = hp.pix2ang(self.nside, pix_src)
src_outside_pix = True
while( src_outside_pix ):
d_theta, d_ra = 2*random.uniform(0, hp.nside2resol(self.nside), 2)
d_theta += np.pi/2
src_theta, src_ra = coord.rotate_xaxis_to_source(theta_pix_center, ra_pix_center, d_theta, d_ra,latlon=False)
pix_shifted_src = hp.ang2pix(self.nside,src_theta,src_ra)
if( pix_shifted_src==pix_src ):
src_outside_pix = False
src_inj_dec[i_src] = src_dec = pi/2 - src_theta
src_inj_ra[i_src] = src_ra
# Compute src weight for that position
log_acc = self.acc_param[i_src].s(np.sin(src_dec))
base_src_weights[i_src] = np.exp(log_acc)*ana.livetime
base_src_weights[i_src] *= src.weight[i_src]
src_weights = base_src_weights * self.frac_during_livetime / ana.livetime
if np.sum(self.src_weights):
src_prob = src_weights / np.sum(src_weights)
else:
src_prob = src_weights
else:
src_prob = self.src_prob
src_n_injs = random.multinomial(n_inj, src_prob)
next_seed = lambda: random.randint(2**32)
mjds = self.sample_mjd(src_n_injs, seed=next_seed())
n_src = len(self.src)
outs = []
for i_src in xrange(n_src):
src_n_inj = src_n_injs[i_src]
if src_n_inj == 0:
continue
if( self.spatial_prior and (src.prior[i_src] is not None) ):
ev = self.psis[i_src].inject(src_n_inj, seed=next_seed(),
ra=[src_inj_ra[i_src]],
dec=[src_inj_dec[i_src]])[0][0]
else:
ev = self.psis[i_src].inject(src_n_inj, seed=next_seed())[0][0]
ev['mjd'] = mjds[i_src]
outs.append(ev)
return outs, 0
def is_during_ontime(self, mjd):
grl = self.ana.grl
@np.vectorize
def f(t):
return np.any((grl.start <= t) & (t <= grl.stop))
return f(mjd)
def _set_hp_idx(self, nside):
# Set indices healpix maps here, otherwise this becomes
# too memory consuming when stacking several 100 sources
ra = self.ana.sig.true_ra
dec = self.ana.sig.true_dec
hp_idx = hp.ang2pix(nside, pi/2-dec, ra)
self.hp_idx = np.array(hp_idx, dtype="uint32")
[docs]
class PointSourceTimeWindowInjector(PointSourceTimeDepInjector):
"""Inject Gaussian or time box flares.
This class implements Gaussian or top-hat time window flare injection for one or more(possibly
spatially-extended) sources.
"""
[docs]
def __init__(self, ana, src, flux, keep,
t0, dt,
box=False, box_mode='center', endpoints=False,
sindec_bandwidth=np.radians(2),):
"""Construct a PointSourceTimeInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type src: :class:`utis.Sources`
:param src: the source list. required keys: ``ra`` and ``dec``. optional keys:
``weight``, ``extension``
:type flux: :class:`hyp.Flux`
:param flux: the signal spectrum
:type keep: list of str
:param keep: names of event features to keep
:type t0: ndarray of float
:param t0: per-source anchor time-of-flare
:type dt: ndarray of float
:param dt: per-source width of flare(in days)
:type box: bool
:param box: whether to use box rather than Gaussian flare
:type box_mode: str
:param box_mode: one of 'center', 'pre', or 'post'
:type endpoints: bool
:param endpoints: injects and flare start and stop times, rather than throughout
flare
:type sindec_bandwidth: float
:param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to
draw simulated signal events
"""
self.t0, self.dt = np.atleast_1d(t0), np.atleast_1d(dt)
self.box, self.box_mode = box, box_mode
self.endpoints = endpoints
super(PointSourceTimeWindowInjector, self).__init__(
ana, src, flux, keep, sindec_bandwidth=sindec_bandwidth)
if self.box:
if self.box_mode == 'pre':
t1, t2 = t0 - dt, t0
elif self.box_mode == 'post':
t1, t2 = t0, t0 + dt
elif self.box_mode == 'center':
t1, t2 = t0 - 0.5 * dt, t0 + 0.5 * dt
else:
raise RuntimeError('unexpected box_mode: {}'.format(self.box_mode))
self.t1, self.t2 = t1, t2
# validate endpoints mode
if endpoints:
if not box:
raise ValueError(
'cannot invoke "endpoints" mode')
if not (self.is_during_ontime(t1) or self.is_during_ontime(t2)):
raise ValueError(
'cannot inject at endpoints if neither flare bound is during detector on-time')
[docs]
def get_time_model(self):
from .pdf import UntriggeredTimePDFRatioModel
return UntriggeredTimePDFRatioModel(self.ana, self.src, box=self.box, box_mode=self.box_mode)
[docs]
def get_frac_during_livetime(self):
return self.time_model.get_frac_during_livetime(self.t0, self.dt)
[docs]
def sample_mjd(self, src_n_injs, seed=None):
random = get_random(seed)
out = []
mjd_min, mjd_max = self.ana.mjd_min, self.ana.mjd_max
for (i, n) in enumerate(src_n_injs):
t0, dt = self.t0[i], self.dt[i]
if self.box:
t1, t2 = self.t1, self.t2
if self.endpoints:
if not self.is_during_ontime(t1):
p = 1
elif not self.is_during_ontime(t2):
p = 0
else:
p = 0.5
mjd = np.where(random.binomial(1, p, n), t1, t2)
if t1==t2==0:
#This is a bit of a bandaid fix because the multiflare trial runner was written
#before active-livetime-only event injection was implemented. This ensures the
#multiflare flare injection uses the legacy behavior prior to rev 180591
mjd = random.uniform(t1, t2, n)
else:
mjd = []
while len(mjd) < n:
mjd = np.r_[mjd, random.uniform(t1, t2, n - len(mjd))]
ontime_mask = self.is_during_ontime(mjd)
mjd = mjd[ontime_mask]
else:
mjd = []
while len(mjd) < n:
mjd = np.r_[mjd, random.normal(self.t0[i], self.dt[i], n - len(mjd))]
#mjds = np.r_[mjds, pdf.sample(n - len(mjds), random=random)[0]]
ontime_mask = self.is_during_ontime(mjd)
mjd = mjd[ontime_mask]
out.append(mjd)
return out
[docs]
class PointSourceRedNoiseInjector(PointSourceTimeDepInjector):
"""Inject red noise as signal.
This class implements white/pink/red/black noise injector
as a signal light-curve for one source.
"""
[docs]
def __init__(self, ana, src, flux, keep, dt, beta=2,
sindec_bandwidth=np.radians(2),):
"""Construct a PointSourceRedNoiseInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type src: :class:`utils.Sources`
:param src: the source list. required keys: ``ra`` and ``dec``.
optional keys: ``weight``, ``extension``
:type flux: :class:`hyp.Flux`
:param flux: the signal spectrum
:type keep: list of str
:param keep: names of event features to keep
:type dt: float (single number)
:param dt: spacing between time-steps/noise-curve in units of days
(recommended < 50 days)
:type beta: integer (single number)
:param beta: power-law index of noise (default=2 for red noise)
:type sindec_bandwidth: float
:param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands
from which to draw simulated signal events
"""
self.dt = dt
self.beta = beta
self.flare_size = (ana.mjd_max - ana.mjd_min)
self.flare_center = (ana.mjd_min + ana.mjd_max)/2
self.N = int(self.flare_size/dt)
self.box = True
self.box_mode = 'center'
self._frac_during_livetime = self.time_model.get_frac_during_livetime(self.flare_center, self.flare_size)
super(PointSourceRedNoiseInjector, self).__init__(
ana, src, flux, keep, sindec_bandwidth=sindec_bandwidth)
[docs]
def get_time_model(self):
from .pdf import UntriggeredTimePDFRatioModel
return UntriggeredTimePDFRatioModel(self.ana, self.src, box=self.box, box_mode=self.box_mode)
[docs]
def get_frac_during_livetime(self):
return self._frac_during_livetime
[docs]
def sample_mjd(self, src_n_injs, seed=None):
random = get_random(seed)
out = []
mjd_min, mjd_max = self.ana.mjd_min, self.ana.mjd_max
for (i, n) in enumerate(src_n_injs):
dt, N, beta = self.dt, self.N, self.beta
mjd = []
p = self.generate_rednoise(N, dt, beta, random)
while len(mjd) < n:
mjd = np.r_[mjd, random.choice(np.linspace(mjd_min, mjd_max, N), n - len(mjd), p=p)]
ontime_mask = self.is_during_ontime(mjd)
mjd = mjd[ontime_mask]
out.append(mjd)
return out
[docs]
def generate_rednoise(self, N, dt, beta, random, generate_complex=False):
"""Generate a power-law lightcurve
This uses the method from Timmer & Koenig [1]
:type N : integer
:param N : number of equal-spaced time steps to generate
:type dt : float
:param dt : spacing between time-steps
:type beta : float
:param beta : power-law index, the spectrum will be (1 / f)^beta
white noise is beta=0
pink noise is beta=1
red noise is beta=2
black noise is beta>2
:type generate_complex : boolean (optional)
:param generate_complex : if True, generate a complex time series rather
than a real time series
References
----------
[1] Timmer, J. & Koenig, M. On Generating Power Law Noise. A&A 300:707
"""
dt = float(dt)
N = int(N)
Npos = int(N / 2)
# Nneg = int((N - 1) / 2)
domega = (2 * pi / dt / N)
if generate_complex:
omega = domega * np.fft.ifftshift(np.arange(N) - int(N / 2))
else:
omega = domega * np.arange(Npos + 1)
x_fft = np.zeros(len(omega), dtype=complex)
x_fft.real[1:] = random.normal(0, 1, len(omega) - 1)
x_fft.imag[1:] = random.normal(0, 1, len(omega) - 1)
x_fft[1:] *= (1. / omega[1:]) ** (0.5 * beta)
x_fft[1:] *= (1. / np.sqrt(2))
# by symmetry, the Nyquist frequency is real if x is real
if (not generate_complex) and (N % 2 == 0):
x_fft.imag[-1] = 0
if generate_complex:
x = np.fft.ifft(x_fft)
else:
x = np.fft.irfft(x_fft, N)
x = x - np.min(x)
return x/np.sum(x)
[docs]
class PointSourceBinnedTimeInjector(PointSourceTimeDepInjector):
"""Inject events distributed according to a binned lightcurve."""
[docs]
def __init__(self, ana, src, flux, keep,
lcs, threshs=None, lags=None,
sindec_bandwidth=np.radians(2),):
"""Construct a PointSourceBinnedTimeInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type src: :class:`utis.Sources`
:param src: the source list. required keys: ``ra`` and ``dec``. optional keys:
``weight``, ``extension``
:type flux: :class:`hyp.Flux`
:param flux: the signal spectrum
:type keep: list of str
:param keep: names of event features to keep
:type lcs: list of :class:`histlite.Hist`
:param lcs: the per-source lightcurves(see :class:`pdf.BinnedTimePDFRatioModel`)
:type threshs: ndarray of float
:param threshs: per-source thresholds
:type sindec_bandwidth: float
:param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to
draw simulated signal events
"""
self.lcs = np.atleast_1d(lcs)
if threshs is None:
self.threshs = np.zeros(len(src))
else:
self.threshs = np.atleast_1d(threshs)
if lags is None:
self.lags = np.zeros(len(src))
else:
self.lags = np.atleast_1d(lags)
super(PointSourceBinnedTimeInjector, self).__init__(
ana, src, flux, keep, sindec_bandwidth=sindec_bandwidth)
self.pdfs = self.time_model.get_lc_pdfs_cropped(
threshs_array=self.threshs, lags_array=self.lags)
[docs]
def get_time_model(self):
from .pdf import BinnedTimePDFRatioModel
return BinnedTimePDFRatioModel(self.ana, self.src, self.lcs)
[docs]
def get_frac_during_livetime(self):
return self.time_model.get_frac_during_livetime(
threshs_array=self.threshs, lags_array=self.lags)
[docs]
def sample_mjd(self, src_n_injs, seed=None):
random = get_random(seed)
out = []
for (i, n) in enumerate(src_n_injs):
mjds = []
pdf = self.pdfs[i]
#j = 0
while len(mjds) < n:
mjds = np.r_[mjds, pdf.sample(n - len(mjds), random=random)[0]]
ontime_mask = self.is_during_ontime(mjds)
mjds = mjds[ontime_mask]
#j += 1
out.append(mjds)
return out
[docs]
class TransientInjector(PointSourceTimeDepInjector):
"""Inject events from transient sources."""
[docs]
def __init__(self, ana, time_model, flux, keep,
sindec_bandwidth=np.radians(2),
spatial_prior=False):
"""Construct a TransientInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type time_model: :class:`pdf.TimePDFRatioModel`
:param time_model: the time PDF ratio model
:type flux: :class:`hyp.Flux`
:param flux: the signal spectrum
:type keep: list of str
:param keep: names of event features to keep
:type sindec_bandwidth: float
:param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to
draw simulated signal events
:type spatial_prior: bool
:param spatial_prior: option to use a spatial prior healpix map for source localisations
"""
self.time_model = time_model
src = time_model.src
super(TransientInjector, self).__init__(
ana, src, flux, keep, sindec_bandwidth=sindec_bandwidth,
spatial_prior=spatial_prior)
[docs]
def get_time_model(self):
return self.time_model
[docs]
def get_frac_during_livetime(self):
return self.time_model.get_frac_during_livetime()
[docs]
def sample_mjd(self, src_n_injs, seed=None):
random = get_random(seed)
time_model = self.time_model
out = []
if ( self.ana.grl is not None ):
active_time_per_source = time_model.active_time_per_source
for i, n in enumerate(src_n_injs):
if( n>0 ):
if ( len(active_time_per_source[i])==1 ):
start, stop = active_time_per_source[i][0]
out.append(random.uniform(start, stop, n))
else:
out_i = []
probs = np.array([t_stop-t_start for t_start, t_stop in active_time_per_source[i]])
probs = probs/sum(probs)
idx = random.choice(np.arange(len(probs)), n, p=probs, replace=True)
for index in idx:
start, stop = active_time_per_source[i][index]
out_i.extend(random.uniform(start, stop, 1))
out.append(out_i)
else:
out.append( np.array([],dtype=float) )
else:
# Just sample uniformly during t_100
start_mjds = time_model.src_mjd
end_mjds = time_model.src_mjd_t_100
for (start_mjd, end_mjd, n) in izip(start_mjds, end_mjds, src_n_injs):
out.append(random.uniform(start_mjd, end_mjd, n))
return out
[docs]
class TemplateInjector(SigInjector):
"""Inject a highly-extended source according to a spatial template.
This class implements injection according to an all-sky source template(such as for the
Galactic "plane"). If per-pixel energy-binned spectra are available then they are used,
in line with the principle that "you can do whatever you want in your likelihood but you'd
better use everything you've got for signal injection", despite the fact that this is
relatively slow computationally.
"""
[docs]
def __init__(self, ana, template_model, keep, flux=None, sig_replace=True):
"""Construct a TemplateInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type template_model: :class:`pdf.TemplateSpacePDFRatioModel`
:param template_model: the template space PDF ratio model
:type keep: list of str
:param keep: names of event features to keep
:type flux: :class:`hyp.Flux`
:param flux: if given, the signal spectrum. otherwise, inject according to per-pixel
spectra if available, or use ``template_model.flux``
:type sig_replace: bool
:param sig_replace: If true, sampling with replacement will be used, means an element can be reused multiple times.
"""
# save properties
if flux is None:
flux = template_model.flux
self.ana, self.template_model, self.keep, self.flux = ana, template_model, keep, flux
self.replace = sig_replace
tm = template_model
# weight as much as possible up front
livetime = ana.livetime
# if per-pixel flux, mask signal into allowed energy range
if tm.bins_energy is None:
self.sig = sig = ana.sig
else:
mask, i_energy = tm.get_mask_and_i_energy(ana.sig.true_energy)
self.sig = sig = ana.sig[mask]
# pixel indices
pixels = hp.ang2pix(tm.nside, pi/2 - sig.true_dec, sig.true_ra)
# build up weights from livetime, oneweight, and if possible, template
self.weights_noflux = livetime * sig.oneweight
if tm.bins_energy is None:
# we can cache the event weights
self.flux_weights = self.weights_noflux * flux(sig.true_energy)
weights = self.flux_weights * tm.template[pixels]
else:
# we just cache the energy bin indices
self.flux_weights = None
mask, i_energy = tm.get_mask_and_i_energy(sig.true_energy)
assert np.all(mask), 'should have restricted energy range already'
self.mask, self.i_energy = mask, i_energy
phi = tm.template[pixels, i_energy]
weights = self.weights_noflux * phi
# total acceptance from weighted events
self._acc_total = np.sum(weights)
@property
def acc_total(self):
return self._acc_total
[docs]
def inject(self, n_inj, seed=None):
flush = sys.stdout.flush
random = get_random(seed)
tm = self.template_model
sig = self.sig
# scramble signal MC
#print('scrambling'), flush()
delta_ra = random.uniform(0, 2*pi, len(sig))
pixels = hp.ang2pix(tm.nside, pi/2 - sig.true_dec, sig.true_ra + delta_ra)
# weight events
#print('weighting'), flush()
if self.flux_weights is None:
# TODO: ideally find something faster than weighting
# every MC event every trial
i_energy = self.i_energy
phi = tm.template[pixels, i_energy]
weights = self.weights_noflux * phi
else:
weights = self.flux_weights * tm.template[pixels]
weights_sum = np.sum(weights)
probs = weights / weights_sum
#print('selecting'), flush()
idx = random.choice(len(weights), n_inj, replace=self.replace, p=probs)
ra = sig.ra[idx] + delta_ra[idx]
out = sig[idx]
out['ra'] = ra
for key in out.keys():
if key not in self.keep:
del out[key]
out['idx'], out['inj'] = idx, np.repeat(self, len(out))
#print('dunzo'), flush()
return [out], 0
[docs]
class TemplateBGInjector(Injector):
"""Inject typical types of background along with a highly-extended source according to a spatial template.
This class implements addition injection to the typical background (scrambled or MC) according to an all-sky
source template(such as for the Galactic "plane"). If per-pixel energy-binned spectra are available then they are used.
"""
[docs]
def __init__(self, ana, template_model, keep, bg_weight_names, flux=None, sig_replace=True, randomizers = None, mcbg = False):
"""Construct a TemplateInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type template_model: :class:`pdf.TemplateSpacePDFRatioModel`
:param template_model: the template space PDF ratio model
:type keep: list of str
:param keep: names of event features to keep
:type flux: :class:`hyp.Flux`
:param flux: if given, the signal spectrum. otherwise, inject according to per-pixel
spectra if available, or use ``template_model.flux``
:type sig_replace: bool
:param sig_replace: If true, sampling with replacement will be used, means an element can be reused multiple times.
"""
# save properties
if flux is None:
flux = template_model.flux
self.ana, self.template_model, self.keep, self.flux, self.mcbg = ana, template_model, keep, flux, mcbg
self.replace = sig_replace
tm = template_model
# weight as much as possible up front
livetime = ana.livetime
# if per-pixel flux, mask signal into allowed energy range
if tm.bins_energy is None:
self.sig = sig = ana.sig
else:
mask, i_energy = tm.get_mask_and_i_energy(ana.sig.true_energy)
self.sig = sig = ana.sig[mask]
self.bin_edges = []
if self.ana.dOmega_corr is not None:
self.dOmega_corr = self.ana.dOmega_corr
self.bin_edges = self.ana.bin_edges
# pixel indices
pixels = hp.ang2pix(tm.nside, pi/2 - sig.true_dec, sig.true_ra)
# build up weights from livetime, oneweight, and if possible, template
self.weights_noflux = livetime * sig.oneweight
if tm.bins_energy is None:
# we can cache the event weights
self.flux_weights = self.weights_noflux * flux(sig.true_energy)
weights = self.flux_weights * tm.template[pixels]
else:
# we just cache the energy bin indices
self.flux_weights = None
mask, i_energy = tm.get_mask_and_i_energy(sig.true_energy)
assert np.all(mask), 'should have restricted energy range already'
self.mask, self.i_energy = mask, i_energy
phi = tm.template[pixels, i_energy]
weights = self.weights_noflux * phi
# total acceptance from weighted events
self._acc_total = np.sum(weights)
# Get MCBG if that is what wanted
if self.mcbg:
print("TemplateMCBGInjector has not been tested with the mcbg option")
self.bg = ana.sig
else:
self.bg = ana.bg_data
self.data=ana.data
if 'azimuth' not in keep:
self.keep.append('azimuth')
if randomizers is None:
randomizers = [RARandomizer()]
self.randomizers = randomizers
self.N = ana.n_data
self.weight_names = bg_weight_names
if 'energy' not in keep:
self.keep.append('energy')
if 'dec' not in keep:
self.keep.append('dec')
if self.mcbg:
if 'idx' not in keep:
self.keep.append('idx')
if 'inj' not in keep:
self.keep.append('inj')
self._set_weights()
def _set_weights(self):
weights = []
bg = self.bg
weight_names = self.weight_names
self.weights = []
self.n_exp = []
self.n_exp_err = []
self.n_w = len(weight_names)
for weight_name in weight_names:
#weights should already have flux incorperated and in units of Hz
weights = bg[weight_name] * self.ana.livetime
self.weights.append(weights)
self.n_exp.append(sum(weights))
self.n_exp_err.append(np.sqrt(sum(weights**2)))
self.probs = [self.weights[i] / np.sum(self.weights[i]) for i in xrange(self.n_w)]
self.bg['idx'] = xrange(len(bg['dec']))
self.bg['inj'] = np.zeros(len(bg['dec']))
@property
def acc_total(self):
return self._acc_total
[docs]
def inject(self, gp_norm, n_inj=None, seed=None, replace=False, truth=False, debug=False):
random = get_random(seed)
bg = self.bg
bg_evts = utils.Events({key: bg[key] for key in self.keep})
size = len(bg_evts)
bg_evts['idx'] = np.arange(size)
# inject template using signal mc evts first
tm = self.template_model
sig = self.sig
# scramble signal MC
#randamize by ra
delta_ra = random.uniform(0, 2*pi, len(sig))
pixels = hp.ang2pix(tm.nside, pi/2 - sig.true_dec, sig.true_ra + delta_ra)
# weight events
if self.flux_weights is None:
# TODO: ideally find something faster than weighting
# every MC event every trial
i_energy = self.i_energy
phi = tm.template[pixels, i_energy]
weights = self.weights_noflux * phi
else:
weights = self.flux_weights * tm.template[pixels]
weights_sum = np.sum(weights)
probs = weights / weights_sum
#print('selecting'), flush()
gp_n_inj = int(weights_sum * gp_norm) # total num of gp evts scaled by weight
if gp_norm < 1e-3: #pi0 template flux norm factor need special treatment
gp_n_inj = int(round(self.flux.to_ns(flux=gp_norm, acc_total=self.acc_total, E0=100, unit=1e3, use_E2dNdE=True), 0))
idx = random.choice(len(weights), gp_n_inj, replace=self.replace, p=probs)
ra = sig.ra[idx] + delta_ra[idx]
tmp_out = sig[idx]
tmp_out['ra'] = ra
tmp_out['dec'] = sig.dec[idx]
for key in tmp_out.keys():
if key not in self.keep:
del tmp_out[key]
tmp_out['idx'], tmp_out['inj'] = idx, np.repeat(self, len(tmp_out))
n_tmp = len(tmp_out)
if len(tmp_out)>0:
tmp_out = utils.Arrays.concatenate(tmp_out)
if truth:
return [tmp_out], []
#regular mc/data bg
outs = []
if self.mcbg:
for i in xrange(self.n_w):
# grab a random map for each of the weight types, use the weights to get the uncertainty
n_bg_mcfluct = random.normal(self.n_exp[i], scale = self.n_exp_err[i])
n_bg_mcfluct = int(n_bg_mcfluct) if n_bg_mcfluct > 0. else 0
n_bg = random.poisson(n_bg_mcfluct)
probs = self.probs[i]
indices = bg['idx']
# draw events
n_bg =n_bg - gp_n_inj
if n_bg<0:
n_bg=0
idx = random.choice(indices, n_bg, p=probs, replace = replace)
out = bg_evts[idx]
out['inj'] = i*np.ones(len(out['dec']))
outs.append(out)
outs = utils.Arrays.concatenate(outs)
# scramble the background
for randomize in self.randomizers:
randomize(outs, seed=seed)
outs = utils.Arrays.concatenate((tmp_out, outs))
return [outs], 0
else:
# sample event indices with replacement as False
idx = random.choice(bg_evts['idx'], size, replace=replace)
outs = bg_evts[idx]
outs['inj'] = np.repeat(self, size)
n_selected = len(outs)
# sample events from off-mask region to compensate
oversamples = []
if len(self.bin_edges) > 0:
for i in np.arange(len(self.bin_edges)-1):
corr = self.dOmega_corr[i]
dec_mask = (bg_evts['dec'] < self.bin_edges[i+1]) & (bg_evts['dec'] > self.bin_edges[i])
n_indec = sum(dec_mask)
re_size = int((corr - 1) * n_indec)
# allow same events
re_idx = random.choice(bg_evts['idx'][dec_mask], re_size, replace=True)
oversample = bg_evts[re_idx]
oversample['inj'] = np.repeat(self, re_size)
n_selected += len(oversample)
if len(oversample) > 0:
alldata = (self.data['dec'] < self.bin_edges[i+1]) & (self.data['dec'] > self.bin_edges[i])
if debug:
print("in dec band: ", np.degrees(self.bin_edges[i]), np.degrees(self.bin_edges[i+1]))
print("total events before masking: ", sum(alldata))
print("offmask events:", n_indec)
print("correction from solid angle: ", corr)
print("sample n events:", re_size)
oversamples.append(oversample)
oversamples = utils.Arrays.concatenate(oversamples)
n_selected += n_tmp
if debug:
print("injected GP events: ", n_tmp)
print("over sampling:", len(oversamples))
n_excluded = len(self.ana.data) - n_selected
# scramble the background
for randomize in self.randomizers:
randomize(outs, seed=seed)
if len(self.bin_edges) > 0:
randomize(oversamples, seed=seed)
if len(oversamples) > 0:
outs = utils.Arrays.concatenate((tmp_out, outs, oversamples))
if debug:
saved = [outs['energy'], outs['dec'], outs['ra'], outs['log10energy'], outs['sigma'], outs['azimuth']]
np.save("oversampled_{}.npy".format(len(oversamples)), np.asarray(saved))
np.save("gp_{}.npy".format(gp_n_inj), np.asarray(saved))
print("inject gp evts: ", n_tmp)
return [outs], n_excluded
[docs]
class SpatialPriorInjector(SigInjector):
"""Injection of one or more point-like sources, whose position is not exactly known,
but given in the form of a spatial prior (healpix map).
"""
[docs]
def __init__(self, ana, src, flux, keep, hp_idx,
sindec_bandwidth=np.radians(2),):
"""Construct a SpatialPriorInjector.
:type ana: :class:`analysis.SubAnalysis`
:param ana: the sub analysis
:type src: :class:`utis.Sources`
:param src: the source list. required keys: ``ra``, ``dec``, ``prior``.
optional keys: ``weight``, ``extension``
:type flux: :class:`hyp.Flux`
:param flux: the signal spectrum
:type keep: list of str
:param keep: names of event features to keep
"""
from .hyp import Flux
self.ana, self.sig, self.src, self.keep = ana, ana.sig, src, deepcopy(keep)
self.hp_idx = hp_idx
livetime = ana.livetime
if 'idx' not in keep:
self.keep.append('idx')
if 'inj' not in keep:
self.keep.append('inj')
if isinstance(flux, Iterable):
self.acc_param = [ana.custom_acc_params[f] for f in flux]
else:
self.acc_param = len(src) * [ana.custom_acc_params[flux]]
if isinstance(flux, Iterable):
self.flux = flux
else:
self.flux = len(src) * [flux]
nsides = np.array([prior.nside for prior in src.prior])
self.nside = nsides[0]
if( not np.all(self.nside==nsides) ):
raise Exception("Please provide a source list with where all priors\
have the same nside")
self.npix = src.prior[0].npix
self.sindec_bandwidth = sindec_bandwidth
self._set_E_masks()
self._set_weights()
def _set_E_masks(self):
E_masks = []
N = len(self.src)
logging = N > 50
flux0 = self.flux[0]
energy_min, energy_max = flux0.energy_range
mask_energy = ((energy_min <= self.sig.true_energy)
& (self.sig.true_energy <= energy_max))
flush = sys.stdout.flush
for i, flux in enumerate(self.flux):
if flux.energy_range != (energy_min, energy_max):
energy_min, energy_max = flux.energy_range
this_mask_energy = ((energy_min <= self.sig.true_energy)
& (self.sig.true_energy <= energy_max))
else:
this_mask_energy = mask_energy
if( np.sum(this_mask_energy)==len(self.sig.ra) ):
# i.e. no masking
# This is done to reduce memory in stacking analyses
E_masks.append(True)
else:
E_masks.append(this_mask_energy)
if logging:
print('\r{:<40s}{:4.0f}% ...'.format(
self.ana.key, 100. * i / N), end=''), flush()
if logging:
print('\r{:<40s}{:4.0f}%. '.format(self.ana.key, 100))
self.E_masks = np.array(E_masks)
def _set_weights(self):
weights = []
sig, src = self.sig, self.src
ow, E = self.ana.livetime / (4*pi) * sig.oneweight, sig.true_energy
n_src = len(self.src)
for i_src in xrange(n_src):
prior = src.prior[i_src]
E_mask = self.E_masks[i_src]
if( E_mask is True ):
weight = ow*self.flux[i_src](E)*prior.map[self.hp_idx]*prior.npix
else:
weight = ow[E_mask] * self.flux[i_src](E[E_mask])\
* prior.map[self.hp_idx[E_mask]] * prior.npix
weights.append(weight)
self.src_weights = src.weight * np.array([np.sum(weight) for weight in weights])
self.src_prob = self.src_weights / np.sum(self.src_weights)
self._acc_total = np.sum(self.src_weights)
def _MC_event_weight(self, i_ev, i_src):
ow = self.ana.livetime/(4*pi)*self.sig.oneweight[i_ev]
E = self.sig.true_energy[i_ev]
return ow * self.flux[i_src](E)
@property
def acc_total(self):
return self._acc_total
[docs]
def inject(self, n_inj, seed=None, ra=None, dec=None):
random = get_random(seed)
sig, src = self.sig, self.src
n_src = len(self.src)
nside = self.nside
# Draw src positions if no given by ra & dec
if( ra is None ):
pix_idx = list(xrange(self.npix))
ra, dec = [], []
for i_src in xrange(n_src):
# Sample a pixel, src location will be a random position
# in that pixel
prior = src.prior[i_src]
if( prior is None ):
src_ra, src_dec = src.ra[i_src], src.dec[i_src]
else:
pix_src = random.choice(pix_idx, p=prior.map)
theta_pix_center, ra_pix_center = hp.pix2ang(nside, pix_src)
src_outside_pix = True
while( src_outside_pix ):
d_theta, d_ra = 2*random.uniform(0, hp.nside2resol(nside), 2)
d_theta += np.pi/2
src_theta, src_ra = coord.rotate_xaxis_to_source(theta_pix_center, ra_pix_center, d_theta, d_ra,latlon=False)
pix_shifted_src = hp.ang2pix(nside, src_theta,src_ra)
if( pix_shifted_src==pix_src ):
src_outside_pix = False
src_dec = np.pi/2 - src_theta
ra.append(src_ra)
dec.append(src_dec)
# Split n_inj over all sources
if( n_src==1 ):
src_n_injs = [n_inj]
else:
# Set source weights & split src_n_injs
base_src_weights = np.zeros(n_src)
for i_src in xrange(n_src):
log_acc = self.acc_param[i_src].s(np.sin(dec[i_src]))
base_src_weights[i_src] = np.exp(log_acc)*src.weight[i_src]
if np.sum(base_src_weights):
src_prob = base_src_weights / np.sum(base_src_weights)
else:
src_prob = base_src_weights
src_n_injs = random.multinomial(n_inj, src_prob)
outs = []
for i_src in xrange(n_src):
src_n_inj = src_n_injs[i_src]
if src_n_inj == 0:
continue
# collect source info
extension = src.extension[i_src]
prior = src.prior[i_src]
src_dec, src_ra = dec[i_src], ra[i_src]
# Select MC events to be used
# (pre-binning MC events can probably speed this up)
src_sindec = np.sin(src_dec)
min_sindec = max(-1, src_sindec - .5*self.sindec_bandwidth)
max_sindec = min(+1, src_sindec + .5*self.sindec_bandwidth)
if( max_sindec==1 ):
min_sindec = 1 - self.sindec_bandwidth
elif( min_sindec==-1 ):
max_sindec = -1 + self.sindec_bandwidth
min_dec, max_dec = np.arcsin(min_sindec), np.arcsin(max_sindec)
mask_dec = (min_dec <= sig.true_dec) & (sig.true_dec <= max_dec)
mask = mask_dec & self.E_masks[i_src]
indices = np.where(mask)[0]
# draw events
weights = self._MC_event_weight(indices, i_src)
probs = weights/np.sum(weights)
idx = random.choice(indices, src_n_inj, p=probs)
ev_ra, ev_dec = sig.xra[idx], sig.xdec[idx]
# smear if necessary
if extension:
ev_ra += random.normal(0, extension, src_n_inj)
ev_dec += random.normal(0, extension, src_n_inj)
# rotate to source position
ev_dec, ev_ra = coord.rotate_xaxis_to_source(
src_dec, src_ra, ev_dec, ev_ra, latlon=True)
out = sig[idx]
out['ra'], out['dec'] = ev_ra, ev_dec
out['idx'] = idx
out['inj'] = np.repeat(self, len(out))
for key in out.keys():
if key not in self.keep:
del out[key]
outs.append(out)
return outs, 0