Source code for csky.trial

# trial.py

r"""User-level interface for csky trial operations.

This module gives the user-level interface for generating actual or randomized datasets and
evaluating and/or maximizing likelihoods. It also includes tools for performing batches of trials;
estimating threshold quantities such as sensitivities, discovery potentials, and upper limits; and
distributing trials over multiple local cores.

Trial Runner

    Programmatically, a trial runner is anything which implements the interface declared in the
    abstract base class, :class:`TrialRunnerBase`.  Broadly speaking, trial runners are objects
    which:

    1.  Know how to generate actual or randomized datasets, possibly including simulated signal
        injections.
    2.  Know how to apply a :class:`csky.llh.LLHModel` to the data in order to obtain a
        :class:`csky.llh.LLHEvaluator`.
    3.  Know how to map operations 1-2 onto multiple datasets to perform multi-dataset analysis.
    4.  Know how to associate fit results (e.g. [3.7, 12.36, 2.5]) with column names (e.g. ['TS,
        ns, gamma'])
    5.  Know how to report per-dataset signal acceptance for use in counts <--> flux calculations.

    Thru this interface, :class:`TrialRunnerBase` provides trial batching, sens/disc/UL
    calculation, and embarassingly parallel trial multiprocessing.

This module provides the following trial runners:

*  :class:`TrialRunner` -- for performing trials that involve a single maximum likelihood fit.
   This includes single source analysis, multi-source stacking, and template analysis.  Other trial
   runners are generally implemented in terms of this one.
*  :class:`SkyScanTrialRunner` -- for performing trials that search for the hottest spot over much
   or all of the sky, on a grid of healpix pixels.
*  :class:`SpatialPriorTrialRunner` -- for performing trials that search for the hottest spot
   associated with each of one or more spatial priors, again expressed in terms of healpix maps.
*  :class:`MultiTrialRunner` -- the most generic trial runner, for performing trials that search for
   the maximum test statistic that can be obtained using the likelihood fits from one or more
   other arbitrary trial runners.


"""

from __future__ import print_function

import abc
from abc import ABCMeta, abstractmethod, abstractproperty
import collections
try:
    from collections.abc import Iterable
except ImportError:
    from collections import Iterable
import copy
import datetime

try:
    from itertools import izip
except ImportError:
    izip = zip
try:
    xrange
except NameError:
    xrange = range
import multiprocessing as mp
import numbers
import sys
import time
import warnings

import healpy as hp
import numpy as np
pi = np.pi

from scipy import interpolate, optimize, stats

import histlite as hl

from . import hyp, llh, utils, dists, coord, inj, pdf, inspect

from .utils import get_random

Trial = collections.namedtuple('Trial', 'evss n_exs')
SpatialPriorTrial = collections.namedtuple('SpatialPriorTrial', 'evss n_exs ra dec')

[docs] class TrialRunnerBase(object): """Base class for trial runners. This is the base class for csky's somewhat idiosyncratic concept of *trial runners*. In short, a trial runner is an object which knows how to: 1. generate ensembles of events(optionally including signal injection) 2. find the parameters that optimize the likelihood for given event ensembles 3. repeat 1-2 *en masse* In most work with csky, classes derived from this one do account for the vast majority of user-library interaciton. This base class provides a number of methods for performing(1-2) or (3) very easily; converting counts <--> flux-or-fluence; conveniently estimating threshold quantities such as sensitivity, discovery potential, or upper limits; and otherwise inspecting the source data. Derived classes are responsible only for implementing(1-2); providing the the source data. Derived classes are responsible only for implementing(1-2); providing the named array mapping for (3); and of course tracking the signal acceptance of each dataset for use in those counts <--> flux-or-fluence calculations. """ __metaclass__ = ABCMeta
[docs] def __init__(self, ana): """Construct a TrialRunnerBase. Args: ana (:class:`csky.analysis.Analysis`): the(multi-dataset) Analysis """ self.ana = ana
@property def keys(self): """The Analysis keys.""" return self.ana.keys @property def n_ana(self): """The number of SubAnalysis datasets.""" return len(self.keys)
[docs] def to_dict(self, x): """Convert an ordered per-SubAnalysis quantity into a key->value dict.""" return {self.keys[i]: x[i] for i in xrange(self.n_ana)}
@abstractproperty def sig_inj_accs(self): """Per-SubAnalysis SigInjector total acceptances.""" pass
[docs] @abstractmethod def get_accs(self, **params): """Per-SubAnalysis signal acceptances given fit parameters.""" pass
[docs] def sig_inj_acc_total(self): """Sum over :py:attr:`TrialRunnerBase.sig_inj_accs`.""" return np.sum(self.sig_inj_accs)
[docs] def get_acc_total(self, **params): """Sum over :meth:`TrialRunnerBase.get_acc_total_accs`.""" return np.sum(self.get_accs(**params))
def _split_params_fluxargs_acc(self, flux, *a, **kw): # TODO: # can this be more general? # *should* this be *even less* general? # what is the proper treatment of unexpected kw? params = {} params.update(kw) use_acc_param = True if isinstance(flux, Iterable): if not np.all(np.array(flux) == flux[0]): raise ValueError('cannot convert to mixed fluxes') flux = flux[0] elif flux is not None: params.update(flux.as_params) elif 'gamma' in params: flux = hyp.PowerLawFlux(params['gamma']) else: flux = np.atleast_1d(self.sig_injs[-1].flux) if len(flux) > 1 and not np.all(flux == flux[0]): print("WARNING: trying to use a list of custom fluxes without specifying the right type of fluxargs, make sure it is intended!") elif len(flux) == 1: flux = flux[0] use_acc_param = False fluxargs = {} if len(a) >= 3: raise TypeError('too many positional arguments') if len(a) >= 2: fluxargs['unit'] = a[1] if len(a) >= 1: fluxargs['E0'] = a[0] fluxargs.setdefault('E0', params.pop('E0', 1)) fluxargs.setdefault('unit', params.pop('unit', 1)) if 'use_E2dNdE' in params: fluxargs.setdefault('use_E2dNdE', params.pop('use_E2dNdE', True)) if use_acc_param: acc = self.get_acc_total(**params) else: acc = self.sig_inj_acc_total return flux, params, fluxargs, acc
[docs] def to_model_norm(self, ns, flux=None, **params): """Get the model normalization. See also :meth:`hyp.Flux.to_model_norm`. Todo: * clarify sig_inj_acc_total vs get_acc_total situation in both code and docs """ if isinstance(ns, dict): ns = ns['n_sig'] if flux is None: return ns / self.sig_inj_acc_total else: return flux.to_model_norm(ns, self.get_acc_total(**params))
[docs] def to_dNdE(self, ns, flux=None, *a, **kw): """Get dN/dE. See also :meth:`hyp.Flux.to_dNdE`. Todo: * clarify sig_inj_acc_total vs get_acc_total situation in both code and docs """ flux, params, fluxargs, acc = self._split_params_fluxargs_acc(flux, *a, **kw) if isinstance(flux, np.ndarray) and len(flux)>=1 and not np.all(flux == flux[0]): ret=0 for f in flux: ret+=f.to_dNdE(ns, acc, **fluxargs) return ret else: return flux.to_dNdE(ns, acc, **fluxargs)
[docs] def to_E2dNdE(self, ns, flux=None, *a, **kw): """Get E0^2 * dN/dE. See also :meth:`hyp.Flux.to_dNdE`. Todo: * clarify sig_inj_acc_total vs get_acc_total situation in both code and docs """ flux, params, fluxargs, acc = self._split_params_fluxargs_acc(flux, *a, **kw) if isinstance(flux, np.ndarray) and len(flux)>=1 and not np.all(flux == flux[0]): ret=0 for f in flux: ret+=f.to_E2dNdE(ns, acc, **fluxargs) return ret else: return flux.to_E2dNdE(ns, acc, **fluxargs)
[docs] def to_ns(self, E2dNdE, flux=None, *a, **kw): """Get number of signal events corresponding to a particular flux. See also :meth:`hyp.Flux.to_ns`. Todo: * clarify sig_inj_acc_total vs get_acc_total situation in both code and docs """ flux, params, fluxargs, acc = self._split_params_fluxargs_acc(flux, *a, **kw) return flux.to_ns(E2dNdE, acc, **fluxargs)
[docs] @abstractmethod def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False): """Get a trial. Args: n_sig (int): number of signal events to inject poisson (bool): whether n_sig is specific or just a Poissonian mean seed (int): seed spec TRUTH (bool): whether to use background-like("scrambled") or real data Returns: list of lists of :class:`csky.utils.Events`, list of int: * events(for each SubAnalysis(for each injection origin)) * ns_excluded(number of events pre-emptively masked out for each SubAnalysis) """ pass
[docs] @abstractmethod def get_one_fit_from_trial(self, evss_ns_excluded_tuple, flat=True, **fitter_args): """Get a fit, given a trial. Args: evss_ns_excluded_tuple (list of lists of :class:`csky.utils.Events`, list of int): same as return from :meth:`TrialRunnerBase.get_one_trial` flat (bool): whether to flatten the fit result fitter_args: same as ``params`` in :meth:`csky.llh.LLHEvaluatorBase.fit` """ pass
def get_one_fit_from_skylab(self, skylab_events, flat=True, decorrelate=True, logging=False, **fitter_args): cdatas = [] for s in skylab_events: cdata = utils.Events(s, convert=True) for ki in cdata.keys(): cdata[ki] = cdata[ki].copy() cdatas.append([cdata]) trial = Trial(cdatas, np.zeros(len(cdatas))) result = self.get_one_fit_from_trial(trial, flat=flat, decorrelate=decorrelate, logging=logging, **fitter_args) return result @abstractmethod def _fits_np_to_Arrays(self, fits): pass
[docs] def get_one_fit(self, n_sig=0, poisson=False, seed=None, TRUTH=False, flat=True, **fitter_args): """ Get a fit for one new trial. Args: n_sig (int): number of signal events to inject poisson (bool): whether n_sig is specific or just a Poissonian mean seed (int): seed spec TRUTH (bool): whether to use background-like("scrambled") or real data flat (bool): whether to flatten the fit result fitter_args: same as ``params`` in :meth:`csky.llh.LLHEvaluatorBase.fit` """ evss_ns_excluded_tuple = self.get_one_trial( n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH) return self.get_one_fit_from_trial(evss_ns_excluded_tuple, flat=flat, **fitter_args)
def _log_trials(self, n_complete, n_total, done=False): print('\r {:10d}/{} trials complete.{}'.format( n_complete, n_total, ' ' if done else '..'), end='\n' if done else '') sys.stdout.flush() def _get_some_fits(self, n_trials, seeds, n_sig=0, poisson=False, queue=None, i_job=0, progress=None, logging=True, **kw): logging = logging and (progress==None) if progress is None: progress = [0] log_trials = self._log_trials if logging else (lambda *a,**kw: None) fits = [] try: i = 0 for i in xrange(n_trials): if progress[i_job] < 0: break if i % 10 == 0: log_trials(i, n_trials) fit = self.get_one_fit(n_sig=n_sig, poisson=poisson, seed=seeds[i], **kw) fit.append(seeds[i]) fits.append(fit) progress[i_job] = i + 1 log_trials(i+1, n_trials, done=True) except KeyboardInterrupt: progress[i_job] = -1 fits = np.array(fits) if queue is None: return self._fits_np_to_Arrays(fits) else: queue.put((i_job, fits))
[docs] def get_many_fits(self, n_trials, n_sig=0, poisson=False, seed=None, mp_cpus=None, logging=True, **fitter_args): """ Get fits for a batch of new trials. Args: n_trials (int): number of trials to perform n_sig (int): number of signal events to inject poisson (bool): whether n_sig is specific or just a Poissonian mean seed (int): seed spec mp_cpus (int): number of cores to use logging (bool): whether to print a log as trials are performed fitter_args: same as ``params`` in :meth:`csky.llh.LLHEvaluatorBase.fit` Returns: :class:`csky.utils.Arrays`: fit results -- ts, ns, and any other fit parameters """ if mp_cpus is None: mp_cpus = self.mp_cpus mp_cpus = min(n_trials, mp_cpus) random = get_random(seed) n_trials = int(n_trials) BIG = 2**32 seeds = random.randint(BIG, size=n_trials) if logging: print('Performing {} {} using {} {}:'.format( n_trials, 'background trials' if n_sig == 0 else 'trials with n_sig = {:.3f}{}'.format(n_sig, ' (poisson)' if poisson else ''), mp_cpus, 'core' if mp_cpus == 1 else 'cores')) if mp_cpus == 1: return self._get_some_fits( n_trials=n_trials, seeds=seeds, n_sig=n_sig, poisson=poisson, logging=logging, **fitter_args) else: # set up _get_some_fits arguments x = np.array_split(np.arange(n_trials), mp_cpus) n_jobs = len(x) queue = mp.Queue() progress = mp.Array('i', np.zeros(n_jobs, dtype=int)) kwargs = [ dict(n_trials=len(x[i]), seeds=seeds[x[i]], n_sig=n_sig, poisson=poisson, queue=queue, i_job=i, progress=progress, **fitter_args) for i in xrange(n_jobs)] # start processes procs = [ mp.Process(target=self._get_some_fits, kwargs=kwargs[i]) for i in xrange(n_jobs)] for proc in procs: proc.start() # monitor progress, try to catch interrupts try: n_complete = np.sum(progress) while n_complete < n_trials: if np.any(np.asarray(progress) < 0): raise KeyboardInterrupt() if logging: self._log_trials(n_complete, n_trials) n_complete = np.sum(progress) time.sleep(1 if logging else .25) except KeyboardInterrupt: # signal to jobs that it's time to stop for i in xrange(n_jobs): progress[i] = -2 print('\nKeyboardInterrupt: terminating early.') # collect results and join jobs fitss = [queue.get() for proc in procs] [proc.join() for proc in procs] fitss = [self._fits_np_to_Arrays(fits[1]) for fits in sorted(fitss)] fits = utils.Arrays.concatenate(fitss) if logging: self._log_trials(len(fits), n_trials, done=True) return fits
[docs] def find_n_sig( self, ts, beta, tol=0.03, n_batches=6, batch_size=50, coverage=2, n_sig_step=3, n_sig_start=None, first_batch_size=50, min_batch_size=0, max_batch_size=np.inf, tss=None, trials=None, n_bootstrap=100, mp_cpus=None, seed=None, logging=True, **kw): """ Find the signal strength that exceeds ``ts`` in ``beta`` fraction of trials. This method estimates sensitivity or discovery potential in terms of a Poisson mean expected number of events ``n_sig``. Under typical interactive usage, this method estimates sensitivity or discovery potential in two steps. First a preliminary scan finds a very coarse estimate used to define a grid of relevant signal strengths. Then, batches of trials are performed for each n_inj in that grid, and the TS-passing-fraction curve (as a function of n_inj) is interpolated to find a more precise ``n_sig`` estimate. The trial batches are grown iteratively in chunks of size ``batch_size`` (or ``min_batch_size`` for the first iteration, if that is larger) until one of two stopping conditions is reached: either the batches have reached ``max_batch_size``, or the estimated relative error (``n_sig_error/n_sig`` ratio) is less than ``tol``. Pre-computed trials can be used if either ``tss`` or ``trials`` is passed. Then, if ``tol=np.inf`` is passed, no new trials will be performed. If ``n_bootstrap=1`` is passed, the error estimate will also be effectively skipped, saving time if these estimates are not needed. Args: ts (float): test statistic threshold beta (float): fraction of signal trials that should exceed ``ts`` tol (float): stop main trial loop if relative error (``n_sig_error/n_sig``) reaches this value or less n_batches (int): number of n_inj grid points (columns in the printed log) batch_size (int): number of trials per grid point per iteration coverage (float): factor to multiply by the initial coarse scan ``n_sig`` estimate to obtain the maximum n_inj grid point n_sig_step (float): ``n_sig`` step size for initial coarse scan n_sig_start (float): ``n_sig`` starting point for initial coarse scan first_batch_size (int): number of trials per iteration for initial coarse scan min_batch_size (int): number of trials per n_inj for first iteration of main trial batches max_batch_size (int): stop main trial loop if batches reach this size tss (dict(float,numpy.ndarray)): TS values from pre-computed trials, with items like ``n_inj: ts_array`` trials (dict(float, :class:`csky.utils.Arrays` )): pre-computed trials, with items like ``n_inj: trials`` n_bootstrap (int): number of times to resample from trial arrays to estimate ``n_sig_error`` mp_cpus (int): number of cores to use for trial batches seed: random number generator seed logging (bool): whether to print a log as trials are performed **kw: additional keyword arguments are passed to :meth:`TrialRunnerBase.get_many_fits`. """ def do_log(*a, **kw): if logging: print(*a, **kw) sys.stdout.flush() start = datetime.datetime.now() do_log('Start time: {}'.format(start)) # sanity requirements assert 0 < tol <= 1 n_batches = int(n_batches) batch_size = int(batch_size) ts = float(ts) random = np.random.RandomState(seed) n_sig_start = n_sig_start or n_sig_step mp_cpus = self.mp_cpus if mp_cpus is None else mp_cpus do_log('Using {} cores.'.format(mp_cpus)) def frac_above(a): """ Fraction of TS in `a` that are above the TS threshold. """ return 1.0 * float(np.sum(np.asarray(a) > ts)) / len(a) if len(a) else 0 if trials is not None and tss is None: tss = {n_sig: trials[n_sig].ts for n_sig in trials} if tss is None: # find the region of interest first_tss = [] n_sig = n_sig_start - n_sig_step do_log('* Starting initial scan for ' '{:.0f}% of {} trials with TS >= {:.3f}...'.format( 100 * beta, first_batch_size, ts)) count_initial_trials = 0 count_initial_batches = 0 while frac_above(first_tss) < beta: n_sig += n_sig_step do_log('\r n_sig = {:.3f} ...'.format(n_sig), end='') first_tss = self.get_many_fits( first_batch_size, n_sig, poisson=True, logging=False, seed=random.randint(2**32), mp_cpus=min(mp_cpus, first_batch_size),**kw).ts count_initial_batches += 1 count_initial_trials += first_batch_size do_log(' frac = {:.5f}'.format(frac_above(first_tss))) # define n_sig grid and report status n_sigs = np.linspace(0, coverage * n_sig, n_batches) do_log('* Generating batches of {} trials...'.format(batch_size)) do_log(format('n_trials | n_inj') + ''.join([format(n, '8.2f') for n in n_sigs]) + format('| n_sig(relative error)', '>26s')) n_trials_already = 0 tss = {n_sig_i: np.array([]) for n_sig_i in n_sigs} else: # start with existing trials n_sigs = np.array(sorted(tss)) do_log('* Generating batches of {} trials...'.format(batch_size)) do_log(format('n_trials | n_inj') + ''.join([format(n, '8.2f') for n in n_sigs]) + format('| n_sig(relative error)', '>26s')) n_trials_already = np.min([len(tss[k]) for k in tss]) def add_batches(batch_size): seeds = random.randint(2**32, size=len(n_sigs)) if mp_cpus > 1: for (i_sig, n_sig) in enumerate(n_sigs): tss.setdefault(n_sig, np.array([])) if batch_size: fits = self.get_many_fits( batch_size, n_sig=n_sig, poisson=True, seed=seeds[i_sig], mp_cpus=mp_cpus, logging=False, **kw) tss[n_sig] = np.concatenate((tss[n_sig], fits.ts)) else: fits = tss[n_sig] if len(fits) < batch_size: raise KeyboardInterrupt() if np.sum(np.isnan(tss[n_sig])) > .5 * len(tss[n_sig]): # not enough trials suggests a KeyboardInterrupt raise KeyboardInterrupt() do_log('{:7.1f}%'.format(100 * frac_above(tss[n_sig])), end='') else: for (i_sig, n_sig) in enumerate(n_sigs): #do_log('.', end='') old_tss = tss.setdefault(n_sig, []) new_tss = [] if batch_size: new_tss = self.get_many_fits( batch_size, n_sig, poisson=True, seed=seeds[i_sig], logging=False, mp_cpus=1, **kw).ts if len(new_tss) < batch_size: raise KeyboardInterrupt() if np.sum(np.isnan(new_tss)) > .5 * len(new_tss): # not enough trials suggests a KeyboardInterrupt raise KeyboardInterrupt() tss[n_sig] = np.r_[old_tss, new_tss] do_log('{:7.1f}%'.format(100 * frac_above(tss[n_sig])), end='') def get_n_sig_and_error(): info = get_n_sig_batches(0, beta, tss, ts=ts) n_sig = info['n_sig'] mp_cpus_here = min(mp_cpus, n_bootstrap, 10) seeds = random.randint(2**32, size=n_bootstrap) if mp_cpus > 1: n_batch = int(np.ceil(1.0 * n_bootstrap / mp_cpus)) n_sigs_bootstrap = [] queue = mp.Queue() jobs = [] seedss = np.array_split(seeds, mp_cpus) for seeds in seedss: args = (beta, tss, ts, seeds, queue) p = mp.Process( target=get_n_sig_bootstraps, args=args) jobs.append(p) p.start() for j in jobs: n_sigs_bootstrap.append(queue.get()) for j in jobs: j.join() n_sigs_bootstrap = np.concatenate(n_sigs_bootstrap) else: def get_tss_bootstrap(seed): random = np.random.RandomState(seed) out = {n: random.choice(tss[n], len(tss[n])) for n in tss} return out #n_sigs_bootstrap = np.array([ # get_n_sig_batches(0, beta, get_tss_bootstrap(seed), ts=ts)['n_sig'] # for seed in seeds]) n_sigs_bootstrap = get_n_sig_bootstraps(beta, tss, ts, seeds) mask = np.isfinite(n_sigs_bootstrap) if mask.sum(): n_sig_interval = np.percentile( n_sigs_bootstrap[mask], 100 * stats.norm.cdf([-1, +1])) n_sig_error = .5 * np.diff(n_sig_interval)[0] / n_sig else: n_sig_error = np.inf return n_sig, n_sig_error, info # get some batches n_trials = [] n_sig_error = 1.01 n_sig_history = [] n_sig_error_history = [] while not (n_sig_error <= tol) and ((not n_trials) or n_trials[-1] < max_batch_size): try: if len(n_trials): this_batch_size = batch_size else: this_batch_size = max(batch_size, min_batch_size) if n_trials: n_trials.append(n_trials[-1] + this_batch_size) elif n_trials_already: n_trials = [n_trials_already] else: n_trials.append(this_batch_size) do_log('{:<8d}{:<8}'.format(n_trials[-1], ' |'), end='') if n_trials_already: add_batches(0) n_trials_already = 0 else: add_batches(this_batch_size) n_trials_already = 0 do_log(' | ', end='') n_sig, n_sig_error, info = get_n_sig_and_error() n_sig_history.append(n_sig) n_sig_error_history.append(n_sig_error) do_log(format( '{:.3f} (+/-{:5.1f}%)'.format(n_sig, 100 * n_sig_error), '>22s'), end='') do_log(' [chi2.cdf]' if info['n_sig'] == info['n_sig_chi2cdf'] else ' [spline]') except KeyboardInterrupt: break end = datetime.datetime.now() do_log('End time: {}'.format(end)) do_log('Elapsed time: {}'.format(end - start)) return dict(n_sig=n_sig, n_sig_error=n_sig_error, n_sig_history=np.array(n_sig_history), n_sig_error_history=np.array(n_sig_error_history), n_trials_history=n_trials, info=info, tss=tss)
[docs] class TrialRunner(TrialRunnerBase): """Main useful trial runner. This class performs trials that involve a single maximum likelihood fit. This includes single source analysis, multi-source stacking, and template analysis. Other trial runners are generally implemented in terms of this one. """
[docs] def __init__(self, ana, get_llh, get_injs, fitter_args={}, param_names=None, mp_cpus=1, mp_nice=0, llh_kw={}, inj_kw={}, concat_evs=False): """Construct a TrialRunner. Args: ana (:class:`analysis.SubAnalysis`): the sub analysis get_llh (callable): ``get_llh(ana, **kwargs)`` -> :class:`LLHEvaluator` get_injs (callable): ``get_injs(ana, **kwargs)`` -> (truth, bg, sig), each of which must be an instance of :class:`inj.Injector`, or should be ``None``. Specifically, as an optimization, in cases where we know we don't need the signal injector (e.g. when a :class:`TrialRunner` is merely an intermediary for a :class:`SkyScanTrialRunner`), the call will look like ``get_injs(..., inj=False)`, in which case ``sig`` can be ``None``. fitter_args (dict): arguments to pass to :meth:`LLHEvaluatorBase.fit` param_names (list of str): ordered list of parameter names. This ensures that fit results can be represented sequentially in a reliable way. mp_cpus (int): default number of cores to use for trial batches (can be overridden at the specific call sites, e.g. ``tr.get_many_fits(..., mp_cpus=2)``) mp_nice (int): OS-level niceness (TODO: actually this is currently ignored, but it should be easy to respect if we want to) llh_kw (dict): additional kwargs to pass to ``get_llh`` inj_kw (dict): additional kwargs to pass to ``get_injs`` Todo: * mp_nice is currently ignored, but it should be easy to respect if we want to """ super(TrialRunner, self).__init__(ana) self.llh_kw = llh_kw try: self.template_ninj = inj_kw['conf']['gpbg_conf']['inj_conf'].get('nsig', None) #leave the option to inject chosen number of GP events self.template_norm = inj_kw['conf']['gpbg_conf']['inj_conf'].get('gp_norm', None) except: self.template_ninj =None llh_models, truth_injs, bg_injs, sig_injs = [], [], [], [] if not hasattr(get_llh, 'keys'): get_llh = {key: get_llh for key in ana.keys} if not hasattr(get_injs, 'keys'): get_injs = {key: get_injs for key in ana.keys} self.concat_evs = concat_evs fits = {} names = [] for a in ana: l = get_llh[a.key](a, **llh_kw) t, b, s = get_injs[a.key](a, l, **inj_kw) llh_models.append(l) truth_injs.append(t) bg_injs.append(b) sig_injs.append(s) if fits and not fitter_args: if not l.pdf_ratio_model.fits == fits: raise RuntimeError('fit parameters do not match: {} vs {}'.format( l.pdf_ratio_model.fits, fits)) if not l.pdf_ratio_model.param_names == names: raise RuntimeError('fit parameters do not match: {} vs {}'.format( l.pdf_ratio_model.fits, fits)) else: fits = l.pdf_ratio_model.fits names = l.pdf_ratio_model.param_names fits.update(fitter_args) if param_names is not None: names = param_names fits_names = [ name for name in fits if ((name[0] != '_') and (name not in ['prior', 'seeder']) )] sfits_names = np.sort(fits_names) snames = np.unique(names) if (len(sfits_names) != len(snames)) or np.any(np.sort(fits_names) != np.unique(names)): raise ValueError( 'fitter_args and param_names must be consistent({} vs {})'.format( np.sort(fits_names), np.unique(names))) for name in sorted(names): if name in ['prior', 'seeder']: continue if len(np.atleast_1d(fits[name])) == 1: names.remove(name) #del fits[name] # save the Analysis and LLHModels self.ana, self.llh_models = ana, llh_models self.llh_models_origin = copy.deepcopy(self.llh_models) # save the Injectors self.truth_injs, self.bg_injs, self._sig_injs = truth_injs, bg_injs, sig_injs # save default fitter strategy self.fitter_args = fits self.param_names = ['ns'] + names self.result_names = ['TS'] + self.param_names # multiprocessing defaults self.mp_cpus, self.mp_nice = mp_cpus, mp_nice
@property def sig_injs(self): """The signal injectors.""" if self._sig_injs[0] is not None: return self._sig_injs else: raise RuntimeError('no signal injectors present') @property def sig_inj_accs(self): return np.array([sig_inj.acc_total for sig_inj in self.sig_injs]) @property def sig_inj_acc_total(self): return np.sum(self.sig_inj_accs)
[docs] def get_accs(self, **params): return np.array([ llh_model.acc_model.get_acc_total(**params) for llh_model in self.llh_models])
@property def sig_inj_probs(self): """Relative weight of each signal injector (by signal acceptance).""" weights = self.sig_inj_accs return weights / np.sum(weights)
[docs] def sig_n_injs(self, n_sig=0, poisson=False, seed=None): """Multinomial sample from :py:attr:`TrialRunner.sig_inj_probs` .""" random = get_random(seed) sig_inj_probs = self.sig_inj_probs n_inj = random.poisson(n_sig) if poisson else n_sig n_injs = random.multinomial(n_inj, sig_inj_probs) return n_injs
[docs] def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False, debug=False): random = get_random(seed) if n_sig: n_injs = self.sig_n_injs(n_sig, poisson=poisson, seed=seed) evss = [] ns_excluded = [] for (i, key) in enumerate(self.keys): if TRUTH: evs, n_excluded = self.truth_injs[i].inject(seed=seed) # test block to save the events for skymaps if debug: gp_scale = 100 tmp_evs, tmp_excluded = self.bg_injs[i].inject(n_inj=self.template_ninj, gp_norm=gp_scale*self.template_norm, seed=seed, truth=TRUTH) outs = tmp_evs+ evs outs = utils.Arrays.concatenate(outs) evs += tmp_evs saved = [outs['energy'], outs['dec'], outs['ra'], outs['log10energy'], outs['sigma'], outs['azimuth']] np.save("MC_bkg_GP_{}.npy".format(gp_scale), np.asarray(saved)) evss.append(evs) ns_excluded.append(n_excluded) # update bkg pdf if isinstance(self.bg_injs[i], inj.TemplateBGInjector): tmp_evs, tmp_excluded = self.bg_injs[i].inject(n_inj=self.template_ninj, gp_norm=self.template_norm, seed=seed, truth=TRUTH) if self.template_norm > 0: original_llh = copy.deepcopy(self.llh_models_origin) if len(self.llh_models)>=2: raise NotImplementedError('llh pdfs updates for multi-seasons not implemented') else: self.llh_models[0].pdf_ratio_model = original_llh[0].pdf_ratio_model.get_updated(tmp_evs[0]) self.llh_models[0].update_bg = False #mute it for unbliding else: if not isinstance(self.bg_injs[i], inj.TemplateBGInjector): evs, n_excluded = self.bg_injs[i].inject(seed=random.randint(2**32)) else: evs, n_excluded = self.bg_injs[i].inject(n_inj=self.template_ninj, gp_norm=self.template_norm, seed=seed) # only for update bkg pdf using gp events tmp_evs, tmp_excluded = self.bg_injs[i].inject(n_inj=self.template_ninj, gp_norm=self.template_norm, seed=seed, truth=True) original_llh = copy.deepcopy(self.llh_models_origin) if len(self.llh_models)>=2: raise NotImplementedError('llh pdfs updates for multi-seasons not implemented') else: self.llh_models[0].pdf_ratio_model = original_llh[0].pdf_ratio_model.get_updated(tmp_evs[0]) if self.concat_evs: evs = [utils.Arrays.concatenate(evs)] if n_sig: n_inj = n_injs[i] if n_inj: ev, nx = self.sig_injs[i].inject(n_inj, seed=random.randint(2**32)) evs += ev n_excluded += nx if self.concat_evs: evs = [utils.Arrays.concatenate(evs)] evss.append(evs) ns_excluded.append(n_excluded) return Trial(evss, ns_excluded)
def get_one_llh_from_trial(self, evss_ns_excluded_tuple): evss, ns_excluded = evss_ns_excluded_tuple llh_models = self.llh_models if len(llh_models) >= 2: llhs = [ llh_model(evs, n_excluded) for (llh_model, evs, n_excluded) in izip(llh_models, evss, ns_excluded) ] return llh.MultiLLHEvaluator(llhs) else: return llh_models[0](evss[0], ns_excluded[0]) def get_one_llh(self, n_sig=0, poisson=False, seed=None, TRUTH=False): evss, ns_excluded = self.get_one_trial( n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH) return self.get_one_llh_from_trial((evss, ns_excluded)) def _flatten_result(self, ts, fitted_params, fitter_args): #ts, fitted_params = fit[:2] out_params = [fitted_params[param] for param in self.param_names if param in fitted_params] return [ts] + out_params
[docs] def get_one_fit_from_trial(self, evss_ns_excluded_tuple, flat=True, **fitter_args): evss, ns_excluded = evss_ns_excluded_tuple llh = self.get_one_llh_from_trial((evss, ns_excluded)) kw = fitter_args for k in self.fitter_args: kw.setdefault(k, self.fitter_args[k]) #kw = copy.deepcopy(self.fitter_args) #kw.update(fitter_args) fit = llh.fit(**kw) flat = flat and '_full_output' not in fitter_args if flat: ts = fit[0] fitted_params = fit[1:][0] for i in range(1,len(fit[1:])): fitted_params.update(fit[1:][i]) return self._flatten_result(ts, fitted_params, fitter_args) else: return fit
[docs] def modify_ra(self, ra): """Modify source right ascension in place. This method is used internally for efficiently computing sky scans. Users should not need to call this method. """ ra = np.atleast_1d(ra) for llh_model in self.llh_models: llh_model.pdf_ratio_model.set_ra(ra) return self
def format_result(self, result, names=None, omit_names=[]): out = [] if names is None: names = self.result_names names = [name for name in names if name not in omit_names] for (name, value) in izip(names, result): line = '{:20}{}'.format(name, value) if 'ra' in name or 'dec' in name: line += ' ({} deg)'.format(np.degrees(value)) out.append(line) return '\n'.join(out) def _fits_np_to_Arrays(self, fits): return utils.Arrays({ param_name: fits[:,i+1] for (i, param_name) in enumerate(self.param_names)}, ts=fits[:,0], seed=fits[:,-1])
[docs] class MultiTrialRunner(TrialRunnerBase):
[docs] def __init__(self, ana, tr_inj, trs, bgs=None, mp_cpus=1, mp_nice=0): super(MultiTrialRunner, self).__init__(ana) self.tr_inj = tr_inj self.trs, self.bgs, = trs, bgs self.mp_cpus, self.mp_nice = mp_cpus, mp_nice
@property def sig_inj_accs(self): return self.tr_inj.sig_inj_accs
[docs] def get_accs(self, **params): return self.tr_inj.get_accs(**params)
[docs] def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False): return self.tr_inj.get_one_trial( n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
def get_all_fits_from_trial(self, evss_ns_excluded_tuple, flat=True, **fitter_args): if not flat: raise NotImplementedError('flat=False not implemented') fits = [ tr.get_one_fit_from_trial( evss_ns_excluded_tuple, flat=flat, **fitter_args) for tr in self.trs] if self.bgs is None: mlog10ps = np.array([ fit[0] for fit in fits]) else: mlog10ps = -np.log10([ bg.sf(fit[0]) for (bg, fit) in izip(self.bgs, fits)]) fits = [ np.r_[mlog10p, fit] for (mlog10p, fit) in izip(mlog10ps, fits)] return fits def get_all_fits(self, n_sig=0, poisson=False, seed=None, TRUTH=False, flat=True, arrays=False, **fitter_args): trial = self.get_one_trial(n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH) return self.get_all_fits_from_trial(trial, flat=flat, arrays=arrays, **fitter_args)
[docs] def get_one_fit_from_trial(self, evss_ns_excluded_tuple, flat=True, **fitter_args): # original behavior: give hottest result # updated behavior: give all fits fits = self.get_all_fits_from_trial( evss_ns_excluded_tuple, flat=flat, **fitter_args) if flat: return list(np.concatenate(fits)) else: return fits
def _fits_np_to_Arrays(self, fits): out = utils.Arrays() x = 0 for (i_tr,tr) in enumerate(self.trs): suffix = '_{:04d}'.format(i_tr) param_names = tr.param_names len_result = 2 + len(param_names) out['mlog10p'+suffix] = fits[:,x+0] out['ts'+suffix] = fits[:,x+1] for (i_param, param_name) in enumerate(param_names): out[param_name+suffix] = fits[:,x+2+i_param] x += len_result out['seed'] = fits[:,-1] return out
[docs] class SkyScanTrialRunner(TrialRunnerBase):
[docs] def __init__(self, ana, get_tr, get_selector, fitter_args={}, param_names=None, nside=128, get_pix=None, pixmask=None, scan_ra=None, scan_dec=None, min_dec=np.radians(-90), max_dec=np.radians(+90), refine_max=False, ts_to_p=None, llh_kw={}, inj_kw={}, src_kw={}, src=None, sig_src=None, mp_cpus=1, mp_scan_cpus=1, mp_nice=0, inj=False): super(SkyScanTrialRunner, self).__init__(ana) #self.get_llh, self.get_injs, self.get_selector = get_llh, get_injs, get_selector self.get_tr, self.get_selector = get_tr, get_selector self.fitter_args, self.param_names=fitter_args, param_names self.nside, self.get_pix, self.pixmask = nside, get_pix, pixmask self.min_dec, self.max_dec = min_dec, max_dec self.mp_cpus, self.mp_scan_cpus, self.mp_nice = mp_cpus, mp_scan_cpus, mp_nice self.llh_kw, self.inj_kw, self.src_kw = llh_kw, inj_kw, src_kw self.sig_src = sig_src or src or utils.Sources(ra=0, dec=0, **self.src_kw) self.sig_tr = self.get_tr(self.sig_src, ana=self.ana, cut_n_sigma=np.inf, inj=inj) self.ts_to_p = ts_to_p if scan_ra is not None and scan_dec is not None: self.scan_ra, self.scan_dec = scan_ra, scan_dec else: self.scan_ra, self.scan_dec = SkyScanner.get_healpix_grid(self.nside)
@staticmethod def get_rectangular_grid(ra, dec, wra, dra, wdec=None, ddec=None, src_deg=False, deg=False, dec_correct=False): if wdec is None: wdec = wra if ddec is None: ddec = dra if src_deg: ra, dec = np.radians([ra, dec]) if deg: wra, dra, wdec, ddec = np.radians([wra, dra, wdec, ddec]) ra_min, ra_max = ra - .5 * wra, ra + .5 * wra + 1e-5 dec_min, dec_max = dec - .5 * wdec, dec + .5 * wdec + 1e-5 RA, DEC = np.r_[ra_min:ra_max:dra], np.r_[dec_min:dec_max:ddec] RA, DEC = np.meshgrid(RA, DEC, indexing='ij') if dec_correct: RA -= ra RA /= np.cos(DEC) RA += ra return RA, DEC
[docs] def sig_inj_accs(self): raise NotImplementedError('SkyScanTrialRunner flux conversions not yet implemented')
[docs] def get_accs(self, **params): raise NotImplementedError('SkyScanTrialRunner flux conversions not yet implemented') accs = [] src = utils.Sources(ra=self.scan_ra, dec=self.scan_dec) for i in xrange(len(self._acc_params)): acc = 0 for p in self.llh_p: acc += p * self._acc_params[i](src, **params) * self.ana.anas[i].livetime # TODO: account for frac during livetime accs.append(acc) return np.array(accs)
def _fits_np_to_Arrays(self, fits): tr = self.sig_tr out = tr._fits_np_to_Arrays(fits[:,1:-2]) out['mlog10p'] = fits[:,0] out['ra'] = fits[:,-2] out['dec'] = fits[:,-1] return out def _get_scanner(self, mp_cpus=None): mp_cpus = mp_cpus or self.mp_scan_cpus get_tr = lambda *a, **kw: self.get_tr(*a, ana=self.ana, **kw) return SkyScanner( get_tr, self.get_selector, mp_cpus=mp_cpus, mp_nice=0, sig_src=self.sig_src, src_kw=self.src_kw)
[docs] def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False, mp_cpus=None): scanner = self._get_scanner(mp_cpus=mp_cpus) return scanner.get_one_trial(n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH)
def get_one_scan_from_trial(self, trial, logging=False, mp_cpus=None, mask=None, **fitter_args): scanner = self._get_scanner(mp_cpus=mp_cpus) scan_ra, scan_dec = self.scan_ra, self.scan_dec if mask is None: if self.get_pix is not None: mask = np.zeros_like(scan_ra, dtype=bool) evss = trial.evss flat_evss = [ev for evs in evss for ev in evs] if flat_evss: ev = utils.Arrays.concatenate(flat_evss) mask[self.get_pix(self.nside, ev, 3)] = True elif self.pixmask is not None: mask = self.pixmask scan = scanner.get_one_scan_from_trial( scan_ra, scan_dec, trial[:2], min_dec=self.min_dec, max_dec=self.max_dec, mask=mask, logging=logging, **fitter_args)[-1] if self.ts_to_p: mlog10p = -np.log10(self.ts_to_p(scan_dec, scan[0])) else: mlog10p = scan[0] scan = np.array([mlog10p] + [s for s in scan]) return scan def get_one_scan(self, n_sig=0, poisson=False, seed=None, TRUTH=False, mp_cpus=None, logging=False, **fitter_args): trial = self.get_one_trial( n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH, mp_cpus=mp_cpus) return self.get_one_scan_from_trial(trial, logging=logging, mp_cpus=mp_cpus, **fitter_args)
[docs] def get_one_fit_from_trial(self, evss_ns_excluded_tuple, flat=True, logging=False, mp_cpus=None, **fitter_args): if not flat: raise NotImplementedError('flat=False not supported for SkyScanTrialRunner') scan_ra, scan_dec = np.ravel(self.scan_ra), np.ravel(self.scan_dec) scan = self.get_one_scan_from_trial( evss_ns_excluded_tuple, logging=logging, mp_cpus=mp_cpus, **fitter_args) scan = np.array(list(map(np.ravel, scan))) i_hot = np.argmax(scan[0]) # TODO: implement refine_max #if self.refine_max: # src = utils.Sources(ra=scan_ra[i_hot], dec=scan_dec[i_hot], **self.src_kws[i_src]) # tr_fit = self.get_tr(src, fit_source=True) # tr = self.get_tr(src, prior=prior) # ts, x = tr_fit.get_one_fit_from_trial(evss_ns_excluded_tuple, flat=False) # ra, dec = x.pop('ra_0000'), x.pop('dec_0000') # i_pix = hp.ang2pix(self.nside, pi/2 - dec, ra) # params = tr._flatten_result(ts, x, fitter_args)[1:] out = np.r_[scan[:,i_hot], scan_ra[i_hot], scan_dec[i_hot]] return out
[docs] class SpatialPriorTrialRunner(TrialRunnerBase):
[docs] def __init__(self, ana, get_tr, get_selector, llh_priors, inj_priors=None, acc_param='acc_param', prior_scan_each=False, prior_mode='stack', fitter_args={}, param_names=None, get_pixmask=None, min_dec=np.radians(-90), max_dec=np.radians(+90), refine_max=False, mp_cpus=1, mp_scan_cpus=1, mp_nice=0, llh_kw={}, inj_kw={}, src_kw={}, inj_src_kw={}, sig_src=None): super(SpatialPriorTrialRunner, self).__init__(ana) #self.acc_param = ana.acc_param #self.get_llh, self.get_injs, self.get_selector = get_llh, get_injs, get_selector self.get_tr, self.get_selector = get_tr, get_selector self.llh_priors = llh_priors = np.atleast_2d(llh_priors) if inj_priors is None: self.inj_priors = llh_priors elif inj_priors == 'none': self.inj_priors = [np.ones_like(llh_priors[0])] else: self.inj_priors = inj_priors self.acc_param = acc_param self._acc_params = [getattr(a, acc_param) for a in ana] self.prior_scan_each, self.prior_mode = prior_scan_each, prior_mode self.fitter_args, self.param_names=fitter_args, param_names self.nside = hp.get_nside(llh_priors[0]) if get_pixmask is True: self.get_pixmask = utils.get_pixmask else: self.get_pixmask = get_pixmask self.npix = hp.nside2npix(self.nside) self.min_dec, self.max_dec = min_dec, max_dec self.refine_max = refine_max self.llh_kw, self.inj_kw, self.src_kw = llh_kw, inj_kw, src_kw self.mp_cpus, self.mp_scan_cpus, self.mp_nice = mp_cpus, mp_scan_cpus, mp_nice n_src = self.llh_priors.shape[0] self.src_kws = [{key: src_kw[key][i] for key in src_kw} for i in xrange(n_src)] self._prep_priors()
def _prep_priors(self): llh_p = [] llh_prior_term = [] llh_mask = [] for p in self.llh_priors: llh_p.append(p / np.sum(p)) pt = np.where(p > 0, p, 1e-10 * np.min(p[p>0])) pt = pt / np.sum(pt) / hp.nside2pixarea(self.nside) pt = np.log(pt) pt = pt - np.max(pt) llh_prior_term.append(pt) llh_mask.append(p > 0) inj_p = [] for p in self.inj_priors: inj_p.append(p / np.sum(p)) self.llh_p, self.inj_p = np.array(llh_p), np.array(inj_p) self.llh_prior_term, self.llh_mask = np.array(llh_prior_term), np.array(llh_mask) self.llh_mask_all = np.sum(self.llh_mask, axis=0) > 0 self.scan_ra, self.scan_dec = SkyScanner.get_healpix_grid(self.nside)
[docs] def sig_inj_accs(self): raise TypeError( 'SpatialPriorTrialRunner has no well-defined injector acceptance; specify the flux')
[docs] def get_accs(self, **params): accs = [] src = utils.Sources(ra=self.scan_ra, dec=self.scan_dec) for i in xrange(len(self._acc_params)): acc = 0 for p in self.llh_p: acc += p * self._acc_params[i](src, **params) * self.ana.anas[i].livetime # TODO: account for frac during livetime accs.append(acc) return np.array(accs)
[docs] def get_one_trial(self, n_sig=0, poisson=False, seed=None, TRUTH=False, mp_cpus=None): random = get_random(seed) pix_src = np.array([random.choice(self.npix, p=p) for p in self.inj_p]) theta, ra = hp.pix2ang(self.nside, pix_src) dec = pi/2 - theta src = utils.Sources(ra=ra, dec=dec, **self.src_kw) tr = self.get_tr(src, ana=self.ana, cut_n_sigma=np.inf, inj=n_sig > 0) trial_base = tr.get_one_trial(n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH) return SpatialPriorTrial(*(trial_base + (ra, dec)))
def _fits_np_to_Arrays(self, fits): out = dict(ts=fits[:,0]) fits = fits[:,1:] tr = self.get_tr(utils.Sources(ra=0, dec=0, **self.src_kw), ana=self.ana, inj=False) param_names = tr.param_names n_params = 3 + len(param_names) for i in xrange(len(self.llh_p)): k = '_{:04d}'.format(i) f = fits[:, i*n_params:(i+1)*n_params] #print(f) o = tr._fits_np_to_Arrays(f) for key in o.keys(): out[key + k] = o[key] out['ra' + k] = f[:,-2] out['dec' + k] = f[:,-1] return utils.Arrays(out) def _get_scanner(self, mp_cpus=None): # TODO: why is this here? mp_cpus = mp_cpus or self.mp_scan_cpus get_tr = lambda *a, **kw: self.get_tr(*a, ana=self.ana, **kw) return SkyScanner( get_tr, self.get_selector, mp_cpus=mp_cpus, mp_nice=0) def _get_sky_scan_tr(self, mask=None, mp_cpus=None): mp_cpus = mp_cpus or self.mp_scan_cpus if mask is None: get_pixmask = self.get_pixmask elif self.get_pixmask is None: get_pixmask = lambda *a: mask else: get_pixmask = lambda *a: np.intersect1d(np.where(mask)[0], self.get_pixmask(*a)) return SkyScanTrialRunner( #self.ana, self.get_llh, self.get_injs, self.get_selector, self.ana, self.get_tr, self.get_selector, fitter_args=self.fitter_args, param_names=self.param_names, nside=self.nside, get_pix=get_pixmask, min_dec=self.min_dec, max_dec=self.max_dec, mp_cpus=self.mp_cpus, mp_scan_cpus=mp_cpus, mp_nice=self.mp_nice, llh_kw=self.llh_kw, inj_kw=self.inj_kw, src_kw=self.src_kw) def get_one_scan_from_trial(self, trial, logging=False, mp_cpus=None, **fitter_args): if self.prior_scan_each: raise NotImplementedError('prior_scan_each not yet implemented') else: mask = self.llh_mask_all scan_tr = self._get_sky_scan_tr(mask=mask, mp_cpus=mp_cpus) scan = scan_tr.get_one_scan_from_trial( trial, logging=logging, mp_cpus=mp_cpus, **fitter_args) # drop p-value scan = scan[1:] return scan def get_one_scan(self, n_sig=0, poisson=False, seed=None, TRUTH=False, mp_cpus=None, logging=False, **fitter_args): trial = self.get_one_trial( n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH, mp_cpus=mp_cpus) return self.get_one_scan_from_trial(trial, logging=logging, mp_cpus=mp_cpus, **fitter_args)
[docs] def get_one_fit_from_trial(self, trial, flat=True, logging=False, mp_cpus=None, **fitter_args): scan = self.get_one_scan_from_trial(trial, logging=logging, mp_cpus=mp_cpus, **fitter_args) # this effectively modulates the ts with each of the spatial prior function scan_ts = np.array([scan[0] + 2 * pt for pt in self.llh_prior_term]) # find the index of the hottest pixel for each spatial prior i_hots = np.array([np.argmax(sts) for sts in scan_ts]) scan_ra, scan_dec = self.scan_ra, self.scan_dec out = [] for (i_src, i_hot) in enumerate(i_hots): if self.refine_max: src = utils.Sources(ra=scan_ra[i_hot], dec=scan_dec[i_hot], **self.src_kws[i_src]) tr_fit = self.get_tr(src, fit_source=True)#ana=self.ana, src=src, fit_source=True) prior = lambda *a, **p: self.llh_prior_term[ i_src, hp.ang2pix(self.nside, pi/2-p['dec_0000'], p['ra_0000'])] tr = self.get_tr(src, prior=prior)#ana=self.ana, src=src, prior=prior) ts, x = tr_fit.get_one_fit_from_trial(evss_ns_excluded_tuple, flat=False)#trial[:2]) ra, dec = x.pop('ra_0000'), x.pop('dec_0000') i_pix = hp.ang2pix(self.nside, pi/2 - dec, ra) ts = ts + 2 * self.llh_prior_term[i_src,i_pix] params = tr._flatten_result(ts, x, fitter_args)[1:] else: ts = scan_ts[i_src,i_hot] params = scan[1:,i_hot] ra, dec = scan_ra[i_hot], scan_dec[i_hot] out.append(np.r_[ts, params, ra, dec]) out = np.array(out) if self.prior_mode == 'stack': out_ts = np.sum(out[:,0]) else: i_hottest = np.argmax(out[:,0]) out_ts = out[i_hottest,0] # first index of return is either 'stack'-ed or hottest ts of all the different priors # the rest are the individual fit results for ns, gamma, ra, dec for each prior return np.r_[out_ts, np.ravel(out)]
[docs] class MultiflareTrialRunner(TrialRunnerBase):
[docs] def __init__(self, ana, srclist, tr_all, conf_doc = {}, filter_srcs=True, muonflag=False, threshold=1, dt_max=np.inf, only_max=False, mp_cpus=1, fitter_args={}): """Construct a MultiflareTrialRunner. """ super(MultiflareTrialRunner, self).__init__(ana) self.muonflag = muonflag self.srclist = srclist self.tr_all = tr_all self.threshold = threshold self.max_dt = dt_max self.only_max = only_max self.mp_cpus = mp_cpus self.fitter_args = copy.deepcopy(fitter_args) self.fitter_args.setdefault('gamma', np.r_[1, 1:4.01:.5, 4]) mjd_min, mjd_max = np.inf, -np.inf self.ana = ana for a in ana.anas: mjd_min = min(mjd_min, a.mjd_min) mjd_max = max(mjd_max, a.mjd_max) self.mjd_min, self.mjd_max = mjd_min, mjd_max self.sample_start = ana.mjd_min self.sample_stop = ana.mjd_max self.conf_doc = conf_doc from csky.conf import get_trial_runner if filter_srcs==True: tr_boxes = [] for src in self.srclist: tr_boxes.append(get_trial_runner(conf_doc, src=src, ana=ana)) self.tr_boxes = tr_boxes
@property def sig_inj_accs(self): return np.array([sig_inj.acc_total for sig_inj in self.sig_injs])
[docs] def get_accs(self, **params): return np.array([ llh_model.acc_model.get_acc_total(**params) for llh_model in self.llh_models])
[docs] def get_one_fit(self, Am=0, Io=2.0, alpha=3.0, deltaT=100., poisson=False, seed=None, TRUTH=False, flat=True, replace_evts=True, **fitter_args): evss_ns_excluded_tuple = self.get_one_trial( Am=Am, Io=Io, alpha=alpha, deltaT=deltaT, poisson=poisson, seed=seed, TRUTH=TRUTH, replace_evts=replace_evts) return self.get_one_fit_from_trial(evss_ns_excluded_tuple, flat=flat, **fitter_args)
def howmanyflares(self, Am, alpha, Io): Am = int(Am) Nflares = 0 while Nflares == 0: y = np.random.uniform(low=0., high=1., size=Am) invcdf = Io*(1.-y)**(1./(1.-alpha)) srcevts = invcdf fevts = [] for nevts in srcevts: if nevts >= 1.: lowf = np.floor(nevts) highf = np.ceil(nevts) pdiff = nevts-lowf p = np.random.uniform(low=0.,high=1.) if p <= pdiff: fevts.append(int(highf)) if p > pdiff: fevts.append(int(lowf)) if nevts < 1.: p = np.random.uniform(low=0., high=1.) if p < nevts: fevts.append(1) Nflares = len(fevts) return Nflares, fevts def howmanyflares_poisson(self, Am, alpha, Io): Am = int(Am) Nflares = 0 y = np.random.uniform(low=0., high=1., size=Am) invcdf = Io*(1.-y)**(1./(1.-alpha)) srcevts = invcdf fevts = [] nevents=np.random.poisson(srcevts,(1,Am)).flatten() print("flares", nevents) return len(nevents),nevents def times_to_inject(self, Am, alpha, Io, tlow, thigh, srclist, deltaT, poisson=False): if poisson==True: numflares, srcevts = self.howmanyflares_poisson(Am, alpha, Io) else: numflares, srcevts = self.howmanyflares(Am, alpha, Io) if deltaT==None: logwidths = np.random.uniform(low=0., high=2., size=numflares) srcwidths = 10.**logwidths else: srcwidths = deltaT*np.ones(numflares) possibletimes = [] for k in range(0,len(srclist)): possibletimes_src = np.arange(tlow, thigh, 0.01) possibletimes.append(possibletimes_src) injflares = [] for k in range(0, numflares): whichsrc = int(np.random.uniform(low=0, high=len(srclist))) twidth = float(srcwidths[k]) possibletimes_src = possibletimes[whichsrc] if len(possibletimes_src)==0: opensources = [] for i in range(0,len(srclist)): if len(possibletimes[i])>0: opensources.append(i) if len(opensources)==0: print("no open sources, too many injected flares and not enough places to put them") sys.exit() whichsrc = np.random.choice(opensources) possibletimes_src = possibletimes[whichsrc] randtime = np.random.choice(possibletimes_src) possibletimes_src = possibletimes_src[abs(possibletimes_src-randtime)>twidth*2.] possibletimes[whichsrc] = possibletimes_src tcen = randtime twidth = float(srcwidths[k]) num_evt = srcevts[k] whichsrc = int(np.random.uniform(low=0, high=len(srclist))) times = np.random.uniform(low=tcen-twidth, high=tcen+twidth, size=num_evt) injflares.append([times, whichsrc]) return injflares
[docs] def get_one_trial(self, Am=0, Io=2.0, alpha=3.0, injflares=None, deltaT=None, t0=None, n_sig=0, poisson=False, seed=None, TRUTH=False, replace_evts=False, save_evts=False): if n_sig: Am = n_sig['Am'] Io = n_sig['Io'] alpha = n_sig['alpha'] tr_boxes = [] if injflares!=None: from csky.conf import get_trial_runner filtered_srcs = self.srclist tr_boxes = [] for src in filtered_srcs: tr_boxes.append(get_trial_runner(self.conf_doc, src=src)) elif Am != 0: from csky.conf import get_trial_runner injflares = self.times_to_inject(Am, alpha, Io, self.sample_start, self.sample_stop, self.srclist, deltaT, t0=t0, poisson=poisson) print("injflares", injflares) filtered_injflares = [f for f in injflares if len(f[0])!=0] src_inds = [f[1] for f in filtered_injflares] if len(src_inds)>0: src_inds = np.unique(src_inds) print("src_inds", src_inds) filtered_srcs = self.srclist[src_inds] tr_boxes = [] for src in self.srclist: tr_boxes.append(get_trial_runner(self.conf_doc, src=src)) n_injs = [] n_inj_evts = [] elif Am == 0: injflares = [] n_injs=[0 for a in self.ana.anas] evss = [] ns_excluded = [] srctimes = [] evss_modified = [] for srcind in range(0,len(self.srclist)): srcinds = [f[1] for f in injflares] if srcind in srcinds: times = np.concatenate([f[0] for f in injflares if f[1]==srcind]) else: times = [] srctimes.append(times) for (i, key) in enumerate(self.keys): if TRUTH: evs, n_excluded = self.tr_all.truth_injs[i].inject() evss.append(evs) ns_excluded.append(n_excluded) else: evs_inj = [] evs_base = [] evs, n_excluded = self.tr_all.bg_injs[i].inject(seed=seed) evs_base += evs for srcind in range(0,len(tr_boxes)): srcid = srcinds[srcind] times = np.array(srctimes[srcid]) inseason = times[times>self.ana.anas[i].mjd_min] inseason = inseason[inseason<self.ana.anas[i].mjd_max] if len(inseason)>0: n_inj = len(inseason) if n_inj: ev, nx = tr_boxes[srcind].sig_injs[i].inject(n_inj, seed=seed) ev[0]['mjd'] = inseason evs += ev evs_inj += ev n_excluded += nx if len(evs_inj)>0: evs_inj = utils.Arrays.concatenate(evs_inj) if len(evs_inj)>=1 and replace_evts==True: evs_base = evs_base[0] evs_base_dec = evs_base[(evs_base['dec']<max(evs_inj['dec'])+0.01) & (evs_base['dec']>min(evs_inj['dec'])-0.01)] for evt in evs_inj: diffs = np.abs(evs_base_dec['log10energy']-evt['log10energy']) eid = evs_base_dec[np.argmin(diffs)]['idx'][0] evs_base = evs_base[evs_base['idx']!=eid] evs_base_dec = evs_base_dec[evs_base_dec['idx']!=eid] evs_base += evs_inj evs_base = [utils.Arrays.concatenate(evs_base)] evss.append(evs_base) ns_excluded.append(n_excluded) print("finished making trial") return evss, ns_excluded
[docs] def get_one_fit_from_trial(self, evss_ns_excluded_tuple, flat=True, decorrelate=True, logging=False, **fitter_args): # TODO: break this down! def dist_mask(arr, src, cut_deg): out = np.zeros(len(arr), dtype=bool) ras = arr['ra'] decs = arr['dec'] psi = np.transpose(coord.delta_angle(decs, ras, src.dec, src.ra, latlon=True))[0] out = psi < np.radians(cut_deg) return out def get_SB_mjd_evs_boxmode(L, gamma=2): all_SB = [] all_mjd = [] evs = [] for e in L.evaluators: print("type of evaluator is", type(e)) if type(e) == pdf.MultiPDFRatioEvaluator: if logging: print("This evaluator has separate space and energy components") self.KDE = False se = e.evaluators[0].space_evaluator te = e.evaluators[0].time_evaluator box_mode = te.model.box_mode ee = e.evaluators[1] evs.append(se.final_ev) Ss = se()[0] if gamma > 0: Se = ee(gamma=gamma)[0] else: Se = np.max([ee(gamma=g)[0] for g in np.r_[1:4.01:.25] ], axis=0) gamma = -gamma SB = Ss * Se all_SB.append(SB) all_mjd.append(se.final_ev.mjd) elif type(e) == pdf.SpaceTimePDFRatioEvaluator: if logging: print("this evaluator only has space and time components") self.KDE = True se = e.space_evaluator te = e.time_evaluator box_mode = te.model.box_mode close_mask = dist_mask(se.ev, se.src, 15.) evs.append(se.ev) if gamma > 0: SB = se(gamma=gamma, _mask=[close_mask])[0] else: SB = np.max([se(gamma=g, _mask=[close_mask])[0] for g in np.r_[1:4.01:.25] ], axis=0) all_SB.append(SB) all_mjd.append(se.ev.mjd) else: print("Type of evaluator is:") print(type(e)) sys.stderr.write("MultiflareTrialRunner doesn't know what to do with this evaluator type. Exiting") sys.exit(1) SB = np.concatenate(all_SB) mjd = np.concatenate(all_mjd) order = np.argsort(mjd) all_SB, all_mjd = SB, mjd = SB[order], mjd[order] mask = SB > self.threshold SB, mjd = SB[mask], mjd[mask] #all_ev = sum(all_ev[1:], all_ev[0]) #all_ev = utils.Arrays(np.concatenate(all_ev)) #print (np.sort(mjd)) return SB, mjd, evs, box_mode def decorrelate_windows(x): # x: [ts, ns, gamma, t1, t2] x = np.atleast_2d(x) if not x.size: return x x = x[np.argsort(x[:,0])[::-1]] keep_starts_ends = x[0,-2:] keep_idx = [0] for i in range(1, len(x)): candidate_start, candidate_end = x[i,-2], x[i,-1] i_start, i_end = np.searchsorted(keep_starts_ends, (candidate_start, candidate_end)) good = ((i_start % 2) == 0) and (i_start == i_end) good2 = ((i_start % 2) == 0) and (i_start + 1 == i_end) and (i_start in keep_starts_ends) #good_start = (i_start == i_end == 0) #good_end = (i_start == i_end == keep_starts_ends.size) #print(i, keep_starts_ends) #print(i, x[i], ':', i_start, i_end) #print(i, good_inner, good_start, good_end, '=>', good_inner or good_start or good_end) if good or good2: keep_starts_ends = np.sort(np.r_[keep_starts_ends, candidate_start, candidate_end]) keep_idx.append(i) out = x[keep_idx] return out def decorrelate_windows_loop(x): # no work if empty x = np.atleast_2d(x) if not x.size: return x def is_non_overlapping(testflare, goodflares): ts, te = testflare[-2:] for goodflare in goodflares: gs, ge = goodflare[-2:] if not (ts - ge >= 0 or te - gs <= 0): return False return True # sort by descending TS x = x[np.argsort(x[:,0])[::-1]] # keep best goodflares = [x[0]] # try to keep others for testflare in x: if is_non_overlapping(testflare, goodflares): goodflares.append(testflare) return np.atleast_2d(goodflares) def do_one_multiflare(L, trbox): results = [] start_time = datetime.datetime.now() # make trial sb, mjd, evs, box_mode = get_SB_mjd_evs_boxmode(L, gamma=2) mjd1, mjd2 = map(np.ravel, np.meshgrid(mjd, mjd)) sb1, sb2 = map(np.ravel, np.meshgrid(sb, sb)) sbsb = sb1 * sb2 dmjd = mjd2 - mjd1 mask = (0 < dmjd) & (dmjd < self.max_dt) mjd1, mjd2, dmjd, sbsb = mjd1[mask], mjd2[mask], dmjd[mask], sbsb[mask] # handle pairs N_pair = mjd1.size if self.KDE: se = L.evaluators[0].space_evaluator close_mask = dist_mask(se.ev, se.src, 15.) else: pass print("There are", N_pair, "windows") for i_pair in range(N_pair): if logging: if (i_pair+1) % 100 == 0 or i_pair == N_pair - 1: print('\r{:5} / {:5} ...'.format(i_pair + 1, mjd1.size), end='') sys.stdout.flush() t1, t2 = mjd1[i_pair], mjd2[i_pair] dt = t2 - t1 if box_mode == 'pre': t0 = t2 elif box_mode == 'post': t0 = t1 else: t0 = t1 + 0.5 * dt #dt += 1e-10 tw_pair_masks = [(t1 <= ev.mjd) & (ev.mjd <= t2) for ev in evs] fa = copy.deepcopy(trbox.fitter_args) fa.update(self.fitter_args) fa.update(dict(t0=t0, dt=dt)) fa.update(fitter_args) fa['seeder'] = None fa['_full_output'] = True if self.KDE: fit_masks = [np.array(tw_pair_masks[0] & close_mask)] if np.any(fit_masks[0]): ts, params, x_fixed, x_options, other_output = L.fit(_masks=fit_masks, **fa) ts_no_prior = other_output['ts_no_prior'] result = trbox._flatten_result(ts, params, fa) if (result[0] <= 0) or (result[1] <= 0): continue result += [ts_no_prior, t1, t2] results.append(result) else: ts, params, x_fixed, x_options, other_output = L.fit(_masks=tw_pair_masks, **fa) ts_no_prior = other_output['ts_no_prior'] result = trbox._flatten_result(ts, params, fa) if (result[0] <= 0) or (result[1] <= 0): continue result += [ts_no_prior, t1, t2] results.append(result) end_time = datetime.datetime.now() if logging: print ('', end_time - start_time, 'elapsed.') results = np.atleast_2d(results) if decorrelate: results = decorrelate_windows_loop(results) return results, mjd1.size src_results = {} for (src_index, trbox) in enumerate(self.tr_boxes): box_src = trbox.llh_kw['src'] if 'name' in box_src: src_id = box_src['name'][0] else: src_id = 'src_{:04d}'.format(src_index) if self.muonflag: modified_evss = copy.deepcopy(evss_ns_excluded_tuple) for k in range(0,len(evss_ns_excluded_tuple[0])): ev_mask = (evss_ns_excluded_tuple[0][k][0].event == src_id) modified_evss[0][k][0] = evss_ns_excluded_tuple[0][k][0][~ev_mask] ML = trbox.get_one_llh_from_trial(modified_evss) else: ML = trbox.get_one_llh_from_trial(evss_ns_excluded_tuple) if hasattr(ML, 'llhs'): Ls = ML.llhs else: Ls = [ML] out = [] results = [] tw_count = 0 for L in Ls: these_results, these_twcount = do_one_multiflare(L, trbox) results.append(these_results) tw_count += these_twcount if len(results) == 1: results = results[0] else: if np.any(list(map(np.size, results))): results = np.concatenate ([r for r in results if r.size > 0]) else: results = results[0] # compile final result if results.size: results = results[results[:,0] > 0] results = results[np.argsort(results[:,0])[::-1]] if self.only_max: results = results[-1:] ts = np.sum(results[:,0]) ns_total = np.sum(results[:,1]) gamma_mean = np.mean(results[:,2]) ts_no_prior = np.sum(results[:,3]) n_flare = len(results) n_flare_tested = tw_count final_result = np.r_[ts, ns_total, gamma_mean, n_flare, n_flare_tested] else: ts, ns_total, gamma_mean = 0, 0, 2.5 ts_no_prior = 0 n_flare, n_flare_tested = 0, tw_count x = results if results.size: flares = Flares(self.mjd_min, self.mjd_max, utils.Arrays( ts=x[:,0], ns=x[:,1], gamma=x[:,2], ts_no_prior=x[:,3], mjd_start=x[:,-2], mjd_end=x[:,-1], dmjd=x[:,-1]-x[:,-2], src_index=np.repeat(src_index, len(results)))) else: flares = Flares(self.mjd_min, self.mjd_max, utils.Arrays( ts=[], ns=[], gamma=[], ts_no_prior=[], mjd_start=[], mjd_end=[], dmjd=[], src_index=[])) final_result = np.array( [ts, ns_total, gamma_mean, ts_no_prior, n_flare, n_flare_tested, flares], dtype=object) src_results[src_id] = final_result if flat: flares = Flares.concatenate([result[-1] for result in src_results.values()]) flares.n_src = len(self.tr_boxes) n_tested = np.sum([result[-2] for result in src_results.values()]) flares.arrays = flares.arrays[np.argsort(flares.arrays.src_index)] final_result = np.array( [np.sum(flares.ts), np.sum(flares.ns), np.mean(flares.gamma) if len(flares) else 0, np.sum(flares.ts_no_prior), len(flares), n_tested, flares] ) return final_result else: return src_results
#if get_all: # x = results # if results.size: # return final_result, utils.Arrays( # ts=x[:,0], ns=x[:,1], gamma=x[:,2], # mjd_start=x[:,3], mjd_end=x[:,4], dmjd=x[:,4]-x[:,3]) # else: # return final_result, utils.Arrays( # ts=[], ns=[], gamma=[], mjd_start=[], mjd_end=[]) #else: # return final_result def modify_ra(self, ra): ra = np.atleast_1d(ra) if len(self.tr_boxes) != 1: raise TypeError('modify_ra() not intended for use with multi-source MultiflareTrialRunner') self.tr_boxes[0].modify_ra(ra) return self def to_hist(self, lightcurve, key='ts'): ana = self.ana mjd_min, mjd_max = ana.anas[0].mjd_min, ana.anas[-1].mjd_max lightcurve = lightcurve[np.argsort(lightcurve.mjd_start)] bins = np.vstack((lightcurve.mjd_start, lightcurve.mjd_end)).T.ravel() values = np.vstack((lightcurve[key], np.zeros_like(lightcurve.ts))).T.ravel() bins = np.r_[mjd_min, bins, mjd_max] values = np.r_[0, values] #idx = np.where(np.diff(bins) > 0)[0] #bins = np.r_[bins[0], bins[idx]] #values = values[idx] return hl.Hist(bins, values) def _fits_np_to_Arrays(self, fits): try: out = utils.Arrays( ts=np.asarray(fits[:, 0], dtype=float), ns_total=np.asarray(fits[:, 1], dtype=float), gamma_mean=np.asarray(fits[:, 2], dtype=float), ts_no_prior=np.asarray(fits[:, 3], dtype=float), n_flare=np.asarray(fits[:, -3], dtype=int), n_flare_tested=np.asarray(fits[:, -2], dtype=int), flares=fits[:, 5], ) except: print(fits) raise return out def format_result(self, result, names=None, omit_names=[]): out = [] if names is None: names = 'ts ns_total gamma_mean ts_no_prior n_flare n_flare_tested'.split() names = [name for name in names if name not in omit_names] for (name, value) in izip(names, result): line = '{:20}{}'.format(name, value) if 'ra' in name or 'dec' in name: line += ' ({} deg)'.format(np.degrees(value)) out.append(line) out.append(str(result[-1])) return '\n'.join(out)
class Flares(object): def __init__(self, mjd_min, mjd_max, arrays, n_src=1): """Construct a Flares. :type arrays: utils.Arrays :param arrays: arrays of flare data """ self.mjd_min, self.mjd_max = mjd_min, mjd_max self.n_src = n_src self.arrays = arrays def __repr__(self): return 'Flares({} flares)'.format(len(self)) def __str__(self): out = ['Flares('] a = self.arrays for i in xrange(len(a)): s1 = ' src, TS, ns, gamma, TS0 = {:3d}, {:7.3f}, {:6.2f}, {:4.2f} {:6.2f}'.format( int(a.src_index[i]), a.ts[i], a.ns[i], a.gamma[i], a.ts_no_prior[i]) s2 = ' | [{:.6f} -> {:.6f}] ({:11.6f})'.format( a.mjd_start[i], a.mjd_end[i], a.dmjd[i]) out.append(s1 + s2) out.append(')') return '\n'.join(out) def __len__(self): return len(self.arrays) @property def ts(self): return self.arrays.ts @property def ts_no_prior(self): return self.arrays.ts_no_prior @property def ns(self): return self.arrays.ns @property def gamma(self): return self.arrays.gamma @property def mjd_start(self): return self.arrays.mjd_start @property def mjd_end(self): return self.arrays.mjd_end @property def dmjd(self): return self.arrays.dmjd @property def src_index(self): return self.arrays.src_index def to_hist(self, key): mjd_min, mjd_max = self.mjd_min, self.mjd_max lightcurve = self.arrays[np.argsort(self.arrays.mjd_start)] bins = np.vstack((lightcurve.mjd_start, lightcurve.mjd_end)).T.ravel() values = np.vstack((lightcurve[key], np.zeros_like(lightcurve.ts))).T.ravel() bins = np.r_[mjd_min, bins, mjd_max] values = np.r_[0, values] return hl.Hist([bins], values) @property def ts_hist(self): return self.to_hist('ts') @property def ts_no_prior_hist(self): return self.to_hist('ts_no_prior') @property def ns_hist(self): return self.to_hist('ns') @property def gamma_hist(self): return self.to_hist('gamma') def per_src_dict(self, srclist=None): flares = { i: Flares(self.mjd_min, self.mjd_max, self.arrays[self.arrays.src_index==i]) for i in range(self.n_src)} out = { i: [f.ts.sum(), f.ns.sum(), f.gamma.mean(), len(f), f] for (i, f) in flares.items() } if srclist is None: return out else: return {srclist[i]: out[i] for i in out} @staticmethod def concatenate(seq): self = seq[0] arrayss = [flares.arrays for flares in seq] return Flares(self.mjd_min, self.mjd_max, utils.Arrays.concatenate(arrayss)) class SkyScanner(object): def __init__(self, get_tr, get_selector, mp_cpus=1, mp_nice=0, sig_src=None, src_kw={}): self.get_tr, self.get_selector = get_tr, get_selector self.src_kw = src_kw self.mp_cpus, self.mp_nice = mp_cpus, mp_nice if sig_src is None: self.sig_src = utils.Sources(ra=0, dec=0, **src_kw) else: self.sig_src = sig_src def get_one_trial(self, src=None, n_sig=0, poisson=False, seed=None, TRUTH=False): if src is None: src = self.sig_src tr_inj = self.get_tr(src, cut_n_sigma=np.inf, inj=True if n_sig else False) evss, ns_excluded = tr_inj.get_one_trial( n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH) return evss, ns_excluded def select_from_trial(self, selector, evss_ns_excluded_tuple, _copy=False): evss, ns_excluded = evss_ns_excluded_tuple new_evss, new_ns_excluded = [], [] for (evs, n_excluded) in izip(evss, ns_excluded): new_evs = [] new_n_excluded = n_excluded for ev in evs: new_ev = selector(ev) if _copy: new_ev['ra'] = new_ev.ra.copy() new_ev['dec'] = new_ev.dec.copy() new_ev['sigma'] = new_ev.sigma.copy() new_evs.append(new_ev) new_n_excluded += len(ev) - len(new_ev) new_evss.append(new_evs) new_ns_excluded.append(new_n_excluded) return new_evss, new_ns_excluded def _log_points(self, n_complete, n_total, done=False): print('\r {:10d}/{} coordinates complete.{}'.format( n_complete, n_total, ' ' if done else '..'), end='\n' if done else '') sys.stdout.flush() def get_partial_scan_from_trial(self, scan_ra, scan_dec, evss_ns_excluded_tuple, n_points=None, queue=None, i_job=0, progress=None, logging=True, **fitter_args): orig_evss, orig_ns_excluded = evss, ns_excluded = evss_ns_excluded_tuple unique_decs = np.unique(scan_dec) initial_src_kw = { key: np.repeat(value, len(unique_decs)) for (key, value) in self.src_kw.items() } initial_src = utils.Sources(dec=unique_decs, **initial_src_kw) initial_selector = self.get_selector(initial_src) evss, ns_excluded = self.select_from_trial(initial_selector, (evss, ns_excluded), _copy=True) # TODO: why doesn't initial exclusion help more? logging = logging and (progress==None) log_points = self._log_points if logging else (lambda *a,**kw: None) if n_points is None: n_points = len(scan_dec) if progress is None: progress = [0] scan_results = [] last_sdec = scan_dec[0] src = utils.Sources(ra=0, dec=last_sdec, **self.src_kw) tr = self.get_tr(src, inj=False) selector = self.get_selector(src) e, n = self.select_from_trial(selector, (evss, ns_excluded)) i_complete = 0 log_points(i_complete, n_points) try: for (i_point, (sra, sdec)) in enumerate(izip(scan_ra, scan_dec)): if progress[i_job] < 0: break if sdec != last_sdec: if logging: log_points(i_complete, n_points) src = utils.Sources(ra=0, dec=sdec, **self.src_kw) tr = self.get_tr(src, inj=False) selector = self.get_selector(src) last_sdec = sdec e, n = self.select_from_trial(selector, (evss, ns_excluded)) next_result = tr.modify_ra(sra).get_one_fit_from_trial((e, n), **fitter_args) scan_results.append(next_result) i_complete += 1 progress[i_job] = i_complete except KeyboardInterrupt: raise progress[i_job] = -1 log_points(i_complete, n_points, done=True) out = np.array(scan_results) if queue is None: return out else: queue.put((i_job, out)) def get_one_scan_from_trial(self, scan_ra, scan_dec, evss_ns_excluded_tuple, min_dec=np.radians(-90), max_dec=np.radians(90), cut_n_sigma=5,mask=None, mp_cpus=None, logging=True, **fitter_args): evss, ns_excluded = evss_ns_excluded_tuple if mp_cpus is None: mp_cpus = self.mp_cpus if len(scan_dec.shape) == 2: transpose = (scan_dec[0,0] == scan_dec[1,0]) else: transpose = False if mask is None: flat_mask = None if transpose: flat_ra = np.ravel(scan_ra.T) flat_dec = np.ravel(scan_dec.T) if mask is not None: flat_mask = np.ravel(mask.T) else: flat_ra = np.ravel(scan_ra) flat_dec = np.ravel(scan_dec) if mask is not None: flat_mask = np.ravel(mask) do_idx = (min_dec <= flat_dec) & (flat_dec <= max_dec) if flat_mask is not None: do_idx &= flat_mask # If very few events, restrict do_idx further, only select pixels # close enough to the evss if( evss[0] ): all_evs = evss[0][0][:0] for ev_list in evss: for evs in ev_list: all_evs += evs if( len(all_evs)<100 ): # todo: set max N_evss as function parameter delta_angle_pix_evss = coord.delta_angle( flat_dec, flat_ra, all_evs.dec, all_evs.ra, latlon=True) min_sigma_per_pix = np.amin( delta_angle_pix_evss/all_evs.sigma, axis=1) evs_mask = min_sigma_per_pix<cut_n_sigma do_idx &= evs_mask if not np.sum(do_idx): do_idx[np.where(flat_dec <= max_dec)[0][0]] = True do_ra, do_dec = flat_ra[do_idx], flat_dec[do_idx] n_points, n_all_points = len(do_dec), len(flat_dec) if logging: print('Scanning {} locations using {} cores:'.format(n_points, mp_cpus)) if mp_cpus == 1: scan = self.get_partial_scan_from_trial( do_ra, do_dec, (evss, ns_excluded), logging=logging, **fitter_args) else: # set up get_partial_scan_from_trial arguments mp_cpus = min(mp_cpus, n_points) x = np.array_split(np.arange(n_points), mp_cpus) n_jobs = len(x) queue = mp.Queue() progress = mp.Array('i', np.zeros(n_jobs, dtype=int)) args = [ (do_ra[x[i]], do_dec[x[i]], (evss, ns_excluded)) for i in xrange(n_jobs)] kwargs = [ dict(n_points=n_points, queue=queue, i_job=i, progress=progress, **fitter_args) for i in xrange(n_jobs)] # start processes procs = [ mp.Process(target=self.get_partial_scan_from_trial, args=args[i], kwargs=kwargs[i]) for i in xrange(n_jobs)] for proc in procs: proc.start() # monitor progress, try to catch interrupts finished = False e = None try: n_complete = np.sum(progress) while n_complete < n_points: if np.any(np.array(progress) < 0): raise KeyboardInterrupt() if logging: self._log_points(n_complete, n_points) n_complete = np.sum(progress) time.sleep(1 if logging else .25) finished = True except KeyboardInterrupt as e: # signal to jobs that it's time to stop for i in xrange(n_jobs): progress[i] = -2 #print('\nKeyboardInterrupt: terminating early.') # collect results and join jobs if e is not None: raise e elif not finished: raise RuntimeError( 'something went wrong! :( not finished but no exception found ):') if logging: self._log_points(n_points, n_points, done=True) scans = [queue.get() for proc in procs] [proc.join() for proc in procs] scan_results = [scan[-1] for scan in sorted(scans)] scan = np.concatenate(scan_results) # fill calculated pixels into full-size array done_scan = np.transpose(scan) scan = [] for s in done_scan: ss = np.zeros(n_all_points) try: ss[do_idx] = s except: ss = np.array(ss, dtype=object) ss[do_idx] = s scan.append(ss) scan = np.array(scan) # reshape scan = np.array([ np.reshape(values, scan_ra.T.shape if transpose else scan_ra.shape) for values in scan]) # transpose if necessary if transpose: scan = np.array([s.T for s in scan]) return scan_ra, scan_dec, scan def get_one_scan(self, scan_ra, scan_dec, src=None, n_sig=0, poisson=False, seed=None, TRUTH=False, mp_cpus=None, logging=True, mask=None, **fitter_args): trial = self.get_one_trial( src=src, n_sig=n_sig, poisson=poisson, seed=seed, TRUTH=TRUTH) return self.get_one_scan_from_trial( scan_ra, scan_dec, trial, mp_cpus=mp_cpus, logging=logging, mask=mask, **fitter_args) def refine_one_scan_from_trial(self, scan, evss_ns_excluded_tuple, nside=512, ts=np.inf, mp_cpus=None, logging=True, **fitter_args): # determine active pixels and target coords #orig_pix_mask = scan[0] > ts #pix_mask = hp.ud_grade(orig_pix_mask, nside) npix = hp.nside2npix(nside) zeniths, ras = hp.pix2ang(nside, np.arange(npix)) decs = pi/2 - zeniths # interpolate map scan_refined = np.array([ hp.get_interp_val(scan[i], zeniths, ras) for i in xrange(scan.shape[0])]) # determine active pixels and target coords pix_mask = scan_refined[0] > ts # perform additional llh fits if necessary if np.any(pix_mask): scan_ra = ras[pix_mask] scan_dec = decs[pix_mask] x = self.get_one_scan_from_trial( scan_ra, scan_dec, evss_ns_excluded_tuple, mp_cpus=mp_cpus, logging=logging, **fitter_args)[-1] final_scan = [] for i in xrange(scan.shape[0]): dest = scan_refined[i] dest[pix_mask] = x[i] final_scan.append(dest) scan_refined = np.array(final_scan) return ras, decs, scan_refined @staticmethod def get_healpix_grid(nside): npix = hp.nside2npix(nside) scan_zenith, scan_ra = hp.pix2ang(nside, np.r_[:npix]) scan_dec = pi/2 - scan_zenith return scan_ra, scan_dec @staticmethod def get_rectangular_grid(ra, dec, wra, dra, wdec=None, ddec=None, src_deg=False, deg=False, dec_correct=False): if wdec is None: wdec = wra if ddec is None: ddec = dra if src_deg: ra, dec = np.radians([ra, dec]) if deg: wra, dra, wdec, ddec = np.radians([wra, dra, wdec, ddec]) ra_min, ra_max = ra - .5 * wra, ra + .5 * wra + 1e-5 dec_min, dec_max = dec - .5 * wdec, dec + .5 * wdec + 1e-5 RA, DEC = np.r_[ra_min:ra_max:dra], np.r_[dec_min:dec_max:ddec] RA, DEC = np.meshgrid(RA, DEC, indexing='ij') if dec_correct: RA -= ra RA /= np.cos(DEC) RA += ra return RA, DEC def get_n_sig_batches(n_sigma, beta, tss_vs_nsig, bg=None, ts=None, bg_fit=True, p0=None): @np.vectorize def CL(Nsig): return 1.0 * np.sum(tss_vs_nsig[Nsig] > ts) / len(tss_vs_nsig[Nsig]) def binomial_error(N, f): n = 1. * f * N return np.sqrt(n * (1 - n/N)) / N # set up bg and threshod if ts is None: if bg is None: bg = dists.Chi2TSD(tss_vs_nsig[0], loc=0, scale=1) tss_vs_nsig = copy.copy(tss_vs_nsig) del tss_vs_nsig[0] # if bg_fit is (implicitly or explicitly) set True, pass it thru # otherwise rely on default of TSD|Chi2TSD|BinnedTSD if bg_fit: ts = bg.isf(stats.norm.sf(n_sigma), bg_fit) else: ts = bg.isf(stats.norm.sf(n_sigma)) # get fit points and errors if bg is None: n_sigs = np.sort(list(tss_vs_nsig.keys())) CLs = CL(n_sigs) Ns = [len(tss_vs_nsig[Nsig]) for Nsig in n_sigs] else: n_sigs = np.r_[0, np.sort(list(tss_vs_nsig.keys()))] CLs = np.r_[float(bg.sf(ts, bg_fit)), CL(n_sigs[1:])] Ns = np.r_[bg.n_total, [len(tss_vs_nsig[Nsig]) for Nsig in n_sigs[1:]]] sigmas = binomial_error(Ns, CLs) i = sigmas > 0 sigmas[~i] = np.min(sigmas[i]) # set seed p0_ndof = max(10, n_sigs[np.argmin(np.abs(CLs - .5))]) if p0 is None: p0 = [p0_ndof, 0, 1 + 2e-11] # try chi2.cdf fit f = lambda x, ndof, loc, scale: stats.chi2.cdf(x, ndof, loc, scale) x, y, ey = n_sigs, CLs, sigmas dof, loc, scale = params = p0 try: with warnings.catch_warnings(): warnings.simplefilter('ignore') dof, loc, scale = params = optimize.curve_fit( f, x, y, p0=p0, sigma=ey)[0] chi2cdf = lambda n: stats.chi2.cdf(n, *params) n_sig_chi2cdf = optimize.brentq( lambda n: chi2cdf(n) - beta, 0, min(1e4, np.max(n_sigs))) except: n_sig_chi2cdf = np.inf # try spline fit #spline = interpolate.UnivariateSpline(n_sigs, CLs, w=1/sigmas) spline = interpolate.PchipInterpolator(n_sigs, CLs) try: n_sig_spline = optimize.brentq( lambda n: spline(n) - beta, np.min(n_sigs), min(1e4, np.max(n_sigs))) except: n_sig_spline = np.inf # choose chi2.cdf if agreement within 50%, otherwise fall back to spline if np.isinf(n_sig_chi2cdf) and np.isinf(n_sig_spline): n_sig = np.inf elif np.isinf(n_sig_chi2cdf): n_sig = n_sig_spline elif np.isinf(n_sig_spline): n_sig = n_sig_chi2cdf else: relative_difference = np.abs(n_sig_chi2cdf - n_sig_spline) \ / (.5 * (n_sig_chi2cdf + n_sig_spline)) n_sig = n_sig_chi2cdf if relative_difference < .5 else n_sig_spline return dict( n_sig=n_sig, n_sig_chi2cdf=n_sig_chi2cdf, n_sig_spline=n_sig_spline, dof=dof, loc=loc, scale=scale, params=params, spline=spline, n_sigs=n_sigs, CLs=CLs, sigmas=sigmas, ts_threshold=ts, ts_from_fit=bg_fit, ts_nsigma=n_sigma, ts_beta=beta, ) def get_n_sig_bootstraps(beta, tss, ts, seeds, queue=None): def get_tss_bootstrap(seed): random = np.random.RandomState(seed) out = {n: random.choice(tss[n], len(tss[n])) for n in tss} return out n_sigs = np.array([get_n_sig_batches(0, beta, get_tss_bootstrap(seed), ts=ts)['n_sig'] for seed in seeds]) if queue is None: return n_sigs else: queue.put(n_sigs)