Source code for csky.hyp

# hyp.py
"""Spectral hypothesis models."""

from __future__ import print_function

from abc import ABCMeta, abstractmethod, abstractproperty
try:
    from itertools import izip
except ImportError:
    izip = zip

import numpy as np

pi = np.pi

[docs] class Flux(object): r"""Abstract base class for fluxes. This class provides basic operations for flux functions of the form dN/dE=f(E), where E is in GeV and dN/dE is in units 1/GeV/cm2/s. The main value-added from this class is counts <--> flux calculations, which are performed in terms of total signal acceptance. Todo: * Check and then document the exact units of "total signal acceptance". """ __metaclass__ = ABCMeta
[docs] @abstractmethod def __call__(self, E): """Get dN/dE given E. Args: E (float): energy in GeV Returns: float: dN/dE in units 1/GeV/cm2/s """ pass
@abstractproperty def as_params(self): """Convert to str->value fit parameter kwargs.""" pass @abstractproperty def energy_range(self): """Get the energy range containing nonzero flux, (Emin,Emax), in GeV.""" pass def _canonical_ns(self, ns): if isinstance(ns, dict): return ns['n_sig'] else: return ns
[docs] def to_model_norm(self, ns, acc_total): """Get the model normalization resulting in ns, given the total acceptance. Args: ns (float): number of signal events acc_total (float): total signal acceptance Returns: float: model normalization Todo: * Check units of acc_total This is just a semantically meaningful way of expressing ``ns / acc_total``. """ return self._canonical_ns(ns) / acc_total
[docs] def to_dNdE(self, ns, acc_total, E0=1, unit=1): r"""Get dN/dE for particular reference energy and units. Args: ns (float): number of signal events acc_total (float): total signal acceptance E0 (float): reference energy in units ``unit`` * GeV unit (float): energy unit for ``E0`` as well as in denominator of return value Returns: float: dN/dE in units 1/(``unit`` * GeV)/cm2/s For example, to get dN/dE at 100 TeV in units 1/TeV/cm2/s, use ``to_dNdE(ns, acc_total, E0=100, unit=1000)``. Todo: * Check units of acc_total """ return (self._canonical_ns(ns) / acc_total) * self(E0 * unit) * unit
[docs] def to_E2dNdE(self, ns, acc_total, E0=1, unit=1): """Get E0^2 * dN/dE for particular reference energy and units. Just like :meth:`Flux.to_dNdE`, except multiplied by E0^2. """ return self.to_dNdE(ns, acc_total, E0=E0, unit=unit) * (E0 * unit)**2 / unit**2
[docs] def to_ns(self, flux, acc_total, E0=1, unit=1, use_E2dNdE=True): """Get number of signal events corresponding to a particular flux. Args: flux (float): dN/dE or E0^2 * dN/dE(see argument ``E2dNdE``) acc_total (float): total signal acceptance E0 (float): reference energy in units ``unit`` * GeV unit (float): energy unit for ``E0`` as well as in denominator of return value E2dNdE (bool): if true(default), ``flux`` gives E0^2 * dN/dE. otherwise, ``flux`` gives dN/dE. Returns: float: number of signal events Todo: * Check units of acc_total """ if use_E2dNdE: return acc_total * flux / self.to_E2dNdE(1, 1, E0=E0, unit=unit) else: return acc_total * flux / self.to_dNdE(1, 1, E0=E0, unit=unit)
[docs] class SplineFluxModel(Flux): """Spline flux spectrum. """
[docs] def __init__(self, norm, psp_table, crit_log_nu_energy_lower, crit_log_nu_energy_upper): super(SplineFluxModel, self).__init__() self._psp_table = psp_table self._norm = norm self._crit_log_nu_energy_lower = crit_log_nu_energy_lower self._crit_log_nu_energy_upper = crit_log_nu_energy_upper self._energy_range = tuple((self._crit_log_nu_energy_lower, self._crit_log_nu_energy_upper))
@property def psp_table(self): """The photospline.SplineTable object that describes the neutrino flux as function of neutrino energy via B-spline interpolation. """ return self._psp_table @psp_table.setter def psp_table(self, t): self._psp_table = t @property def norm(self): """The relative flux normalization. norm=1 corresponds to the nominal model flux. """ return self._norm @norm.setter def norm(self, v): try: v = float(v) except: v = (np.asarray(v)).astype(float) self.norm = v @property def crit_log_nu_energy_lower(self): """The lower bound of the support of the spline interpolator""" return self._crit_log_nu_energy_lower @crit_log_nu_energy_lower.setter def crit_log_nu_energy_lower(self, v): v = float_cast(v, 'Property crit_log_nu_energy_lower must be castable to type float!') self._crit_log_nu_energy_lower = v @property def crit_log_nu_energy_upper(self): """The upper bound of the support of the spline interpolator""" return self._crit_log_nu_energy_upper @crit_log_nu_energy_upper.setter def crit_log_nu_energy_upper(self, v): v = float_cast(v, 'Property crit_log_nu_energy_upper must be castable to type float!') self._crit_log_nu_energy_upper = v
[docs] class SeyfertCoreCoronaFlux(SplineFluxModel): """Implements the Core-Corona Seyfert Galaxy neutrino flux model of A. Kheirandish et al., Astrophys.J. 922 (2021) 45 by means of B-spline interpolation. Attributes ---------- """
[docs] def __init__(self, psp_table, log_xray_lumin, src_dist, norm, lumin_scale=1.0, crit_log_energy_flux=-50, crit_log_nu_energy_lower=2.0, crit_log_nu_energy_upper=7.0): super(SeyfertCoreCoronaFlux, self).__init__(norm, psp_table, crit_log_nu_energy_lower, crit_log_nu_energy_upper) self._lumin_scale = lumin_scale self._crit_log_energy_flux = crit_log_energy_flux self._src_dist = src_dist self._log_xray_lumin = log_xray_lumin self._energy_range = tuple((10**crit_log_nu_energy_lower, 10**crit_log_nu_energy_upper))
@property def energy_range(self): return self._energy_range @property def log_xray_lumin(self): """The log10 of the intrinsic source luminosity in 2-10keV x-ray band""" return self._log_xray_lumin @log_xray_lumin.setter def log_xray_lumin(self, v): try: v = float(v) except: v = (np.asarray(v)).astype(float) self._log_xray_lumin = v @property def lumin_scale(self): """correct normalization of model flux by relative factor""" return self._lumin_scale @lumin_scale.setter def lumin_scale(self, v): try: v = float(v) except: v = (np.asarray(v)).astype(float) self._lumin_scale = v @property def src_dist(self): """The distance to the source in units of Mpc""" return self._src_dist @src_dist.setter def src_dist(self, v): v = float_cast( v, 'Property src_dist must be castable to type float!') self._src_dist = v @property def crit_log_energy_flux(self): """defines when the flux is considered to be 0""" return self._crit_log_energy_flux @crit_log_energy_flux.setter def crit_log_energy_flux(self, v): v = float_cast( v, 'Property crit_log_energy_flux must be castable to type float!') self._crit_log_energy_flux = v @property def math_function_str(self): """We have used hash of this representation in `skyllh.core.source_hypothesis.get_fluxmodel_to_source_map()` to map fluxes with KDE PDFs. So far KDEs only depend on `log_xray_lumin` value and not on `src_dist`. """ return ( f'dN/dE = {self.norm} * 10^(log10(f(E)) - 2*log10(E) - ' f' 2*log10(src_dist) with log_xray_lumin={self.log_xray_lumin}' )
[docs] def __call__(self, E): """The flux value dN/dE at energy E. Parameters ---------- E : float | 1d numpy.ndarray of float Evaluation energy [GeV] Returns ------- flux : float | 1d ndarray of float Flux at energy E in units of GeV^-1 cm^-2 s^-1. """ #E = np.atleast_1d(E) log_enu = np.log10(E) #print("log_enu", log_enu) log_energy_flux = self.psp_table.evaluate_simple([log_enu]) #print("log_energy_flux", log_energy_flux) # convert energy flux to particle flux. account for source distance. flux = 10**(log_energy_flux - 2.0*log_enu - 2.0*np.log10(self.src_dist)) # need take care of very small fluxes (set to 0 beyond critical energy) # or below critical flux if isinstance(log_energy_flux, np.ndarray): out_of_bounds1 = log_energy_flux < self.crit_log_energy_flux out_of_bounds2 = np.logical_or(log_enu < self.crit_log_nu_energy_lower, log_enu > self.crit_log_nu_energy_upper) flux[np.logical_or(out_of_bounds1, out_of_bounds2)] = 0 return self.norm * self.lumin_scale * flux else: if (log_energy_flux < self._crit_log_energy_flux) or \ (log_enu < self._crit_log_nu_energy_lower) or \ (log_enu > self._crit_log_nu_energy_upper): return 0 else: return self.norm * self.lumin_scale * flux
def __deepcopy__(self, memo): """The photospline.SplineTable objects are strictly immutable. Hence no copy should be required, ever! """ return SeyfertCoreCoronaFlux( self.psp_table, self.log_xray_lumin, self.src_dist, self.norm, self.lumin_scale, self.crit_log_energy_flux, self.crit_log_nu_energy_lower, self.crit_log_nu_energy_upper ) @property def as_params(self): return dict(psp_table=self.psp_table, log_xray_lumin=self.log_xray_lumin, src_dist=self.src_dist)
[docs] class PowerLawFlux(Flux): """Power law spectrum. """
[docs] def __init__(self, gamma, norm=1, energy_range=(0, np.inf), energy_cutoff=np.inf): """ Args: gamma (float): spectral index norm (float): normalization factor(at 1 GeV) energy_range (tuple of (float, float)): hard energy bounds(flux is zero elsewhere) energy_cutoff (float): exponential cutoff energy(in GeV) """ self.gamma, self.norm = gamma, norm self._energy_range, self.energy_cutoff = tuple(energy_range), energy_cutoff
@property def _tuple(self): return self.gamma, self.norm, self.energy_range, self.energy_cutoff def __hash__(self): return hash(self._tuple) def __eq__(self, other): return self._tuple == other._tuple
[docs] def __call__(self, E): out = np.where( (self._energy_range[0] <= E) & (E <= self._energy_range[-1]), self.norm * E**-self.gamma, 0) if self.energy_cutoff < np.inf: out *= np.exp(-E / self.energy_cutoff) return out
@property def as_params(self): return dict(gamma=self.gamma, energy_cutoff=self.energy_cutoff) @property def energy_range(self): return self._energy_range def __repr__(self): params = ['gamma={}'.format(self.gamma)] if self.norm != 1: params.append('norm={}'.format(self.norm)) if self.energy_range != (0, np.inf): params.append('energy_range={}'.format(self.energy_range)) if self.energy_cutoff != np.inf: params.append('energy_cutoff={}'.format(self.energy_cutoff)) return 'PowerLawFlux(' + ','.join(params) + ')'
[docs] class BinnedFlux(Flux): """Energy-binned spectrum."""
[docs] def __init__(self, bins_energy, flux, max_energy_approx_gamma=1e6): """Construct a BinnedFlux. Args: bins_energy (ndarray of float): energy bin edges(up to *or* including Emax of highest energy bin) flux (ndarray of float): average dN/dE, in 1/GeV/cm2/s, for each energy bin max_energy_approx_gamma (float): maximum energy(in GeV) to consider when computing approximate average spectral index(used by :meth:BinnedFlux.as_params) """ if len(flux) == len(bins_energy) - 1: flux = np.r_[flux, 0] self.bins_energy, self.flux = bins_energy, flux self._energy_range = tuple(bins_energy[[0, -1]]) self.max_energy_approx_gamma = max_energy_approx_gamma # calculate approximate power law gamma_mask = ((bins_energy < max_energy_approx_gamma) & (flux > 0)) dlogflux = np.diff(np.log(flux[gamma_mask])) dlogenergy = np.diff(np.log(bins_energy[gamma_mask])) self.gamma = -np.mean((dlogflux / dlogenergy))
@property def _tuple(self): return tuple(self.bins_energy), tuple(self.flux), self.max_energy_approx_gamma def __hash__(self): return hash(self._tuple) def __eq__(self, other): return self._tuple == other._tuple
[docs] def __call__(self, E): bins_energy, flux = self.bins_energy, self.flux Emin, Emax = bins_energy[[0, -1]] single = np.isscalar(E) E = np.atleast_1d(E) out = np.empty_like(E) mask = (Emin <= E) & (E <= Emax) i_energy = np.searchsorted(bins_energy, E, side='right') - 1 out[mask] = flux[i_energy[mask]] out[~mask]= 0 if np.isscalar(E): out = out[0] return out
@property def as_params(self): return dict(gamma=self.gamma) @property def energy_range(self): return self._energy_range