# hyp.py
"""Spectral hypothesis models."""
from __future__ import print_function
from abc import ABCMeta, abstractmethod, abstractproperty
try:
from itertools import izip
except ImportError:
izip = zip
import numpy as np
pi = np.pi
[docs]
class Flux(object):
r"""Abstract base class for fluxes.
This class provides basic operations for flux functions of the form dN/dE=f(E), where E is in
GeV and dN/dE is in units 1/GeV/cm2/s. The main value-added from this class is counts <--> flux
calculations, which are performed in terms of total signal acceptance.
Todo:
* Check and then document the exact units of "total signal acceptance".
"""
__metaclass__ = ABCMeta
[docs]
@abstractmethod
def __call__(self, E):
"""Get dN/dE given E.
Args:
E (float): energy in GeV
Returns:
float: dN/dE in units 1/GeV/cm2/s
"""
pass
@abstractproperty
def as_params(self):
"""Convert to str->value fit parameter kwargs."""
pass
@abstractproperty
def energy_range(self):
"""Get the energy range containing nonzero flux, (Emin,Emax), in GeV."""
pass
def _canonical_ns(self, ns):
if isinstance(ns, dict):
return ns['n_sig']
else:
return ns
[docs]
def to_model_norm(self, ns, acc_total):
"""Get the model normalization resulting in ns, given the total acceptance.
Args:
ns (float): number of signal events
acc_total (float): total signal acceptance
Returns:
float: model normalization
Todo:
* Check units of acc_total
This is just a semantically meaningful way of expressing ``ns / acc_total``.
"""
return self._canonical_ns(ns) / acc_total
[docs]
def to_dNdE(self, ns, acc_total, E0=1, unit=1):
r"""Get dN/dE for particular reference energy and units.
Args:
ns (float): number of signal events
acc_total (float): total signal acceptance
E0 (float): reference energy in units ``unit`` * GeV
unit (float): energy unit for ``E0`` as well as in denominator of return value
Returns:
float: dN/dE in units 1/(``unit`` * GeV)/cm2/s
For example, to get dN/dE at 100 TeV in units 1/TeV/cm2/s, use ``to_dNdE(ns, acc_total,
E0=100, unit=1000)``.
Todo:
* Check units of acc_total
"""
return (self._canonical_ns(ns) / acc_total) * self(E0 * unit) * unit
[docs]
def to_E2dNdE(self, ns, acc_total, E0=1, unit=1):
"""Get E0^2 * dN/dE for particular reference energy and units.
Just like :meth:`Flux.to_dNdE`, except multiplied by E0^2.
"""
return self.to_dNdE(ns, acc_total, E0=E0, unit=unit) * (E0 * unit)**2 / unit**2
[docs]
def to_ns(self, flux, acc_total, E0=1, unit=1, use_E2dNdE=True):
"""Get number of signal events corresponding to a particular flux.
Args:
flux (float): dN/dE or E0^2 * dN/dE(see argument ``E2dNdE``)
acc_total (float): total signal acceptance
E0 (float): reference energy in units ``unit`` * GeV
unit (float): energy unit for ``E0`` as well as in denominator of return value
E2dNdE (bool): if true(default), ``flux`` gives E0^2 * dN/dE. otherwise, ``flux`` gives
dN/dE.
Returns:
float: number of signal events
Todo:
* Check units of acc_total
"""
if use_E2dNdE:
return acc_total * flux / self.to_E2dNdE(1, 1, E0=E0, unit=unit)
else:
return acc_total * flux / self.to_dNdE(1, 1, E0=E0, unit=unit)
[docs]
class SplineFluxModel(Flux):
"""Spline flux spectrum. """
[docs]
def __init__(self, norm, psp_table, crit_log_nu_energy_lower, crit_log_nu_energy_upper):
super(SplineFluxModel, self).__init__()
self._psp_table = psp_table
self._norm = norm
self._crit_log_nu_energy_lower = crit_log_nu_energy_lower
self._crit_log_nu_energy_upper = crit_log_nu_energy_upper
self._energy_range = tuple((self._crit_log_nu_energy_lower, self._crit_log_nu_energy_upper))
@property
def psp_table(self):
"""The photospline.SplineTable object that describes
the neutrino flux as function of neutrino energy via
B-spline interpolation.
"""
return self._psp_table
@psp_table.setter
def psp_table(self, t):
self._psp_table = t
@property
def norm(self):
"""The relative flux normalization. norm=1 corresponds
to the nominal model flux.
"""
return self._norm
@norm.setter
def norm(self, v):
try: v = float(v)
except: v = (np.asarray(v)).astype(float)
self.norm = v
@property
def crit_log_nu_energy_lower(self):
"""The lower bound of the support of the spline interpolator"""
return self._crit_log_nu_energy_lower
@crit_log_nu_energy_lower.setter
def crit_log_nu_energy_lower(self, v):
v = float_cast(v, 'Property crit_log_nu_energy_lower must be castable to type float!')
self._crit_log_nu_energy_lower = v
@property
def crit_log_nu_energy_upper(self):
"""The upper bound of the support of the spline interpolator"""
return self._crit_log_nu_energy_upper
@crit_log_nu_energy_upper.setter
def crit_log_nu_energy_upper(self, v):
v = float_cast(v, 'Property crit_log_nu_energy_upper must be castable to type float!')
self._crit_log_nu_energy_upper = v
[docs]
class SeyfertCoreCoronaFlux(SplineFluxModel):
"""Implements the Core-Corona Seyfert Galaxy neutrino flux model of
A. Kheirandish et al., Astrophys.J. 922 (2021) 45 by means of
B-spline interpolation.
Attributes
----------
"""
[docs]
def __init__(self, psp_table, log_xray_lumin, src_dist, norm,
lumin_scale=1.0,
crit_log_energy_flux=-50,
crit_log_nu_energy_lower=2.0, crit_log_nu_energy_upper=7.0):
super(SeyfertCoreCoronaFlux, self).__init__(norm, psp_table,
crit_log_nu_energy_lower,
crit_log_nu_energy_upper)
self._lumin_scale = lumin_scale
self._crit_log_energy_flux = crit_log_energy_flux
self._src_dist = src_dist
self._log_xray_lumin = log_xray_lumin
self._energy_range = tuple((10**crit_log_nu_energy_lower, 10**crit_log_nu_energy_upper))
@property
def energy_range(self):
return self._energy_range
@property
def log_xray_lumin(self):
"""The log10 of the intrinsic source luminosity in 2-10keV x-ray band"""
return self._log_xray_lumin
@log_xray_lumin.setter
def log_xray_lumin(self, v):
try: v = float(v)
except: v = (np.asarray(v)).astype(float)
self._log_xray_lumin = v
@property
def lumin_scale(self):
"""correct normalization of model flux by relative factor"""
return self._lumin_scale
@lumin_scale.setter
def lumin_scale(self, v):
try: v = float(v)
except: v = (np.asarray(v)).astype(float)
self._lumin_scale = v
@property
def src_dist(self):
"""The distance to the source in units of Mpc"""
return self._src_dist
@src_dist.setter
def src_dist(self, v):
v = float_cast(
v, 'Property src_dist must be castable to type float!')
self._src_dist = v
@property
def crit_log_energy_flux(self):
"""defines when the flux is considered to be 0"""
return self._crit_log_energy_flux
@crit_log_energy_flux.setter
def crit_log_energy_flux(self, v):
v = float_cast(
v, 'Property crit_log_energy_flux must be castable to type float!')
self._crit_log_energy_flux = v
@property
def math_function_str(self):
"""We have used hash of this representation in
`skyllh.core.source_hypothesis.get_fluxmodel_to_source_map()`
to map fluxes with KDE PDFs. So far KDEs only depend on `log_xray_lumin`
value and not on `src_dist`.
"""
return (
f'dN/dE = {self.norm} * 10^(log10(f(E)) - 2*log10(E) - '
f' 2*log10(src_dist) with log_xray_lumin={self.log_xray_lumin}'
)
[docs]
def __call__(self, E):
"""The flux value dN/dE at energy E.
Parameters
----------
E : float | 1d numpy.ndarray of float
Evaluation energy [GeV]
Returns
-------
flux : float | 1d ndarray of float
Flux at energy E in units of GeV^-1 cm^-2 s^-1.
"""
#E = np.atleast_1d(E)
log_enu = np.log10(E)
#print("log_enu", log_enu)
log_energy_flux = self.psp_table.evaluate_simple([log_enu])
#print("log_energy_flux", log_energy_flux)
# convert energy flux to particle flux. account for source distance.
flux = 10**(log_energy_flux - 2.0*log_enu - 2.0*np.log10(self.src_dist))
# need take care of very small fluxes (set to 0 beyond critical energy)
# or below critical flux
if isinstance(log_energy_flux, np.ndarray):
out_of_bounds1 = log_energy_flux < self.crit_log_energy_flux
out_of_bounds2 = np.logical_or(log_enu < self.crit_log_nu_energy_lower,
log_enu > self.crit_log_nu_energy_upper)
flux[np.logical_or(out_of_bounds1, out_of_bounds2)] = 0
return self.norm * self.lumin_scale * flux
else:
if (log_energy_flux < self._crit_log_energy_flux) or \
(log_enu < self._crit_log_nu_energy_lower) or \
(log_enu > self._crit_log_nu_energy_upper):
return 0
else:
return self.norm * self.lumin_scale * flux
def __deepcopy__(self, memo):
"""The photospline.SplineTable objects are strictly immutable.
Hence no copy should be required, ever!
"""
return SeyfertCoreCoronaFlux(
self.psp_table, self.log_xray_lumin, self.src_dist, self.norm,
self.lumin_scale, self.crit_log_energy_flux,
self.crit_log_nu_energy_lower, self.crit_log_nu_energy_upper
)
@property
def as_params(self):
return dict(psp_table=self.psp_table, log_xray_lumin=self.log_xray_lumin, src_dist=self.src_dist)
[docs]
class PowerLawFlux(Flux):
"""Power law spectrum. """
[docs]
def __init__(self, gamma, norm=1, energy_range=(0, np.inf), energy_cutoff=np.inf):
"""
Args:
gamma (float): spectral index
norm (float): normalization factor(at 1 GeV)
energy_range (tuple of (float, float)): hard energy bounds(flux is zero elsewhere)
energy_cutoff (float): exponential cutoff energy(in GeV)
"""
self.gamma, self.norm = gamma, norm
self._energy_range, self.energy_cutoff = tuple(energy_range), energy_cutoff
@property
def _tuple(self):
return self.gamma, self.norm, self.energy_range, self.energy_cutoff
def __hash__(self):
return hash(self._tuple)
def __eq__(self, other):
return self._tuple == other._tuple
[docs]
def __call__(self, E):
out = np.where(
(self._energy_range[0] <= E) & (E <= self._energy_range[-1]),
self.norm * E**-self.gamma, 0)
if self.energy_cutoff < np.inf:
out *= np.exp(-E / self.energy_cutoff)
return out
@property
def as_params(self):
return dict(gamma=self.gamma, energy_cutoff=self.energy_cutoff)
@property
def energy_range(self):
return self._energy_range
def __repr__(self):
params = ['gamma={}'.format(self.gamma)]
if self.norm != 1:
params.append('norm={}'.format(self.norm))
if self.energy_range != (0, np.inf):
params.append('energy_range={}'.format(self.energy_range))
if self.energy_cutoff != np.inf:
params.append('energy_cutoff={}'.format(self.energy_cutoff))
return 'PowerLawFlux(' + ','.join(params) + ')'
[docs]
class BinnedFlux(Flux):
"""Energy-binned spectrum."""
[docs]
def __init__(self, bins_energy, flux, max_energy_approx_gamma=1e6):
"""Construct a BinnedFlux.
Args:
bins_energy (ndarray of float): energy bin edges(up to *or* including
Emax of highest energy bin)
flux (ndarray of float): average dN/dE, in 1/GeV/cm2/s, for each energy bin
max_energy_approx_gamma (float): maximum energy(in GeV) to consider when computing
approximate average spectral index(used by :meth:BinnedFlux.as_params)
"""
if len(flux) == len(bins_energy) - 1:
flux = np.r_[flux, 0]
self.bins_energy, self.flux = bins_energy, flux
self._energy_range = tuple(bins_energy[[0, -1]])
self.max_energy_approx_gamma = max_energy_approx_gamma
# calculate approximate power law
gamma_mask = ((bins_energy < max_energy_approx_gamma) & (flux > 0))
dlogflux = np.diff(np.log(flux[gamma_mask]))
dlogenergy = np.diff(np.log(bins_energy[gamma_mask]))
self.gamma = -np.mean((dlogflux / dlogenergy))
@property
def _tuple(self):
return tuple(self.bins_energy), tuple(self.flux), self.max_energy_approx_gamma
def __hash__(self):
return hash(self._tuple)
def __eq__(self, other):
return self._tuple == other._tuple
[docs]
def __call__(self, E):
bins_energy, flux = self.bins_energy, self.flux
Emin, Emax = bins_energy[[0, -1]]
single = np.isscalar(E)
E = np.atleast_1d(E)
out = np.empty_like(E)
mask = (Emin <= E) & (E <= Emax)
i_energy = np.searchsorted(bins_energy, E, side='right') - 1
out[mask] = flux[i_energy[mask]]
out[~mask]= 0
if np.isscalar(E):
out = out[0]
return out
@property
def as_params(self):
return dict(gamma=self.gamma)
@property
def energy_range(self):
return self._energy_range