# trial.py
r"""User-level interface for csky trial operations.
This module gives the user-level interface for generating actual or randomized datasets and
evaluating and/or maximizing likelihoods. It also includes tools for performing batches of trials;
estimating threshold quantities such as sensitivities, discovery potentials, and upper limits; and
distributing trials over multiple local cores.
Trial Runner
Programmatically, a trial runner is anything which implements the interface declared in the
abstract base class, :class:`TrialRunnerBase`. Broadly speaking, trial runners are objects
which:
1. Know how to generate actual or randomized datasets, possibly including simulated signal
injections.
2. Know how to apply a :class:`csky.llh.LLHModel` to the data in order to obtain a
:class:`csky.llh.LLHEvaluator`.
3. Know how to map operations 1-2 onto multiple datasets to perform multi-dataset analysis.
4. Know how to associate fit results (e.g. [3.7, 12.36, 2.5]) with column names (e.g. ['TS,
ns, gamma'])
5. Know how to report per-dataset signal acceptance for use in counts <--> flux calculations.
Thru this interface, :class:`TrialRunnerBase` provides trial batching, sens/disc/UL
calculation, and embarassingly parallel trial multiprocessing.
This module provides the following trial runners:
* :class:`TrialRunner` -- for performing trials that involve a single maximum likelihood fit.
This includes single source analysis, multi-source stacking, and template analysis. Other trial
runners are generally implemented in terms of this one.
* :class:`SkyScanTrialRunner` -- for performing trials that search for the hottest spot over much
or all of the sky, on a grid of healpix pixels.
* :class:`SpatialPriorTrialRunner` -- for performing trials that search for the hottest spot
associated with each of one or more spatial priors, again expressed in terms of healpix maps.
* :class:`MultiTrialRunner` -- the most generic trial runner, for performing trials that search for
the maximum test statistic that can be obtained using the likelihood fits from one or more
other arbitrary trial runners.
"""
from __future__ import print_function
import abc
from abc import ABCMeta, abstractmethod, abstractproperty
import collections
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
import copy
import datetime
try:
from itertools import izip
except ImportError:
izip = zip
try:
xrange
except NameError:
xrange = range
import multiprocessing as mp
import numbers
import sys
import time
import warnings
import healpy as hp
import numpy as np
pi = np.pi
from scipy import interpolate, optimize, stats
import histlite as hl
from . import hyp, llh, utils, dists, coord, inj, pdf, inspect
from .utils import get_random
Trial = collections.namedtuple('Trial', 'evss n_exs')
SpatialPriorTrial = collections.namedtuple('SpatialPriorTrial', 'evss n_exs ra dec')
[docs]
class TrialRunnerBase(object):
"""Base class for trial runners.
This is the base class for csky's somewhat idiosyncratic concept of *trial runners*. In short,
a trial runner is an object which knows how to:
1. generate ensembles of events(optionally including signal injection)
2. find the parameters that optimize the likelihood for given event ensembles
3. repeat 1-2 *en masse*
In most work with csky, classes derived from this one do account for the vast majority of
user-library interaciton. This base class provides a number of methods for performing(1-2) or
(3) very easily; converting counts <--> flux-or-fluence; conveniently estimating threshold
quantities such as sensitivity, discovery potential, or upper limits; and otherwise inspecting
the source data. Derived classes are responsible only for implementing(1-2); providing the
the source data. Derived classes are responsible only for implementing(1-2); providing the
named array mapping for (3); and of course tracking the signal acceptance of each dataset for
use in those counts <--> flux-or-fluence calculations.
"""
__metaclass__ = ABCMeta
[docs]
def __init__(self, ana):
"""Construct a TrialRunnerBase.
Args:
ana (:class:`csky.analysis.Analysis`): the(multi-dataset) Analysis
"""
self.ana = ana
@property
def keys(self):
"""The Analysis keys."""
return self.ana.keys
@property
def n_ana(self):
"""The number of SubAnalysis datasets."""
return len(self.keys)
[docs]
def to_dict(self, x):
"""Convert an ordered per-SubAnalysis quantity into a key->value dict."""
return {self.keys[i]: x[i] for i in xrange(self.n_ana)}
@abstractproperty
def sig_inj_accs(self):
"""Per-SubAnalysis SigInjector total acceptances."""
pass
[docs]
@abstractmethod
def get_accs(self, **params):
"""Per-SubAnalysis signal acceptances given fit parameters."""
pass
[docs]
def sig_inj_acc_total(self):
"""Sum over :py:attr:`TrialRunnerBase.sig_inj_accs`."""
return np.sum(self.sig_inj_accs)
[docs]
def get_acc_total(self, **params):
"""Sum over :meth:`TrialRunnerBase.get_acc_total_accs`."""
return np.sum(self.get_accs(**params))
def _split_params_fluxargs_acc(self, flux, *a, **kw):
# TODO:
# can this be more general?
# *should* this be *even less* general?
# what is the proper treatment of unexpected kw?
params = {}
params.update(kw)
use_acc_param = True
if isinstance(flux, Iterable):
if not np.all(np.array(flux) == flux[0]):
raise ValueError('cannot convert to mixed fluxes')
flux = flux[0]
elif flux is not None:
params.update(flux.as_params)
elif 'gamma' in params:
flux = hyp.PowerLawFlux(params['gamma'])
else:
flux = np.atleast_1d(self.sig_injs[-1].flux)
if len(flux) > 1 and not np.all(flux == flux[0]):
print("WARNING: trying to use a list of custom fluxes without specifying the right type of fluxargs, make sure it is intended!")
elif len(flux) == 1:
flux = flux[0]
use_acc_param = False
fluxargs = {}
if len(a) >= 3:
raise TypeError('too many positional arguments')
if len(a) >= 2:
fluxargs['unit'] = a[1]
if len(a) >= 1:
fluxargs['E0'] = a[0]
fluxargs.setdefault('E0', params.pop('E0', 1))
fluxargs.setdefault('unit', params.pop('unit', 1))
if 'use_E2dNdE' in params:
fluxargs.setdefault('use_E2dNdE', params.pop('use_E2dNdE', True))
if use_acc_param:
acc = self.get_acc_total(**params)
else:
acc = self.sig_inj_acc_total
return flux, params, fluxargs, acc
[docs]
def to_model_norm(self, ns, flux=None, **params):
"""Get the model normalization.
See also :meth:`hyp.Flux.to_model_norm`.
Todo:
* clarify sig_inj_acc_total vs get_acc_total situation in both code and docs
"""
if isinstance(ns, dict):
ns = ns['n_sig']
if flux is None:
return ns / self.sig_inj_acc_total
else:
return flux.to_model_norm(ns, self.get_acc_total(**params))
[docs]
def to_dNdE(self, ns, flux=None, *a, **kw):
"""Get dN/dE.
See also :meth:`hyp.Flux.to_dNdE`.
Todo:
* clarify sig_inj_acc_total vs get_acc_total situation in both code and docs
"""
flux, params, fluxargs, acc = self._split_params_fluxargs_acc(flux, *a, **kw)
if isinstance(flux, np.ndarray) and len(flux)>=1 and not np.all(flux == flux[0]):
ret=0
for f in flux:
ret+=f.to_dNdE(ns, acc, **fluxargs)
return ret
else:
return flux.to_dNdE(ns, acc, **fluxargs)
[docs]
def to_E2dNdE(self, ns, flux=None, *a, **kw):
"""Get E0^2 * dN/dE.
See also :meth:`hyp.Flux.to_dNdE`.
Todo:
* clarify sig_inj_acc_total vs get_acc_total situation in both code and docs
"""
flux, params, fluxargs, acc = self._split_params_fluxargs_acc(flux, *a, **kw)
if isinstance(flux, np.ndarray) and len(flux)>=1 and not np.all(flux == flux[0]):
ret=0
for f in flux:
ret+=f.to_E2dNdE(ns, acc, **fluxargs)
return ret
else:
return flux.to_E2dNdE(ns, acc, **fluxargs)
[docs]
def to_ns(self, E2dNdE, flux=None, *a, **kw):
"""Get number of signal events corresponding to a particular flux.
See also :meth:`hyp.Flux.to_ns`.
Todo:
* clarify sig_inj_acc_total vs get_acc_total situation in both code and docs
"""
flux, params, fluxargs, acc = self._split_params_fluxargs_acc(flux, *a, **kw)
return flux.to_ns(E2dNdE, acc, **fluxargs)
[docs]
@abstractmethod
def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False):
"""Get a trial.
Args:
n_sig (int): number of signal events to inject
poisson (bool): whether n_sig is specific or just a Poissonian mean
seed (int): seed spec
TRUTH (bool): whether to use background-like("scrambled") or real data
Returns:
list of lists of :class:`csky.utils.Events`, list of int:
* events(for each SubAnalysis(for each injection origin))
* ns_excluded(number of events pre-emptively masked out for each SubAnalysis)
"""
pass
[docs]
@abstractmethod
def get_one_fit_from_trial(self, evss_ns_excluded_tuple, flat=True, **fitter_args):
"""Get a fit, given a trial.
Args:
evss_ns_excluded_tuple (list of lists of :class:`csky.utils.Events`, list of int): same
as return from :meth:`TrialRunnerBase.get_one_trial`
flat (bool): whether to flatten the fit result
fitter_args: same as ``params`` in :meth:`csky.llh.LLHEvaluatorBase.fit`
"""
pass
def get_one_fit_from_skylab(self, skylab_events, flat=True, decorrelate=True, logging=False, **fitter_args):
cdatas = []
for s in skylab_events:
cdata = utils.Events(s, convert=True)
for ki in cdata.keys():
cdata[ki] = cdata[ki].copy()
cdatas.append([cdata])
trial = Trial(cdatas, np.zeros(len(cdatas)))
result = self.get_one_fit_from_trial(trial, flat=flat, decorrelate=decorrelate, logging=logging, **fitter_args)
return result
@abstractmethod
def _fits_np_to_Arrays(self, fits):
pass
[docs]
def get_one_fit(self, n_sig=0, poisson=False, seed=None, TRUTH=False,
flat=True, **fitter_args):
"""
Get a fit for one new trial.
Args:
n_sig (int): number of signal events to inject
poisson (bool): whether n_sig is specific or just a Poissonian mean
seed (int): seed spec
TRUTH (bool): whether to use background-like("scrambled") or real data
flat (bool): whether to flatten the fit result
fitter_args: same as ``params`` in :meth:`csky.llh.LLHEvaluatorBase.fit`
"""
evss_ns_excluded_tuple = self.get_one_trial(
n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
return self.get_one_fit_from_trial(evss_ns_excluded_tuple, flat=flat, **fitter_args)
def _log_trials(self, n_complete, n_total, done=False):
print('\r {:10d}/{} trials complete.{}'.format(
n_complete, n_total, ' ' if done else '..'), end='\n' if done else '')
sys.stdout.flush()
def _get_some_fits(self, n_trials, seeds, n_sig=0, poisson=False,
queue=None, i_job=0, progress=None, logging=True, **kw):
logging = logging and (progress==None)
if progress is None:
progress = [0]
log_trials = self._log_trials if logging else (lambda *a,**kw: None)
fits = []
try:
i = 0
for i in xrange(n_trials):
if progress[i_job] < 0:
break
if i % 10 == 0:
log_trials(i, n_trials)
fit = self.get_one_fit(n_sig=n_sig, poisson=poisson, seed=seeds[i], **kw)
fit.append(seeds[i])
fits.append(fit)
progress[i_job] = i + 1
log_trials(i+1, n_trials, done=True)
except KeyboardInterrupt:
progress[i_job] = -1
fits = np.array(fits)
if queue is None:
return self._fits_np_to_Arrays(fits)
else:
queue.put((i_job, fits))
[docs]
def get_many_fits(self, n_trials, n_sig=0, poisson=False, seed=None,
mp_cpus=None, logging=True, **fitter_args):
"""
Get fits for a batch of new trials.
Args:
n_trials (int): number of trials to perform
n_sig (int): number of signal events to inject
poisson (bool): whether n_sig is specific or just a Poissonian mean
seed (int): seed spec
mp_cpus (int): number of cores to use
logging (bool): whether to print a log as trials are performed
fitter_args: same as ``params`` in :meth:`csky.llh.LLHEvaluatorBase.fit`
Returns:
:class:`csky.utils.Arrays`: fit results -- ts, ns, and any other fit parameters
"""
if mp_cpus is None:
mp_cpus = self.mp_cpus
mp_cpus = min(n_trials, mp_cpus)
random = get_random(seed)
n_trials = int(n_trials)
BIG = 2**32
seeds = random.randint(BIG, size=n_trials)
if logging:
print('Performing {} {} using {} {}:'.format(
n_trials,
'background trials'
if n_sig == 0 else
'trials with n_sig = {:.3f}{}'.format(n_sig, ' (poisson)' if poisson else ''),
mp_cpus, 'core' if mp_cpus == 1 else 'cores'))
if mp_cpus == 1:
return self._get_some_fits(
n_trials=n_trials, seeds=seeds, n_sig=n_sig, poisson=poisson,
logging=logging, **fitter_args)
else:
# set up _get_some_fits arguments
x = np.array_split(np.arange(n_trials), mp_cpus)
n_jobs = len(x)
queue = mp.Queue()
progress = mp.Array('i', np.zeros(n_jobs, dtype=int))
kwargs = [
dict(n_trials=len(x[i]), seeds=seeds[x[i]],
n_sig=n_sig, poisson=poisson,
queue=queue, i_job=i, progress=progress, **fitter_args)
for i in xrange(n_jobs)]
# start processes
procs = [
mp.Process(target=self._get_some_fits, kwargs=kwargs[i])
for i in xrange(n_jobs)]
for proc in procs:
proc.start()
# monitor progress, try to catch interrupts
try:
n_complete = np.sum(progress)
while n_complete < n_trials:
if np.any(np.asarray(progress) < 0):
raise KeyboardInterrupt()
if logging:
self._log_trials(n_complete, n_trials)
n_complete = np.sum(progress)
time.sleep(1 if logging else .25)
except KeyboardInterrupt:
# signal to jobs that it's time to stop
for i in xrange(n_jobs):
progress[i] = -2
print('\nKeyboardInterrupt: terminating early.')
# collect results and join jobs
fitss = [queue.get() for proc in procs]
[proc.join() for proc in procs]
fitss = [self._fits_np_to_Arrays(fits[1]) for fits in sorted(fitss)]
fits = utils.Arrays.concatenate(fitss)
if logging:
self._log_trials(len(fits), n_trials, done=True)
return fits
[docs]
def find_n_sig(
self, ts, beta,
tol=0.03, n_batches=6, batch_size=50, coverage=2,
n_sig_step=3, n_sig_start=None, first_batch_size=50,
min_batch_size=0, max_batch_size=np.inf,
tss=None, trials=None, n_bootstrap=100,
mp_cpus=None, seed=None, logging=True, **kw):
"""
Find the signal strength that exceeds ``ts`` in ``beta`` fraction of trials.
This method estimates sensitivity or discovery potential in terms of a Poisson
mean expected number of events ``n_sig``.
Under typical interactive usage, this method estimates sensitivity or discovery
potential in two steps. First a preliminary scan finds a very coarse estimate
used to define a grid of relevant signal strengths. Then, batches of trials are
performed for each n_inj in that grid, and the TS-passing-fraction curve (as a
function of n_inj) is interpolated to find a more precise ``n_sig`` estimate.
The trial batches are grown iteratively in chunks of size ``batch_size`` (or
``min_batch_size`` for the first iteration, if that is larger) until one of two
stopping conditions is reached: either the batches have reached
``max_batch_size``, or the estimated relative error (``n_sig_error/n_sig``
ratio) is less than ``tol``.
Pre-computed trials can be used if either ``tss`` or ``trials`` is passed.
Then, if ``tol=np.inf`` is passed, no new trials will be performed. If
``n_bootstrap=1`` is passed, the error estimate will also be effectively skipped,
saving time if these estimates are not needed.
Args:
ts (float): test statistic threshold
beta (float): fraction of signal trials that should exceed ``ts``
tol (float): stop main trial loop if relative error (``n_sig_error/n_sig``)
reaches this value or less
n_batches (int): number of n_inj grid points (columns in the printed log)
batch_size (int): number of trials per grid point per iteration
coverage (float): factor to multiply by the initial coarse scan ``n_sig``
estimate to obtain the maximum n_inj grid point
n_sig_step (float): ``n_sig`` step size for initial coarse scan
n_sig_start (float): ``n_sig`` starting point for initial coarse scan
first_batch_size (int): number of trials per iteration for initial coarse scan
min_batch_size (int): number of trials per n_inj for first iteration of main
trial batches
max_batch_size (int): stop main trial loop if batches reach this size
tss (dict(float,numpy.ndarray)): TS values from pre-computed trials, with items like ``n_inj:
ts_array``
trials (dict(float, :class:`csky.utils.Arrays` )): pre-computed trials, with
items like ``n_inj: trials``
n_bootstrap (int): number of times to resample from trial arrays to estimate
``n_sig_error``
mp_cpus (int): number of cores to use for trial batches
seed: random number generator seed
logging (bool): whether to print a log as trials are performed
**kw: additional keyword arguments are passed to
:meth:`TrialRunnerBase.get_many_fits`.
"""
def do_log(*a, **kw):
if logging:
print(*a, **kw)
sys.stdout.flush()
start = datetime.datetime.now()
do_log('Start time: {}'.format(start))
# sanity requirements
assert 0 < tol <= 1
n_batches = int(n_batches)
batch_size = int(batch_size)
ts = float(ts)
random = np.random.RandomState(seed)
n_sig_start = n_sig_start or n_sig_step
mp_cpus = self.mp_cpus if mp_cpus is None else mp_cpus
do_log('Using {} cores.'.format(mp_cpus))
def frac_above(a):
"""
Fraction of TS in `a` that are above the TS threshold.
"""
return 1.0 * float(np.sum(np.asarray(a) > ts)) / len(a) if len(a) else 0
if trials is not None and tss is None:
tss = {n_sig: trials[n_sig].ts for n_sig in trials}
if tss is None:
# find the region of interest
first_tss = []
n_sig = n_sig_start - n_sig_step
do_log('* Starting initial scan for '
'{:.0f}% of {} trials with TS >= {:.3f}...'.format(
100 * beta, first_batch_size, ts))
count_initial_trials = 0
count_initial_batches = 0
while frac_above(first_tss) < beta:
n_sig += n_sig_step
do_log('\r n_sig = {:.3f} ...'.format(n_sig), end='')
first_tss = self.get_many_fits(
first_batch_size, n_sig, poisson=True, logging=False,
seed=random.randint(2**32),
mp_cpus=min(mp_cpus, first_batch_size),**kw).ts
count_initial_batches += 1
count_initial_trials += first_batch_size
do_log(' frac = {:.5f}'.format(frac_above(first_tss)))
# define n_sig grid and report status
n_sigs = np.linspace(0, coverage * n_sig, n_batches)
do_log('* Generating batches of {} trials...'.format(batch_size))
do_log(format('n_trials | n_inj')
+ ''.join([format(n, '8.2f') for n in n_sigs])
+ format('| n_sig(relative error)', '>26s'))
n_trials_already = 0
tss = {n_sig_i: np.array([]) for n_sig_i in n_sigs}
else:
# start with existing trials
n_sigs = np.array(sorted(tss))
do_log('* Generating batches of {} trials...'.format(batch_size))
do_log(format('n_trials | n_inj')
+ ''.join([format(n, '8.2f') for n in n_sigs])
+ format('| n_sig(relative error)', '>26s'))
n_trials_already = np.min([len(tss[k]) for k in tss])
def add_batches(batch_size):
seeds = random.randint(2**32, size=len(n_sigs))
if mp_cpus > 1:
for (i_sig, n_sig) in enumerate(n_sigs):
tss.setdefault(n_sig, np.array([]))
if batch_size:
fits = self.get_many_fits(
batch_size, n_sig=n_sig, poisson=True, seed=seeds[i_sig],
mp_cpus=mp_cpus, logging=False, **kw)
tss[n_sig] = np.concatenate((tss[n_sig], fits.ts))
else:
fits = tss[n_sig]
if len(fits) < batch_size:
raise KeyboardInterrupt()
if np.sum(np.isnan(tss[n_sig])) > .5 * len(tss[n_sig]):
# not enough trials suggests a KeyboardInterrupt
raise KeyboardInterrupt()
do_log('{:7.1f}%'.format(100 * frac_above(tss[n_sig])), end='')
else:
for (i_sig, n_sig) in enumerate(n_sigs):
#do_log('.', end='')
old_tss = tss.setdefault(n_sig, [])
new_tss = []
if batch_size:
new_tss = self.get_many_fits(
batch_size, n_sig, poisson=True, seed=seeds[i_sig], logging=False,
mp_cpus=1, **kw).ts
if len(new_tss) < batch_size:
raise KeyboardInterrupt()
if np.sum(np.isnan(new_tss)) > .5 * len(new_tss):
# not enough trials suggests a KeyboardInterrupt
raise KeyboardInterrupt()
tss[n_sig] = np.r_[old_tss, new_tss]
do_log('{:7.1f}%'.format(100 * frac_above(tss[n_sig])), end='')
def get_n_sig_and_error():
info = get_n_sig_batches(0, beta, tss, ts=ts)
n_sig = info['n_sig']
mp_cpus_here = min(mp_cpus, n_bootstrap, 10)
seeds = random.randint(2**32, size=n_bootstrap)
if mp_cpus > 1:
n_batch = int(np.ceil(1.0 * n_bootstrap / mp_cpus))
n_sigs_bootstrap = []
queue = mp.Queue()
jobs = []
seedss = np.array_split(seeds, mp_cpus)
for seeds in seedss:
args = (beta, tss, ts, seeds, queue)
p = mp.Process(
target=get_n_sig_bootstraps, args=args)
jobs.append(p)
p.start()
for j in jobs:
n_sigs_bootstrap.append(queue.get())
for j in jobs:
j.join()
n_sigs_bootstrap = np.concatenate(n_sigs_bootstrap)
else:
def get_tss_bootstrap(seed):
random = np.random.RandomState(seed)
out = {n: random.choice(tss[n], len(tss[n])) for n in tss}
return out
#n_sigs_bootstrap = np.array([
# get_n_sig_batches(0, beta, get_tss_bootstrap(seed), ts=ts)['n_sig']
# for seed in seeds])
n_sigs_bootstrap = get_n_sig_bootstraps(beta, tss, ts, seeds)
mask = np.isfinite(n_sigs_bootstrap)
if mask.sum():
n_sig_interval = np.percentile(
n_sigs_bootstrap[mask], 100 * stats.norm.cdf([-1, +1]))
n_sig_error = .5 * np.diff(n_sig_interval)[0] / n_sig
else:
n_sig_error = np.inf
return n_sig, n_sig_error, info
# get some batches
n_trials = []
n_sig_error = 1.01
n_sig_history = []
n_sig_error_history = []
while not (n_sig_error <= tol) and ((not n_trials) or n_trials[-1] < max_batch_size):
try:
if len(n_trials):
this_batch_size = batch_size
else:
this_batch_size = max(batch_size, min_batch_size)
if n_trials:
n_trials.append(n_trials[-1] + this_batch_size)
elif n_trials_already:
n_trials = [n_trials_already]
else:
n_trials.append(this_batch_size)
do_log('{:<8d}{:<8}'.format(n_trials[-1], ' |'), end='')
if n_trials_already:
add_batches(0)
n_trials_already = 0
else:
add_batches(this_batch_size)
n_trials_already = 0
do_log(' | ', end='')
n_sig, n_sig_error, info = get_n_sig_and_error()
n_sig_history.append(n_sig)
n_sig_error_history.append(n_sig_error)
do_log(format(
'{:.3f} (+/-{:5.1f}%)'.format(n_sig, 100 * n_sig_error), '>22s'), end='')
do_log(' [chi2.cdf]' if info['n_sig'] == info['n_sig_chi2cdf'] else ' [spline]')
except KeyboardInterrupt:
break
end = datetime.datetime.now()
do_log('End time: {}'.format(end))
do_log('Elapsed time: {}'.format(end - start))
return dict(n_sig=n_sig, n_sig_error=n_sig_error,
n_sig_history=np.array(n_sig_history),
n_sig_error_history=np.array(n_sig_error_history),
n_trials_history=n_trials,
info=info, tss=tss)
[docs]
class TrialRunner(TrialRunnerBase):
"""Main useful trial runner.
This class performs trials that involve a single maximum likelihood fit. This includes single
source analysis, multi-source stacking, and template analysis. Other trial runners are
generally implemented in terms of this one.
"""
[docs]
def __init__(self, ana, get_llh, get_injs,
fitter_args={}, param_names=None,
mp_cpus=1, mp_nice=0,
llh_kw={}, inj_kw={},
concat_evs=False):
"""Construct a TrialRunner.
Args:
ana (:class:`analysis.SubAnalysis`): the sub analysis
get_llh (callable): ``get_llh(ana, **kwargs)`` -> :class:`LLHEvaluator`
get_injs (callable): ``get_injs(ana, **kwargs)`` -> (truth, bg, sig), each of which
must be an instance of :class:`inj.Injector`, or should be ``None``.
Specifically, as an optimization, in cases where we know we don't need
the signal injector (e.g. when a :class:`TrialRunner` is merely an intermediary
for a :class:`SkyScanTrialRunner`), the call will look like
``get_injs(..., inj=False)`, in which case ``sig`` can be ``None``.
fitter_args (dict): arguments to pass to :meth:`LLHEvaluatorBase.fit`
param_names (list of str): ordered list of parameter names. This ensures that
fit results can be represented sequentially in a reliable way.
mp_cpus (int): default number of cores to use for trial batches (can be overridden at the
specific call sites, e.g. ``tr.get_many_fits(..., mp_cpus=2)``)
mp_nice (int): OS-level niceness (TODO: actually this is currently ignored, but it should
be easy to respect if we want to)
llh_kw (dict): additional kwargs to pass to ``get_llh``
inj_kw (dict): additional kwargs to pass to ``get_injs``
Todo:
* mp_nice is currently ignored, but it should
be easy to respect if we want to
"""
super(TrialRunner, self).__init__(ana)
self.llh_kw = llh_kw
try:
self.template_ninj = inj_kw['conf']['gpbg_conf']['inj_conf'].get('nsig', None) #leave the option to inject chosen number of GP events
self.template_norm = inj_kw['conf']['gpbg_conf']['inj_conf'].get('gp_norm', None)
except:
self.template_ninj =None
llh_models, truth_injs, bg_injs, sig_injs = [], [], [], []
if not hasattr(get_llh, 'keys'):
get_llh = {key: get_llh for key in ana.keys}
if not hasattr(get_injs, 'keys'):
get_injs = {key: get_injs for key in ana.keys}
self.concat_evs = concat_evs
fits = {}
names = []
for a in ana:
l = get_llh[a.key](a, **llh_kw)
t, b, s = get_injs[a.key](a, l, **inj_kw)
llh_models.append(l)
truth_injs.append(t)
bg_injs.append(b)
sig_injs.append(s)
if fits and not fitter_args:
if not l.pdf_ratio_model.fits == fits:
raise RuntimeError('fit parameters do not match: {} vs {}'.format(
l.pdf_ratio_model.fits, fits))
if not l.pdf_ratio_model.param_names == names:
raise RuntimeError('fit parameters do not match: {} vs {}'.format(
l.pdf_ratio_model.fits, fits))
else:
fits = l.pdf_ratio_model.fits
names = l.pdf_ratio_model.param_names
fits.update(fitter_args)
if param_names is not None:
names = param_names
fits_names = [
name for name in fits
if ((name[0] != '_')
and (name not in ['prior', 'seeder'])
)]
sfits_names = np.sort(fits_names)
snames = np.unique(names)
if (len(sfits_names) != len(snames)) or np.any(np.sort(fits_names) != np.unique(names)):
raise ValueError(
'fitter_args and param_names must be consistent({} vs {})'.format(
np.sort(fits_names), np.unique(names)))
for name in sorted(names):
if name in ['prior', 'seeder']:
continue
if len(np.atleast_1d(fits[name])) == 1:
names.remove(name)
#del fits[name]
# save the Analysis and LLHModels
self.ana, self.llh_models = ana, llh_models
self.llh_models_origin = copy.deepcopy(self.llh_models)
# save the Injectors
self.truth_injs, self.bg_injs, self._sig_injs = truth_injs, bg_injs, sig_injs
# save default fitter strategy
self.fitter_args = fits
self.param_names = ['ns'] + names
self.result_names = ['TS'] + self.param_names
# multiprocessing defaults
self.mp_cpus, self.mp_nice = mp_cpus, mp_nice
@property
def sig_injs(self):
"""The signal injectors."""
if self._sig_injs[0] is not None:
return self._sig_injs
else:
raise RuntimeError('no signal injectors present')
@property
def sig_inj_accs(self):
return np.array([sig_inj.acc_total for sig_inj in self.sig_injs])
@property
def sig_inj_acc_total(self):
return np.sum(self.sig_inj_accs)
[docs]
def get_accs(self, **params):
return np.array([
llh_model.acc_model.get_acc_total(**params)
for llh_model in self.llh_models])
@property
def sig_inj_probs(self):
"""Relative weight of each signal injector (by signal acceptance)."""
weights = self.sig_inj_accs
return weights / np.sum(weights)
[docs]
def sig_n_injs(self, n_sig=0, poisson=False, seed=None):
"""Multinomial sample from :py:attr:`TrialRunner.sig_inj_probs` ."""
random = get_random(seed)
sig_inj_probs = self.sig_inj_probs
n_inj = random.poisson(n_sig) if poisson else n_sig
n_injs = random.multinomial(n_inj, sig_inj_probs)
return n_injs
[docs]
def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False, debug=False):
random = get_random(seed)
if n_sig:
n_injs = self.sig_n_injs(n_sig, poisson=poisson, seed=seed)
evss = []
ns_excluded = []
for (i, key) in enumerate(self.keys):
if TRUTH:
evs, n_excluded = self.truth_injs[i].inject(seed=seed)
# test block to save the events for skymaps
if debug:
gp_scale = 100
tmp_evs, tmp_excluded = self.bg_injs[i].inject(n_inj=self.template_ninj, gp_norm=gp_scale*self.template_norm, seed=seed, truth=TRUTH)
outs = tmp_evs+ evs
outs = utils.Arrays.concatenate(outs)
evs += tmp_evs
saved = [outs['energy'], outs['dec'], outs['ra'], outs['log10energy'], outs['sigma'], outs['azimuth']]
np.save("MC_bkg_GP_{}.npy".format(gp_scale), np.asarray(saved))
evss.append(evs)
ns_excluded.append(n_excluded)
# update bkg pdf
if isinstance(self.bg_injs[i], inj.TemplateBGInjector):
tmp_evs, tmp_excluded = self.bg_injs[i].inject(n_inj=self.template_ninj, gp_norm=self.template_norm, seed=seed, truth=TRUTH)
if self.template_norm > 0:
original_llh = copy.deepcopy(self.llh_models_origin)
if len(self.llh_models)>=2:
raise NotImplementedError('llh pdfs updates for multi-seasons not implemented')
else:
self.llh_models[0].pdf_ratio_model = original_llh[0].pdf_ratio_model.get_updated(tmp_evs[0])
self.llh_models[0].update_bg = False #mute it for unbliding
else:
if not isinstance(self.bg_injs[i], inj.TemplateBGInjector):
evs, n_excluded = self.bg_injs[i].inject(seed=random.randint(2**32))
else:
evs, n_excluded = self.bg_injs[i].inject(n_inj=self.template_ninj, gp_norm=self.template_norm, seed=seed)
# only for update bkg pdf using gp events
tmp_evs, tmp_excluded = self.bg_injs[i].inject(n_inj=self.template_ninj, gp_norm=self.template_norm, seed=seed, truth=True)
original_llh = copy.deepcopy(self.llh_models_origin)
if len(self.llh_models)>=2:
raise NotImplementedError('llh pdfs updates for multi-seasons not implemented')
else:
self.llh_models[0].pdf_ratio_model = original_llh[0].pdf_ratio_model.get_updated(tmp_evs[0])
if self.concat_evs:
evs = [utils.Arrays.concatenate(evs)]
if n_sig:
n_inj = n_injs[i]
if n_inj:
ev, nx = self.sig_injs[i].inject(n_inj, seed=random.randint(2**32))
evs += ev
n_excluded += nx
if self.concat_evs:
evs = [utils.Arrays.concatenate(evs)]
evss.append(evs)
ns_excluded.append(n_excluded)
return Trial(evss, ns_excluded)
def get_one_llh_from_trial(self, evss_ns_excluded_tuple):
evss, ns_excluded = evss_ns_excluded_tuple
llh_models = self.llh_models
if len(llh_models) >= 2:
llhs = [
llh_model(evs, n_excluded)
for (llh_model, evs, n_excluded)
in izip(llh_models, evss, ns_excluded) ]
return llh.MultiLLHEvaluator(llhs)
else:
return llh_models[0](evss[0], ns_excluded[0])
def get_one_llh(self, n_sig=0, poisson=False, seed=None, TRUTH=False):
evss, ns_excluded = self.get_one_trial(
n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
return self.get_one_llh_from_trial((evss, ns_excluded))
def _flatten_result(self, ts, fitted_params, fitter_args):
#ts, fitted_params = fit[:2]
out_params = [fitted_params[param]
for param in self.param_names
if param in fitted_params]
return [ts] + out_params
[docs]
def get_one_fit_from_trial(self, evss_ns_excluded_tuple, flat=True, **fitter_args):
evss, ns_excluded = evss_ns_excluded_tuple
llh = self.get_one_llh_from_trial((evss, ns_excluded))
kw = fitter_args
for k in self.fitter_args:
kw.setdefault(k, self.fitter_args[k])
#kw = copy.deepcopy(self.fitter_args)
#kw.update(fitter_args)
fit = llh.fit(**kw)
flat = flat and '_full_output' not in fitter_args
if flat:
ts = fit[0]
fitted_params = fit[1:][0]
for i in range(1,len(fit[1:])):
fitted_params.update(fit[1:][i])
return self._flatten_result(ts, fitted_params, fitter_args)
else:
return fit
[docs]
def modify_ra(self, ra):
"""Modify source right ascension in place.
This method is used internally for efficiently computing sky scans. Users should
not need to call this method.
"""
ra = np.atleast_1d(ra)
for llh_model in self.llh_models:
llh_model.pdf_ratio_model.set_ra(ra)
return self
def format_result(self, result, names=None, omit_names=[]):
out = []
if names is None:
names = self.result_names
names = [name for name in names if name not in omit_names]
for (name, value) in izip(names, result):
line = '{:20}{}'.format(name, value)
if 'ra' in name or 'dec' in name:
line += ' ({} deg)'.format(np.degrees(value))
out.append(line)
return '\n'.join(out)
def _fits_np_to_Arrays(self, fits):
return utils.Arrays({
param_name: fits[:,i+1]
for (i, param_name) in enumerate(self.param_names)}, ts=fits[:,0], seed=fits[:,-1])
[docs]
class MultiTrialRunner(TrialRunnerBase):
[docs]
def __init__(self, ana, tr_inj, trs, bgs=None, mp_cpus=1, mp_nice=0):
super(MultiTrialRunner, self).__init__(ana)
self.tr_inj = tr_inj
self.trs, self.bgs, = trs, bgs
self.mp_cpus, self.mp_nice = mp_cpus, mp_nice
@property
def sig_inj_accs(self):
return self.tr_inj.sig_inj_accs
[docs]
def get_accs(self, **params):
return self.tr_inj.get_accs(**params)
[docs]
def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False):
return self.tr_inj.get_one_trial(
n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
def get_all_fits_from_trial(self, evss_ns_excluded_tuple, flat=True, **fitter_args):
if not flat:
raise NotImplementedError('flat=False not implemented')
fits = [
tr.get_one_fit_from_trial(
evss_ns_excluded_tuple, flat=flat, **fitter_args)
for tr in self.trs]
if self.bgs is None:
mlog10ps = np.array([
fit[0]
for fit in fits])
else:
mlog10ps = -np.log10([
bg.sf(fit[0])
for (bg, fit) in izip(self.bgs, fits)])
fits = [
np.r_[mlog10p, fit]
for (mlog10p, fit) in izip(mlog10ps, fits)]
return fits
def get_all_fits(self, n_sig=0, poisson=False, seed=None, TRUTH=False,
flat=True, arrays=False, **fitter_args):
trial = self.get_one_trial(n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
return self.get_all_fits_from_trial(trial, flat=flat, arrays=arrays, **fitter_args)
[docs]
def get_one_fit_from_trial(self, evss_ns_excluded_tuple, flat=True, **fitter_args):
# original behavior: give hottest result
# updated behavior: give all fits
fits = self.get_all_fits_from_trial(
evss_ns_excluded_tuple, flat=flat, **fitter_args)
if flat:
return list(np.concatenate(fits))
else:
return fits
def _fits_np_to_Arrays(self, fits):
out = utils.Arrays()
x = 0
for (i_tr,tr) in enumerate(self.trs):
suffix = '_{:04d}'.format(i_tr)
param_names = tr.param_names
len_result = 2 + len(param_names)
out['mlog10p'+suffix] = fits[:,x+0]
out['ts'+suffix] = fits[:,x+1]
for (i_param, param_name) in enumerate(param_names):
out[param_name+suffix] = fits[:,x+2+i_param]
x += len_result
out['seed'] = fits[:,-1]
return out
[docs]
class SkyScanTrialRunner(TrialRunnerBase):
[docs]
def __init__(self, ana, get_tr, get_selector,
fitter_args={}, param_names=None,
nside=128, get_pix=None, pixmask=None,
scan_ra=None, scan_dec=None,
min_dec=np.radians(-90), max_dec=np.radians(+90),
refine_max=False,
ts_to_p=None,
llh_kw={}, inj_kw={}, src_kw={}, src=None, sig_src=None,
mp_cpus=1, mp_scan_cpus=1, mp_nice=0, inj=False):
super(SkyScanTrialRunner, self).__init__(ana)
#self.get_llh, self.get_injs, self.get_selector = get_llh, get_injs, get_selector
self.get_tr, self.get_selector = get_tr, get_selector
self.fitter_args, self.param_names=fitter_args, param_names
self.nside, self.get_pix, self.pixmask = nside, get_pix, pixmask
self.min_dec, self.max_dec = min_dec, max_dec
self.mp_cpus, self.mp_scan_cpus, self.mp_nice = mp_cpus, mp_scan_cpus, mp_nice
self.llh_kw, self.inj_kw, self.src_kw = llh_kw, inj_kw, src_kw
self.sig_src = sig_src or src or utils.Sources(ra=0, dec=0, **self.src_kw)
self.sig_tr = self.get_tr(self.sig_src, ana=self.ana, cut_n_sigma=np.inf, inj=inj)
self.ts_to_p = ts_to_p
if scan_ra is not None and scan_dec is not None:
self.scan_ra, self.scan_dec = scan_ra, scan_dec
else:
self.scan_ra, self.scan_dec = SkyScanner.get_healpix_grid(self.nside)
@staticmethod
def get_rectangular_grid(ra, dec, wra, dra, wdec=None, ddec=None, src_deg=False,
deg=False, dec_correct=False):
if wdec is None:
wdec = wra
if ddec is None:
ddec = dra
if src_deg:
ra, dec = np.radians([ra, dec])
if deg:
wra, dra, wdec, ddec = np.radians([wra, dra, wdec, ddec])
ra_min, ra_max = ra - .5 * wra, ra + .5 * wra + 1e-5
dec_min, dec_max = dec - .5 * wdec, dec + .5 * wdec + 1e-5
RA, DEC = np.r_[ra_min:ra_max:dra], np.r_[dec_min:dec_max:ddec]
RA, DEC = np.meshgrid(RA, DEC, indexing='ij')
if dec_correct:
RA -= ra
RA /= np.cos(DEC)
RA += ra
return RA, DEC
[docs]
def sig_inj_accs(self):
raise NotImplementedError('SkyScanTrialRunner flux conversions not yet implemented')
[docs]
def get_accs(self, **params):
raise NotImplementedError('SkyScanTrialRunner flux conversions not yet implemented')
accs = []
src = utils.Sources(ra=self.scan_ra, dec=self.scan_dec)
for i in xrange(len(self._acc_params)):
acc = 0
for p in self.llh_p:
acc += p * self._acc_params[i](src, **params) * self.ana.anas[i].livetime
# TODO: account for frac during livetime
accs.append(acc)
return np.array(accs)
def _fits_np_to_Arrays(self, fits):
tr = self.sig_tr
out = tr._fits_np_to_Arrays(fits[:,1:-2])
out['mlog10p'] = fits[:,0]
out['ra'] = fits[:,-2]
out['dec'] = fits[:,-1]
return out
def _get_scanner(self, mp_cpus=None):
mp_cpus = mp_cpus or self.mp_scan_cpus
get_tr = lambda *a, **kw: self.get_tr(*a, ana=self.ana, **kw)
return SkyScanner(
get_tr, self.get_selector,
mp_cpus=mp_cpus, mp_nice=0, sig_src=self.sig_src, src_kw=self.src_kw)
[docs]
def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False, mp_cpus=None):
scanner = self._get_scanner(mp_cpus=mp_cpus)
return scanner.get_one_trial(n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
def get_one_scan_from_trial(self, trial, logging=False, mp_cpus=None,
mask=None, **fitter_args):
scanner = self._get_scanner(mp_cpus=mp_cpus)
scan_ra, scan_dec = self.scan_ra, self.scan_dec
if mask is None:
if self.get_pix is not None:
mask = np.zeros_like(scan_ra, dtype=bool)
evss = trial.evss
flat_evss = [ev for evs in evss for ev in evs]
if flat_evss:
ev = utils.Arrays.concatenate(flat_evss)
mask[self.get_pix(self.nside, ev, 3)] = True
elif self.pixmask is not None:
mask = self.pixmask
scan = scanner.get_one_scan_from_trial(
scan_ra, scan_dec, trial[:2],
min_dec=self.min_dec, max_dec=self.max_dec, mask=mask,
logging=logging, **fitter_args)[-1]
if self.ts_to_p:
mlog10p = -np.log10(self.ts_to_p(scan_dec, scan[0]))
else:
mlog10p = scan[0]
scan = np.array([mlog10p] + [s for s in scan])
return scan
def get_one_scan(self, n_sig=0, poisson=False, seed=None,
TRUTH=False, mp_cpus=None, logging=False, **fitter_args):
trial = self.get_one_trial(
n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH, mp_cpus=mp_cpus)
return self.get_one_scan_from_trial(trial, logging=logging, mp_cpus=mp_cpus, **fitter_args)
[docs]
def get_one_fit_from_trial(self, evss_ns_excluded_tuple, flat=True, logging=False, mp_cpus=None,
**fitter_args):
if not flat:
raise NotImplementedError('flat=False not supported for SkyScanTrialRunner')
scan_ra, scan_dec = np.ravel(self.scan_ra), np.ravel(self.scan_dec)
scan = self.get_one_scan_from_trial(
evss_ns_excluded_tuple, logging=logging, mp_cpus=mp_cpus, **fitter_args)
scan = np.array(list(map(np.ravel, scan)))
i_hot = np.argmax(scan[0])
# TODO: implement refine_max
#if self.refine_max:
# src = utils.Sources(ra=scan_ra[i_hot], dec=scan_dec[i_hot], **self.src_kws[i_src])
# tr_fit = self.get_tr(src, fit_source=True)
# tr = self.get_tr(src, prior=prior)
# ts, x = tr_fit.get_one_fit_from_trial(evss_ns_excluded_tuple, flat=False)
# ra, dec = x.pop('ra_0000'), x.pop('dec_0000')
# i_pix = hp.ang2pix(self.nside, pi/2 - dec, ra)
# params = tr._flatten_result(ts, x, fitter_args)[1:]
out = np.r_[scan[:,i_hot], scan_ra[i_hot], scan_dec[i_hot]]
return out
[docs]
class SpatialPriorTrialRunner(TrialRunnerBase):
[docs]
def __init__(self, ana, get_tr, get_selector,
llh_priors, inj_priors=None,
acc_param='acc_param',
prior_scan_each=False, prior_mode='stack',
fitter_args={}, param_names=None,
get_pixmask=None,
min_dec=np.radians(-90), max_dec=np.radians(+90),
refine_max=False,
mp_cpus=1, mp_scan_cpus=1, mp_nice=0,
llh_kw={}, inj_kw={}, src_kw={}, inj_src_kw={}, sig_src=None):
super(SpatialPriorTrialRunner, self).__init__(ana)
#self.acc_param = ana.acc_param
#self.get_llh, self.get_injs, self.get_selector = get_llh, get_injs, get_selector
self.get_tr, self.get_selector = get_tr, get_selector
self.llh_priors = llh_priors = np.atleast_2d(llh_priors)
if inj_priors is None:
self.inj_priors = llh_priors
elif inj_priors == 'none':
self.inj_priors = [np.ones_like(llh_priors[0])]
else:
self.inj_priors = inj_priors
self.acc_param = acc_param
self._acc_params = [getattr(a, acc_param) for a in ana]
self.prior_scan_each, self.prior_mode = prior_scan_each, prior_mode
self.fitter_args, self.param_names=fitter_args, param_names
self.nside = hp.get_nside(llh_priors[0])
if get_pixmask is True:
self.get_pixmask = utils.get_pixmask
else:
self.get_pixmask = get_pixmask
self.npix = hp.nside2npix(self.nside)
self.min_dec, self.max_dec = min_dec, max_dec
self.refine_max = refine_max
self.llh_kw, self.inj_kw, self.src_kw = llh_kw, inj_kw, src_kw
self.mp_cpus, self.mp_scan_cpus, self.mp_nice = mp_cpus, mp_scan_cpus, mp_nice
n_src = self.llh_priors.shape[0]
self.src_kws = [{key: src_kw[key][i] for key in src_kw} for i in xrange(n_src)]
self._prep_priors()
def _prep_priors(self):
llh_p = []
llh_prior_term = []
llh_mask = []
for p in self.llh_priors:
llh_p.append(p / np.sum(p))
pt = np.where(p > 0, p, 1e-10 * np.min(p[p>0]))
pt = pt / np.sum(pt) / hp.nside2pixarea(self.nside)
pt = np.log(pt)
pt = pt - np.max(pt)
llh_prior_term.append(pt)
llh_mask.append(p > 0)
inj_p = []
for p in self.inj_priors:
inj_p.append(p / np.sum(p))
self.llh_p, self.inj_p = np.array(llh_p), np.array(inj_p)
self.llh_prior_term, self.llh_mask = np.array(llh_prior_term), np.array(llh_mask)
self.llh_mask_all = np.sum(self.llh_mask, axis=0) > 0
self.scan_ra, self.scan_dec = SkyScanner.get_healpix_grid(self.nside)
[docs]
def sig_inj_accs(self):
raise TypeError(
'SpatialPriorTrialRunner has no well-defined injector acceptance; specify the flux')
[docs]
def get_accs(self, **params):
accs = []
src = utils.Sources(ra=self.scan_ra, dec=self.scan_dec)
for i in xrange(len(self._acc_params)):
acc = 0
for p in self.llh_p:
acc += p * self._acc_params[i](src, **params) * self.ana.anas[i].livetime
# TODO: account for frac during livetime
accs.append(acc)
return np.array(accs)
[docs]
def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False, mp_cpus=None):
random = get_random(seed)
pix_src = np.array([random.choice(self.npix, p=p) for p in self.inj_p])
theta, ra = hp.pix2ang(self.nside, pix_src)
dec = pi/2 - theta
src = utils.Sources(ra=ra, dec=dec, **self.src_kw)
tr = self.get_tr(src, ana=self.ana, cut_n_sigma=np.inf, inj=n_sig > 0)
trial_base = tr.get_one_trial(n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
return SpatialPriorTrial(*(trial_base + (ra, dec)))
def _fits_np_to_Arrays(self, fits):
out = dict(ts=fits[:,0])
fits = fits[:,1:]
tr = self.get_tr(utils.Sources(ra=0, dec=0, **self.src_kw), ana=self.ana, inj=False)
param_names = tr.param_names
n_params = 3 + len(param_names)
for i in xrange(len(self.llh_p)):
k = '_{:04d}'.format(i)
f = fits[:, i*n_params:(i+1)*n_params]
#print(f)
o = tr._fits_np_to_Arrays(f)
for key in o.keys():
out[key + k] = o[key]
out['ra' + k] = f[:,-2]
out['dec' + k] = f[:,-1]
return utils.Arrays(out)
def _get_scanner(self, mp_cpus=None):
# TODO: why is this here?
mp_cpus = mp_cpus or self.mp_scan_cpus
get_tr = lambda *a, **kw: self.get_tr(*a, ana=self.ana, **kw)
return SkyScanner(
get_tr, self.get_selector,
mp_cpus=mp_cpus, mp_nice=0)
def _get_sky_scan_tr(self, mask=None, mp_cpus=None):
mp_cpus = mp_cpus or self.mp_scan_cpus
if mask is None:
get_pixmask = self.get_pixmask
elif self.get_pixmask is None:
get_pixmask = lambda *a: mask
else:
get_pixmask = lambda *a: np.intersect1d(np.where(mask)[0], self.get_pixmask(*a))
return SkyScanTrialRunner(
#self.ana, self.get_llh, self.get_injs, self.get_selector,
self.ana, self.get_tr, self.get_selector,
fitter_args=self.fitter_args, param_names=self.param_names,
nside=self.nside, get_pix=get_pixmask,
min_dec=self.min_dec, max_dec=self.max_dec,
mp_cpus=self.mp_cpus, mp_scan_cpus=mp_cpus, mp_nice=self.mp_nice,
llh_kw=self.llh_kw, inj_kw=self.inj_kw, src_kw=self.src_kw)
def get_one_scan_from_trial(self, trial, logging=False, mp_cpus=None, **fitter_args):
if self.prior_scan_each:
raise NotImplementedError('prior_scan_each not yet implemented')
else:
mask = self.llh_mask_all
scan_tr = self._get_sky_scan_tr(mask=mask, mp_cpus=mp_cpus)
scan = scan_tr.get_one_scan_from_trial(
trial, logging=logging, mp_cpus=mp_cpus, **fitter_args)
# drop p-value
scan = scan[1:]
return scan
def get_one_scan(self, n_sig=0, poisson=False, seed=None,
TRUTH=False, mp_cpus=None, logging=False, **fitter_args):
trial = self.get_one_trial(
n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH, mp_cpus=mp_cpus)
return self.get_one_scan_from_trial(trial, logging=logging, mp_cpus=mp_cpus, **fitter_args)
[docs]
def get_one_fit_from_trial(self, trial, flat=True, logging=False, mp_cpus=None,
**fitter_args):
scan = self.get_one_scan_from_trial(trial, logging=logging, mp_cpus=mp_cpus, **fitter_args)
# this effectively modulates the ts with each of the spatial prior function
scan_ts = np.array([scan[0] + 2 * pt for pt in self.llh_prior_term])
# find the index of the hottest pixel for each spatial prior
i_hots = np.array([np.argmax(sts) for sts in scan_ts])
scan_ra, scan_dec = self.scan_ra, self.scan_dec
out = []
for (i_src, i_hot) in enumerate(i_hots):
if self.refine_max:
src = utils.Sources(ra=scan_ra[i_hot], dec=scan_dec[i_hot], **self.src_kws[i_src])
tr_fit = self.get_tr(src, fit_source=True)#ana=self.ana, src=src, fit_source=True)
prior = lambda *a, **p: self.llh_prior_term[
i_src, hp.ang2pix(self.nside, pi/2-p['dec_0000'], p['ra_0000'])]
tr = self.get_tr(src, prior=prior)#ana=self.ana, src=src, prior=prior)
ts, x = tr_fit.get_one_fit_from_trial(evss_ns_excluded_tuple, flat=False)#trial[:2])
ra, dec = x.pop('ra_0000'), x.pop('dec_0000')
i_pix = hp.ang2pix(self.nside, pi/2 - dec, ra)
ts = ts + 2 * self.llh_prior_term[i_src,i_pix]
params = tr._flatten_result(ts, x, fitter_args)[1:]
else:
ts = scan_ts[i_src,i_hot]
params = scan[1:,i_hot]
ra, dec = scan_ra[i_hot], scan_dec[i_hot]
out.append(np.r_[ts, params, ra, dec])
out = np.array(out)
if self.prior_mode == 'stack':
out_ts = np.sum(out[:,0])
else:
i_hottest = np.argmax(out[:,0])
out_ts = out[i_hottest,0]
# first index of return is either 'stack'-ed or hottest ts of all the different priors
# the rest are the individual fit results for ns, gamma, ra, dec for each prior
return np.r_[out_ts, np.ravel(out)]
[docs]
class MultiflareTrialRunner(TrialRunnerBase):
[docs]
def __init__(self, ana, srclist, tr_all, conf_doc = {}, filter_srcs=True, muonflag=False, threshold=1, dt_max=np.inf, only_max=False,
mp_cpus=1, fitter_args={}):
"""Construct a MultiflareTrialRunner. """
super(MultiflareTrialRunner, self).__init__(ana)
self.muonflag = muonflag
self.srclist = srclist
self.tr_all = tr_all
self.threshold = threshold
self.max_dt = dt_max
self.only_max = only_max
self.mp_cpus = mp_cpus
self.fitter_args = copy.deepcopy(fitter_args)
self.fitter_args.setdefault('gamma', np.r_[1, 1:4.01:.5, 4])
mjd_min, mjd_max = np.inf, -np.inf
self.ana = ana
for a in ana.anas:
mjd_min = min(mjd_min, a.mjd_min)
mjd_max = max(mjd_max, a.mjd_max)
self.mjd_min, self.mjd_max = mjd_min, mjd_max
self.sample_start = ana.mjd_min
self.sample_stop = ana.mjd_max
self.conf_doc = conf_doc
from csky.conf import get_trial_runner
if filter_srcs==True:
tr_boxes = []
for src in self.srclist:
tr_boxes.append(get_trial_runner(conf_doc, src=src, ana=ana))
self.tr_boxes = tr_boxes
@property
def sig_inj_accs(self):
return np.array([sig_inj.acc_total for sig_inj in self.sig_injs])
[docs]
def get_accs(self, **params):
return np.array([
llh_model.acc_model.get_acc_total(**params)
for llh_model in self.llh_models])
[docs]
def get_one_fit(self, Am=0, Io=2.0, alpha=3.0, deltaT=100., poisson=False, seed=None, TRUTH=False,
flat=True, replace_evts=True, **fitter_args):
evss_ns_excluded_tuple = self.get_one_trial(
Am=Am, Io=Io, alpha=alpha, deltaT=deltaT, poisson=poisson, seed=seed, TRUTH=TRUTH, replace_evts=replace_evts)
return self.get_one_fit_from_trial(evss_ns_excluded_tuple, flat=flat, **fitter_args)
def howmanyflares(self, Am, alpha, Io):
Am = int(Am)
Nflares = 0
while Nflares == 0:
y = np.random.uniform(low=0., high=1., size=Am)
invcdf = Io*(1.-y)**(1./(1.-alpha))
srcevts = invcdf
fevts = []
for nevts in srcevts:
if nevts >= 1.:
lowf = np.floor(nevts)
highf = np.ceil(nevts)
pdiff = nevts-lowf
p = np.random.uniform(low=0.,high=1.)
if p <= pdiff:
fevts.append(int(highf))
if p > pdiff:
fevts.append(int(lowf))
if nevts < 1.:
p = np.random.uniform(low=0., high=1.)
if p < nevts:
fevts.append(1)
Nflares = len(fevts)
return Nflares, fevts
def howmanyflares_poisson(self, Am, alpha, Io):
Am = int(Am)
Nflares = 0
y = np.random.uniform(low=0., high=1., size=Am)
invcdf = Io*(1.-y)**(1./(1.-alpha))
srcevts = invcdf
fevts = []
nevents=np.random.poisson(srcevts,(1,Am)).flatten()
print("flares", nevents)
return len(nevents),nevents
def times_to_inject(self, Am, alpha, Io, tlow, thigh, srclist, deltaT, poisson=False):
if poisson==True:
numflares, srcevts = self.howmanyflares_poisson(Am, alpha, Io)
else:
numflares, srcevts = self.howmanyflares(Am, alpha, Io)
if deltaT==None:
logwidths = np.random.uniform(low=0., high=2., size=numflares)
srcwidths = 10.**logwidths
else:
srcwidths = deltaT*np.ones(numflares)
possibletimes = []
for k in range(0,len(srclist)):
possibletimes_src = np.arange(tlow, thigh, 0.01)
possibletimes.append(possibletimes_src)
injflares = []
for k in range(0, numflares):
whichsrc = int(np.random.uniform(low=0, high=len(srclist)))
twidth = float(srcwidths[k])
possibletimes_src = possibletimes[whichsrc]
if len(possibletimes_src)==0:
opensources = []
for i in range(0,len(srclist)):
if len(possibletimes[i])>0:
opensources.append(i)
if len(opensources)==0:
print("no open sources, too many injected flares and not enough places to put them")
sys.exit()
whichsrc = np.random.choice(opensources)
possibletimes_src = possibletimes[whichsrc]
randtime = np.random.choice(possibletimes_src)
possibletimes_src = possibletimes_src[abs(possibletimes_src-randtime)>twidth*2.]
possibletimes[whichsrc] = possibletimes_src
tcen = randtime
twidth = float(srcwidths[k])
num_evt = srcevts[k]
whichsrc = int(np.random.uniform(low=0, high=len(srclist)))
times = np.random.uniform(low=tcen-twidth, high=tcen+twidth, size=num_evt)
injflares.append([times, whichsrc])
return injflares
[docs]
def get_one_trial(self, Am=0, Io=2.0, alpha=3.0, injflares=None, deltaT=None, t0=None, n_sig=0, poisson=False, seed=None, TRUTH=False, replace_evts=False, save_evts=False):
if n_sig:
Am = n_sig['Am']
Io = n_sig['Io']
alpha = n_sig['alpha']
tr_boxes = []
if injflares!=None:
from csky.conf import get_trial_runner
filtered_srcs = self.srclist
tr_boxes = []
for src in filtered_srcs:
tr_boxes.append(get_trial_runner(self.conf_doc, src=src))
elif Am != 0:
from csky.conf import get_trial_runner
injflares = self.times_to_inject(Am, alpha, Io, self.sample_start, self.sample_stop, self.srclist, deltaT, t0=t0, poisson=poisson)
print("injflares", injflares)
filtered_injflares = [f for f in injflares if len(f[0])!=0]
src_inds = [f[1] for f in filtered_injflares]
if len(src_inds)>0:
src_inds = np.unique(src_inds)
print("src_inds", src_inds)
filtered_srcs = self.srclist[src_inds]
tr_boxes = []
for src in self.srclist:
tr_boxes.append(get_trial_runner(self.conf_doc, src=src))
n_injs = []
n_inj_evts = []
elif Am == 0:
injflares = []
n_injs=[0 for a in self.ana.anas]
evss = []
ns_excluded = []
srctimes = []
evss_modified = []
for srcind in range(0,len(self.srclist)):
srcinds = [f[1] for f in injflares]
if srcind in srcinds:
times = np.concatenate([f[0] for f in injflares if f[1]==srcind])
else:
times = []
srctimes.append(times)
for (i, key) in enumerate(self.keys):
if TRUTH:
evs, n_excluded = self.tr_all.truth_injs[i].inject()
evss.append(evs)
ns_excluded.append(n_excluded)
else:
evs_inj = []
evs_base = []
evs, n_excluded = self.tr_all.bg_injs[i].inject(seed=seed)
evs_base += evs
for srcind in range(0,len(tr_boxes)):
srcid = srcinds[srcind]
times = np.array(srctimes[srcid])
inseason = times[times>self.ana.anas[i].mjd_min]
inseason = inseason[inseason<self.ana.anas[i].mjd_max]
if len(inseason)>0:
n_inj = len(inseason)
if n_inj:
ev, nx = tr_boxes[srcind].sig_injs[i].inject(n_inj, seed=seed)
ev[0]['mjd'] = inseason
evs += ev
evs_inj += ev
n_excluded += nx
if len(evs_inj)>0:
evs_inj = utils.Arrays.concatenate(evs_inj)
if len(evs_inj)>=1 and replace_evts==True:
evs_base = evs_base[0]
evs_base_dec = evs_base[(evs_base['dec']<max(evs_inj['dec'])+0.01) & (evs_base['dec']>min(evs_inj['dec'])-0.01)]
for evt in evs_inj:
diffs = np.abs(evs_base_dec['log10energy']-evt['log10energy'])
eid = evs_base_dec[np.argmin(diffs)]['idx'][0]
evs_base = evs_base[evs_base['idx']!=eid]
evs_base_dec = evs_base_dec[evs_base_dec['idx']!=eid]
evs_base += evs_inj
evs_base = [utils.Arrays.concatenate(evs_base)]
evss.append(evs_base)
ns_excluded.append(n_excluded)
print("finished making trial")
return evss, ns_excluded
[docs]
def get_one_fit_from_trial(self, evss_ns_excluded_tuple,
flat=True,
decorrelate=True,
logging=False, **fitter_args):
# TODO: break this down!
def dist_mask(arr, src, cut_deg):
out = np.zeros(len(arr), dtype=bool)
ras = arr['ra']
decs = arr['dec']
psi = np.transpose(coord.delta_angle(decs, ras, src.dec, src.ra, latlon=True))[0]
out = psi < np.radians(cut_deg)
return out
def get_SB_mjd_evs_boxmode(L, gamma=2):
all_SB = []
all_mjd = []
evs = []
for e in L.evaluators:
print("type of evaluator is", type(e))
if type(e) == pdf.MultiPDFRatioEvaluator:
if logging:
print("This evaluator has separate space and energy components")
self.KDE = False
se = e.evaluators[0].space_evaluator
te = e.evaluators[0].time_evaluator
box_mode = te.model.box_mode
ee = e.evaluators[1]
evs.append(se.final_ev)
Ss = se()[0]
if gamma > 0:
Se = ee(gamma=gamma)[0]
else:
Se = np.max([ee(gamma=g)[0] for g in np.r_[1:4.01:.25] ], axis=0)
gamma = -gamma
SB = Ss * Se
all_SB.append(SB)
all_mjd.append(se.final_ev.mjd)
elif type(e) == pdf.SpaceTimePDFRatioEvaluator:
if logging:
print("this evaluator only has space and time components")
self.KDE = True
se = e.space_evaluator
te = e.time_evaluator
box_mode = te.model.box_mode
close_mask = dist_mask(se.ev, se.src, 15.)
evs.append(se.ev)
if gamma > 0:
SB = se(gamma=gamma, _mask=[close_mask])[0]
else:
SB = np.max([se(gamma=g, _mask=[close_mask])[0] for g in np.r_[1:4.01:.25] ], axis=0)
all_SB.append(SB)
all_mjd.append(se.ev.mjd)
else:
print("Type of evaluator is:")
print(type(e))
sys.stderr.write("MultiflareTrialRunner doesn't know what to do with this evaluator type. Exiting")
sys.exit(1)
SB = np.concatenate(all_SB)
mjd = np.concatenate(all_mjd)
order = np.argsort(mjd)
all_SB, all_mjd = SB, mjd = SB[order], mjd[order]
mask = SB > self.threshold
SB, mjd = SB[mask], mjd[mask]
#all_ev = sum(all_ev[1:], all_ev[0])
#all_ev = utils.Arrays(np.concatenate(all_ev))
#print (np.sort(mjd))
return SB, mjd, evs, box_mode
def decorrelate_windows(x):
# x: [ts, ns, gamma, t1, t2]
x = np.atleast_2d(x)
if not x.size:
return x
x = x[np.argsort(x[:,0])[::-1]]
keep_starts_ends = x[0,-2:]
keep_idx = [0]
for i in range(1, len(x)):
candidate_start, candidate_end = x[i,-2], x[i,-1]
i_start, i_end = np.searchsorted(keep_starts_ends, (candidate_start, candidate_end))
good = ((i_start % 2) == 0) and (i_start == i_end)
good2 = ((i_start % 2) == 0) and (i_start + 1 == i_end) and (i_start in keep_starts_ends)
#good_start = (i_start == i_end == 0)
#good_end = (i_start == i_end == keep_starts_ends.size)
#print(i, keep_starts_ends)
#print(i, x[i], ':', i_start, i_end)
#print(i, good_inner, good_start, good_end, '=>', good_inner or good_start or good_end)
if good or good2:
keep_starts_ends = np.sort(np.r_[keep_starts_ends, candidate_start, candidate_end])
keep_idx.append(i)
out = x[keep_idx]
return out
def decorrelate_windows_loop(x):
# no work if empty
x = np.atleast_2d(x)
if not x.size:
return x
def is_non_overlapping(testflare, goodflares):
ts, te = testflare[-2:]
for goodflare in goodflares:
gs, ge = goodflare[-2:]
if not (ts - ge >= 0 or te - gs <= 0):
return False
return True
# sort by descending TS
x = x[np.argsort(x[:,0])[::-1]]
# keep best
goodflares = [x[0]]
# try to keep others
for testflare in x:
if is_non_overlapping(testflare, goodflares):
goodflares.append(testflare)
return np.atleast_2d(goodflares)
def do_one_multiflare(L, trbox):
results = []
start_time = datetime.datetime.now()
# make trial
sb, mjd, evs, box_mode = get_SB_mjd_evs_boxmode(L, gamma=2)
mjd1, mjd2 = map(np.ravel, np.meshgrid(mjd, mjd))
sb1, sb2 = map(np.ravel, np.meshgrid(sb, sb))
sbsb = sb1 * sb2
dmjd = mjd2 - mjd1
mask = (0 < dmjd) & (dmjd < self.max_dt)
mjd1, mjd2, dmjd, sbsb = mjd1[mask], mjd2[mask], dmjd[mask], sbsb[mask]
# handle pairs
N_pair = mjd1.size
if self.KDE:
se = L.evaluators[0].space_evaluator
close_mask = dist_mask(se.ev, se.src, 15.)
else:
pass
print("There are", N_pair, "windows")
for i_pair in range(N_pair):
if logging:
if (i_pair+1) % 100 == 0 or i_pair == N_pair - 1:
print('\r{:5} / {:5} ...'.format(i_pair + 1, mjd1.size), end='')
sys.stdout.flush()
t1, t2 = mjd1[i_pair], mjd2[i_pair]
dt = t2 - t1
if box_mode == 'pre':
t0 = t2
elif box_mode == 'post':
t0 = t1
else:
t0 = t1 + 0.5 * dt
#dt += 1e-10
tw_pair_masks = [(t1 <= ev.mjd) & (ev.mjd <= t2) for ev in evs]
fa = copy.deepcopy(trbox.fitter_args)
fa.update(self.fitter_args)
fa.update(dict(t0=t0, dt=dt))
fa.update(fitter_args)
fa['seeder'] = None
fa['_full_output'] = True
if self.KDE:
fit_masks = [np.array(tw_pair_masks[0] & close_mask)]
if np.any(fit_masks[0]):
ts, params, x_fixed, x_options, other_output = L.fit(_masks=fit_masks, **fa)
ts_no_prior = other_output['ts_no_prior']
result = trbox._flatten_result(ts, params, fa)
if (result[0] <= 0) or (result[1] <= 0):
continue
result += [ts_no_prior, t1, t2]
results.append(result)
else:
ts, params, x_fixed, x_options, other_output = L.fit(_masks=tw_pair_masks, **fa)
ts_no_prior = other_output['ts_no_prior']
result = trbox._flatten_result(ts, params, fa)
if (result[0] <= 0) or (result[1] <= 0):
continue
result += [ts_no_prior, t1, t2]
results.append(result)
end_time = datetime.datetime.now()
if logging:
print ('', end_time - start_time, 'elapsed.')
results = np.atleast_2d(results)
if decorrelate:
results = decorrelate_windows_loop(results)
return results, mjd1.size
src_results = {}
for (src_index, trbox) in enumerate(self.tr_boxes):
box_src = trbox.llh_kw['src']
if 'name' in box_src:
src_id = box_src['name'][0]
else:
src_id = 'src_{:04d}'.format(src_index)
if self.muonflag:
modified_evss = copy.deepcopy(evss_ns_excluded_tuple)
for k in range(0,len(evss_ns_excluded_tuple[0])):
ev_mask = (evss_ns_excluded_tuple[0][k][0].event == src_id)
modified_evss[0][k][0] = evss_ns_excluded_tuple[0][k][0][~ev_mask]
ML = trbox.get_one_llh_from_trial(modified_evss)
else:
ML = trbox.get_one_llh_from_trial(evss_ns_excluded_tuple)
if hasattr(ML, 'llhs'):
Ls = ML.llhs
else:
Ls = [ML]
out = []
results = []
tw_count = 0
for L in Ls:
these_results, these_twcount = do_one_multiflare(L, trbox)
results.append(these_results)
tw_count += these_twcount
if len(results) == 1:
results = results[0]
else:
if np.any(list(map(np.size, results))):
results = np.concatenate ([r for r in results if r.size > 0])
else:
results = results[0]
# compile final result
if results.size:
results = results[results[:,0] > 0]
results = results[np.argsort(results[:,0])[::-1]]
if self.only_max:
results = results[-1:]
ts = np.sum(results[:,0])
ns_total = np.sum(results[:,1])
gamma_mean = np.mean(results[:,2])
ts_no_prior = np.sum(results[:,3])
n_flare = len(results)
n_flare_tested = tw_count
final_result = np.r_[ts, ns_total, gamma_mean, n_flare, n_flare_tested]
else:
ts, ns_total, gamma_mean = 0, 0, 2.5
ts_no_prior = 0
n_flare, n_flare_tested = 0, tw_count
x = results
if results.size:
flares = Flares(self.mjd_min, self.mjd_max, utils.Arrays(
ts=x[:,0], ns=x[:,1], gamma=x[:,2], ts_no_prior=x[:,3],
mjd_start=x[:,-2], mjd_end=x[:,-1], dmjd=x[:,-1]-x[:,-2],
src_index=np.repeat(src_index, len(results))))
else:
flares = Flares(self.mjd_min, self.mjd_max, utils.Arrays(
ts=[], ns=[], gamma=[], ts_no_prior=[], mjd_start=[], mjd_end=[], dmjd=[],
src_index=[]))
final_result = np.array(
[ts, ns_total, gamma_mean, ts_no_prior, n_flare, n_flare_tested, flares],
dtype=object)
src_results[src_id] = final_result
if flat:
flares = Flares.concatenate([result[-1] for result in src_results.values()])
flares.n_src = len(self.tr_boxes)
n_tested = np.sum([result[-2] for result in src_results.values()])
flares.arrays = flares.arrays[np.argsort(flares.arrays.src_index)]
final_result = np.array(
[np.sum(flares.ts), np.sum(flares.ns),
np.mean(flares.gamma) if len(flares) else 0,
np.sum(flares.ts_no_prior), len(flares), n_tested, flares]
)
return final_result
else:
return src_results
#if get_all:
# x = results
# if results.size:
# return final_result, utils.Arrays(
# ts=x[:,0], ns=x[:,1], gamma=x[:,2],
# mjd_start=x[:,3], mjd_end=x[:,4], dmjd=x[:,4]-x[:,3])
# else:
# return final_result, utils.Arrays(
# ts=[], ns=[], gamma=[], mjd_start=[], mjd_end=[])
#else:
# return final_result
def modify_ra(self, ra):
ra = np.atleast_1d(ra)
if len(self.tr_boxes) != 1:
raise TypeError('modify_ra() not intended for use with multi-source MultiflareTrialRunner')
self.tr_boxes[0].modify_ra(ra)
return self
def to_hist(self, lightcurve, key='ts'):
ana = self.ana
mjd_min, mjd_max = ana.anas[0].mjd_min, ana.anas[-1].mjd_max
lightcurve = lightcurve[np.argsort(lightcurve.mjd_start)]
bins = np.vstack((lightcurve.mjd_start, lightcurve.mjd_end)).T.ravel()
values = np.vstack((lightcurve[key], np.zeros_like(lightcurve.ts))).T.ravel()
bins = np.r_[mjd_min, bins, mjd_max]
values = np.r_[0, values]
#idx = np.where(np.diff(bins) > 0)[0]
#bins = np.r_[bins[0], bins[idx]]
#values = values[idx]
return hl.Hist(bins, values)
def _fits_np_to_Arrays(self, fits):
try:
out = utils.Arrays(
ts=np.asarray(fits[:, 0], dtype=float),
ns_total=np.asarray(fits[:, 1], dtype=float),
gamma_mean=np.asarray(fits[:, 2], dtype=float),
ts_no_prior=np.asarray(fits[:, 3], dtype=float),
n_flare=np.asarray(fits[:, -3], dtype=int),
n_flare_tested=np.asarray(fits[:, -2], dtype=int),
flares=fits[:, 5],
)
except:
print(fits)
raise
return out
def format_result(self, result, names=None, omit_names=[]):
out = []
if names is None:
names = 'ts ns_total gamma_mean ts_no_prior n_flare n_flare_tested'.split()
names = [name for name in names if name not in omit_names]
for (name, value) in izip(names, result):
line = '{:20}{}'.format(name, value)
if 'ra' in name or 'dec' in name:
line += ' ({} deg)'.format(np.degrees(value))
out.append(line)
out.append(str(result[-1]))
return '\n'.join(out)
class Flares(object):
def __init__(self, mjd_min, mjd_max, arrays, n_src=1):
"""Construct a Flares.
:type arrays: utils.Arrays
:param arrays: arrays of flare data
"""
self.mjd_min, self.mjd_max = mjd_min, mjd_max
self.n_src = n_src
self.arrays = arrays
def __repr__(self):
return 'Flares({} flares)'.format(len(self))
def __str__(self):
out = ['Flares(']
a = self.arrays
for i in xrange(len(a)):
s1 = ' src, TS, ns, gamma, TS0 = {:3d}, {:7.3f}, {:6.2f}, {:4.2f} {:6.2f}'.format(
int(a.src_index[i]), a.ts[i], a.ns[i], a.gamma[i], a.ts_no_prior[i])
s2 = ' | [{:.6f} -> {:.6f}] ({:11.6f})'.format(
a.mjd_start[i], a.mjd_end[i], a.dmjd[i])
out.append(s1 + s2)
out.append(')')
return '\n'.join(out)
def __len__(self):
return len(self.arrays)
@property
def ts(self):
return self.arrays.ts
@property
def ts_no_prior(self):
return self.arrays.ts_no_prior
@property
def ns(self):
return self.arrays.ns
@property
def gamma(self):
return self.arrays.gamma
@property
def mjd_start(self):
return self.arrays.mjd_start
@property
def mjd_end(self):
return self.arrays.mjd_end
@property
def dmjd(self):
return self.arrays.dmjd
@property
def src_index(self):
return self.arrays.src_index
def to_hist(self, key):
mjd_min, mjd_max = self.mjd_min, self.mjd_max
lightcurve = self.arrays[np.argsort(self.arrays.mjd_start)]
bins = np.vstack((lightcurve.mjd_start, lightcurve.mjd_end)).T.ravel()
values = np.vstack((lightcurve[key], np.zeros_like(lightcurve.ts))).T.ravel()
bins = np.r_[mjd_min, bins, mjd_max]
values = np.r_[0, values]
return hl.Hist([bins], values)
@property
def ts_hist(self):
return self.to_hist('ts')
@property
def ts_no_prior_hist(self):
return self.to_hist('ts_no_prior')
@property
def ns_hist(self):
return self.to_hist('ns')
@property
def gamma_hist(self):
return self.to_hist('gamma')
def per_src_dict(self, srclist=None):
flares = {
i: Flares(self.mjd_min, self.mjd_max, self.arrays[self.arrays.src_index==i])
for i in range(self.n_src)}
out = {
i: [f.ts.sum(), f.ns.sum(), f.gamma.mean(), len(f), f]
for (i, f) in flares.items()
}
if srclist is None:
return out
else:
return {srclist[i]: out[i] for i in out}
@staticmethod
def concatenate(seq):
self = seq[0]
arrayss = [flares.arrays for flares in seq]
return Flares(self.mjd_min, self.mjd_max, utils.Arrays.concatenate(arrayss))
class SkyScanner(object):
def __init__(self, get_tr, get_selector, mp_cpus=1, mp_nice=0, sig_src=None, src_kw={}):
self.get_tr, self.get_selector = get_tr, get_selector
self.src_kw = src_kw
self.mp_cpus, self.mp_nice = mp_cpus, mp_nice
if sig_src is None:
self.sig_src = utils.Sources(ra=0, dec=0, **src_kw)
else:
self.sig_src = sig_src
def get_one_trial(self, src=None, n_sig=0, poisson=False, seed=None, TRUTH=False):
if src is None:
src = self.sig_src
tr_inj = self.get_tr(src, cut_n_sigma=np.inf, inj=True if n_sig else False)
evss, ns_excluded = tr_inj.get_one_trial(
n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
return evss, ns_excluded
def select_from_trial(self, selector, evss_ns_excluded_tuple, _copy=False):
evss, ns_excluded = evss_ns_excluded_tuple
new_evss, new_ns_excluded = [], []
for (evs, n_excluded) in izip(evss, ns_excluded):
new_evs = []
new_n_excluded = n_excluded
for ev in evs:
new_ev = selector(ev)
if _copy:
new_ev['ra'] = new_ev.ra.copy()
new_ev['dec'] = new_ev.dec.copy()
new_ev['sigma'] = new_ev.sigma.copy()
new_evs.append(new_ev)
new_n_excluded += len(ev) - len(new_ev)
new_evss.append(new_evs)
new_ns_excluded.append(new_n_excluded)
return new_evss, new_ns_excluded
def _log_points(self, n_complete, n_total, done=False):
print('\r {:10d}/{} coordinates complete.{}'.format(
n_complete, n_total, ' ' if done else '..'), end='\n' if done else '')
sys.stdout.flush()
def get_partial_scan_from_trial(self, scan_ra, scan_dec, evss_ns_excluded_tuple,
n_points=None, queue=None, i_job=0, progress=None, logging=True,
**fitter_args):
orig_evss, orig_ns_excluded = evss, ns_excluded = evss_ns_excluded_tuple
unique_decs = np.unique(scan_dec)
initial_src_kw = {
key: np.repeat(value, len(unique_decs))
for (key, value) in self.src_kw.items()
}
initial_src = utils.Sources(dec=unique_decs, **initial_src_kw)
initial_selector = self.get_selector(initial_src)
evss, ns_excluded = self.select_from_trial(initial_selector, (evss, ns_excluded), _copy=True)
# TODO: why doesn't initial exclusion help more?
logging = logging and (progress==None)
log_points = self._log_points if logging else (lambda *a,**kw: None)
if n_points is None:
n_points = len(scan_dec)
if progress is None:
progress = [0]
scan_results = []
last_sdec = scan_dec[0]
src = utils.Sources(ra=0, dec=last_sdec, **self.src_kw)
tr = self.get_tr(src, inj=False)
selector = self.get_selector(src)
e, n = self.select_from_trial(selector, (evss, ns_excluded))
i_complete = 0
log_points(i_complete, n_points)
try:
for (i_point, (sra, sdec)) in enumerate(izip(scan_ra, scan_dec)):
if progress[i_job] < 0:
break
if sdec != last_sdec:
if logging:
log_points(i_complete, n_points)
src = utils.Sources(ra=0, dec=sdec, **self.src_kw)
tr = self.get_tr(src, inj=False)
selector = self.get_selector(src)
last_sdec = sdec
e, n = self.select_from_trial(selector, (evss, ns_excluded))
next_result = tr.modify_ra(sra).get_one_fit_from_trial((e, n), **fitter_args)
scan_results.append(next_result)
i_complete += 1
progress[i_job] = i_complete
except KeyboardInterrupt:
raise
progress[i_job] = -1
log_points(i_complete, n_points, done=True)
out = np.array(scan_results)
if queue is None:
return out
else:
queue.put((i_job, out))
def get_one_scan_from_trial(self, scan_ra, scan_dec, evss_ns_excluded_tuple,
min_dec=np.radians(-90), max_dec=np.radians(90),
cut_n_sigma=5,mask=None, mp_cpus=None,
logging=True, **fitter_args):
evss, ns_excluded = evss_ns_excluded_tuple
if mp_cpus is None:
mp_cpus = self.mp_cpus
if len(scan_dec.shape) == 2:
transpose = (scan_dec[0,0] == scan_dec[1,0])
else:
transpose = False
if mask is None:
flat_mask = None
if transpose:
flat_ra = np.ravel(scan_ra.T)
flat_dec = np.ravel(scan_dec.T)
if mask is not None:
flat_mask = np.ravel(mask.T)
else:
flat_ra = np.ravel(scan_ra)
flat_dec = np.ravel(scan_dec)
if mask is not None:
flat_mask = np.ravel(mask)
do_idx = (min_dec <= flat_dec) & (flat_dec <= max_dec)
if flat_mask is not None:
do_idx &= flat_mask
# If very few events, restrict do_idx further, only select pixels
# close enough to the evss
if( evss[0] ):
all_evs = evss[0][0][:0]
for ev_list in evss:
for evs in ev_list:
all_evs += evs
if( len(all_evs)<100 ): # todo: set max N_evss as function parameter
delta_angle_pix_evss = coord.delta_angle(
flat_dec, flat_ra, all_evs.dec, all_evs.ra, latlon=True)
min_sigma_per_pix = np.amin(
delta_angle_pix_evss/all_evs.sigma, axis=1)
evs_mask = min_sigma_per_pix<cut_n_sigma
do_idx &= evs_mask
if not np.sum(do_idx):
do_idx[np.where(flat_dec <= max_dec)[0][0]] = True
do_ra, do_dec = flat_ra[do_idx], flat_dec[do_idx]
n_points, n_all_points = len(do_dec), len(flat_dec)
if logging:
print('Scanning {} locations using {} cores:'.format(n_points, mp_cpus))
if mp_cpus == 1:
scan = self.get_partial_scan_from_trial(
do_ra, do_dec, (evss, ns_excluded), logging=logging, **fitter_args)
else:
# set up get_partial_scan_from_trial arguments
mp_cpus = min(mp_cpus, n_points)
x = np.array_split(np.arange(n_points), mp_cpus)
n_jobs = len(x)
queue = mp.Queue()
progress = mp.Array('i', np.zeros(n_jobs, dtype=int))
args = [
(do_ra[x[i]], do_dec[x[i]], (evss, ns_excluded))
for i in xrange(n_jobs)]
kwargs = [
dict(n_points=n_points, queue=queue, i_job=i, progress=progress, **fitter_args)
for i in xrange(n_jobs)]
# start processes
procs = [
mp.Process(target=self.get_partial_scan_from_trial,
args=args[i], kwargs=kwargs[i])
for i in xrange(n_jobs)]
for proc in procs:
proc.start()
# monitor progress, try to catch interrupts
finished = False
e = None
try:
n_complete = np.sum(progress)
while n_complete < n_points:
if np.any(np.array(progress) < 0):
raise KeyboardInterrupt()
if logging:
self._log_points(n_complete, n_points)
n_complete = np.sum(progress)
time.sleep(1 if logging else .25)
finished = True
except KeyboardInterrupt as e:
# signal to jobs that it's time to stop
for i in xrange(n_jobs):
progress[i] = -2
#print('\nKeyboardInterrupt: terminating early.')
# collect results and join jobs
if e is not None:
raise e
elif not finished:
raise RuntimeError(
'something went wrong! :( not finished but no exception found ):')
if logging:
self._log_points(n_points, n_points, done=True)
scans = [queue.get() for proc in procs]
[proc.join() for proc in procs]
scan_results = [scan[-1] for scan in sorted(scans)]
scan = np.concatenate(scan_results)
# fill calculated pixels into full-size array
done_scan = np.transpose(scan)
scan = []
for s in done_scan:
ss = np.zeros(n_all_points)
try:
ss[do_idx] = s
except:
ss = np.array(ss, dtype=object)
ss[do_idx] = s
scan.append(ss)
scan = np.array(scan)
# reshape
scan = np.array([
np.reshape(values, scan_ra.T.shape if transpose else scan_ra.shape)
for values in scan])
# transpose if necessary
if transpose:
scan = np.array([s.T for s in scan])
return scan_ra, scan_dec, scan
def get_one_scan(self, scan_ra, scan_dec,
src=None, n_sig=0, poisson=False, seed=None, TRUTH=False,
mp_cpus=None, logging=True, mask=None, **fitter_args):
trial = self.get_one_trial(
src=src, n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
return self.get_one_scan_from_trial(
scan_ra, scan_dec, trial, mp_cpus=mp_cpus, logging=logging, mask=mask, **fitter_args)
def refine_one_scan_from_trial(self, scan, evss_ns_excluded_tuple,
nside=512, ts=np.inf,
mp_cpus=None, logging=True, **fitter_args):
# determine active pixels and target coords
#orig_pix_mask = scan[0] > ts
#pix_mask = hp.ud_grade(orig_pix_mask, nside)
npix = hp.nside2npix(nside)
zeniths, ras = hp.pix2ang(nside, np.arange(npix))
decs = pi/2 - zeniths
# interpolate map
scan_refined = np.array([
hp.get_interp_val(scan[i], zeniths, ras)
for i in xrange(scan.shape[0])])
# determine active pixels and target coords
pix_mask = scan_refined[0] > ts
# perform additional llh fits if necessary
if np.any(pix_mask):
scan_ra = ras[pix_mask]
scan_dec = decs[pix_mask]
x = self.get_one_scan_from_trial(
scan_ra, scan_dec, evss_ns_excluded_tuple,
mp_cpus=mp_cpus, logging=logging, **fitter_args)[-1]
final_scan = []
for i in xrange(scan.shape[0]):
dest = scan_refined[i]
dest[pix_mask] = x[i]
final_scan.append(dest)
scan_refined = np.array(final_scan)
return ras, decs, scan_refined
@staticmethod
def get_healpix_grid(nside):
npix = hp.nside2npix(nside)
scan_zenith, scan_ra = hp.pix2ang(nside, np.r_[:npix])
scan_dec = pi/2 - scan_zenith
return scan_ra, scan_dec
@staticmethod
def get_rectangular_grid(ra, dec, wra, dra, wdec=None, ddec=None, src_deg=False,
deg=False, dec_correct=False):
if wdec is None:
wdec = wra
if ddec is None:
ddec = dra
if src_deg:
ra, dec = np.radians([ra, dec])
if deg:
wra, dra, wdec, ddec = np.radians([wra, dra, wdec, ddec])
ra_min, ra_max = ra - .5 * wra, ra + .5 * wra + 1e-5
dec_min, dec_max = dec - .5 * wdec, dec + .5 * wdec + 1e-5
RA, DEC = np.r_[ra_min:ra_max:dra], np.r_[dec_min:dec_max:ddec]
RA, DEC = np.meshgrid(RA, DEC, indexing='ij')
if dec_correct:
RA -= ra
RA /= np.cos(DEC)
RA += ra
return RA, DEC
def get_n_sig_batches(n_sigma, beta, tss_vs_nsig, bg=None,
ts=None, bg_fit=True, p0=None):
@np.vectorize
def CL(Nsig):
return 1.0 * np.sum(tss_vs_nsig[Nsig] > ts) / len(tss_vs_nsig[Nsig])
def binomial_error(N, f):
n = 1. * f * N
return np.sqrt(n * (1 - n/N)) / N
# set up bg and threshod
if ts is None:
if bg is None:
bg = dists.Chi2TSD(tss_vs_nsig[0], loc=0, scale=1)
tss_vs_nsig = copy.copy(tss_vs_nsig)
del tss_vs_nsig[0]
# if bg_fit is (implicitly or explicitly) set True, pass it thru
# otherwise rely on default of TSD|Chi2TSD|BinnedTSD
if bg_fit:
ts = bg.isf(stats.norm.sf(n_sigma), bg_fit)
else:
ts = bg.isf(stats.norm.sf(n_sigma))
# get fit points and errors
if bg is None:
n_sigs = np.sort(list(tss_vs_nsig.keys()))
CLs = CL(n_sigs)
Ns = [len(tss_vs_nsig[Nsig]) for Nsig in n_sigs]
else:
n_sigs = np.r_[0, np.sort(list(tss_vs_nsig.keys()))]
CLs = np.r_[float(bg.sf(ts, bg_fit)), CL(n_sigs[1:])]
Ns = np.r_[bg.n_total, [len(tss_vs_nsig[Nsig]) for Nsig in n_sigs[1:]]]
sigmas = binomial_error(Ns, CLs)
i = sigmas > 0
sigmas[~i] = np.min(sigmas[i])
# set seed
p0_ndof = max(10, n_sigs[np.argmin(np.abs(CLs - .5))])
if p0 is None:
p0 = [p0_ndof, 0, 1 + 2e-11]
# try chi2.cdf fit
f = lambda x, ndof, loc, scale: stats.chi2.cdf(x, ndof, loc, scale)
x, y, ey = n_sigs, CLs, sigmas
dof, loc, scale = params = p0
try:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
dof, loc, scale = params = optimize.curve_fit(
f, x, y, p0=p0, sigma=ey)[0]
chi2cdf = lambda n: stats.chi2.cdf(n, *params)
n_sig_chi2cdf = optimize.brentq(
lambda n: chi2cdf(n) - beta, 0, min(1e4, np.max(n_sigs)))
except:
n_sig_chi2cdf = np.inf
# try spline fit
#spline = interpolate.UnivariateSpline(n_sigs, CLs, w=1/sigmas)
spline = interpolate.PchipInterpolator(n_sigs, CLs)
try:
n_sig_spline = optimize.brentq(
lambda n: spline(n) - beta, np.min(n_sigs), min(1e4, np.max(n_sigs)))
except:
n_sig_spline = np.inf
# choose chi2.cdf if agreement within 50%, otherwise fall back to spline
if np.isinf(n_sig_chi2cdf) and np.isinf(n_sig_spline):
n_sig = np.inf
elif np.isinf(n_sig_chi2cdf):
n_sig = n_sig_spline
elif np.isinf(n_sig_spline):
n_sig = n_sig_chi2cdf
else:
relative_difference = np.abs(n_sig_chi2cdf - n_sig_spline) \
/ (.5 * (n_sig_chi2cdf + n_sig_spline))
n_sig = n_sig_chi2cdf if relative_difference < .5 else n_sig_spline
return dict(
n_sig=n_sig, n_sig_chi2cdf=n_sig_chi2cdf, n_sig_spline=n_sig_spline,
dof=dof, loc=loc, scale=scale, params=params, spline=spline,
n_sigs=n_sigs, CLs=CLs, sigmas=sigmas,
ts_threshold=ts, ts_from_fit=bg_fit, ts_nsigma=n_sigma, ts_beta=beta,
)
def get_n_sig_bootstraps(beta, tss, ts, seeds, queue=None):
def get_tss_bootstrap(seed):
random = np.random.RandomState(seed)
out = {n: random.choice(tss[n], len(tss[n])) for n in tss}
return out
n_sigs = np.array([get_n_sig_batches(0, beta, get_tss_bootstrap(seed), ts=ts)['n_sig']
for seed in seeds])
if queue is None:
return n_sigs
else:
queue.put(n_sigs)