Source code for csky.inj

# inj.py

"""Event injectors and randomization methods."""

from __future__ import print_function

import abc
from abc import ABCMeta, abstractmethod, abstractproperty
try:
    from collections.abc import Iterable
except:
    from collections import Iterable
from copy import deepcopy
try:
    from itertools import izip
except ImportError:
    izip = zip
try:
    xrange
except NameError:
    xrange = range
import sys

import healpy as hp
import numpy as np
pi = np.pi

from . import coord, utils
from .utils import get_random


[docs] class Randomizer(object): """Base class for event randomizers. Classes which randomize(or scramble) data events should derive from this class. """ __metaclass__ = ABCMeta
[docs] def __init__(self): pass
[docs] @abstractmethod def __call__(self, ev, seed=None): """Randomize events in place. :type ev: :class:`utils.Events` :param ev: the events :type seed: int or :class:`numpy.random.RandomState` :param seed: seed spec """ pass
[docs] class RARandomizer(Randomizer): """Randomize events uniformly in right ascension.""" def __repr__(self): return 'RARandomizer()'
[docs] def __call__(self, ev, seed=None): random = get_random(seed) ev['ra'] = random.uniform(0, 2*pi, len(ev))
[docs] class DecRandomizer(Randomizer): """Randomize events in declination. This class applies randomization to the event declinations. Multiple randomization methods are available, which need to be specified via the `dec_rand_method` keyword. Additionally, events near the poles can be excluded from the randomization by setting the pole exclusion radius via the keyword `dec_rand_pole_exlusion`. """
[docs] def __init__( self, dec_rand_method='gaussian_sigma', dec_rand_kwargs=None, dec_rand_pole_exlusion=None, ): """Construct a DecRandomizer :type dec_rand_method: str :param dec_rand_method: The randomization method to use. 'gaussian_sigma': Applies Gaussian-distributed noise to the event declinations. The width of the Gaussians is based on the per-event angular uncertainties ``sigma``. Additional kwargs (to be passed via `dec_rand_kwargs`): 'sigma_scaling': Scaling factor that will be applied to the per-event angular uncertainties prior to randomization. This only affects the randomization - event angular uncertainties will not be modified. 'uniform_sigma': Applies uniform jitter to the event declinations. The width of the jitter is based on the per-event angular uncertainties ``sigma``. Additional kwargs (to be passed via `dec_rand_kwargs`): 'sigma_scaling': Scaling factor that will be applied to the per-event angular uncertainties prior to randomization. This only affects the randomization - event angular uncertainties will not be modified. 'gaussian_fixed': Applies Gaussian-distributed noise to the event declinations. The width of the Gaussians is set to `randomization_width`. Additional kwargs (to be passed via `dec_rand_kwargs`): 'randomization_width': The sigma of the Gaussian. 'uniform_fixed': Applies uniform jitter to the event declinations. The width of the Gaussians is set to `randomization_width`. Additional kwargs (to be passed via `dec_rand_kwargs`): 'randomization_width': The width for the jitter. Events will be randomized to dec +- width. :type dec_rand_kwargs: dict :param dec_rand_kwargs: Additional keyword arguments provided as a dictionary. See `dec_rand_method` for details on which kwargs are necessary for the specified randomization method. :type dec_rand_pole_exlusion: float :param dec_rand_pole_exlusion: Radius of the polar regions(in radians), which will be ignored. Events with declinations in this pole region will not be randomized. If `None`(default), then all events are randomized. """ self.dec_rand_method = dec_rand_method.lower() self.dec_rand_pole_exlusion = dec_rand_pole_exlusion self.dec_rand_kwargs = deepcopy(dec_rand_kwargs) self.half_pi = np.pi / 2. # to save some space we'll define an error message template if self.dec_rand_kwargs: keys = sorted(list(self.dec_rand_kwargs.keys())) else: keys = [] msg = 'Only expected {}, but got the following kwargs: ' + str(keys) # Make sure the randomization method is understood and that all # necessary keyword arguments are provided and no additional ones if self.dec_rand_method in ['gaussian_sigma', 'uniform_sigma']: # set default if None if self.dec_rand_kwargs is None: self.dec_rand_kwargs = {'sigma_scaling': None} # check if correct keywords are passed elif keys != ['sigma_scaling']: raise ValueError(msg.format(['sigma_scaling'])) elif self.dec_rand_method in ['gaussian_fixed', 'uniform_fixed']: if keys != ['randomization_width']: raise ValueError(msg.format(['randomization_width'])) else: raise ValueError( 'Unknown `DecRandomizer` randomization method: {}'.format( self.dec_rand_method))
def __repr__(self): """Get string representation of the class """ msg = 'DecRandomizer(dec_rand_method={}, dec_rand_pole_exlusion={})' return msg.format(self.dec_rand_method, self.dec_rand_pole_exlusion)
[docs] def __call__(self, ev, seed=None): """Randomize events """ random = get_random(seed) # ------------------- # Apply randomization # ------------------- # Sigma-based randomization if self.dec_rand_method in ['gaussian_sigma', 'uniform_sigma']: # get sigma scaling factor if self.dec_rand_kwargs['sigma_scaling']: scale = ev.sigma * self.dec_rand_kwargs['sigma_scaling'] else: scale = ev.sigma if self.dec_rand_method == 'gaussian_sigma': decs = random.normal(loc=ev.dec, scale=scale) elif self.dec_rand_method == 'uniform_sigma': decs = ev.dec + random.uniform(low=-scale, high=scale) else: raise ValueError('Should never happen!') # Fixed Gaussian randomization elif self.dec_rand_method == 'gaussian_fixed': decs = random.normal( loc=ev.dec, scale=self.dec_rand_kwargs['randomization_width']) # Fixed uniform randomization elif self.dec_rand_method == 'uniform_fixed': width = self.dec_rand_kwargs['randomization_width'] decs = ev.dec + random.uniform( low=-width, high=width, size=len(ev.dec)) # safety check else: raise ValueError('Should never happen!') # move dec back into range by reflecting at pole. # Note: this is technically only valid if RA is scrambled as well, # otherwise the azimuth should change when flipping over the pole # make sure everything is within [-pi/2, pi/2] m_above = decs > self.half_pi m_below = decs < -self.half_pi decs[m_above] = np.pi - decs[m_above] decs[m_below] = -np.pi - decs[m_below] # ------------------------------------- # Exclude randomization for pole region # ------------------------------------- if self.dec_rand_pole_exlusion is not None: mask = np.logical_or( ev['dec'] < -self.half_pi + self.dec_rand_pole_exlusion, ev['dec'] > self.half_pi - self.dec_rand_pole_exlusion, ) decs[mask] = ev['dec'][mask] # assign new values ev['dec'] = decs
[docs] class SinDecRandomizer(Randomizer): """Randomize events in sin declination. This class applies a jitter of specified strength to the sin(dec) values. The jitter strength should not be chosen much larger than the sin(dec) bin-widths in the PDFs. Randomization should not change the joint PDFs of the observables, since the background PDF is only created once at initialization (and optionally updated with the injected signal via the `update_bg` parameter). If binning is performed uniformly in sin(dec), a jitter strength of the order of the bin-width is probably fine as long as there are no big jumps in the PDF between consecutive bins. Otherwise, a smaller jitter value should be chosen. Because the SinDecRandomizer modifies the sin(dec) values instead of the declination directly, it applies a stronger randomization in terms of modified declination at the poles. """
[docs] def __init__(self, sindec_jitter): """Construct a SinDecRandomizer :type sindec_jitter: float :param sindec_jitter: jitter strength in sin(dec). The sin(dec) values will be modified via: sin(dec) += uniform(-sindec_jitter, sindec_jitter) """ self.sindec_jitter = sindec_jitter
def __repr__(self): return 'SinDecRandomizer(sindec_jitter={:.3f})'.format( self.sindec_jitter)
[docs] def __call__(self, ev, seed=None): random = get_random(seed) sin_decs = ( np.sin(ev['dec']) + random.uniform( -self.sindec_jitter, self.sindec_jitter, size=len(ev['dec'])) ) # make sure everything is within [-1, 1] m_above = sin_decs > 1. m_below = sin_decs < -1. sin_decs[m_above] = 2. - sin_decs[m_above] sin_decs[m_below] = -2 - sin_decs[m_below] # additional clip for numerical issues right at the boundary ev['dec'] = np.arcsin(np.clip(sin_decs, -1, 1))
[docs] class PoleRandomizer(Randomizer): """Randomize declinations of events near the North and South poles. This class randomizes the declinations of events near the poles. This is mainly useful for cascade analysis, where it is non-obvious how much of the polar regions should simply be neglected. It may also be useful in track analyses if the user is specifically interested in sources near one or both of the poles. """
[docs] def __init__(self, pole_radius, shuffle=True, sample_dec=False): """Construct a PoleRandomizer. :type pole_radius: float :param pole_radius: radius of the polar regions(in radians) :type shuffle: bool :param shuffle: if false, scramble polar events uniformly throughout the poles in which they each actually originated; if true(default), then instead randomize polar events' declinations by applying a random permutation of these. if `sample_dec` is set to true, then the declination values are additionally smeared by a Gaussian (see description for `sample_dec`). :type sample_dec: bool :param sample_dec: if true, the event declinations at the poles are additionally randomized by sampling from a Gaussian with the width set to the event sigma. Note that this setting may push events from the poles away towards the equator and therefore modify the joint PDFs! If false(default), only the permutation of event declination across events at the pole is performed. This setting is only relevant if `shuffle` is set to True. """ self.pole_radius = pole_radius self.pole_dec = pi/2 - pole_radius self.pole_sindec = np.sin(self.pole_dec) self.shuffle = shuffle self.sample_dec = sample_dec
def __repr__(self): return ( 'PoleRandomizer(pole_radius={:.1f}deg, shuffle={}, ' 'sample_dec={})' ).format( np.degrees(self.pole_radius), self.shuffle, self.sample_dec, )
[docs] def __call__(self, ev, seed=None): random = get_random(seed) pole_mask = np.abs(ev.dec) > self.pole_dec n = np.sum(pole_mask) if n: ev['dec'] = ev.dec.copy() sign = np.sign(ev.dec[pole_mask]) if self.shuffle: pole_decs = ev.dec[pole_mask] if self.sample_dec: pole_decs = random.normal(pole_decs, ev.sigma[pole_mask]) north_mask = sign > 0 pole_decs[north_mask] = random.permutation(pole_decs[north_mask]) pole_decs[~north_mask] = random.permutation(pole_decs[~north_mask]) # keep declinations in bounds pole_decs %= 2*pi pole_decs[pole_decs > pi] -= 2*pi bad_north = pole_decs > pi/2 pole_decs[bad_north] -= 2 * (pole_decs[bad_north] - pi/2) bad_south = pole_decs < -pi/2 pole_decs[bad_south] -= 2 * (pole_decs[bad_south] + pi/2) ev['dec'][pole_mask] = pole_decs else: sd = random.uniform(self.pole_sindec, 1, n) ev['dec'][pole_mask] = sign * np.arcsin(sd)
[docs] class EnergyRandomizer(Randomizer): """Randomize event energies. This class randomizes log10(E/GeV). This may be useful if some events are causing intense spikes in the background distributions at particular declinations. Note that this feature has not been thoroughly tested, if at all. """
[docs] def __init__(self, sigma_log10): """Construct an EnergyRandomizer. :type sigma_log10: float :param sigma_log10: width of the Gaussian noise to apply to log10(E/GeV) """ self.sigma_log10 = sigma_log10
def __repr__(self): return 'EnergyRandomizer(sigma_log10={:.3f})'.format(self.sigma_log10)
[docs] def __call__(self, ev, seed=None): random = get_random(seed) ev['energy'] *= np.abs(random.normal(1, self.sigma_log10, size=len(ev))) ev['log10energy'] = np.log10 (ev.energy)
[docs] class MJDShuffleRandomizer(Randomizer): """Shuffle event times. This class shuffles the arrival times(in MJD) of the events. """ def __repr__(self): return 'MJDShuffleRandomizer()'
[docs] def __call__(self, ev, seed=None): random = get_random(seed) ev['mjd'] = mjd = random.permutation(ev.mjd) ev['ra'] = coord.switch_ra_azimuth(ev.azimuth, mjd)
[docs] class MJDGRLRandomizer(Randomizer): """Randomize event times according to GRL."""
[docs] def __init__(self, grl, presample=0, rates_by='events'): self.grl, self.presample = grl, presample self.livetime = grl.livetime.sum() if rates_by == 'livetime': self.p = self.grl.livetime / self.livetime elif rates_by == 'events': self.p = self.grl.events/float(self.grl.events.sum()) else: raise NotImplementedError('unsupported rates_by="{}"'.format(rates_by)) self.n_runs = len(self.grl) if presample: raise NotImplementedError('presampling from GRL not yet implemented')
def __repr__(self): return 'MJDGRLRandomizer({} runs over {} years{})'.format( len(self.grl), self.livetime / 365.25, ', presampled' if self.presample else '') def get_times(self, N, seed=None): random = get_random(seed) grl = self.grl i_runs = random.choice(self.n_runs, size=N, p=self.p) start, stop = grl.start[i_runs], grl.stop[i_runs] return random.uniform(start, stop)
[docs] def __call__(self, ev, seed=None): ev['mjd'] = mjd = self.get_times(len(ev), seed=seed) ev['ra'] = coord.switch_ra_azimuth(ev.azimuth, mjd)
[docs] class Selector(object): """Base class for event selectors. This is an interface for pre-emptively selecting events prior to [any randomizations as well as] likelihood evaluation. For per-trial masking, see :class:`PDFRatioEvaluator`. TODO: actually go and add documentation for masking functionality under :class:`PDFRatioEvaluator` """ __metaclass__ = ABCMeta
[docs] @abstractmethod def mask(self, arr): """Get a boolean mask for selected elements of ``arr``. :type arr: :class:`utils.Arrays` :param arr: the Arrays to mask :rtype: ndarray of bool :return: the mask """ pass
[docs] @abstractmethod def where(self, arr): """Get the indices of selected elements of ``arr``. :type arr: :class:`utils.Arrays` :param arr: the Arrays to mask :rtype: ndarray of bool :return: the indices """ pass
[docs] @abstractmethod def __call__(self, arr): """Get a downselected copy of ``arr``. :type arr: :class:`utils.Arrays` :param arr: the Arrays to mask :rtype: same as ``arr`` :return: downselected copy(or potentially view) of ``arr`` """ pass
[docs] class DecBandSelector(Selector): """Select events based on proximity in declination to source(s). This class allows pre-emptive application of the so-called "band" cut(see :class:`PointSourceSpacePDFRatioModel` for more). """
[docs] def __init__(self, src, cut_n_sigma=5): """Construct a DecBandSelector. :type src: :class:`utis.Sources` :param src: the source list. required keys: ``ra`` and ``dec``. optional keys: ``weight``, ``extension`` :type cut_n_sigma: float :param cut_n_sigma: number of sigmas away from a source an event must be within to be included """ self.dec = src.dec self.cut_n_sigma = cut_n_sigma if np.any(src.extension > 0): self.sigma = src.extension else: self.sigma = None
[docs] def mask(self, arr): out = np.zeros(len(arr), dtype=bool) for (i_dec, dec) in enumerate(self.dec): if self.sigma is None: sigma = arr.sigma else: sigma = np.sqrt(arr.sigma**2 + self.sigma[i_dec]**2) out |= np.abs(arr.dec - dec) / sigma < self.cut_n_sigma return out
[docs] def where(self, arr): return np.where(self.mask(arr))[0]
[docs] def __call__(self, arr): return arr[self.mask(arr)]
[docs] class TimeWindowSelector(Selector): """Select events within a given time window. You probably don't want to use this class; you might be looking for :class:`DataOnOffInjector`. """
[docs] def __init__(self, mjd_min, mjd_max): self.mjd_min, self.mjd_max = mjd_min, mjd_max
[docs] def mask(self, arr): out = np.zeros(len(arr), dtype=bool) for (i_dec, (mjd_min, mjd_max)) in enumerate(izip(self.mjd_min, self.mjd_max)): out |= (mjd_min <= arr.mjd) & (arr.mjd <= mjd_max) return out
[docs] def where(self, arr): return np.where(self.mask(arr))[0]
[docs] def __call__(self, arr): return arr[self.mask(arr)]
[docs] class Injector(object): """Base class for event injectors. Injectors produce event ensembles along with counts of events that could have been injected but were instead pre-emptively excluded using a :class:`Selector`. TODO: revisit hierarchy. Background injectors don't seem to want an ``n_inj`` argument at all, in the :meth:`Injector.inject` method. """ __metaclass__ = ABCMeta
[docs] @abstractmethod def inject(self, n_inj, seed=None): """Get some number of events. :type n_inj: int :param n_inj: number of events to inject :type seed: int or :class:`numpy.random.RandomState` :param seed: seed spec :rtype: (list of :class:`utils.Events`, int) :return: (event ensembles, number of pre-emptively excluded events) """ pass
[docs] class DataInjector(Injector): """Inject real data events. This class injects real events from the data, possibly(in fact, almost always) with one or more randomizations applied. """
[docs] def __init__(self, ana, data, keep, randomizers=None, bg_replace=False): """Construct a DataInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type data: :class:`utils.Events` :param data: the data events to inject :type keep: list of str :param keep: names of event features to keep :type randomizers: list of :class:`Randomizer` :param randomizers: the randomizers to apply; if not given, RARandomizer is used. :type bg_replace: bool :param bg_replace: If true, sampling with replacement will be used. """ self.ana, self.data = ana, data self.keep = keep if randomizers is None: randomizers = [RARandomizer()] self.randomizers = randomizers self.replace = bg_replace self.N = ana.n_data
[docs] def inject(self, seed=None): random = get_random(seed) out = utils.Events({key: self.data[key] for key in self.keep}) size = len(out) out['idx'] = np.arange(size) if self.replace: # sample event indices with replacement idx = random.choice(out['idx'], size, replace=self.replace) out = out[idx] out['inj'] = np.repeat(self, size) n_selected = len(out) n_excluded = len(self.ana.data) - n_selected #out.compress(self.keep + 'idx inj'.split()) for randomize in self.randomizers: randomize(out, seed=seed) return [out], n_excluded
[docs] class DataOnOffInjector(Injector): """Inject off-time events into on-time windows. This class performs the sort of background injection typically used in transient analyses, where the background is taken from off-time data while the analysis is performed in specific on-time windows. """
[docs] def __init__(self, ana, time_model, keep, randomizers=None, cut_n_sigma=5, full_sky=False, bg_replace=True): """Construct a DataOnOffInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type time_model: :class:`pdf.TimePDFRatioModel` :param time_model: the time PDF ratio model :type keep: list of str :param keep: names of event features to keep :type randomizers: list of :class:`Randomizer` :param randomizers: the randomizers to apply. you probably don't want to apply any(TODO: reconsider, why exactly is this argument here?) :type cut_n_sigma: float :param cut_n_sigma: number of sigmas away from a source an event must be within to be included :type bg_replace: bool :param bg_replace: If true, sampling with replacement will be used. """ self.ana, self.time_model, self.keep = ana, time_model, deepcopy(keep) if 'azimuth' not in keep: self.keep.append('azimuth') self.data, self.src = ana.bg_data, time_model.src self.cut_n_sigma = cut_n_sigma self.full_sky = full_sky self.arange = np.arange(len(self.src)) self._set_indices(full_sky=full_sky) self.randomizers = randomizers or [] self.replace = bg_replace
[docs] def inject(self, seed=None): # note: default is replace=True because it's faster random = get_random(seed) n_injs = random.poisson(self.n_injs_poisson) i_src_injs = np.where(n_injs > 0)[0] if len(i_src_injs) == 0: return [], 0 data, indices, time_model = self.data, self.indices, self.time_model idx = np.concatenate([ random.choice(indices[i_src], n, replace=self.replace) for (i_src, n) in izip(i_src_injs, n_injs[i_src_injs]) ]) out = utils.Events({key: data[key][idx] for key in self.keep}) out['idx'] = idx out['inj'] = np.repeat(self, len(out)) if ( self.ana.grl is not None ): out['mjd'] = [] active_time_per_source = time_model.active_time_per_source for i, n in enumerate(n_injs): if( n>0 ): if ( len(active_time_per_source[i])==1 ): start, stop = active_time_per_source[i][0] out['mjd'] = np.append(out['mjd'], random.uniform(start, stop, n)) else: probs = np.array([t_stop-t_start for t_start, t_stop in active_time_per_source[i]]) probs = probs/sum(probs) idx = random.choice(np.arange(len(probs)), n, p=probs, replace=True) for index in idx: start, stop = active_time_per_source[i][index] out['mjd'] = np.append(out['mjd'], random.uniform(start, stop, 1)) else: out['mjd'] = np.append(out['mjd'], np.array([],dtype=float) ) else: start_mjds = np.repeat(time_model.src_mjd_min, n_injs) end_mjds = np.repeat(time_model.src_mjd_max, n_injs) out['mjd'] = random.uniform(start_mjds, end_mjds) for randomize in self.randomizers: randomize(out, seed=seed) out['ra'] = coord.switch_ra_azimuth(out.azimuth, out.mjd) return [out], 0
def _set_indices(self, full_sky=False): data, src, cut_n_sigma = self.data, self.src, self.cut_n_sigma time_model = self.time_model indices = [] n_injs_poisson = [] N = 1. * len(data) for i in xrange(len(src)): if(full_sky): idx = np.array(xrange(len(data))) else: selector = DecBandSelector(src[i:i+1], cut_n_sigma=cut_n_sigma) idx = selector.where(data) rate_frac = len(idx) / N n_inj = rate_frac * time_model.src_nb[i] indices.append(idx) n_injs_poisson.append(n_inj) self.indices = indices self.n_injs_poisson = np.array(n_injs_poisson)
[docs] class MCBackgroundInjector(Injector): """Inject background events using weighted MC events to model the background instead of scrambling data in RA. This class injects simulated events from the monte carlo, possibly(in fact, almost always) with one or more randomizations applied. """
[docs] def __init__(self, ana, bg_weight_names, keep, randomizers = None): """Construct a PointSourceInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type mc: :class:`utils.Events` :param mc: the monte carlo events to inject :type bg_weight_names: list of str :param bg_weight_names: names of the monte carlo weights saved in the signal events. Weights should be in Hz and already have flux incorperated. :type keep: list of str :param keep: names of event features to keep :type randomizers: list of :class:`Randomizer` :param randomizers: the randomizers to apply to the monte carlo events """ self.ana, self.mc, self.keep = ana, ana.sig, deepcopy(keep) if 'azimuth' not in keep: self.keep.append('azimuth') if randomizers is None: randomizers = [RARandomizer()] self.randomizers = randomizers self.N = ana.n_data self.weight_names = bg_weight_names if 'idx' not in keep: self.keep.append('idx') if 'inj' not in keep: self.keep.append('inj') self._set_weights()
def _set_weights(self): weights = [] mc = self.mc weight_names = self.weight_names self.weights = [] self.n_exp = [] self.n_exp_err = [] self.n_w = len(weight_names) for weight_name in weight_names: #weights should already have flux incorperated and in units of Hz weights = mc[weight_name] * self.ana.livetime self.weights.append(weights) self.n_exp.append(sum(weights)) self.n_exp_err.append(np.sqrt(sum(weights**2))) self.probs = [self.weights[i] / np.sum(self.weights[i]) for i in xrange(self.n_w)] self.mc['idx'] = xrange(len(mc['dec'])) self.mc['inj'] = np.zeros(len(mc['dec']))
[docs] def inject(self, replace=False, seed=None): random = get_random(seed) mc = self.mc mc_evts = utils.Events({key: mc[key] for key in self.keep}) outs = [] for i in xrange(self.n_w): # grab a random map for each of the weight types, use the weights to get the uncertainty n_bg_mcfluct = random.normal(self.n_exp[i], scale = self.n_exp_err[i]) n_bg_mcfluct = int(n_bg_mcfluct) if n_bg_mcfluct > 0. else 0 n_bg = random.poisson(n_bg_mcfluct) probs = self.probs[i] indices = mc['idx'] # draw events idx = random.choice(indices, n_bg, p=probs, replace = True) out = mc_evts[idx] out['inj'] = i*np.ones(len(out['dec'])) outs.append(out) outs = utils.Arrays.concatenate(outs) # randomize drawn events to further the randomness of maps for randomize in self.randomizers: randomize(outs, seed=seed) return [outs], 0
[docs] class SigInjector(Injector): """Base class for signal injectors. Signal injectors, unlike background injectors, must also be able to provide their total acceptance. This class enforces that requirement using :meth:`SigInjector.acc_total`. """ @abstractproperty def acc_total(self): """Total signal acceptance. This is similar to :meth:`LLHEvaluatorBase.get_acc_total`, but evaluated in light of all specified signal injection details and typically in terms of the actual simulated signal events that will be used. """ pass
[docs] class PointSourceInjector(SigInjector): """Inject one or more point-like sources. This class implements the signal injection for classic time-integrated analyses. It transparently supports stacking of multiple sources as well as per-source Gaussian spatial extension. """
[docs] def __init__(self, ana, src, flux, keep, sindec_bandwidth=np.radians(2),): """Construct a PointSourceInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type src: :class:`utis.Sources` :param src: the source list. required keys: ``ra`` and ``dec``. optional keys: ``weight``, ``extension`` :type flux: :class:`hyp.Flux` :param flux: the signal spectrum :type keep: list of str :param keep: names of event features to keep :type sindec_bandwidth: float :param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to draw simulated signal events """ from .hyp import Flux self.ana, self.sig, self.src, self.keep = ana, ana.sig, src, deepcopy(keep) if 'idx' not in keep: self.keep.append('idx') if 'inj' not in keep: self.keep.append('inj') if isinstance(flux, Iterable): self.flux = flux else: self.flux = len(src) * [flux] self.sindec_bandwidth = sindec_bandwidth self.solid_angle = 2 * pi * self.sindec_bandwidth self._set_indices() self._set_weights()
def _set_indices(self): sig, src, sindec_bandwidth = self.sig, self.src, self.sindec_bandwidth src_sindecs = np.sin(src.dec) min_sindecs = np.maximum(-1, src_sindecs - .5*sindec_bandwidth) max_sindecs = np.minimum(+1, src_sindecs + .5*sindec_bandwidth) min_sindecs[max_sindecs == +1] = +1 - sindec_bandwidth max_sindecs[min_sindecs == -1] = -1 + sindec_bandwidth min_decs, max_decs = np.arcsin(min_sindecs), np.arcsin(max_sindecs) indices = [] N = len(src) logging = N > 50 flux0 = self.flux[0] energy_min, energy_max = flux0.energy_range mask_energy = ((energy_min <= self.sig.true_energy) & (self.sig.true_energy <= energy_max)) flush = sys.stdout.flush for (i, (min_dec, max_dec, flux)) in enumerate(izip(min_decs, max_decs, self.flux)): if flux.energy_range != (energy_min, energy_max): energy_min, energy_max = flux.energy_range this_mask_energy = ((energy_min <= self.sig.true_energy) & (self.sig.true_energy <= energy_max)) else: this_mask_energy = mask_energy mask_dec = (min_dec <= sig.true_dec) & (sig.true_dec <= max_dec) mask = mask_dec & this_mask_energy indices.append(np.where(mask)[0]) if logging: print('\r{:<40s}{:4.0f}% ...'.format( self.ana.key, 100. * i / N), end=''), flush() if logging: print('\r{:<40s}{:4.0f}%. '.format(self.ana.key, 100)) self.indices = indices def _set_weights(self): weights = [] sig, src = self.sig, self.src ow, E = self.ana.livetime / self.solid_angle * sig.oneweight, sig.true_energy n_src = len(self.src) for i_src in xrange(n_src): indices = self.indices[i_src] weight = ow[indices] * self.flux[i_src](E[indices]) weights.append(weight) self.weights = weights # Can be removed to reduce memory consumption? self.src_weights = src.weight * np.array([np.sum(weight) for weight in weights]) self.probs = [weights[i] / np.sum(weights[i]) for i in xrange(n_src)] self.src_prob = self.src_weights / np.sum(self.src_weights) self._acc_total = np.sum(self.src_weights) @property def acc_total(self): return self._acc_total
[docs] def inject(self, n_inj, seed=None): random = get_random(seed) src_n_injs = random.multinomial(n_inj, self.src_prob) sig, src = self.sig, self.src n_src = len(self.src) outs = [] for i_src in xrange(n_src): src_n_inj = src_n_injs[i_src] if src_n_inj == 0: continue # collect source info indices = self.indices[i_src] probs = self.probs[i_src] extension = src.extension[i_src] src_ra, src_dec = src.ra[i_src], src.dec[i_src] # draw events idx = random.choice(indices, src_n_inj, p=probs) ev_ra, ev_dec = sig.xra[idx], sig.xdec[idx] # smear if necessary if extension: ev_ra += random.normal(0, extension, src_n_inj) ev_dec += random.normal(0, extension, src_n_inj) # rotate to source position ev_dec, ev_ra = coord.rotate_xaxis_to_source( src_dec, src_ra, ev_dec, ev_ra, latlon=True) out = sig[idx] out['ra'], out['dec'] = ev_ra, ev_dec out['idx'] = idx out['inj'] = np.repeat(self, len(out)) for key in out.keys(): if key not in self.keep: del out[key] #out.compress(self.keep + 'idx inj'.split()) outs.append(out) return outs, 0
[docs] class PointSourceTimeDepInjector(SigInjector): """Base class for for time-dependent injection from point-like sources. This is an abstract base class for time-dependent injection from point-like sources. Most of the injection work is done in terms of per-source instances of :class:`PointSourceInjector`; time-dependence is handled by derived classes' implementations of the abstract methods :meth:`PointSourceTimeDepInjector.get_time_model`, :meth:`PointSourceTimeDepInjector.get_frac_during_livetime`, and :meth:`PointSourceTimeDepInjector.sample_mjd`. """
[docs] def __init__(self, ana, src, flux, keep, sindec_bandwidth=np.radians(2), spatial_prior=False): """Construct a PointSourceTimeDepInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type src: :class:`utis.Sources` :param src: the source list. required keys: ``ra`` and ``dec``. optional keys: ``weight``, ``extension`` :type flux: :class:`hyp.Flux` :param flux: the signal spectrum :type keep: list of str :param keep: names of event features to keep :type sindec_bandwidth: float :param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to draw simulated signal events :type spatial_prior: bool :param spatial_prior: option to use a spatial prior healpix map for source localisations """ self.ana, self.src, self.keep = ana, src, deepcopy(keep) self.spatial_prior = spatial_prior #if 'azimuth' not in keep: # self.keep.append('azimuth') self.sig = ana.sig if( spatial_prior ): if isinstance(flux, Iterable): self.acc_param = [ana.custom_acc_params[f] for f in flux] else: self.acc_param = len(src) * [ana.custom_acc_params[flux]] if isinstance(flux, Iterable): self.flux = flux = flux else: self.flux = flux = len(src) * [flux] self.n_src = n_src = len(src) if spatial_prior: psis = [] nside = [prior.nside for prior in src.prior if( prior is not None )] N_nsides = len(np.unique(nside)) if( N_nsides==1 ): self.nside = nside[0] self.npix = 12 * self.nside**2 self._set_hp_idx(nside[0]) self.pix_idx = list(xrange(self.npix)) elif( N_nsides>1 ): raise Exception("Handling of maps with varying nside not implemented yet") for i in xrange(n_src): src_i = src[i:i+1] if( src_i.prior[0] is not None ): psi = SpatialPriorInjector( ana, src_i, flux[i], self.keep, self.hp_idx, sindec_bandwidth=sindec_bandwidth) else: psi = PointSourceInjector( ana, src_i, flux[i], self.keep, sindec_bandwidth=sindec_bandwidth) psis.append(psi) self.psis = psis else: self.psis = psis = [ PointSourceInjector( ana, src[i:i+1], flux[i], self.keep, sindec_bandwidth=sindec_bandwidth) for i in xrange(n_src)] self.time_model = time_model = self.get_time_model() self.base_src_weights = np.array([psi.src_weights[0] for psi in psis]) self.frac_during_livetime = self.get_frac_during_livetime() self.src_weights = self.base_src_weights * self.frac_during_livetime / ana.livetime # all sources have zero probability if none have S_time > 0 for this dataset if np.sum(self.src_weights): self.src_prob = self.src_weights / np.sum(self.src_weights) else: self.src_prob = self.src_weights self._acc_total = np.sum(self.src_weights)
[docs] @abstractmethod def get_time_model(self): """Get the time PDF ratio model. :rtype: :class:`pdf.TimePDFRatioModel` """ pass
[docs] @abstractmethod def get_frac_during_livetime(self): """Get the fraction of the total lightcurve covered by a given dataset. See :meth:`pdf.TimePDFRatioModel.get_frac_during_livetime`. """ pass
[docs] @abstractmethod def sample_mjd(self, src_n_injs, seed=None): """Get per-source ``mjd`` arrays. :type src_n_injs: ndarray of int :param src_n_injs: per-source number of events to inject :type seed: int or :class:`numpy.random.RandomState` :param seed: seed spec """ pass
@property def acc_total(self): return self._acc_total
[docs] def inject(self, n_inj, seed=None): random = get_random(seed) sig, src, ana = self.sig, self.src, self.ana if( self.spatial_prior ): base_src_weights = np.zeros(self.n_src) src_inj_ra, src_inj_dec = np.zeros(self.n_src), np.zeros(self.n_src) for i_src in xrange(self.n_src): prior = src.prior[i_src] if( prior is None ): src_inj_dec[i_src] = src.dec[i_src] #not used src_inj_ra[i_src] = src.ra[i_src] #not used base_src_weights[i_src] = self.base_src_weights[i_src] else: # Draw source positions from prior pix_src = random.choice(self.pix_idx, p=prior.map) theta_pix_center, ra_pix_center = hp.pix2ang(self.nside, pix_src) src_outside_pix = True while( src_outside_pix ): d_theta, d_ra = 2*random.uniform(0, hp.nside2resol(self.nside), 2) d_theta += np.pi/2 src_theta, src_ra = coord.rotate_xaxis_to_source(theta_pix_center, ra_pix_center, d_theta, d_ra,latlon=False) pix_shifted_src = hp.ang2pix(self.nside,src_theta,src_ra) if( pix_shifted_src==pix_src ): src_outside_pix = False src_inj_dec[i_src] = src_dec = pi/2 - src_theta src_inj_ra[i_src] = src_ra # Compute src weight for that position log_acc = self.acc_param[i_src].s(np.sin(src_dec)) base_src_weights[i_src] = np.exp(log_acc)*ana.livetime base_src_weights[i_src] *= src.weight[i_src] src_weights = base_src_weights * self.frac_during_livetime / ana.livetime if np.sum(self.src_weights): src_prob = src_weights / np.sum(src_weights) else: src_prob = src_weights else: src_prob = self.src_prob src_n_injs = random.multinomial(n_inj, src_prob) next_seed = lambda: random.randint(2**32) mjds = self.sample_mjd(src_n_injs, seed=next_seed()) n_src = len(self.src) outs = [] for i_src in xrange(n_src): src_n_inj = src_n_injs[i_src] if src_n_inj == 0: continue if( self.spatial_prior and (src.prior[i_src] is not None) ): ev = self.psis[i_src].inject(src_n_inj, seed=next_seed(), ra=[src_inj_ra[i_src]], dec=[src_inj_dec[i_src]])[0][0] else: ev = self.psis[i_src].inject(src_n_inj, seed=next_seed())[0][0] ev['mjd'] = mjds[i_src] outs.append(ev) return outs, 0
def is_during_ontime(self, mjd): grl = self.ana.grl @np.vectorize def f(t): return np.any((grl.start <= t) & (t <= grl.stop)) return f(mjd) def _set_hp_idx(self, nside): # Set indices healpix maps here, otherwise this becomes # too memory consuming when stacking several 100 sources ra = self.ana.sig.true_ra dec = self.ana.sig.true_dec hp_idx = hp.ang2pix(nside, pi/2-dec, ra) self.hp_idx = np.array(hp_idx, dtype="uint32")
[docs] class PointSourceTimeWindowInjector(PointSourceTimeDepInjector): """Inject Gaussian or time box flares. This class implements Gaussian or top-hat time window flare injection for one or more(possibly spatially-extended) sources. """
[docs] def __init__(self, ana, src, flux, keep, t0, dt, box=False, box_mode='center', endpoints=False, sindec_bandwidth=np.radians(2),): """Construct a PointSourceTimeInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type src: :class:`utis.Sources` :param src: the source list. required keys: ``ra`` and ``dec``. optional keys: ``weight``, ``extension`` :type flux: :class:`hyp.Flux` :param flux: the signal spectrum :type keep: list of str :param keep: names of event features to keep :type t0: ndarray of float :param t0: per-source anchor time-of-flare :type dt: ndarray of float :param dt: per-source width of flare(in days) :type box: bool :param box: whether to use box rather than Gaussian flare :type box_mode: str :param box_mode: one of 'center', 'pre', or 'post' :type endpoints: bool :param endpoints: injects and flare start and stop times, rather than throughout flare :type sindec_bandwidth: float :param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to draw simulated signal events """ self.t0, self.dt = np.atleast_1d(t0), np.atleast_1d(dt) self.box, self.box_mode = box, box_mode self.endpoints = endpoints super(PointSourceTimeWindowInjector, self).__init__( ana, src, flux, keep, sindec_bandwidth=sindec_bandwidth) if self.box: if self.box_mode == 'pre': t1, t2 = t0 - dt, t0 elif self.box_mode == 'post': t1, t2 = t0, t0 + dt elif self.box_mode == 'center': t1, t2 = t0 - 0.5 * dt, t0 + 0.5 * dt else: raise RuntimeError('unexpected box_mode: {}'.format(self.box_mode)) self.t1, self.t2 = t1, t2 # validate endpoints mode if endpoints: if not box: raise ValueError( 'cannot invoke "endpoints" mode') if not (self.is_during_ontime(t1) or self.is_during_ontime(t2)): raise ValueError( 'cannot inject at endpoints if neither flare bound is during detector on-time')
[docs] def get_time_model(self): from .pdf import UntriggeredTimePDFRatioModel return UntriggeredTimePDFRatioModel(self.ana, self.src, box=self.box, box_mode=self.box_mode)
[docs] def get_frac_during_livetime(self): return self.time_model.get_frac_during_livetime(self.t0, self.dt)
[docs] def sample_mjd(self, src_n_injs, seed=None): random = get_random(seed) out = [] mjd_min, mjd_max = self.ana.mjd_min, self.ana.mjd_max for (i, n) in enumerate(src_n_injs): t0, dt = self.t0[i], self.dt[i] if self.box: t1, t2 = self.t1, self.t2 if self.endpoints: if not self.is_during_ontime(t1): p = 1 elif not self.is_during_ontime(t2): p = 0 else: p = 0.5 mjd = np.where(random.binomial(1, p, n), t1, t2) if t1==t2==0: #This is a bit of a bandaid fix because the multiflare trial runner was written #before active-livetime-only event injection was implemented. This ensures the #multiflare flare injection uses the legacy behavior prior to rev 180591 mjd = random.uniform(t1, t2, n) else: mjd = [] while len(mjd) < n: mjd = np.r_[mjd, random.uniform(t1, t2, n - len(mjd))] ontime_mask = self.is_during_ontime(mjd) mjd = mjd[ontime_mask] else: mjd = [] while len(mjd) < n: mjd = np.r_[mjd, random.normal(self.t0[i], self.dt[i], n - len(mjd))] #mjds = np.r_[mjds, pdf.sample(n - len(mjds), random=random)[0]] ontime_mask = self.is_during_ontime(mjd) mjd = mjd[ontime_mask] out.append(mjd) return out
[docs] class PointSourceRedNoiseInjector(PointSourceTimeDepInjector): """Inject red noise as signal. This class implements white/pink/red/black noise injector as a signal light-curve for one source. """
[docs] def __init__(self, ana, src, flux, keep, dt, beta=2, sindec_bandwidth=np.radians(2),): """Construct a PointSourceRedNoiseInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type src: :class:`utils.Sources` :param src: the source list. required keys: ``ra`` and ``dec``. optional keys: ``weight``, ``extension`` :type flux: :class:`hyp.Flux` :param flux: the signal spectrum :type keep: list of str :param keep: names of event features to keep :type dt: float (single number) :param dt: spacing between time-steps/noise-curve in units of days (recommended < 50 days) :type beta: integer (single number) :param beta: power-law index of noise (default=2 for red noise) :type sindec_bandwidth: float :param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to draw simulated signal events """ self.dt = dt self.beta = beta self.flare_size = (ana.mjd_max - ana.mjd_min) self.flare_center = (ana.mjd_min + ana.mjd_max)/2 self.N = int(self.flare_size/dt) self.box = True self.box_mode = 'center' self._frac_during_livetime = self.time_model.get_frac_during_livetime(self.flare_center, self.flare_size) super(PointSourceRedNoiseInjector, self).__init__( ana, src, flux, keep, sindec_bandwidth=sindec_bandwidth)
[docs] def get_time_model(self): from .pdf import UntriggeredTimePDFRatioModel return UntriggeredTimePDFRatioModel(self.ana, self.src, box=self.box, box_mode=self.box_mode)
[docs] def get_frac_during_livetime(self): return self._frac_during_livetime
[docs] def sample_mjd(self, src_n_injs, seed=None): random = get_random(seed) out = [] mjd_min, mjd_max = self.ana.mjd_min, self.ana.mjd_max for (i, n) in enumerate(src_n_injs): dt, N, beta = self.dt, self.N, self.beta mjd = [] p = self.generate_rednoise(N, dt, beta, random) while len(mjd) < n: mjd = np.r_[mjd, random.choice(np.linspace(mjd_min, mjd_max, N), n - len(mjd), p=p)] ontime_mask = self.is_during_ontime(mjd) mjd = mjd[ontime_mask] out.append(mjd) return out
[docs] def generate_rednoise(self, N, dt, beta, random, generate_complex=False): """Generate a power-law lightcurve This uses the method from Timmer & Koenig [1] :type N : integer :param N : number of equal-spaced time steps to generate :type dt : float :param dt : spacing between time-steps :type beta : float :param beta : power-law index, the spectrum will be (1 / f)^beta white noise is beta=0 pink noise is beta=1 red noise is beta=2 black noise is beta>2 :type generate_complex : boolean (optional) :param generate_complex : if True, generate a complex time series rather than a real time series References ---------- [1] Timmer, J. & Koenig, M. On Generating Power Law Noise. A&A 300:707 """ dt = float(dt) N = int(N) Npos = int(N / 2) # Nneg = int((N - 1) / 2) domega = (2 * pi / dt / N) if generate_complex: omega = domega * np.fft.ifftshift(np.arange(N) - int(N / 2)) else: omega = domega * np.arange(Npos + 1) x_fft = np.zeros(len(omega), dtype=complex) x_fft.real[1:] = random.normal(0, 1, len(omega) - 1) x_fft.imag[1:] = random.normal(0, 1, len(omega) - 1) x_fft[1:] *= (1. / omega[1:]) ** (0.5 * beta) x_fft[1:] *= (1. / np.sqrt(2)) # by symmetry, the Nyquist frequency is real if x is real if (not generate_complex) and (N % 2 == 0): x_fft.imag[-1] = 0 if generate_complex: x = np.fft.ifft(x_fft) else: x = np.fft.irfft(x_fft, N) x = x - np.min(x) return x/np.sum(x)
[docs] class PointSourceBinnedTimeInjector(PointSourceTimeDepInjector): """Inject events distributed according to a binned lightcurve."""
[docs] def __init__(self, ana, src, flux, keep, lcs, threshs=None, lags=None, sindec_bandwidth=np.radians(2),): """Construct a PointSourceBinnedTimeInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type src: :class:`utis.Sources` :param src: the source list. required keys: ``ra`` and ``dec``. optional keys: ``weight``, ``extension`` :type flux: :class:`hyp.Flux` :param flux: the signal spectrum :type keep: list of str :param keep: names of event features to keep :type lcs: list of :class:`histlite.Hist` :param lcs: the per-source lightcurves(see :class:`pdf.BinnedTimePDFRatioModel`) :type threshs: ndarray of float :param threshs: per-source thresholds :type sindec_bandwidth: float :param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to draw simulated signal events """ self.lcs = np.atleast_1d(lcs) if threshs is None: self.threshs = np.zeros(len(src)) else: self.threshs = np.atleast_1d(threshs) if lags is None: self.lags = np.zeros(len(src)) else: self.lags = np.atleast_1d(lags) super(PointSourceBinnedTimeInjector, self).__init__( ana, src, flux, keep, sindec_bandwidth=sindec_bandwidth) self.pdfs = self.time_model.get_lc_pdfs_cropped( threshs_array=self.threshs, lags_array=self.lags)
[docs] def get_time_model(self): from .pdf import BinnedTimePDFRatioModel return BinnedTimePDFRatioModel(self.ana, self.src, self.lcs)
[docs] def get_frac_during_livetime(self): return self.time_model.get_frac_during_livetime( threshs_array=self.threshs, lags_array=self.lags)
[docs] def sample_mjd(self, src_n_injs, seed=None): random = get_random(seed) out = [] for (i, n) in enumerate(src_n_injs): mjds = [] pdf = self.pdfs[i] #j = 0 while len(mjds) < n: mjds = np.r_[mjds, pdf.sample(n - len(mjds), random=random)[0]] ontime_mask = self.is_during_ontime(mjds) mjds = mjds[ontime_mask] #j += 1 out.append(mjds) return out
[docs] class TransientInjector(PointSourceTimeDepInjector): """Inject events from transient sources."""
[docs] def __init__(self, ana, time_model, flux, keep, sindec_bandwidth=np.radians(2), spatial_prior=False): """Construct a TransientInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type time_model: :class:`pdf.TimePDFRatioModel` :param time_model: the time PDF ratio model :type flux: :class:`hyp.Flux` :param flux: the signal spectrum :type keep: list of str :param keep: names of event features to keep :type sindec_bandwidth: float :param sindec_bandwidth: width(**not** half-width) of the sin(dec) bands from which to draw simulated signal events :type spatial_prior: bool :param spatial_prior: option to use a spatial prior healpix map for source localisations """ self.time_model = time_model src = time_model.src super(TransientInjector, self).__init__( ana, src, flux, keep, sindec_bandwidth=sindec_bandwidth, spatial_prior=spatial_prior)
[docs] def get_time_model(self): return self.time_model
[docs] def get_frac_during_livetime(self): return self.time_model.get_frac_during_livetime()
[docs] def sample_mjd(self, src_n_injs, seed=None): random = get_random(seed) time_model = self.time_model out = [] if ( self.ana.grl is not None ): active_time_per_source = time_model.active_time_per_source for i, n in enumerate(src_n_injs): if( n>0 ): if ( len(active_time_per_source[i])==1 ): start, stop = active_time_per_source[i][0] out.append(random.uniform(start, stop, n)) else: out_i = [] probs = np.array([t_stop-t_start for t_start, t_stop in active_time_per_source[i]]) probs = probs/sum(probs) idx = random.choice(np.arange(len(probs)), n, p=probs, replace=True) for index in idx: start, stop = active_time_per_source[i][index] out_i.extend(random.uniform(start, stop, 1)) out.append(out_i) else: out.append( np.array([],dtype=float) ) else: # Just sample uniformly during t_100 start_mjds = time_model.src_mjd end_mjds = time_model.src_mjd_t_100 for (start_mjd, end_mjd, n) in izip(start_mjds, end_mjds, src_n_injs): out.append(random.uniform(start_mjd, end_mjd, n)) return out
[docs] class TemplateInjector(SigInjector): """Inject a highly-extended source according to a spatial template. This class implements injection according to an all-sky source template(such as for the Galactic "plane"). If per-pixel energy-binned spectra are available then they are used, in line with the principle that "you can do whatever you want in your likelihood but you'd better use everything you've got for signal injection", despite the fact that this is relatively slow computationally. """
[docs] def __init__(self, ana, template_model, keep, flux=None, sig_replace=True): """Construct a TemplateInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type template_model: :class:`pdf.TemplateSpacePDFRatioModel` :param template_model: the template space PDF ratio model :type keep: list of str :param keep: names of event features to keep :type flux: :class:`hyp.Flux` :param flux: if given, the signal spectrum. otherwise, inject according to per-pixel spectra if available, or use ``template_model.flux`` :type sig_replace: bool :param sig_replace: If true, sampling with replacement will be used, means an element can be reused multiple times. """ # save properties if flux is None: flux = template_model.flux self.ana, self.template_model, self.keep, self.flux = ana, template_model, keep, flux self.replace = sig_replace tm = template_model # weight as much as possible up front livetime = ana.livetime # if per-pixel flux, mask signal into allowed energy range if tm.bins_energy is None: self.sig = sig = ana.sig else: mask, i_energy = tm.get_mask_and_i_energy(ana.sig.true_energy) self.sig = sig = ana.sig[mask] # pixel indices pixels = hp.ang2pix(tm.nside, pi/2 - sig.true_dec, sig.true_ra) # build up weights from livetime, oneweight, and if possible, template self.weights_noflux = livetime * sig.oneweight if tm.bins_energy is None: # we can cache the event weights self.flux_weights = self.weights_noflux * flux(sig.true_energy) weights = self.flux_weights * tm.template[pixels] else: # we just cache the energy bin indices self.flux_weights = None mask, i_energy = tm.get_mask_and_i_energy(sig.true_energy) assert np.all(mask), 'should have restricted energy range already' self.mask, self.i_energy = mask, i_energy phi = tm.template[pixels, i_energy] weights = self.weights_noflux * phi # total acceptance from weighted events self._acc_total = np.sum(weights)
@property def acc_total(self): return self._acc_total
[docs] def inject(self, n_inj, seed=None): flush = sys.stdout.flush random = get_random(seed) tm = self.template_model sig = self.sig # scramble signal MC #print('scrambling'), flush() delta_ra = random.uniform(0, 2*pi, len(sig)) pixels = hp.ang2pix(tm.nside, pi/2 - sig.true_dec, sig.true_ra + delta_ra) # weight events #print('weighting'), flush() if self.flux_weights is None: # TODO: ideally find something faster than weighting # every MC event every trial i_energy = self.i_energy phi = tm.template[pixels, i_energy] weights = self.weights_noflux * phi else: weights = self.flux_weights * tm.template[pixels] weights_sum = np.sum(weights) probs = weights / weights_sum #print('selecting'), flush() idx = random.choice(len(weights), n_inj, replace=self.replace, p=probs) ra = sig.ra[idx] + delta_ra[idx] out = sig[idx] out['ra'] = ra for key in out.keys(): if key not in self.keep: del out[key] out['idx'], out['inj'] = idx, np.repeat(self, len(out)) #print('dunzo'), flush() return [out], 0
[docs] class TemplateBGInjector(Injector): """Inject typical types of background along with a highly-extended source according to a spatial template. This class implements addition injection to the typical background (scrambled or MC) according to an all-sky source template(such as for the Galactic "plane"). If per-pixel energy-binned spectra are available then they are used. """
[docs] def __init__(self, ana, template_model, keep, bg_weight_names, flux=None, sig_replace=True, randomizers = None, mcbg = False): """Construct a TemplateInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type template_model: :class:`pdf.TemplateSpacePDFRatioModel` :param template_model: the template space PDF ratio model :type keep: list of str :param keep: names of event features to keep :type flux: :class:`hyp.Flux` :param flux: if given, the signal spectrum. otherwise, inject according to per-pixel spectra if available, or use ``template_model.flux`` :type sig_replace: bool :param sig_replace: If true, sampling with replacement will be used, means an element can be reused multiple times. """ # save properties if flux is None: flux = template_model.flux self.ana, self.template_model, self.keep, self.flux, self.mcbg = ana, template_model, keep, flux, mcbg self.replace = sig_replace tm = template_model # weight as much as possible up front livetime = ana.livetime # if per-pixel flux, mask signal into allowed energy range if tm.bins_energy is None: self.sig = sig = ana.sig else: mask, i_energy = tm.get_mask_and_i_energy(ana.sig.true_energy) self.sig = sig = ana.sig[mask] self.bin_edges = [] if self.ana.dOmega_corr is not None: self.dOmega_corr = self.ana.dOmega_corr self.bin_edges = self.ana.bin_edges # pixel indices pixels = hp.ang2pix(tm.nside, pi/2 - sig.true_dec, sig.true_ra) # build up weights from livetime, oneweight, and if possible, template self.weights_noflux = livetime * sig.oneweight if tm.bins_energy is None: # we can cache the event weights self.flux_weights = self.weights_noflux * flux(sig.true_energy) weights = self.flux_weights * tm.template[pixels] else: # we just cache the energy bin indices self.flux_weights = None mask, i_energy = tm.get_mask_and_i_energy(sig.true_energy) assert np.all(mask), 'should have restricted energy range already' self.mask, self.i_energy = mask, i_energy phi = tm.template[pixels, i_energy] weights = self.weights_noflux * phi # total acceptance from weighted events self._acc_total = np.sum(weights) # Get MCBG if that is what wanted if self.mcbg: print("TemplateMCBGInjector has not been tested with the mcbg option") self.bg = ana.sig else: self.bg = ana.bg_data self.data=ana.data if 'azimuth' not in keep: self.keep.append('azimuth') if randomizers is None: randomizers = [RARandomizer()] self.randomizers = randomizers self.N = ana.n_data self.weight_names = bg_weight_names if 'energy' not in keep: self.keep.append('energy') if 'dec' not in keep: self.keep.append('dec') if self.mcbg: if 'idx' not in keep: self.keep.append('idx') if 'inj' not in keep: self.keep.append('inj') self._set_weights()
def _set_weights(self): weights = [] bg = self.bg weight_names = self.weight_names self.weights = [] self.n_exp = [] self.n_exp_err = [] self.n_w = len(weight_names) for weight_name in weight_names: #weights should already have flux incorperated and in units of Hz weights = bg[weight_name] * self.ana.livetime self.weights.append(weights) self.n_exp.append(sum(weights)) self.n_exp_err.append(np.sqrt(sum(weights**2))) self.probs = [self.weights[i] / np.sum(self.weights[i]) for i in xrange(self.n_w)] self.bg['idx'] = xrange(len(bg['dec'])) self.bg['inj'] = np.zeros(len(bg['dec'])) @property def acc_total(self): return self._acc_total
[docs] def inject(self, gp_norm, n_inj=None, seed=None, replace=False, truth=False, debug=False): random = get_random(seed) bg = self.bg bg_evts = utils.Events({key: bg[key] for key in self.keep}) size = len(bg_evts) bg_evts['idx'] = np.arange(size) # inject template using signal mc evts first tm = self.template_model sig = self.sig # scramble signal MC #randamize by ra delta_ra = random.uniform(0, 2*pi, len(sig)) pixels = hp.ang2pix(tm.nside, pi/2 - sig.true_dec, sig.true_ra + delta_ra) # weight events if self.flux_weights is None: # TODO: ideally find something faster than weighting # every MC event every trial i_energy = self.i_energy phi = tm.template[pixels, i_energy] weights = self.weights_noflux * phi else: weights = self.flux_weights * tm.template[pixels] weights_sum = np.sum(weights) probs = weights / weights_sum #print('selecting'), flush() gp_n_inj = int(weights_sum * gp_norm) # total num of gp evts scaled by weight if gp_norm < 1e-3: #pi0 template flux norm factor need special treatment gp_n_inj = int(round(self.flux.to_ns(flux=gp_norm, acc_total=self.acc_total, E0=100, unit=1e3, use_E2dNdE=True), 0)) idx = random.choice(len(weights), gp_n_inj, replace=self.replace, p=probs) ra = sig.ra[idx] + delta_ra[idx] tmp_out = sig[idx] tmp_out['ra'] = ra tmp_out['dec'] = sig.dec[idx] for key in tmp_out.keys(): if key not in self.keep: del tmp_out[key] tmp_out['idx'], tmp_out['inj'] = idx, np.repeat(self, len(tmp_out)) n_tmp = len(tmp_out) if len(tmp_out)>0: tmp_out = utils.Arrays.concatenate(tmp_out) if truth: return [tmp_out], [] #regular mc/data bg outs = [] if self.mcbg: for i in xrange(self.n_w): # grab a random map for each of the weight types, use the weights to get the uncertainty n_bg_mcfluct = random.normal(self.n_exp[i], scale = self.n_exp_err[i]) n_bg_mcfluct = int(n_bg_mcfluct) if n_bg_mcfluct > 0. else 0 n_bg = random.poisson(n_bg_mcfluct) probs = self.probs[i] indices = bg['idx'] # draw events n_bg =n_bg - gp_n_inj if n_bg<0: n_bg=0 idx = random.choice(indices, n_bg, p=probs, replace = replace) out = bg_evts[idx] out['inj'] = i*np.ones(len(out['dec'])) outs.append(out) outs = utils.Arrays.concatenate(outs) # scramble the background for randomize in self.randomizers: randomize(outs, seed=seed) outs = utils.Arrays.concatenate((tmp_out, outs)) return [outs], 0 else: # sample event indices with replacement as False idx = random.choice(bg_evts['idx'], size, replace=replace) outs = bg_evts[idx] outs['inj'] = np.repeat(self, size) n_selected = len(outs) # sample events from off-mask region to compensate oversamples = [] if len(self.bin_edges) > 0: for i in np.arange(len(self.bin_edges)-1): corr = self.dOmega_corr[i] dec_mask = (bg_evts['dec'] < self.bin_edges[i+1]) & (bg_evts['dec'] > self.bin_edges[i]) n_indec = sum(dec_mask) re_size = int((corr - 1) * n_indec) # allow same events re_idx = random.choice(bg_evts['idx'][dec_mask], re_size, replace=True) oversample = bg_evts[re_idx] oversample['inj'] = np.repeat(self, re_size) n_selected += len(oversample) if len(oversample) > 0: alldata = (self.data['dec'] < self.bin_edges[i+1]) & (self.data['dec'] > self.bin_edges[i]) if debug: print("in dec band: ", np.degrees(self.bin_edges[i]), np.degrees(self.bin_edges[i+1])) print("total events before masking: ", sum(alldata)) print("offmask events:", n_indec) print("correction from solid angle: ", corr) print("sample n events:", re_size) oversamples.append(oversample) oversamples = utils.Arrays.concatenate(oversamples) n_selected += n_tmp if debug: print("injected GP events: ", n_tmp) print("over sampling:", len(oversamples)) n_excluded = len(self.ana.data) - n_selected # scramble the background for randomize in self.randomizers: randomize(outs, seed=seed) if len(self.bin_edges) > 0: randomize(oversamples, seed=seed) if len(oversamples) > 0: outs = utils.Arrays.concatenate((tmp_out, outs, oversamples)) if debug: saved = [outs['energy'], outs['dec'], outs['ra'], outs['log10energy'], outs['sigma'], outs['azimuth']] np.save("oversampled_{}.npy".format(len(oversamples)), np.asarray(saved)) np.save("gp_{}.npy".format(gp_n_inj), np.asarray(saved)) print("inject gp evts: ", n_tmp) return [outs], n_excluded
[docs] class SpatialPriorInjector(SigInjector): """Injection of one or more point-like sources, whose position is not exactly known, but given in the form of a spatial prior (healpix map). """
[docs] def __init__(self, ana, src, flux, keep, hp_idx, sindec_bandwidth=np.radians(2),): """Construct a SpatialPriorInjector. :type ana: :class:`analysis.SubAnalysis` :param ana: the sub analysis :type src: :class:`utis.Sources` :param src: the source list. required keys: ``ra``, ``dec``, ``prior``. optional keys: ``weight``, ``extension`` :type flux: :class:`hyp.Flux` :param flux: the signal spectrum :type keep: list of str :param keep: names of event features to keep """ from .hyp import Flux self.ana, self.sig, self.src, self.keep = ana, ana.sig, src, deepcopy(keep) self.hp_idx = hp_idx livetime = ana.livetime if 'idx' not in keep: self.keep.append('idx') if 'inj' not in keep: self.keep.append('inj') if isinstance(flux, Iterable): self.acc_param = [ana.custom_acc_params[f] for f in flux] else: self.acc_param = len(src) * [ana.custom_acc_params[flux]] if isinstance(flux, Iterable): self.flux = flux else: self.flux = len(src) * [flux] nsides = np.array([prior.nside for prior in src.prior]) self.nside = nsides[0] if( not np.all(self.nside==nsides) ): raise Exception("Please provide a source list with where all priors\ have the same nside") self.npix = src.prior[0].npix self.sindec_bandwidth = sindec_bandwidth self._set_E_masks() self._set_weights()
def _set_E_masks(self): E_masks = [] N = len(self.src) logging = N > 50 flux0 = self.flux[0] energy_min, energy_max = flux0.energy_range mask_energy = ((energy_min <= self.sig.true_energy) & (self.sig.true_energy <= energy_max)) flush = sys.stdout.flush for i, flux in enumerate(self.flux): if flux.energy_range != (energy_min, energy_max): energy_min, energy_max = flux.energy_range this_mask_energy = ((energy_min <= self.sig.true_energy) & (self.sig.true_energy <= energy_max)) else: this_mask_energy = mask_energy if( np.sum(this_mask_energy)==len(self.sig.ra) ): # i.e. no masking # This is done to reduce memory in stacking analyses E_masks.append(True) else: E_masks.append(this_mask_energy) if logging: print('\r{:<40s}{:4.0f}% ...'.format( self.ana.key, 100. * i / N), end=''), flush() if logging: print('\r{:<40s}{:4.0f}%. '.format(self.ana.key, 100)) self.E_masks = np.array(E_masks) def _set_weights(self): weights = [] sig, src = self.sig, self.src ow, E = self.ana.livetime / (4*pi) * sig.oneweight, sig.true_energy n_src = len(self.src) for i_src in xrange(n_src): prior = src.prior[i_src] E_mask = self.E_masks[i_src] if( E_mask is True ): weight = ow*self.flux[i_src](E)*prior.map[self.hp_idx]*prior.npix else: weight = ow[E_mask] * self.flux[i_src](E[E_mask])\ * prior.map[self.hp_idx[E_mask]] * prior.npix weights.append(weight) self.src_weights = src.weight * np.array([np.sum(weight) for weight in weights]) self.src_prob = self.src_weights / np.sum(self.src_weights) self._acc_total = np.sum(self.src_weights) def _MC_event_weight(self, i_ev, i_src): ow = self.ana.livetime/(4*pi)*self.sig.oneweight[i_ev] E = self.sig.true_energy[i_ev] return ow * self.flux[i_src](E) @property def acc_total(self): return self._acc_total
[docs] def inject(self, n_inj, seed=None, ra=None, dec=None): random = get_random(seed) sig, src = self.sig, self.src n_src = len(self.src) nside = self.nside # Draw src positions if no given by ra & dec if( ra is None ): pix_idx = list(xrange(self.npix)) ra, dec = [], [] for i_src in xrange(n_src): # Sample a pixel, src location will be a random position # in that pixel prior = src.prior[i_src] if( prior is None ): src_ra, src_dec = src.ra[i_src], src.dec[i_src] else: pix_src = random.choice(pix_idx, p=prior.map) theta_pix_center, ra_pix_center = hp.pix2ang(nside, pix_src) src_outside_pix = True while( src_outside_pix ): d_theta, d_ra = 2*random.uniform(0, hp.nside2resol(nside), 2) d_theta += np.pi/2 src_theta, src_ra = coord.rotate_xaxis_to_source(theta_pix_center, ra_pix_center, d_theta, d_ra,latlon=False) pix_shifted_src = hp.ang2pix(nside, src_theta,src_ra) if( pix_shifted_src==pix_src ): src_outside_pix = False src_dec = np.pi/2 - src_theta ra.append(src_ra) dec.append(src_dec) # Split n_inj over all sources if( n_src==1 ): src_n_injs = [n_inj] else: # Set source weights & split src_n_injs base_src_weights = np.zeros(n_src) for i_src in xrange(n_src): log_acc = self.acc_param[i_src].s(np.sin(dec[i_src])) base_src_weights[i_src] = np.exp(log_acc)*src.weight[i_src] if np.sum(base_src_weights): src_prob = base_src_weights / np.sum(base_src_weights) else: src_prob = base_src_weights src_n_injs = random.multinomial(n_inj, src_prob) outs = [] for i_src in xrange(n_src): src_n_inj = src_n_injs[i_src] if src_n_inj == 0: continue # collect source info extension = src.extension[i_src] prior = src.prior[i_src] src_dec, src_ra = dec[i_src], ra[i_src] # Select MC events to be used # (pre-binning MC events can probably speed this up) src_sindec = np.sin(src_dec) min_sindec = max(-1, src_sindec - .5*self.sindec_bandwidth) max_sindec = min(+1, src_sindec + .5*self.sindec_bandwidth) if( max_sindec==1 ): min_sindec = 1 - self.sindec_bandwidth elif( min_sindec==-1 ): max_sindec = -1 + self.sindec_bandwidth min_dec, max_dec = np.arcsin(min_sindec), np.arcsin(max_sindec) mask_dec = (min_dec <= sig.true_dec) & (sig.true_dec <= max_dec) mask = mask_dec & self.E_masks[i_src] indices = np.where(mask)[0] # draw events weights = self._MC_event_weight(indices, i_src) probs = weights/np.sum(weights) idx = random.choice(indices, src_n_inj, p=probs) ev_ra, ev_dec = sig.xra[idx], sig.xdec[idx] # smear if necessary if extension: ev_ra += random.normal(0, extension, src_n_inj) ev_dec += random.normal(0, extension, src_n_inj) # rotate to source position ev_dec, ev_ra = coord.rotate_xaxis_to_source( src_dec, src_ra, ev_dec, ev_ra, latlon=True) out = sig[idx] out['ra'], out['dec'] = ev_ra, ev_dec out['idx'] = idx out['inj'] = np.repeat(self, len(out)) for key in out.keys(): if key not in self.keep: del out[key] outs.append(out) return outs, 0