""" Class for managing likelihood function evaluation.
"""
__all__ = ["Likelihood"]
from xpsi import _rank
from xpsi.utils import make_verbose
from xpsi.global_imports import *
from xpsi.Star import Star
from xpsi.Signal import Signal, LikelihoodError, construct_energy_array
from xpsi.Prior import Prior
from xpsi.ParameterSubspace import ParameterSubspace
from xpsi import HotRegion
from xpsi import Elsewhere
[docs]class Likelihood(ParameterSubspace):
""" A container for all objects related to likelihood evaluation.
A collective for objects pertaining to a statistical analysis of
X-ray signals. These objects include X-ray data (sub)sets, the model
instruments used to acquire these data sets, and the model star and
model backgrounds.
:param obj star:
An instance of :class:`~.Star.Star`. This instance is the model star.
:param list signals:
Either:
* a single :class:`~.Signal.Signal` instance
* a list of :class:`~.Signal.Signal` instances
* a list of lists of :class:`~.Signal.Signal` instances
:param float num_energies:
Number of energies to compute specific photon flux signals at for
likelihood evaluation. These energies will be distributed linearly
in logarithmic space within the union of waveband coverages
achieved by some set of instruments. Gaps in waveband coverage will
be skipped.
:param float fast_rel_num_energies:
Fraction of the normal number of energies to use in *fast* mode.
:param int threads:
The number of ``OpenMP`` threads to spawn for integration. The default
number of threads used by low-level integration routines is ``1``. The
number can be increased if the parallelisation paradigm on a cluster
is changed such that, e.g., per node there is one thread per CPU,
instead of one ``OpenMPI`` process. It is recommended that
:obj:`threads` is ``1``; more threads are useful when performing
integrals at high resolution, but not necessarily when integrating
*many* times as when sampling, because the MPI parallelisation
paradigm is invoked.
:param float llzero:
The minimum log-likelihood setting for MultiNest. Points whose
log-likelihood is lower than this value are ignored, which is useful
for defining complicated priors.
:param bool externally_updated:
Update the parameter values upon call to the likelihood object?
If so, then pass ``False``. A parameter vector then needs to be passed
to the likelihood object. Otherwise, safely assume that the
new parameter values are set externally, e.g., in a prior object
when inverse sampling the prior for nested sampling.
:param obj prior:
Instance of subclass of :class:`~.Prior.Prior`.
:param float max_energy:
Optional maximum of energy set for signal computation. If no maximum
is requested (the default), then the maximum is equal to the maximum
energy from the loaded instrument response models.
"""
def __init__(self, star, signals,
num_energies = 128,
fast_rel_num_energies = 0.25,
threads = 1, llzero = -1.0e90,
externally_updated = False,
prior = None,
max_energy = None):
self.star = star
self.signals = signals
self._do_fast = False
self._num_energies = num_energies
self._fast_rel_num_energies = fast_rel_num_energies
for photosphere, signals in zip(star.photospheres, self._signals):
try:
for signal in signals:
assert photosphere.prefix == signal.photosphere, \
'Each signal subspace must have a photosphere \
attribute that matches the identification prefix \
of a photosphere object, by convention, and the order \
of the list of signal-object lists must match the \
order of the list of photosphere objects.'
except AttributeError:
pass # quietly assume one photosphere object
energies = construct_energy_array(num_energies,
list(signals),
max_energy)
num = int( fast_rel_num_energies * num_energies )
fast_energies = construct_energy_array(num,
list(signals),
max_energy)
for signal in signals:
signal.energies = energies
signal.phases = photosphere.hot.phases_in_cycles
if photosphere.hot.do_fast:
signal.fast_energies = fast_energies
signal.fast_phases = photosphere.hot.fast_phases_in_cycles
self._do_fast = True
self.threads = threads
self.llzero = llzero
self.externally_updated = externally_updated
if prior is not None:
self.prior = prior
# merge subspaces
super(Likelihood, self).__init__(self._star, *(self._signals + [prior]))
@property
def threads(self):
""" Get the number of threads spawned for integration. """
return self._threads
@threads.setter
def threads(self, n):
""" Set the number of threads spawned for integration. """
try:
self._threads = int(n)
except TypeError:
raise TypeError('Thread number must be an integer.')
@property
def do_fast(self):
""" Does fast mode need to be activated? """
return self._do_fast
@property
def star(self):
""" Get the instance of :class:`~.Star.Star`. """
return self._star
@star.setter
def star(self, obj):
# only one star object for this version
if not isinstance(obj, Star):
raise TypeError('Invalid type for a Star object.')
else:
self._star = obj
@property
def signals(self):
""" Get the list of :class:`~.Signal.Signal` instances. """
if len(self._signals) == 1:
if len(self._signals[0]) == 1:
return self._signals[0][0]
return self._signals
@signals.setter
def signals(self, obj):
# infer how user supplied the signal objects
if isinstance(obj, Signal):
self._signals = [[obj,],]
elif isinstance(obj, list):
if all(isinstance(o, list) for o in obj):
for l in obj:
if not all(isinstance(o, Signal) for o in l):
raise TypeError('Invalid type for a signal object.')
self._signals = obj
elif all(isinstance(o, Signal) for o in obj):
self._signals = [obj,]
else:
raise TypeError('Invalid type for a signal object.')
@property
def signal(self):
""" Get the sole signal instance or throw exception. """
if len(self._signals) > 1 or len(self._signals[0]) > 1:
raise ValueError('There is more than one signal instance.')
return self._signals[0][0]
@property
def prior(self):
return self._prior
@prior.setter
def prior(self, obj):
""" Set the prior object if useful, e.g., in nested sampling.
:param prior:
A callable instance of :class:`~.Prior.Prior` which is
useful for nested sampling if inverse prior sampling
is not possible or not straightforward. If not ``None``
(default), the prior is called. If the point lies
exterior to the prior hypervolume, ensure the callable
returns minus :obj:`numpy.inf`. The purpose is to mix the
likelihood function and the prior for definition of
the prior support. Inverse sampling is
still performed between hard parameter bounds, or on
any prior factors not handled in the call method. If the
(conditional) prior to be evaluated is not uniform,
return the logarithm of the prior density, which is
combined with the likelihood evaluation. The likelihood
is not evaluated if the log-prior is infinite.
"""
if not isinstance(obj, Prior):
raise TypeError('The prior object is not an instance of the '
'Prior class.')
self._prior = obj
try:
self.merge(obj)
except TypeError:
pass # assume no prior parameters
self._prior.parameters = self # a reference to the parameter container
@prior.deleter
def prior(self):
""" Delete the reference to the prior. """
try:
del self._prior
except AttributeError:
pass # nothing to be done
@property
def llzero(self):
""" Get the minimum log-likelihood setting passed to MultiNest. """
return self._llzero
@llzero.setter
def llzero(self, value):
""" Set the minimum log-likelihood that is viewed as zero. """
try:
self._llzero = float(value)
except TypeError:
raise TypeError('Zero log-likelihood number must be a float.')
@property
def random_near_llzero(self):
""" Get the minimum log-likelihood scaled randomly by up to an
order of magnitude. """
return self._llzero * (0.1 + 0.9*_np.random.rand(1))
@property
def less_than_llzero(self):
""" Get a number less than the minimum log-likelihood threshold. """
return 1.1 * self._llzero
@property
def externally_updated(self):
""" Safely assume parameters are already updated upon call to self? """
return self._externally_updated
@externally_updated.setter
def externally_updated(self, updated):
if not isinstance(updated, bool):
raise TypeError('Provide a boolean to define update protocol.')
self._externally_updated = updated
@staticmethod
def _divide(obj, x):
""" Helper operator to check for compatibility first.
As an example, if fast mode is activated for some hot region but not
another, :obj:`obj` would be ``None`` and thus a safeguard is needed.
"""
if isinstance(obj, _np.ndarray):
return obj / x
else:
return None
def _driver(self, fast_mode=False, synthesise=False, force_update=False, **kwargs):
""" Main likelihood evaluation driver routine. """
self._star.activate_fast_mode(fast_mode)
star_updated = False
if self._star.needs_update or force_update: # ignore fast parameters in this version
try:
if fast_mode or not self._do_fast:
fast_total_counts = None
else:
fast_total_counts = tuple(signal.fast_total_counts for\
signal in self._signals)
self._star.update(fast_total_counts, self.threads,force_update=force_update)
except xpsiError as e:
if isinstance(e, HotRegion.RayError):
print('Warning: HotRegion.RayError raised.')
elif isinstance(e, Elsewhere.RayError):
print('Warning: Elsewhere.RayError raised.')
return self.random_near_llzero
for photosphere, signals in zip(self._star.photospheres, self._signals):
try:
if fast_mode:
energies = signals[0].fast_energies
else:
energies = signals[0].energies
photosphere.integrate(energies, self.threads)
except xpsiError as e:
try:
prefix = ' prefix ' + photosphere.prefix
except AttributeError:
prefix = ''
if isinstance(e, HotRegion.PulseError):
print('Warning: HotRegion.PulseError raised for '
'photosphere%s.' % prefix)
elif isinstance(e, Elsewhere.IntegrationError):
print('Warning: Elsewhere.IntegrationError for '
'photosphere%s.' % prefix)
elif isinstance(e, HotRegion.AtmosError):
raise
elif isinstance(e, Elsewhere.AtmosError):
raise
print('Parameter vector: ', super(Likelihood,self).__call__())
return self.random_near_llzero
star_updated = True
# register the signals by operating with the instrument response
for signals, photosphere in zip(self._signals, self._star.photospheres):
for signal in signals:
if star_updated or signal.needs_update:
if signal.isI:
signal.register(tuple(
tuple(self._divide(component,
self._star.spacetime.d_sq)
for component in hot_region)
for hot_region in photosphere.signal),
fast_mode=fast_mode, threads=self.threads)
elif signal.isQ:
signal.register(tuple(
tuple(self._divide(component,
self._star.spacetime.d_sq)
for component in hot_region)
for hot_region in photosphere.signalQ),
fast_mode=fast_mode, threads=self.threads)
elif signal.isU:
signal.register(tuple(
tuple(self._divide(component,
self._star.spacetime.d_sq)
for component in hot_region)
for hot_region in photosphere.signalU),
fast_mode=fast_mode, threads=self.threads)
elif signal.isQn:
signal.register(tuple(
tuple(self._divide(component,
self._star.spacetime.d_sq)
for component in hot_region)
for hot_region in photosphere.signalQ),
fast_mode=fast_mode, threads=self.threads)
Qsignal = signal.signals
signal.register(tuple(
tuple(self._divide(component,
self._star.spacetime.d_sq)
for component in hot_region)
for hot_region in photosphere.signal),
fast_mode=fast_mode, threads=self.threads)
Isignal = signal.signals
for ihot in range(len(photosphere.signalQ)):
signal._signals[ihot]=_np.where(Isignal[ihot]==0.0, 0.0, Qsignal[ihot]/Isignal[ihot])
elif signal.isUn:
signal.register(tuple(
tuple(self._divide(component,
self._star.spacetime.d_sq)
for component in hot_region)
for hot_region in photosphere.signalU),
fast_mode=fast_mode, threads=self.threads)
Usignal = signal.signals
signal.register(tuple(
tuple(self._divide(component,
self._star.spacetime.d_sq)
for component in hot_region)
for hot_region in photosphere.signal),
fast_mode=fast_mode, threads=self.threads)
Isignal = signal.signals
for ihot in range(len(photosphere.signalU)):
signal._signals[ihot]=_np.where(Isignal[ihot]==0.0, 0.0, Usignal[ihot]/Isignal[ihot])
else:
raise TypeError('Signal type must be either I, Q, U, Qn, or Un.')
reregistered = True
else:
reregistered = False
if not fast_mode and reregistered:
if synthesise:
hot = photosphere.hot
try:
kws = kwargs.pop(signal.prefix)
except AttributeError:
kws = {}
shifts = [h['phase_shift'] for h in hot.objects]
signal.shifts = _np.array(shifts)
signal.synthesise(threads=self._threads, **kws)
else:
try:
hot = photosphere.hot
shifts = [h['phase_shift'] for h in hot.objects]
signal.shifts = _np.array(shifts)
signal(threads=self._threads, llzero=self._llzero)
except LikelihoodError:
try:
prefix = ' prefix ' + signal.prefix
except AttributeError:
prefix = ''
print('Warning: LikelihoodError raised for '
'signal%s.' % prefix)
print('Parameter vector: ', super(Likelihood,self).__call__())
return self.random_near_llzero
return star_updated
def reinitialise(self):
""" Reinitialise the likelihood object.
Useful if some resolution settings in child objects were changed
(namely, the number of signal phases) that need to be communicated
to other child objects.
"""
self.__init__(self._star,
self._signals,
self._num_energies,
self._fast_rel_num_energies,
self._threads,
self._llzero)
[docs] def __call__(self, p = None, reinitialise = False, force = False):
""" Evaluate the logarithm of the joint likelihood over all pulsations.
:param list p:
Parameter vector if parameters not updated externally.
:param optional[bool] reinitialise:
Call ``self.reinitialise()``?
:param optional[bool] force:
Force complete reevaluation even if some parameters are unchanged.
To faciliate this, all parameter caches are cleared.
:returns: The logarithm of the likelihood.
"""
if reinitialise: # for safety if settings have been changed
self.reinitialise() # do setup again given exisiting object refs
self.clear_cache() # clear cache and values
elif force: # no need to reinitialise, just clear cache and values
self.clear_cache()
if not self.externally_updated: # do not safely assume already handled
if p is None: # expected a vector of values instead of nothing
raise TypeError('Parameter values have not been updated.')
super(Likelihood, self).__call__(p) # update free parameters
if self.needs_update or force:
try:
logprior = self._prior(p) # pass vector just in case wanted
except AttributeError:
pass
else:
if not _np.isfinite(logprior):
# we need to restore due to premature return
super(Likelihood, self).__call__(self.cached)
return self.less_than_llzero
if self._do_fast:
# perform a low-resolution precomputation to direct cell
# allocation
x = self._driver(fast_mode=True,force_update=force)
if not isinstance(x, bool):
super(Likelihood, self).__call__(self.cached) # restore
return x
elif x:
x = self._driver(force_update=force)
if not isinstance(x, bool):
super(Likelihood, self).__call__(self.cached) # restore
return x
else:
x = self._driver(force_update=force)
if not isinstance(x, bool):
super(Likelihood, self).__call__(self.cached) # restore
return x
# memoization: update parameter value caches
super(Likelihood, self).__call__(self.vector)
loglikelihood = 0.0
for signals in self._signals:
for signal in signals:
try:
loglikelihood += signal.loglikelihood
except AttributeError as e:
print("ERROR: It looks like X-PSI falsely thought that the signal does not need to be updated and thus skipped an essential part of the calculation. If not sampling, please use ``force=True`` or ``force_update=True`` option for the likelihood evaluation, or if sampling please set ``likelihood.externally_updated = True``")
raise
if loglikelihood <= self.llzero:
return self.random_near_llzero
try:
return loglikelihood + logprior
except NameError:
return loglikelihood
[docs] @make_verbose('Checking likelihood and prior evaluation before '
'commencing sampling', 'Checks passed')
def check(self,
hypercube_points,
loglikelihood_call_vals,
rtol_loglike,
atol_loglike=0.0,
logprior_call_vals=None,
rtol_logprior=None,
atol_logprior=None,
physical_points=None,
force_update=False,
numpy_allclose=False):
""" Perform checks on the likelihood evaluator and the prior density.
Can be called from :func:`~.Sample.nested` to execute a check before
automatically commencing a sampling process.
:param ndarray[n,m] hypercube_points:
A set of ``n`` points in the unit hypercube, where ``m`` is
dimensionality (``self.num_params``) of the sampling space -- i.e.,
of the hypercube. If you want to pass the physical points instead,
just pass ``None`` here.
:param optional(ndarray[n,m]) physical_points:
A set of ``n`` points in the physical parameter space, where
``m`` is dimensionality (``self.num_params``) of the sampling space.
The ``hypercube_points``, if not ``None``, will be ignored.
:param optional[bool] force_update:
Force everything to be re-calculated regardless of what was computed before.
This can be used to prevent errors in cases when the automatic check
for update need is not working as intended.
:param optional[bool] numpy_allclose:
Determine whether the allclose function of numpy is used when evaluating
the closeness of the given and calculated likelihood. By default, a fallback
implementation is used, which also prints the likelihood values.
"""
if numpy_allclose:
from numpy import allclose
else:
yield 'Not using ``allclose`` function from NumPy.'
yield 'Using fallback implementation instead.'
@make_verbose('Checking closeness of likelihood arrays:',
'Closeness evaluated')
def allclose(a, b, rtol, atol, equal_nan=None):
""" Fallback based on NumPy v1.17. """
for _a, _b in zip(a, b):
yield '%.10e | %.10e .....' % (_a, _b)
yield ~((_np.abs(a - b) > atol + rtol*_np.abs(b)).any())
lls = []
lps = [] if logprior_call_vals is not None else None
if physical_points is not None:
physical_points = _np.array(physical_points)
try:
if physical_points.ndim > 2:
raise AttributeError("Dimension of physical_points is > 2")
elif physical_points.ndim == 1:
physical_points = physical_points.reshape(1,len(physical_points))
except AttributeError as e:
e.message = ('Physical points for likelihood check must form '
'a numpy.ndarray with at most two dimensions.')
raise
if physical_points.shape[1] != len(self):
raise IndexError('Vector size does not match number of '
'free parameters of likelihood function.')
_cached = self.externally_updated
self.externally_updated = False
for point in physical_points:
lls.append(self.__call__(point,force=force_update))
if lps is not None:
lps.append(self._prior(point))
self.externally_updated = _cached
else:
hypercube_points = _np.array(hypercube_points)
try:
if hypercube_points.ndim > 2:
raise AttributeError
elif hypercube_points.ndim == 1:
hypercube_points = hypercube_points.reshape(1,len(hypercube_points))
except AttributeError as e:
e.message = ('Unit hypercube points for likelihood check must '
'form a numpy.ndarray with at most two dimensions.')
raise
if hypercube_points.shape[1] != len(self):
raise IndexError('Vector size does not match number of '
'free parameters of likelihood function.')
for point in hypercube_points:
phys_point = self._prior.inverse_sample(point)
lls.append(self.__call__(phys_point,force=force_update))
if lps is not None:
lps.append(self._prior(phys_point))
try:
proceed = allclose(_np.array(lls),
_np.array(loglikelihood_call_vals),
rtol=rtol_loglike, atol=atol_loglike,
equal_nan=False)
except Exception as e:
raise Exception('Log-likelihood value checks failed on process %i '
'with exception value: %s' % (_rank, str(e)))
else:
if proceed:
yield 'Log-likelihood value checks passed on root process.'
else:
raise ValueError('Log-likelihood value checks failed on process '
'%i' % _rank)
if lps is not None:
try:
proceed = allclose(_np.array(lps),
_np.array(logprior_call_vals),
rtol=rtol_logprior, atol=atol_logprior,
equal_nan=False)
except Exception as e:
raise Exception('Log-prior value checks failed on process %i '
'with exception value: %s' % (_rank, str(e)))
else:
if proceed:
yield 'Log-prior value checks passed on root process.'
else:
raise ValueError('Log-prior value checks failed on '
'process %i' % _rank)
[docs] def synthesise(self, p, reinitialise=False, force=False, **kwargs):
""" Synthesise pulsation data.
:param list p: Parameter vector.
:param optional[bool] reinitialise: Call ``self.reinitialise()``?
:param optional[bool] force:
Force complete reevaluation even if some parameters are unchanged.
:param dict kwargs:
Keyword arguments propagated to custom signal synthesis methods.
Examples of such arguments include exposure times or
required total count numbers (see example notebooks).
"""
if reinitialise: # for safety if settings have been changed
self.reinitialise() # do setup again given exisiting object refs
self.clear_cache() # clear cache and values
elif force: # no need to reinitialise, just clear cache and values
self.clear_cache()
if p is None: # expected a vector of values instead of nothing
raise TypeError('Parameter values are None.')
super(Likelihood, self).__call__(p) # update free parameters
try:
logprior = self._prior(p) # pass vector just in case wanted
except AttributeError:
pass
else:
if not _np.isfinite(logprior):
because_of_1D_bounds = False
for param in self._prior.parameters:
if param.bounds[0] is not None:
if not param.bounds[0] <= param.value:
because_of_1D_bounds = True
if param.bounds[1] is not None:
if not param.value <= param.bounds[1]:
because_of_1D_bounds = True
if because_of_1D_bounds:
print("Warning: Prior check failed, because at least one of the parameters is not within the hard 1D-bounds. No synthetic data will be produced.")
else:
print('Warning: Prior check failed because a requirement set in CustomPrior has failed. No synthetic data will be produced.')
# we need to restore due to premature return
super(Likelihood, self).__call__(self.cached)
return None
if self._do_fast:
# perform a low-resolution precomputation to direct cell
# allocation
x = self._driver(fast_mode=True,force_update=force)
if not isinstance(x, bool):
super(Likelihood, self).__call__(self.cached) # restore
return None
elif x:
x = self._driver(synthesise=True,force_update=force, **kwargs)
if not isinstance(x, bool):
super(Likelihood, self).__call__(self.cached) # restore
return None
else:
x = self._driver(synthesise=True,force_update=force, **kwargs)
if not isinstance(x, bool):
super(Likelihood, self).__call__(self.cached) # restore
return None