__all__ = ["Instrument"]
from xpsi.global_imports import *
from xpsi.utils import make_verbose
from xpsi.ParameterSubspace import ParameterSubspace
from xpsi import Parameter
from xpsi.utils import make_verbose
from xpsi.PileupModule import XrayPileup
from astropy.io import fits
from astropy.table import Table
[docs]
class ResponseError(xpsiError):
""" Raised if there is a problem with the input response matrix. """
[docs]
class EdgesError(xpsiError):
""" Raised if there is a problem with the input energy edges. """
[docs]
class ChannelError(xpsiError):
""" Raised if there is a problem with the input channel numbers. """
[docs]
class Instrument(ParameterSubspace):
r""" Base class for astronomical X-ray instruments on-board space telescopes.
The body of the initialiser must not be changed to ensure inter-module
compatibility, but can be extended if appropriate using a call to
``super().__init__``. Specialist constructors can be defined in a subclass
using the ``@classmethod`` decorator.
:param ndarray[p,q] matrix:
A :math:`p \\times q` matrix which is the product of a redistribution
matrix and effective area vector. The input energy intervals must
increase along the columns of :attr:`matrix`, and the output channels
must increase along the rows of :attr:`matrix`. The *units* of the
elements must be that of an *effective* area (:math:`cm^2`). Generally
there will be some available calibration product, and deviations from
this nominal response model will be parametrised. So here load some
nominal response matrix.
:param ndarray[q+1] energy_edges:
Energy edges in keV of the instrument energy intervals which must be
congruent to the first dimension of the :attr:`matrix`: the number of
edges must be :math:`q + 1`. The edges must be monotonically increasing.
These edges will correspond to the nominal response matrix and any
deviation from this matrix (see above).
:param ndarray[p] channels:
Instrument channel numbers which must be equal in number to the number
of rows of the :attr:`matrix`. The number of channels must therefore be
:math:`p`. These channels will correspond to the nominal response
matrix and any deviation from this matrix (see above). In common usage
patterns, the channel numbers will increase monotonically with row
number, and usually increment by one (but this is not necessary).
.. note::
That these channel numbers are not used to index the loaded instrument
(sub)matrix. The :attr:`xpsi.Data.index_range` property returns
bounding row numbers that index the loaded instrument response
(sub)matrix in order to operate on an incident signal flux. The
channel array contained in :attr:`xpsi.Data.channels` must be a
contiguous (ordered) subset of the channel array loaded here.
.. note::
The dimensions of the response matrix need not be equal, but it is
required that the number of input intervals be greater than or equal to
the number of output channels -- i.e., :math:`p \leq q`. If :math:`p <
q` then it is implied that subsets of adjacent output channels are
effectively grouped together.
:param ndarray[p+1] channel_edges:
The channel (energy) edges of the instrument, in keV. The array must
be congruent to the zeroth dimension of the :attr:`matrix`: the number
of edges must be :math:`p + 1`. The edges must be monotonically
increasing. These edges will correspond to the nominal response matrix
and any deviation from this matrix (see above).
:param tuple args:
Container of parameter instances.
:param dict kwargs:
If you want to prefix parameters of an instance of this instrument
subspace with an identifier, pass it as keyword argument and it will
find its way to the base class.
"""
def __init__(self, matrix, energy_edges, channels, channel_edges=None,
*args, **kwargs):
self.matrix = matrix
self.energy_edges = energy_edges
self.channels = channels
if channel_edges is not None:
self.channel_edges = channel_edges
super(Instrument, self).__init__(*args, **kwargs)
@property
def matrix(self):
r""" Get the reference response matrix.
In common usage patterns there will be some fiducial or nominal
response matrix that either defines fixed instrument operation or
is a basis for parametrised deviations. This matrix is usually a
calibration product distributed by an instrument calibration team.
A matrix of dimension :math:`p \\times q`. Here :math:`p` must be the
number of input energy intervals, and :math:`q \geq p` the number of
output channels.
.. note::
The attribute :attr:`matrix` must be assigned, and it must be
a :class:`numpy.ndarray` for use with :func:`numpy.dot` (even
if the matrix is sparse to some degree).
"""
return self._matrix
@matrix.setter
def matrix(self, matrix):
""" Set the matrix. """
try:
assert isinstance(matrix, _np.ndarray)
assert matrix.ndim == 2
assert (matrix >= 0.0).all()
except AssertionError:
raise ResponseError('Input matrix must be a two-dimensional ndarray with all matrix elements that are zero or positive.')
try:
for i in range(matrix.shape[0]):
assert matrix[i,:].any()
for j in range(matrix.shape[1]):
assert matrix[:,j].any()
except AssertionError:
raise ResponseError('Each row and column of the matrix must contain at least one positive number.')
self._matrix = matrix
[docs]
def construct_matrix(self):
""" Construct the response matrix if it is parameterised.
If customising, do operations to calculate a matrix, and return it.
You can access parameters (free, fixed, and derived) via the container
access ``self[<name>]``.
If the instrument operation is fixed, you might not need to subclass,
because the default behaviour is to return the nominal response you
loaded. If for some reason the matrix you loaded is to be modified in
some fixed manner, possibly as a function of some custom fixed
parameters that you defined, you would also have to subclass and
provide the correct implementation of this method.
"""
return self.matrix
[docs]
def __call__(self, signal, irange, orange):
""" Register an incident signal.
:param ndarray[m,n] signal:
An :math:`m \\times n` matrix, where input energy interval
increments along rows, and phase increases along columns. The
number of rows, :math:`m`, must equal the number of columns of
:attr:`matrix`: :math:`m=q`.
:param array-like irange:
Indexable object with two elements respectively denoting the
indices of the first and last *input* intervals. The response
matrix :attr:`matrix` must be indexable with these numbers, i.e.,
they must satisfy :math:`indx < q`.
:param array-like orange:
Indexable object with two elements respectively denoting the
indices of the first and last *output* channels. The response
matrix :attr:`matrix` must be indexable with these numbers, i.e.,
they must satisfy :math:`indx < p`.
:return: *ndarray[p,n]* containing the registered signal.
.. note::
The product of the most recent operation is stored as the property
:attr:`cached_signal`.
"""
matrix = self.construct_matrix()
self._cached_signal = _np.dot(matrix[orange[0]:orange[1],
irange[0]:irange[1]], signal)
return self._cached_signal
@property
def cached_signal(self):
""" Get the cached registered signal. """
return self._cached_signal
@property
def energy_edges(self):
""" Get the energy edges of the instrument, in keV.
A :class:`numpy.ndarray` of edges of the input energy intervals which
map to channels defined in the data space.
"""
return self._energy_edges
@energy_edges.setter
def energy_edges(self, energy_edges):
""" Set the energy edges in keV. """
if not isinstance(energy_edges, _np.ndarray):
try:
energy_edges = _np.array(energy_edges)
except TypeError:
raise EdgesError('Energy edges must be in a one-dimensional array of positive increasing values.')
try:
assert energy_edges.ndim == 1
assert (energy_edges >= 0.0).all()
assert energy_edges.shape[0] == self._matrix.shape[1] + 1
assert not (energy_edges[1:] <= energy_edges[:-1]).any()
except AssertionError:
raise EdgesError('Energy edges must be in a one-dimensional array of positive increasing values, with a '
'length equal to number of energy intervals in the matrix + 1.')
self._energy_edges = energy_edges
@property
def channel_edges(self):
""" Get the channel (energy) edges of the instrument, in keV.
A :class:`numpy.ndarray` of edges of the registered energy intervals
labelled as channels defined in the data space. This is relevant when
there is a detector-by-detector gain scale applied to event data (such
as for NICER instrument calibration products), meaning that the
redistribution matrix is effectively shared by detectors and the
channels across detectors can share an energy scale definition.
An incident photon of given energy then has a registered-energy
distribution that generally peaks in the vicinity of the true photon
energy. The resdistribution matrix will have some energy resolution
(along with other features such as shelves). With thanks to Paul S.
Ray for explaining the choice to calibrate in this manner.
.. note::
If you made a channel cut that results in a non-contiguous subset
of channels, you will need to overwrite the setter method because
the checks will fail.
"""
return self._channel_edges
@channel_edges.setter
def channel_edges(self, channel_edges):
""" Set the channel (energy) edges in keV. """
if not isinstance(channel_edges, _np.ndarray):
try:
channel_edges = _np.array(channel_edges)
except TypeError:
raise EdgesError('Channel edges must be in a one-dimensional array of positive increasing values.')
try:
assert channel_edges.ndim == 1
assert (channel_edges >= 0.0).all()
assert channel_edges.shape[0] == self._matrix.shape[0] + 1
assert not (channel_edges[1:] <= channel_edges[:-1]).any()
except AssertionError:
raise EdgesError('Channel edges must be in a one-dimensional array of positive increasing values, with a '
'length equal to the number of channel intervals in the matrix + 1.')
self._channel_edges = channel_edges
@property
def channels(self):
""" Get the array of channels corresponding to rows of the matrix.
The matrix being the loaded instrument response (sub)matrix.
"""
return self._channels
@channels.setter
@make_verbose('Setting channels for loaded instrument response (sub)matrix',
'Channels set')
def channels(self, channel_array):
if not isinstance(channel_array, _np.ndarray):
try:
channel_array = _np.array(channel_array)
except TypeError:
raise ChannelError('Channel numbers must be in an array.')
try:
assert channel_array.ndim == 1
assert (channel_array >= 0).all()
assert channel_array.shape[0] == self._matrix.shape[0]
except AssertionError:
raise ChannelError('Channel numbers must be in a one-dimensional array of positive integers (including zero), with a '
'length equal to the number of channel in the matrix.')
if (channel_array[1:] - channel_array[:-1] != 1).any():
print('WARNING: Channel numbers do not uniformly increment by one.')
self._channels = channel_array
yield
@make_verbose('Trimming instrument response',
'Instrument response trimmed')
def trim_response(self,
min_channel=0,
max_channel=-1,
threshold=1e-5 ):
""" Trim the instrument response to the specified channel range.
:param int min_channel:
The minimum channel number to include in the trimmed response.
:param int max_channel:
The maximum channel number to include in the trimmed response.
:param float threshold:
The threshold value to use for trimming the instrument response.
Channels / inputs with a total response below this value will be removed.
"""
# Make the table of required channels
assert min_channel >= self.channels[0]
if max_channel == -1:
max_channel = self.channels[-1]
assert max_channel <= self.channels[-1]
old_channels = self.channels
new_channels_indexes = [ min_channel <= c <= max_channel for c in self.channels]
# Find empty columns and lines
new_matrix = self.matrix[new_channels_indexes]
empty_channels = _np.all( new_matrix <= threshold, axis=1)
empty_inputs = _np.all( new_matrix <= threshold, axis=0)
# Apply to matrix and channels directly
self.matrix = new_matrix[~empty_channels][:,~empty_inputs]
self.channels = self.channels[new_channels_indexes][ ~empty_channels ]
# Get the edges of energies for both input and channel
new_energy_edges = [ self.energy_edges[k] for k in range(len(empty_inputs)) if not empty_inputs[k] ]
self.energy_edges = _np.hstack( (new_energy_edges , self.energy_edges[ _np.where( self.energy_edges == new_energy_edges[-1] )[0] + 1 ] ) )
if hasattr( self , 'channel_edges' ):
new_channels_edges = [ self.channel_edges[ _np.where(old_channels==chan)[0][0]] for chan in self.channels]
self.channel_edges = _np.hstack( (new_channels_edges , self.channel_edges[_np.where( old_channels==self.channels[-1])[0] + 1]) )
# Print if any trimming happens
if empty_inputs.sum() > 0:
print(f'Triming the response matrix because it contains rows with only values <= {threshold}.\n '
f'Now min_energy={self.energy_edges[0]} and max_energy={self.energy_edges[-1]}')
if empty_channels.sum() > 0:
print(f'Triming the response matrix because it contains columns with only values <= {threshold}.\n '
f'Now min_channel={self.channels[0]} and max_channel={self.channels[-1]}')
# If ARF and RMF, trim them
if hasattr( self , 'ARF' ):
self.RMF = self.RMF[new_channels_indexes][~empty_channels][:,~empty_inputs]
self.ARF = self.ARF[~empty_inputs]
@classmethod
@make_verbose('Loading instrument response matrix from OGIP compliant files',
'Response matrix loaded')
def from_ogip_fits(cls,
RMF_path,
ARF_path=None,
min_channel=0,
max_channel=-1,
min_input=1,
max_input=-1,
datafolder=None,
**kwargs):
""" Loading method for Instrument using OGIP defined ARF/RMF or RSP.
:param str RMF_path:
The path to the RMF file which should be OGIP compliant. Path to the OGIP compliant RSP file it ARF_path is None.
:param str | None ARF_path:
The path to the ARF file which should be OGIP compliant or None if the RMF_path points to a RSP file.
:param int min_channel:
The minimum channel for which the instrument response is loaded.
:param int max_channel:
The maximum channel for which the instrument response is loaded.
:param int min_input:
The minimum input energy number for which the instrument response is loaded.
:param int max_input:
The maximum input energy number for which the instrument response is loaded.
:param str | None datafolder:
The path to the folder which contains both ARF and RMF files, if not specified in RMF_path or ARF_path.
:return:
Instrument instance with loaded instrument response matrix.
"""
if datafolder:
ARF_path = _os.path.join( datafolder, ARF_path ) if ARF_path is not None else None
RMF_path = _os.path.join( datafolder, RMF_path )
# Open useful values in ARF/RMF/RSP
with fits.open( RMF_path ) as RMF_hdul:
RMF_header = RMF_hdul['MATRIX'].header
RMF_instr = RMF_header['INSTRUME']
DETCHANS = RMF_header['DETCHANS']
NUMGRP = RMF_header['NAXIS2']
TLMIN = RMF_header['TLMIN4']
TLMAX = RMF_header['TLMAX4']
# Handle the RSP case
if ARF_path is not None:
with fits.open( ARF_path ) as ARF_hdul:
assert RMF_instr == ARF_hdul['SPECRESP'].header['INSTRUME']
# Get the values and change the -1 values if requried
if max_channel == -1:
max_channel = DETCHANS -1
if max_input == -1:
max_input = NUMGRP
channels = _np.arange( min_channel, max_channel+1 )
inputs = _np.arange( min_input, max_input+1 )
# Perform routine checks
assert min_channel >= TLMIN and max_channel <= TLMAX
assert min_input >= 0 and max_input <= NUMGRP
# If everything in order, get the data
with fits.open( RMF_path ) as RMF_hdul:
RMF_MATRIX = RMF_hdul['MATRIX'].data
RMF_EBOUNDS = RMF_hdul['EBOUNDS'].data
# Get the channels from the data
RMF = _np.zeros((DETCHANS, NUMGRP))
for i, (N_GRP, F_CHAN, N_CHAN, RMF_line) in enumerate( zip(RMF_MATRIX['N_GRP'], RMF_MATRIX['F_CHAN'], RMF_MATRIX['N_CHAN'], RMF_MATRIX['MATRIX']) ):
# Skip if needed
if N_GRP == 0:
continue
# Check the values
if not isinstance(F_CHAN, _np.ndarray ):
F_CHAN = [F_CHAN]
N_CHAN = [N_CHAN]
# Add the values to the RMF
n_skip = 0
for f_chan, n_chan in zip(F_CHAN,N_CHAN):
if n_chan == 0:
continue
RMF[f_chan:f_chan+n_chan,i] += RMF_line[n_skip:n_skip+n_chan]
n_skip += n_chan
# Make the RSP, depending on the input files
if ARF_path is None:
RSP = RMF[min_channel:max_channel+1,min_input-1:max_input]
ARF_area = RMF.sum( axis=0 )
else:
ARF = Table.read(ARF_path, 'SPECRESP')
ARF_area = ARF['SPECRESP']
RSP = RMF * ARF_area
RSP = RSP[min_channel:max_channel+1,min_input-1:max_input]
# Find empty columns and lines
empty_channels = _np.all(RSP == 0, axis=1)
empty_inputs = _np.all(RSP == 0, axis=0)
RSP = RSP[~empty_channels][:,~empty_inputs]
channels = channels[ ~empty_channels ]
inputs = inputs[ ~empty_inputs ]
if empty_inputs.sum() > 0:
print(f'Triming the response matrix because it contains rows with only 0 values.\n '
f'Now min_energy={inputs[0]} and max_energy={inputs[-1]}')
if empty_channels.sum() > 0:
print(f'Triming the response matrix because it contains columns with only 0 values.\n'
f' Now min_channel={channels[0]} and max_channel={channels[-1]}')
# Get the edges of energies for both input and channel
energy_edges = _np.append( RMF_MATRIX['ENERG_LO'][inputs-1], RMF_MATRIX['ENERG_HI'][inputs[-1]-1]).astype(dtype=_np.double)
channel_energy_edges = _np.append(RMF_EBOUNDS['E_MIN'][channels],RMF_EBOUNDS['E_MAX'][channels[-1]])
# Make the instrument
Instrument = cls(RSP,
energy_edges,
channels,
channel_energy_edges,
**kwargs)
# Add ARF and RMF for plotting
Instrument.RMF = RMF[min_channel:max_channel+1,min_input-1:max_input][~empty_channels][:,~empty_inputs]
Instrument.ARF = ARF_area[min_input-1:max_input][~empty_inputs]
Instrument.name = RMF_instr
return Instrument
class InstrumentPileup(Instrument):
""" Instrument with pile-up computation """
def __call__(self, signal, *args):
""" Overwrite. """
## Compute spectrum with pileup
piled_spectrum = self.pileup.analyze(signal,
alpha=self['grade_migration'],
psf_frac=self['psf_fraction'])
self._cached_signal = piled_spectrum
return self._cached_signal
@classmethod
@make_verbose('Loading instrument response matrix',
'Response matrix loaded')
def from_ogip_fits(cls,
Data_path,
ARF_path,
RMF_path,
min_channel=0,
max_channel=-1,
min_input=1,
max_input=-1,
bounds=dict(),
values=dict(),
datafolder=None,
**kwargs):
""" Load any instrument response matrix. """
if datafolder:
ARF_path = _os.path.join( datafolder, ARF_path )
RMF_path = _os.path.join( datafolder, RMF_path )
Data_path = _os.path.join( datafolder, Data_path )
# Open useful values in ARF/RMF
with fits.open( ARF_path ) as ARF_hdul:
ARF_header = ARF_hdul['SPECRESP'].header
ARF_instr = ARF_header['INSTRUME']
with fits.open( RMF_path ) as RMF_hdul:
RMF_header = RMF_hdul['MATRIX'].header
RMF_instr = RMF_header['INSTRUME']
DETCHANS = RMF_header['DETCHANS']
NUMGRP = RMF_header['NAXIS2']
TLMIN = RMF_header['TLMIN4']
TLMAX = RMF_header['TLMAX4']
# Get the values and change the -1 values if requried
if max_channel == -1:
max_channel = DETCHANS -1
if max_input == -1:
max_input = NUMGRP
channels = _np.arange( min_channel, max_channel+1 )
inputs = _np.arange( min_input, max_input+1 )
# Perform routine checks
assert ARF_instr == RMF_instr
assert min_channel >= TLMIN and max_channel <= TLMAX
assert min_input >= 0 and max_input <= NUMGRP
# If everything in order, get the data
with fits.open( RMF_path ) as RMF_hdul:
RMF_MATRIX = RMF_hdul['MATRIX'].data
RMF_EBOUNDS = RMF_hdul['EBOUNDS'].data
# Get the channels from the data
RMF = _np.zeros((DETCHANS, NUMGRP))
for i, (N_GRP, F_CHAN, N_CHAN, RMF_line) in enumerate( zip(RMF_MATRIX['N_GRP'], RMF_MATRIX['F_CHAN'], RMF_MATRIX['N_CHAN'], RMF_MATRIX['MATRIX']) ):
# Skip if needed
if N_GRP == 0:
continue
# Check the values
if not isinstance(F_CHAN, _np.ndarray ):
F_CHAN = [F_CHAN]
N_CHAN = [N_CHAN]
# Add the values to the RMF
n_skip = 0
for f_chan, n_chan in zip(F_CHAN,N_CHAN):
if n_chan == 0:
continue
RMF[f_chan:f_chan+n_chan,i] += RMF_line[n_skip:n_skip+n_chan]
n_skip += n_chan
# Make the RSP
ARF = Table.read(ARF_path, 'SPECRESP')
ARF_area = ARF['SPECRESP']
# Extract the required matrix
RSP = RMF * ARF_area
RSP = RSP[min_channel:max_channel+1,min_input-1:max_input]
# Find empty columns and lines
empty_channels = _np.all(RSP == 0, axis=1)
empty_inputs = _np.all(RSP == 0, axis=0)
RSP = RSP[~empty_channels][:,~empty_inputs]
channels = channels[ ~empty_channels ]
inputs = inputs[ ~empty_inputs ]
if empty_inputs.sum() > 0:
print(f'Triming the response matrix because it contains lines with only 0 values.\n Now min_input={inputs[0]} and max_input={inputs[-1]}')
if empty_channels.sum() > 0:
print(f'Triming the response matrix because it contains columns with only 0 values.\n Now min_channel={channels[0]} and max_channel={channels[-1]}')
# Get the edges of energies for both input and channel
energy_edges = _np.append( ARF['ENERG_LO'][inputs-1], ARF['ENERG_HI'][inputs[-1]-1]).astype(dtype=_np.double)
energies = (ARF['ENERG_LO']+ARF['ENERG_HI'][:])/2
channel_energy_edges = _np.append(RMF_EBOUNDS['E_MIN'][channels],RMF_EBOUNDS['E_MAX'][channels[-1]])
## -------- INITIALIZATION OF THE PILEUP ---------------
## Definition of the parameters for the pile-up
alpha_grade = Parameter('grade_migration',
strict_bounds = (0.0,1.0),
bounds = bounds.get('grade_migration', None),
doc = 'Grade migration factor : probability that the piled event is not rejected as “bad event”',
symbol = r'$G_n$',
value = values.get('grade_migration', None))
psffrac = Parameter('psf_fraction',
strict_bounds = (0.8,1.0), #min in sherpa set at 0.85
bounds = bounds.get('psf_fraction', None),
doc = 'fraction of events in the source extraction region to which pileup will be applied',
symbol = r'$PSF_frac',
value = values.get('psf_fraction', 0.9))
nregions = Parameter('nregions',
strict_bounds = (0.0,10.0),
bounds = None, ##this parameter should always be fixed - should be 1.0 for point sources
doc = 'number of regions to which pileup model will be applied independently',
symbol = r'$N_{regions}$',
value = values.get('nregions', 1.0))
g0 = Parameter('g0',
strict_bounds = (0.0,1.0),
bounds = None, ##this parameter should always be fixed
doc = 'grade correction for single photon detection',
symbol = r'$g_0$',
value = values.get('g0', 1.0))
npiled = Parameter('npiled',
strict_bounds = (0.0,100.0),
bounds = None, ##this parameter should always be fixed
doc = 'number of photons considered for pileup in a single frame',
symbol = r'$N_{phot}$',
value = values.get('npiled', 5)) #or 30 in sherpa
with fits.open( Data_path ) as hdul:
Data_header = hdul['SPECTRUM'].header
## make sure that EXPTIME and FRACEXPO exist - should be the case if it's Chandra data
frametime = Data_header['EXPTIME']
frac_expo = ARF_header['FRACEXPO']
frame = Parameter('frame_time',
strict_bounds = (0.0,10.0),
bounds = None,
doc = 'good exposure time per frame',
symbol = r'$\tau$ (s)',
value = frametime )
fracexpo = Parameter('frac_expo',
strict_bounds = (0.0,1.0),
bounds = None,
doc = 'fraction of the frame exposure time to create effective frame exposure time',
symbol = r'$f_{expo}$',
value = frac_expo)
Instrument = cls(RSP,
energy_edges,
channels,
channel_energy_edges,
alpha_grade,psffrac,nregions,g0,npiled,frame,fracexpo,
**kwargs)
# Add ARF and RMF
Instrument.RMF = RMF[min_channel:max_channel+1,min_input-1:max_input][~empty_channels][:,~empty_inputs]
Instrument.ARF = ARF_area[min_input-1:max_input][~empty_inputs]
Instrument.energies = energies[min_input-1:max_input][~empty_inputs]
##Initialization of the pileup module
pileup = XrayPileup(Instrument)
Instrument.pileup = pileup
return Instrument