Source code for csky.dists

# dists.py

"""
Test statistic distributions models.
"""

try:
    from itertools import izip
except ImportError:
    izip = zip

import numpy as np
from scipy import stats
import sys

import histlite as hl

try:
    xrange
except NameError:
    xrange = range

from . import utils, bk


[docs] class TSD(object): """ Trials-based representation of a test statistic distribution. Several methods are in the spirit of scipy.stats distribution objects, but note that at least currently, the interface is not a perfect match. """
[docs] def __init__(self, values, n_zero=0, **kw): """ Construct a Chi2TSD. Args: values (array of float or :class:`utis.Arrays`):the test statistic values n_zero (int): number of trials not contained by ``values`` for which TS=0 kw (mapping): additional arguments for ``scipy.stats.chi2.fit``. By default, if ``loc`` and ``floc`` are not set, then ``floc=0`` will be used; if ``scale`` and ``fscale`` are not set, then ``fscale=1`` will be used. """ if isinstance(values, utils.Arrays): self.trials = values values = values.ts elif isinstance(values, TSD): self.trials = values.trials values = self.trials.ts else: self.trials = utils.Arrays(ts=values) values = np.sort(values) mask = values != 0 self.values = values[mask] self.n_nonzero = np.sum(mask) self.n_positive = np.sum(values > 0) self.n_total = len(values) + n_zero self.n_zero = self.n_total - self.n_nonzero self.eta = 1. * self.n_positive / self.n_total self.f_zero = 1 - self.eta self.kw = kw
def __len__(self): return self.n_total def __repr__(self): return 'TSD({} trials, eta={:.3f}, median={:.3f})'.format( self.n_total, self.eta, float(self.median())) @property def description(self): main = ['Chi2TSD from {} trials:'.format(self.n_total)] main += [' eta = {:.3f}'.format(self.eta)] main += [' ndof = {:.3f}'.format(self.ndof)] main += [' loc = {:.3f}'.format(self.loc)] main += [' scale = {:.3f}'.format(self.scale)] main = '\n '.join(main) direct = ['Thresholds from trials:'] direct += [' median = {:.3f}'.format(self.median())] for nsigma in xrange(1, 6): direct += [' {} sigma = {:.2f}'.format(nsigma, self.isf_nsigma(nsigma))] direct = '\n '.join(direct) return '\n'.join([main, direct]) def __add__(self, other): """ Obtain a new Chi2TSD by using all trials in both ``self`` and ``other``. """ if self.trials.keys() == other.trials.keys(): # TODO: how does this approach deal with n_zero passed aside from trials? return TSD(self.trials + other.trials) else: return TSD(np.r_[self.values, other.values], self.n_zero + other.n_zero)
[docs] def sf(self, x, fit=False): """ The survival function, similar to ``scipy.stats`` distributions. """ if fit: raise ValueError('fit must be false') i = self.n_nonzero - np.searchsorted(self.values, x) return 1. * i / self.n_total
[docs] def sf_nsigma(self, x, **kw): """ The survival function, but returning a value in terms of number of sigmas. """ return stats.norm.isf(self.sf(x, **kw))
[docs] def cdf(self, x): """ The cumulative distribution function, similar to ``scipy.stats`` distributions. """ i = np.searchsorted(self.values, x) return self.f_zero + 1. * i / self.n_total
[docs] def isf(self, p, fit=False): """ The inverse survival function, similar to ``scipy.stats`` distributions. """ # TODO: better error if n_nonzero == 0 if fit: raise ValueError('fit must be false') i = self.n_nonzero - (p * self.n_total) if isinstance(i, np.ndarray): i = i.astype(int) else: i = int(i) ii = np.maximum(0, np.minimum(i, self.n_nonzero - 1)) return np.where(i < self.n_nonzero, np.where(i < 0, 0, self.values[ii]), np.inf)
def isf_nsigma(self, nsigma, **kw): return self.isf(stats.norm.sf(nsigma), **kw)
[docs] def median(self): """ The median TS value. """ eps = 1e-10 return float(TSD.isf(self, .5 + eps))
[docs] def get_hist(self, **kw): """ Get a :class:`histlite.Hist` representation of the distribution. """ kw.setdefault('range', (min(self.values.min(), 0), self.values[-1])) h = hl.hist(self.values, **kw) h.values[h.index(0)] += self.n_zero return h
[docs] class Chi2TSD(TSD): """:math:`\chi^2`-fit representation of a test statistic distribution. Information about trials is retained, and a chi2 fit is applied to the region TS > 0. """
[docs] def __init__(self, values, n_zero=0, threshold=0, **kw): self.sup.__init__(values, n_zero=n_zero, **kw) self.kw.setdefault('loc', 0) try: self.ndof, self.loc, self.scale = self.params = \ stats.chi2.fit(self.values[self.values > threshold], **self.kw) except: self.ndof, self.loc, self.scale = self.params = (1, 0, 1) sys.stderr.write('warning: Chi2TSD() could not find good chi2 fit\n')
@property def sup(self): return super(Chi2TSD, self) def __repr__(self): return 'Chi2TSD({} trials, eta={:.3f}, ' \ 'ndof={:.3f}, median={:.3f} (from fit {:.3f}))'.format( self.n_total, self.eta, self.ndof, float(self.median()), float(self.median(True))) @property def description(self): base = self.sup.description fit = ['Thresholds from fit:'] fit += [' median = {:.3f}'.format(self.median(fit=True))] for nsigma in xrange(1, 6): fit += [' {} sigma = {:.2f}'.format(nsigma, self.isf_nsigma(nsigma, fit=True))] fit = '\n '.join(fit) return '\n'.join([base, fit]) @property def chi2 (self): """ The fitted ``scipy.stats.chi2`` object. """ return stats.chi2 (self.ndof, self.loc, self.scale) def __add__(self, other): """ Obtain a new Chi2TSD by using all trials in both ``self`` and ``other``. """ kw = other.kw if self.trials.keys() == other.trials.keys(): return Chi2TSD( self.trials + other.trials, **kw) else: return Chi2TSD( np.r_[self.values, other.values], self.n_zero + other.n_zero, **kw)
[docs] def sf(self, x, fit=True): """ The survival function, similar to ``scipy.stats`` distributions. """ if fit: return self.eta * self.chi2.sf(x) else: return self.sup.sf(x)
[docs] def cdf(self, x, fit=True): """ The cumulative distribution function, similar to ``scipy.stats`` distributions. """ if fit: return self.f_zero + self.eta * self.chi2.cdf(x) else: return self.sup.cdf(x)
[docs] def isf(self, p, fit=True): """ The inverse survival function, similar to ``scipy.stats`` distributions. """ if fit: return np.where(p <= self.eta, self.chi2.isf(p / self.eta), 0) else: return self.sup.isf(p)
def pdf(self, x, fit=True): if not fit: raise NotImplemented('Chi2TSD.pdf not implemented for fit=False') return self.eta * self.chi2.pdf(x)
[docs] def median(self, fit=False): """ The median TS value. """ if fit: return float(self.isf(.5, fit=True)) else: return self.sup.median()
[docs] class BinnedTSD(TSD): """Histogram-based representation of a test statistic distribution. This class tries to reproduce the behavior of :class:`TSD` without storing information about every single trial, thereby reducing RAM and disk space requirements. """
[docs] def __init__(self, values, n_zero=0, **kw): self.sup.__init__(values, n_zero=n_zero, **kw) self._keep_trials = self.kw.pop('keep_trials', True) self._cap_high = self.kw.pop('cap_high', True) self.kw.setdefault('bins', 10000) self.kw.setdefault('range', (min(self.values.min(), 0), self.values.max())) self.h = h = hl.hist(self.values, **self.kw) self.h._values[h.index(0)] += self.n_zero self.hn = hn = h.normalize(density=True) self.kw['bins'] = h.bins[0] self.kw['range'] = h.range[0] self.n_bins = h.n_bins[0] # set up cdf and sf b = np.r_[-1e10, h.bins[0], 1e10] v = np.r_[0, h.values, 0] hwide = hl.Hist([b], v).normalize(integrate=False) self.h_cdf = h_cdf = hwide.cumsum() h_cdf.values[[0,-1]] = 0, 1 self.h_sf = 1 - h_cdf # cap high-TS tail if desired if self._cap_high: self.h_cdf.values[-1] = 1 - hwide.values[-2] self.h_sf.values[-1] = hwide.values[-2] # save median, drop trials if desired self._median = self.sup.median() self._max_value = self.values.max() if not self._keep_trials: self.trials = None self.values = None
@property def sup(self): return super(BinnedTSD, self) def __repr__(self): return 'BinnedTSD({} trials, eta={:.3f}, median={:.3f} ({} bins))'.format( self.n_total, self.eta, float(self.median()), self.n_bins) def __add__(self, other): """ Obtain a new Chi2TSD by using all trials in both ``self`` and ``other``. """ kw = other.kw if self.trials._names == other.trials._names: return Chi2TSD( self.trials + other.trials, **kw) else: return Chi2TSD( np.r_[self.values, other.values], self.n_zero + other.n_zero, **kw)
[docs] def median(self): return self._median
[docs] def sf(self, x, fit=True): if fit: return self.h_sf.get_values(x) else: return self.sup.sf(x)
[docs] def cdf(self, x, fit=True): """ The cumulative distribution function, similar to ``scipy.stats`` distributions. """ if fit: return self.h_cdf.get_values(x) else: return self.sup.cdf(x)
[docs] def isf(self, p, fit=True): """ The inverse survival function, similar to ``scipy.stats`` distributions. """ if fit: h = self.h_sf i = np.reshape(np.where(h.values > p)[0][-1], np.shape(p)) last_mask = i == h.n_bins[0] - 1 # TODO: needs searchsorted out = h.bins[0][1:][np.reshape(np.where(h.values > p)[0][-1], np.shape(p))] return np.where(last_mask, self._max_value, out) else: return self.sup.isf(p)
[docs] def ts_to_p(bg, key, ts, fit=True): """Convert TS to p-values, interpolating over the background grid. Args: bg (dict): the backgound distributions key (array of float): the values that indicate where along the background parameterization each TS value originates from fit (bool): whether to request ``sf(ts_value, fit=True)`` """ shape = ts.shape assert key.shape == shape bg_keys = np.sort(list(bg.keys())) test_key = np.clip(key, bg_keys.min(), bg_keys.max()) p = np.ones(shape) key_bins = np.r_[-np.inf, bg_keys, np.inf] all_bg = {k: bg[k] for k in bg} all_bg[-np.inf] = bg[bg_keys[0]] all_bg[+np.inf] = bg[bg_keys[-1]] for (key_min, key_max) in izip(key_bins[:-1], key_bins[1:]): mask = (key_min <= test_key) & (test_key < key_max) this_ts = ts[mask] bg1 = all_bg[key_min] bg2 = all_bg[key_max] p1 = bg1.sf(this_ts, fit=fit) p2 = bg2.sf(this_ts, fit=fit) m = (p2 - p1) / (key_max - key_min) this_p = m * (test_key[mask] - key_min) + p1 p[mask] = this_p return p if len(shape) == 1: iterkeys = np.unique(test_keys)[::-1] else: iterkeys = np.unique(test_keys) for iterkey in iterkeys: this_ts = test_ts[test_keys == iterkey] key1 = np.where(bg_keys <= iterkey)[0][-1] key2 = np.where(iterkey <= bg_keys)[0][0] key1 = bg_keys[key1] key2 = bg_keys[key2] if key1 == key2: bg_both = bk.get_best(bg, key1) this_p = bg_both.sf(this_ts, fit=fit) else: bg1 = bk.get_best(bg, key1) bg2 = bk.get_best(bg, key2) p1 = bg1.sf(this_ts, fit=fit) p2 = bg2.sf(this_ts, fit=fit) m = (p2 - p1) / (key2 - key1) this_p = m * (iterkey - key1) + p1 p.append(this_p) return np.concatenate(p).reshape(shape)