__all__ = ["Data"]
from xpsi.global_imports import *
from xpsi.utils import make_verbose
from xpsi.Instrument import ChannelError
from astropy.io import fits
[docs]
class Data(object):
r""" A container for event data.
The working assumption is that the sampling distribution of this event data
can be written in terms of a set of channel-by-channel *count*\ -rate
signals. The instrument associated with this data in an instance of
:class:`~.Signal.Signal` must transform incident signals into a structure
congruent to that of the event data. The attributes and methods of this
class and any derived classes must therefore store information required for
this operation.
The initialiser assigns observation settings to attributes which are
required for the treating the incident specific flux signals using the
model instrument. The body of the initialiser may be changed, but to ensure
inter-module compatibility, only the phase handling should really be
modified, for instance if you want to implement an unbinned likelihood
function with respect to phase; the phase bins defined in this concrete
implementation are only used by a custom implementation of the
likelihood function (i.e., by a subclass of :class:`xpsi.Signal`), and
not by the other concrete classes used to construct the likelihood
callable. The initialiser can also be extended if appropriate using a call
to ``super().__init__``. Specialist constructors can be defined in a
subclass using the ``@classmethod`` decorator, for instance to load event
data from disk into a compatible data structure in memory; an example of
this may be found below.
.. note::
You can subclass in order to tailor the handling of the event data, for
instance to implement a likelihood functions for unbinned event data.
:param ndarray[n,m] counts:
A :class:`~numpy.ndarray` of count numbers. The columns must map to
the phase intervals given by :obj:`phases`. The rows of the array map
to some subset of instrument channels.
:param ndarray[n] channels:
Instrument channel numbers which must be equal in number to the first
dimension of the :attr:`matrix`: the number of channels must be
:math:`p`. These channels will correspond to the nominal response
matrix and any deviation from this matrix (see above). In common usage
patterns, the channel numbers will increase monotonically with row
number, and usually increment by one (but this is not necessary). It is
advisable that these numbers are the actual instrument channel numbers
so that plots generated by the post-processing module using these
labels are clear.
:param ndarray[m+1] phases:
A :class:`~numpy.ndarray` of phase interval edges, where events are
binned into these same intervals in each instrument channel.
:param int first:
The index of the first row of the loaded response matrix containing
events (see note below).
:param int last:
The index of the last row of the loaded response matrix containing
events (see note below).
.. note::
The :obj:`counts` matrix rows *must* span a contiguous subset of the
rows of the loaded response matrix, but in general can span an
arbitrary subset and order of instrument channels. Note that the
:obj:`first` and :obj:`last+1` numbers are used to index the loaded
instrument response matrix. Therefore, if you load only a submatrix of
the full instrument response matrix, these indices must be appropriate
for the loaded submatrix, and must not be the true channel numbers
(this information is instead loaded in the :class:`xpsi.Instrument`).
Of course, in all sensible usage patterns the order of the instrument
channels, when mapped to matrix rows, will be such that channel number
increases with matrix row number monotonically because, then the
nominal event energy increases monotonically with row number, which is
important for visualisation of data and model (because spatial order
matches energy order and correlations in space can be discerned easily).
However, there could in principle be channel cuts that mean an increment
of more than one channel between matrix adjacent rows, and the
response matrix needs to be manipulated before or during a custom
loading phase such that its rows match the channel numbers assigned to
the :obj:`counts` matrix rows.
:param float exposure_time:
The exposure time, in seconds, to acquire this set of event data.
"""
def __init__(self, counts, channels, phases, first, last, exposure_time):
if not isinstance(counts, _np.ndarray):
try:
counts = _np.array(counts)
except TypeError:
raise TypeError('Counts object is not a ``numpy.ndarray``.')
if counts.ndim not in [1,2]:
raise TypeError('Counts must be in a one- or two-dimensional '
'array.')
#if (counts < 0.0).any():
# raise ValueError('Negative count numbers are invalid.')
self.channels = channels
if not isinstance(phases, _np.ndarray):
try:
phases = _np.array(phases)
except TypeError:
raise TypeError('Phases object is not a ``numpy.ndarray``.')
if phases.ndim != 1:
raise ValueError('Phases must form a one-dimensional sequence.')
self._phases = phases
try:
self._first = int(first)
self._last = int(last)
except TypeError:
raise TypeError('The first and last channels must be integers.')
#if self._first >= self._last:
if self._first > self._last:
raise ValueError('The first channel number must be equal or lower '
'than the last channel number.')
if counts.shape[0] != self._last - self._first + 1:
raise ValueError('The number of rows must be compatible '
'with the first and last channel numbers.')
if counts.shape[1] != (phases.shape[0] - 1):
raise ValueError('The number of columns must be compatible '
'with the number of phases. Note that the phases array are '
'the phase bin edges (i.e., number of phases bins + 1)')
self._counts = counts
try:
self._exposure_time = float(exposure_time)
except TypeError:
raise TypeError('Exposure time must be a float.')
@property
def exposure_time(self):
""" Get the total exposure time in seconds. """
return self._exposure_time
@property
def counts(self):
""" Get the photon count data. """
return self._counts
@property
def phases(self):
""" Get the phases. """
return self._phases
@property
def index_range(self):
""" Get a 2-tuple of the bounding response-matrix row indices. """
return (self._first, self._last + 1) # plus one for array indexing
@property
def channel_range(self):
""" Deprecated property name. To be removed for v1.0. """
return self.index_range
@property
def channels(self):
""" Get the array of channels that the event data spans. """
return self._channels
@channels.setter
@make_verbose('Setting channels for event data',
'Channels set')
def channels(self, array):
if not isinstance(array, _np.ndarray):
try:
array = _np.array(array)
except TypeError:
raise ChannelError('Channel numbers must be in a '
'one-dimensional array, and must all be '
'positive integers including zero.')
if array.ndim != 1 or (array < 0).any():
raise ChannelError('Channel numbers must be in a '
'one-dimensional array, and must all be '
'positive integers including zero.')
try:
if array.shape[0] != array.shape[0]:
raise ChannelError('Number of channels does')
except AttributeError:
pass # if synthesising there will not be any counts loaded here
if (array[1:] - array[:-1] != 1).any():
yield ('Warning: Channel numbers do not uniformly increment by one.'
'\n Please check for correctness.')
self._channels = array
yield
@make_verbose('Trimming event data', 'Event data trimmed')
def trim_data( self ,
min_channel = 0,
max_channel = -1 ):
""" Trim the event data to the specified channel range.
:param int min_channel:
The minimum channel number to include in the trimmed data.
:param int max_channel:
The maximum channel number to include in the trimmed data.
"""
# Make the table of required channels
assert min_channel >= self.channels[0]
assert max_channel <= self.channels[-1]
new_channels = [ min_channel <= c <= max_channel for c in self.channels]
# Trim the counts and channels
self._channels = self.channels[new_channels]
self._counts = self.counts[new_channels]
# Change first and last
self._first = 0
self._last = max_channel - min_channel
[docs]
@classmethod
@make_verbose('Loading event list and phase binning',
'Events loaded and binned')
def phase_bin__event_list(cls, path, channels, phases,
channel_column,
phase_column=None,
phase_averaged=False,
phase_shift=0.0,
channel_edges=None,
skiprows=1,
eV=False,
dtype=_np.int32,
*args, **kwargs):
""" Load a phase-folded event list and bin the events in phase.
:param str path:
Path to event list file containing two columns, where the first
column contains phases on the unit interval, and the second
column contains the channel number.
:param ndarray[n] channels:
An (ordered) subset of instrument channels. It is advisable that
these channels are a contiguous subset of instrument channels, but
this not a strict requirement if you are comfortable with the
handling the instrument response matrix and count number matrix to
match in row-to-channel definitions.
:param list phases:
An ordered sequence of phase-interval edges on the unit interval.
The first and last elements will almost always be zero and unity
respectively.
:param float phase_shift:
A phase-shift in cycles to be applied when binning the events in
phase.
:param int phase_column:
The column in the loaded file containing event phases.
:param bool phase_averaged:
Is the event data phase averaged?
:param int channel_column:
The column in the loaded file containing event channels.
:param ndarray[n+1] channel_edges:
The nominal energy edges of the instrument channels, assumed to
be contiguous if binning event energies in channel number.
:param int skiprows:
The number of top rows to skip when loading the events from file.
The top row of couple of rows will typically be reserved for
column headers.
:param bool eV:
Are event energies in eV, instead of channel number?
:param type dtype:
The type of the count data. Sensible options are ``numpy.int`` (the
default) or a :mod:`numpy` floating point type. The choice here
only matters when executing custom likelihood evaluation code, which
might expect a type without checking or casting.
"""
events = _np.loadtxt(path, skiprows=skiprows)
channels = list(channels)
yield 'Total number of events: %i.' % events.shape[0]
data = _np.zeros((len(channels), len(phases)-1), dtype=dtype)
for i in range(events.shape[0]):
_channel = None
if eV:
for j in range(len(channel_edges) - 1):
if channel_edges[j] <= events[i, channel_column]/1.0e3 < channel_edges[j+1]:
_channel = channels[j]
break
else:
_channel = events[i, channel_column]
if _channel is not None and _channel in channels:
if not phase_averaged:
_temp = events[i,phase_column] + phase_shift
_temp -= _np.floor(_temp)
for j in range(phases.shape[0] - 1):
if phases[j] <= _temp <= phases[j+1]:
data[channels.index(int(events[i,channel_column])),j] += 1
break
else:
data[channels.index(int(_channel)), 0] += 1
yield 'Number of events constituting data set: %i.' % _np.sum(data)
yield cls(data, channels, _np.array(phases), *args, **kwargs)
@classmethod
def bin__event_list(cls, *args, **kwargs):
return cls.phase_bin__event_list(*args, **kwargs)
@classmethod
@make_verbose('Loading OGIP compliant file',
'Data loaded')
def load(cls, path,
n_phases=32,
channels=None,
phase_column='PULSE_PHASE',
channel_column='PI'):
""" Load an OGIP compliant event list (EVT) or spectrum (PHA) fits file by redirecting to more specific class methods.
:param str path:
Path to EVT or PHA fits file to extract the data from.
:param int n_phases:
The number of phase bins to use, if the data has phase information.
:param ndarray[n] | None channels:
The energy channels to extract the data from. Should be the same as the channels of the instrument to use.
If None, extract all channels in the event list.
:param str phase_column:
If the file is a EVT file, column containing event phases.
:param str channel_column:
If the file is a EVT file, column containing event channels.
"""
# Check whether event file or PHA
with fits.open( path ) as hdul:
HDUCLAS1 = hdul[1].header['HDUCLAS1']
# Select the case based on HDUCLAS1
if HDUCLAS1 == 'SPECTRUM':
Data = cls.from_pha(path, channels=channels )
elif HDUCLAS1 == 'EVENTS':
Data = cls.from_evt(path,
n_phases=n_phases,
channels=channels,
phase_column=phase_column,
channel_column=channel_column)
else:
raise IOError('HDUCLAS1 of Header does not match PHA or EVT files values. Could not load.')
# Write the name of the instrument
Data.instrument = hdul[1].header['INSTRUME']
return Data
@classmethod
@make_verbose('Loading event list and phase binning',
'Events loaded and binned')
def from_evt(cls, path,
n_phases=32,
channels=None,
phase_column='PULSE_PHASE',
channel_column='PI',
phase_shift=0.0):
""" Load an OGIP compliant event list EVT fits file.
:param str path:
Path to event list fits file to extract the data from.
:param int n_phases:
The number of phase bins to use.
:param ndarray[n] channels:
The energy channels to extract the data from. Should be the same as the channels of the instrument to use.
If None, extract all channels in the event list.
:param str phase_column:
The column in the OGIP EVT file containing event phases.
:param str channel_column:
The column in the OGIP EVT file containing event channels.
:param float phase_shift:
A common phase shift to apply to the event phases.
"""
# Read the fits file
with fits.open( path ) as hdul:
Header = hdul['EVENTS'].header
EvtList = hdul['EVENTS'].data
# Extract useful data
exposure = Header['EXPOSURE']
channel_data = EvtList[channel_column]
phases_data = ( EvtList[ phase_column ] + phase_shift ) % 1.0
# No channels specified, use everything
if channels is None:
min_channel = _np.min( channel_data )
max_channel = _np.max( channel_data )
channels = _np.arange(min_channel,max_channel+1)
else:
min_channel = channels[0]
max_channel = channels[-1]
channel_borders = _np.arange(min_channel, max_channel+2)-0.5
# Get intrinsinc values
first = 0
last = max_channel - min_channel
phases_borders = _np.linspace( 0.0 , 1.0 , n_phases+1 )
# Make the 2D histogram
mask = [ ch>=min_channel and ch<=max_channel for ch in channel_data ]
counts_histogram, _, _ = _np.histogram2d( channel_data[mask] , phases_data[mask],
bins=[channel_borders, phases_borders])
# Instatiate the class
return cls( counts_histogram.astype( dtype=_np.double ),
channels=channels,
phases=phases_borders,
first=first,
last=last,
exposure_time=exposure )
@classmethod
@make_verbose('Loading PHA spectrum and phase binning',
'Spectrum loaded')
def from_pha( cls, path,
channels=None ):
""" Load an OGIP compliant spectrum PHA fits file.
:param str path:
Path to PHA fits file to extract the data from.
:param ndarray[n] channels:
The energy channels to extract the data from. Should be the same as the channels of the instrument to use.
If None, extract all channels in the PHA file.
"""
# Read the fits files
with fits.open( path ) as hdul:
Header = hdul['SPECTRUM'].header
spectrum = hdul['SPECTRUM'].data
# Extract useful data
exposure = _np.double( Header['EXPOSURE'] )
channel_data = spectrum['CHANNEL']
try:
counts_data = spectrum['COUNTS']
except KeyError:
counts_data = spectrum['RATE']
exposure = _np.double(1.0)
# No channels specified, use everything
if channels is None:
min_channel = _np.min( channel_data )
max_channel = _np.max( channel_data )
channels = _np.arange(min_channel,max_channel+1)
else:
min_channel = channels[0]
max_channel = channels[-1]
# Get intrinsinc values
first = 0
last = max_channel - min_channel
phases = _np.array([0.0, 1.0])
# Match channels
channel_counts_map = dict(zip(channel_data, counts_data))
if not all(ch in channel_counts_map for ch in channels):
raise ValueError("Not all channels exist in channel_data.")
counts = _np.array( [[float(channel_counts_map[ch]) for ch in channels]] , dtype=_np.double).T
Data = cls( counts,
channels=channels,
phases=phases,
first=first,
last=last,
exposure_time=exposure )
# Add useful paths
Data.backscal = Header['BACKSCAL']
Data.ancrfile = Header['ANCRFILE']
Data.respfile = Header['RESPFILE']
try:
if Header['HDUCLAS2'] == 'TOTAL':
Data.backfile = Header['BACKFILE']
except KeyError as e:
print('Keyword BACKFILE not found in file ',path)
return Data
def spectra_support(self, n, source_backscal, channels=None):
""" Compute the spectrum support, if the data instance is background, assuming Poisson statistics.
:param int n:
The width, in sigma, to compute the support for.
:param float source_backscal:
The source scaling backscal parameter, to apply rescaling to the background spectrum so the background extraction area matches the source's.
This value can be found using Data.backscal on the source's Data instance.
"""
# Get the background counts prior
counts = self.counts.sum(axis=1)
counts_support = _np.array([counts-n*_np.sqrt(counts),counts+n*_np.sqrt(counts)]).T
counts_support[counts_support[:,0] < 0.0, 0] = 0.0
# Correct for null support upper, values which make the background marginalisation go wrong (interval of integration is then [0,0])
# Does it by getting the nearest strictly positive value of the upper support bound
for i in range(counts_support.shape[0]):
if counts_support[i,1] == 0.0:
for j in range(1, counts_support.shape[0]):
if i+j < counts_support.shape[0] and counts_support[i+j,1] > 0.0:
counts_support[i,1] = counts_support[i+j,1]
break
elif i-j >= 0 and counts_support[i-j,1] > 0.0:
counts_support[i,1] = counts_support[i-j,1]
break
# Apply the scaling
count_rate_support = counts_support / self.exposure_time
try:
count_rate_support *= ( source_backscal / self.backscal )
except:
raise IOError('No BACKSCAL was provided for source. Could not compute the support of the spectrum')
# If channels, extract the reight values
if channels is not None:
channel_indexes = [ _np.where(self.channels == ch)[0][0] for ch in channels ]
count_rate_support = count_rate_support[ channel_indexes , :]
# Clean
count_rate_support = _contig( count_rate_support, dtype=_np.double )
return count_rate_support
def plot(self, num_rot = 2, dpi=200, colormap='inferno'):
""" Plot the data in a convenient way.
:param int num_rot:
The number of rotations to plot.
:param int dpi:
The resolution of the plot.
:param str colormap:
The colormap to use.
"""
# Get the counts
counts_list = [ self.counts for i in range(num_rot) ]
phase_list = [self.phases[:-1] + i for i in range(num_rot)]
counts = _np.concatenate( (counts_list), axis=1 )
phases = _np.concatenate( (phase_list), axis=0 )
# Do the plot
fig,axs = _mpl.pyplot.subplots( 2,2 , height_ratios=[1.,1.], width_ratios=[3,1.5], sharex='col',sharey='row')
fig.subplots_adjust(wspace=0, hspace=0)
axs[0,1].axis('off')
# Plot the 2D data
ax2 = axs[1,0]
im = ax2.pcolormesh( phases, self.channels , counts, cmap=colormap)
ax2.set_xlabel(r'Phase $\phi$ [cycles]')
ax2.set_ylabel('PI channel')
ax2.set_yscale('log')
# Plot the pulse
ax1 = axs[0,0]
ax1.sharex( ax2 )
ax1.tick_params(direction='in', which='both',labelbottom=False)
ax1.errorbar( x=phases, y=counts.sum(axis=0), yerr=_np.sqrt( counts.sum(axis=0) ), ds='steps-mid', color='black' )
ax1.set_ylabel('Counts')
# Plot the spectrum
ax3 = axs[1,1]
ax3.sharey( ax2 )
ax3.set_yscale('log')
ax3.tick_params(direction='in', which='both',labelbottom=False)
ax3.step( counts.sum(axis=1)/2, self.channels , color='black')
ax3.set_xlabel('Cts/chan.')
# Add the colorbar
fig.colorbar( im , ax=ax3 , location='right', label='Counts')
fig.set_dpi(dpi)
return fig, axs