# dists.py
"""
Test statistic distributions models.
"""
try:
from itertools import izip
except ImportError:
izip = zip
import numpy as np
from scipy import stats
import sys
import histlite as hl
try:
xrange
except NameError:
xrange = range
from . import utils, bk
[docs]
class TSD(object):
"""
Trials-based representation of a test statistic distribution.
Several methods are in the spirit of scipy.stats distribution objects, but
note that at least currently, the interface is not a perfect match.
"""
[docs]
def __init__(self, values, n_zero=0, **kw):
"""
Construct a Chi2TSD.
Args:
values (array of float or :class:`utis.Arrays`):the test statistic values
n_zero (int): number of trials not contained by ``values`` for which
TS=0
kw (mapping): additional arguments for ``scipy.stats.chi2.fit``. By
default, if ``loc`` and ``floc`` are not set, then ``floc=0`` will
be used; if ``scale`` and ``fscale`` are not set, then ``fscale=1``
will be used.
"""
if isinstance(values, utils.Arrays):
self.trials = values
values = values.ts
elif isinstance(values, TSD):
self.trials = values.trials
values = self.trials.ts
else:
self.trials = utils.Arrays(ts=values)
values = np.sort(values)
mask = values != 0
self.values = values[mask]
self.n_nonzero = np.sum(mask)
self.n_positive = np.sum(values > 0)
self.n_total = len(values) + n_zero
self.n_zero = self.n_total - self.n_nonzero
self.eta = 1. * self.n_positive / self.n_total
self.f_zero = 1 - self.eta
self.kw = kw
def __len__(self):
return self.n_total
def __repr__(self):
return 'TSD({} trials, eta={:.3f}, median={:.3f})'.format(
self.n_total, self.eta, float(self.median()))
@property
def description(self):
main = ['Chi2TSD from {} trials:'.format(self.n_total)]
main += [' eta = {:.3f}'.format(self.eta)]
main += [' ndof = {:.3f}'.format(self.ndof)]
main += [' loc = {:.3f}'.format(self.loc)]
main += [' scale = {:.3f}'.format(self.scale)]
main = '\n '.join(main)
direct = ['Thresholds from trials:']
direct += [' median = {:.3f}'.format(self.median())]
for nsigma in xrange(1, 6):
direct += [' {} sigma = {:.2f}'.format(nsigma, self.isf_nsigma(nsigma))]
direct = '\n '.join(direct)
return '\n'.join([main, direct])
def __add__(self, other):
"""
Obtain a new Chi2TSD by using all trials in both ``self`` and ``other``.
"""
if self.trials.keys() == other.trials.keys():
# TODO: how does this approach deal with n_zero passed aside from trials?
return TSD(self.trials + other.trials)
else:
return TSD(np.r_[self.values, other.values], self.n_zero + other.n_zero)
[docs]
def sf(self, x, fit=False):
"""
The survival function, similar to ``scipy.stats`` distributions.
"""
if fit:
raise ValueError('fit must be false')
i = self.n_nonzero - np.searchsorted(self.values, x)
return 1. * i / self.n_total
[docs]
def sf_nsigma(self, x, **kw):
"""
The survival function, but returning a value in terms of number of sigmas.
"""
return stats.norm.isf(self.sf(x, **kw))
[docs]
def cdf(self, x):
"""
The cumulative distribution function, similar to
``scipy.stats`` distributions.
"""
i = np.searchsorted(self.values, x)
return self.f_zero + 1. * i / self.n_total
[docs]
def isf(self, p, fit=False):
"""
The inverse survival function, similar to ``scipy.stats``
distributions.
"""
# TODO: better error if n_nonzero == 0
if fit:
raise ValueError('fit must be false')
i = self.n_nonzero - (p * self.n_total)
if isinstance(i, np.ndarray):
i = i.astype(int)
else:
i = int(i)
ii = np.maximum(0, np.minimum(i, self.n_nonzero - 1))
return np.where(i < self.n_nonzero,
np.where(i < 0, 0, self.values[ii]),
np.inf)
def isf_nsigma(self, nsigma, **kw):
return self.isf(stats.norm.sf(nsigma), **kw)
[docs]
def get_hist(self, **kw):
"""
Get a :class:`histlite.Hist` representation of the distribution.
"""
kw.setdefault('range', (min(self.values.min(), 0), self.values[-1]))
h = hl.hist(self.values, **kw)
h.values[h.index(0)] += self.n_zero
return h
[docs]
class Chi2TSD(TSD):
""":math:`\chi^2`-fit representation of a test statistic distribution.
Information about trials is retained, and a chi2 fit is applied to the
region TS > 0.
"""
[docs]
def __init__(self, values, n_zero=0, threshold=0, **kw):
self.sup.__init__(values, n_zero=n_zero, **kw)
self.kw.setdefault('loc', 0)
try:
self.ndof, self.loc, self.scale = self.params = \
stats.chi2.fit(self.values[self.values > threshold], **self.kw)
except:
self.ndof, self.loc, self.scale = self.params = (1, 0, 1)
sys.stderr.write('warning: Chi2TSD() could not find good chi2 fit\n')
@property
def sup(self):
return super(Chi2TSD, self)
def __repr__(self):
return 'Chi2TSD({} trials, eta={:.3f}, ' \
'ndof={:.3f}, median={:.3f} (from fit {:.3f}))'.format(
self.n_total, self.eta, self.ndof,
float(self.median()), float(self.median(True)))
@property
def description(self):
base = self.sup.description
fit = ['Thresholds from fit:']
fit += [' median = {:.3f}'.format(self.median(fit=True))]
for nsigma in xrange(1, 6):
fit += [' {} sigma = {:.2f}'.format(nsigma, self.isf_nsigma(nsigma, fit=True))]
fit = '\n '.join(fit)
return '\n'.join([base, fit])
@property
def chi2 (self):
"""
The fitted ``scipy.stats.chi2`` object.
"""
return stats.chi2 (self.ndof, self.loc, self.scale)
def __add__(self, other):
"""
Obtain a new Chi2TSD by using all trials in both ``self`` and ``other``.
"""
kw = other.kw
if self.trials.keys() == other.trials.keys():
return Chi2TSD(
self.trials + other.trials, **kw)
else:
return Chi2TSD(
np.r_[self.values, other.values], self.n_zero + other.n_zero, **kw)
[docs]
def sf(self, x, fit=True):
"""
The survival function, similar to ``scipy.stats`` distributions.
"""
if fit:
return self.eta * self.chi2.sf(x)
else:
return self.sup.sf(x)
[docs]
def cdf(self, x, fit=True):
"""
The cumulative distribution function, similar to
``scipy.stats`` distributions.
"""
if fit:
return self.f_zero + self.eta * self.chi2.cdf(x)
else:
return self.sup.cdf(x)
[docs]
def isf(self, p, fit=True):
"""
The inverse survival function, similar to ``scipy.stats``
distributions.
"""
if fit:
return np.where(p <= self.eta,
self.chi2.isf(p / self.eta),
0)
else:
return self.sup.isf(p)
def pdf(self, x, fit=True):
if not fit:
raise NotImplemented('Chi2TSD.pdf not implemented for fit=False')
return self.eta * self.chi2.pdf(x)
[docs]
class BinnedTSD(TSD):
"""Histogram-based representation of a test statistic distribution.
This class tries to reproduce the behavior of :class:`TSD` without storing information
about every single trial, thereby reducing RAM and disk space requirements.
"""
[docs]
def __init__(self, values, n_zero=0, **kw):
self.sup.__init__(values, n_zero=n_zero, **kw)
self._keep_trials = self.kw.pop('keep_trials', True)
self._cap_high = self.kw.pop('cap_high', True)
self.kw.setdefault('bins', 10000)
self.kw.setdefault('range', (min(self.values.min(), 0), self.values.max()))
self.h = h = hl.hist(self.values, **self.kw)
self.h._values[h.index(0)] += self.n_zero
self.hn = hn = h.normalize(density=True)
self.kw['bins'] = h.bins[0]
self.kw['range'] = h.range[0]
self.n_bins = h.n_bins[0]
# set up cdf and sf
b = np.r_[-1e10, h.bins[0], 1e10]
v = np.r_[0, h.values, 0]
hwide = hl.Hist([b], v).normalize(integrate=False)
self.h_cdf = h_cdf = hwide.cumsum()
h_cdf.values[[0,-1]] = 0, 1
self.h_sf = 1 - h_cdf
# cap high-TS tail if desired
if self._cap_high:
self.h_cdf.values[-1] = 1 - hwide.values[-2]
self.h_sf.values[-1] = hwide.values[-2]
# save median, drop trials if desired
self._median = self.sup.median()
self._max_value = self.values.max()
if not self._keep_trials:
self.trials = None
self.values = None
@property
def sup(self):
return super(BinnedTSD, self)
def __repr__(self):
return 'BinnedTSD({} trials, eta={:.3f}, median={:.3f} ({} bins))'.format(
self.n_total, self.eta, float(self.median()), self.n_bins)
def __add__(self, other):
"""
Obtain a new Chi2TSD by using all trials in both ``self`` and ``other``.
"""
kw = other.kw
if self.trials._names == other.trials._names:
return Chi2TSD(
self.trials + other.trials, **kw)
else:
return Chi2TSD(
np.r_[self.values, other.values], self.n_zero + other.n_zero, **kw)
[docs]
def sf(self, x, fit=True):
if fit:
return self.h_sf.get_values(x)
else:
return self.sup.sf(x)
[docs]
def cdf(self, x, fit=True):
"""
The cumulative distribution function, similar to
``scipy.stats`` distributions.
"""
if fit:
return self.h_cdf.get_values(x)
else:
return self.sup.cdf(x)
[docs]
def isf(self, p, fit=True):
"""
The inverse survival function, similar to ``scipy.stats``
distributions.
"""
if fit:
h = self.h_sf
i = np.reshape(np.where(h.values > p)[0][-1], np.shape(p))
last_mask = i == h.n_bins[0] - 1
# TODO: needs searchsorted
out = h.bins[0][1:][np.reshape(np.where(h.values > p)[0][-1], np.shape(p))]
return np.where(last_mask, self._max_value, out)
else:
return self.sup.isf(p)
[docs]
def ts_to_p(bg, key, ts, fit=True):
"""Convert TS to p-values, interpolating over the background grid.
Args:
bg (dict): the backgound distributions
key (array of float): the values that indicate where along the background
parameterization each TS value originates from
fit (bool): whether to request ``sf(ts_value, fit=True)``
"""
shape = ts.shape
assert key.shape == shape
bg_keys = np.sort(list(bg.keys()))
test_key = np.clip(key, bg_keys.min(), bg_keys.max())
p = np.ones(shape)
key_bins = np.r_[-np.inf, bg_keys, np.inf]
all_bg = {k: bg[k] for k in bg}
all_bg[-np.inf] = bg[bg_keys[0]]
all_bg[+np.inf] = bg[bg_keys[-1]]
for (key_min, key_max) in izip(key_bins[:-1], key_bins[1:]):
mask = (key_min <= test_key) & (test_key < key_max)
this_ts = ts[mask]
bg1 = all_bg[key_min]
bg2 = all_bg[key_max]
p1 = bg1.sf(this_ts, fit=fit)
p2 = bg2.sf(this_ts, fit=fit)
m = (p2 - p1) / (key_max - key_min)
this_p = m * (test_key[mask] - key_min) + p1
p[mask] = this_p
return p
if len(shape) == 1:
iterkeys = np.unique(test_keys)[::-1]
else:
iterkeys = np.unique(test_keys)
for iterkey in iterkeys:
this_ts = test_ts[test_keys == iterkey]
key1 = np.where(bg_keys <= iterkey)[0][-1]
key2 = np.where(iterkey <= bg_keys)[0][0]
key1 = bg_keys[key1]
key2 = bg_keys[key2]
if key1 == key2:
bg_both = bk.get_best(bg, key1)
this_p = bg_both.sf(this_ts, fit=fit)
else:
bg1 = bk.get_best(bg, key1)
bg2 = bk.get_best(bg, key2)
p1 = bg1.sf(this_ts, fit=fit)
p2 = bg2.sf(this_ts, fit=fit)
m = (p2 - p1) / (key2 - key1)
this_p = m * (iterkey - key1) + p1
p.append(this_p)
return np.concatenate(p).reshape(shape)