Source code for xpsi.Data


__all__ = ["Data"]

from xpsi.global_imports import *
from xpsi.utils import make_verbose

from xpsi.Instrument import ChannelError

from astropy.io import fits

[docs] class Data(object): r""" A container for event data. The working assumption is that the sampling distribution of this event data can be written in terms of a set of channel-by-channel *count*\ -rate signals. The instrument associated with this data in an instance of :class:`~.Signal.Signal` must transform incident signals into a structure congruent to that of the event data. The attributes and methods of this class and any derived classes must therefore store information required for this operation. The initialiser assigns observation settings to attributes which are required for the treating the incident specific flux signals using the model instrument. The body of the initialiser may be changed, but to ensure inter-module compatibility, only the phase handling should really be modified, for instance if you want to implement an unbinned likelihood function with respect to phase; the phase bins defined in this concrete implementation are only used by a custom implementation of the likelihood function (i.e., by a subclass of :class:`xpsi.Signal`), and not by the other concrete classes used to construct the likelihood callable. The initialiser can also be extended if appropriate using a call to ``super().__init__``. Specialist constructors can be defined in a subclass using the ``@classmethod`` decorator, for instance to load event data from disk into a compatible data structure in memory; an example of this may be found below. .. note:: You can subclass in order to tailor the handling of the event data, for instance to implement a likelihood functions for unbinned event data. :param ndarray[n,m] counts: A :class:`~numpy.ndarray` of count numbers. The columns must map to the phase intervals given by :obj:`phases`. The rows of the array map to some subset of instrument channels. :param ndarray[n] channels: Instrument channel numbers which must be equal in number to the first dimension of the :attr:`matrix`: the number of channels must be :math:`p`. These channels will correspond to the nominal response matrix and any deviation from this matrix (see above). In common usage patterns, the channel numbers will increase monotonically with row number, and usually increment by one (but this is not necessary). It is advisable that these numbers are the actual instrument channel numbers so that plots generated by the post-processing module using these labels are clear. :param ndarray[m+1] phases: A :class:`~numpy.ndarray` of phase interval edges, where events are binned into these same intervals in each instrument channel. :param int first: The index of the first row of the loaded response matrix containing events (see note below). :param int last: The index of the last row of the loaded response matrix containing events (see note below). .. note:: The :obj:`counts` matrix rows *must* span a contiguous subset of the rows of the loaded response matrix, but in general can span an arbitrary subset and order of instrument channels. Note that the :obj:`first` and :obj:`last+1` numbers are used to index the loaded instrument response matrix. Therefore, if you load only a submatrix of the full instrument response matrix, these indices must be appropriate for the loaded submatrix, and must not be the true channel numbers (this information is instead loaded in the :class:`xpsi.Instrument`). Of course, in all sensible usage patterns the order of the instrument channels, when mapped to matrix rows, will be such that channel number increases with matrix row number monotonically because, then the nominal event energy increases monotonically with row number, which is important for visualisation of data and model (because spatial order matches energy order and correlations in space can be discerned easily). However, there could in principle be channel cuts that mean an increment of more than one channel between matrix adjacent rows, and the response matrix needs to be manipulated before or during a custom loading phase such that its rows match the channel numbers assigned to the :obj:`counts` matrix rows. :param float exposure_time: The exposure time, in seconds, to acquire this set of event data. """ def __init__(self, counts, channels, phases, first, last, exposure_time): if not isinstance(counts, _np.ndarray): try: counts = _np.array(counts) except TypeError: raise TypeError('Counts object is not a ``numpy.ndarray``.') if counts.ndim not in [1,2]: raise TypeError('Counts must be in a one- or two-dimensional ' 'array.') #if (counts < 0.0).any(): # raise ValueError('Negative count numbers are invalid.') self.channels = channels if not isinstance(phases, _np.ndarray): try: phases = _np.array(phases) except TypeError: raise TypeError('Phases object is not a ``numpy.ndarray``.') if phases.ndim != 1: raise ValueError('Phases must form a one-dimensional sequence.') self._phases = phases try: self._first = int(first) self._last = int(last) except TypeError: raise TypeError('The first and last channels must be integers.') #if self._first >= self._last: if self._first > self._last: raise ValueError('The first channel number must be equal or lower ' 'than the last channel number.') if counts.shape[0] != self._last - self._first + 1: raise ValueError('The number of rows must be compatible ' 'with the first and last channel numbers.') if counts.shape[1] != (phases.shape[0] - 1): raise ValueError('The number of columns must be compatible ' 'with the number of phases. Note that the phases array are ' 'the phase bin edges (i.e., number of phases bins + 1)') self._counts = counts try: self._exposure_time = float(exposure_time) except TypeError: raise TypeError('Exposure time must be a float.') @property def exposure_time(self): """ Get the total exposure time in seconds. """ return self._exposure_time @property def counts(self): """ Get the photon count data. """ return self._counts @property def phases(self): """ Get the phases. """ return self._phases @property def index_range(self): """ Get a 2-tuple of the bounding response-matrix row indices. """ return (self._first, self._last + 1) # plus one for array indexing @property def channel_range(self): """ Deprecated property name. To be removed for v1.0. """ return self.index_range @property def channels(self): """ Get the array of channels that the event data spans. """ return self._channels @channels.setter @make_verbose('Setting channels for event data', 'Channels set') def channels(self, array): if not isinstance(array, _np.ndarray): try: array = _np.array(array) except TypeError: raise ChannelError('Channel numbers must be in a ' 'one-dimensional array, and must all be ' 'positive integers including zero.') if array.ndim != 1 or (array < 0).any(): raise ChannelError('Channel numbers must be in a ' 'one-dimensional array, and must all be ' 'positive integers including zero.') try: if array.shape[0] != array.shape[0]: raise ChannelError('Number of channels does') except AttributeError: pass # if synthesising there will not be any counts loaded here if (array[1:] - array[:-1] != 1).any(): yield ('Warning: Channel numbers do not uniformly increment by one.' '\n Please check for correctness.') self._channels = array yield @make_verbose('Trimming event data', 'Event data trimmed') def trim_data( self , min_channel = 0, max_channel = -1 ): """ Trim the event data to the specified channel range. :param int min_channel: The minimum channel number to include in the trimmed data. :param int max_channel: The maximum channel number to include in the trimmed data. """ # Make the table of required channels assert min_channel >= self.channels[0] assert max_channel <= self.channels[-1] new_channels = [ min_channel <= c <= max_channel for c in self.channels] # Trim the counts and channels self._channels = self.channels[new_channels] self._counts = self.counts[new_channels] # Change first and last self._first = 0 self._last = max_channel - min_channel
[docs] @classmethod @make_verbose('Loading event list and phase binning', 'Events loaded and binned') def phase_bin__event_list(cls, path, channels, phases, channel_column, phase_column=None, phase_averaged=False, phase_shift=0.0, channel_edges=None, skiprows=1, eV=False, dtype=_np.int32, *args, **kwargs): """ Load a phase-folded event list and bin the events in phase. :param str path: Path to event list file containing two columns, where the first column contains phases on the unit interval, and the second column contains the channel number. :param ndarray[n] channels: An (ordered) subset of instrument channels. It is advisable that these channels are a contiguous subset of instrument channels, but this not a strict requirement if you are comfortable with the handling the instrument response matrix and count number matrix to match in row-to-channel definitions. :param list phases: An ordered sequence of phase-interval edges on the unit interval. The first and last elements will almost always be zero and unity respectively. :param float phase_shift: A phase-shift in cycles to be applied when binning the events in phase. :param int phase_column: The column in the loaded file containing event phases. :param bool phase_averaged: Is the event data phase averaged? :param int channel_column: The column in the loaded file containing event channels. :param ndarray[n+1] channel_edges: The nominal energy edges of the instrument channels, assumed to be contiguous if binning event energies in channel number. :param int skiprows: The number of top rows to skip when loading the events from file. The top row of couple of rows will typically be reserved for column headers. :param bool eV: Are event energies in eV, instead of channel number? :param type dtype: The type of the count data. Sensible options are ``numpy.int`` (the default) or a :mod:`numpy` floating point type. The choice here only matters when executing custom likelihood evaluation code, which might expect a type without checking or casting. """ events = _np.loadtxt(path, skiprows=skiprows) channels = list(channels) yield 'Total number of events: %i.' % events.shape[0] data = _np.zeros((len(channels), len(phases)-1), dtype=dtype) for i in range(events.shape[0]): _channel = None if eV: for j in range(len(channel_edges) - 1): if channel_edges[j] <= events[i, channel_column]/1.0e3 < channel_edges[j+1]: _channel = channels[j] break else: _channel = events[i, channel_column] if _channel is not None and _channel in channels: if not phase_averaged: _temp = events[i,phase_column] + phase_shift _temp -= _np.floor(_temp) for j in range(phases.shape[0] - 1): if phases[j] <= _temp <= phases[j+1]: data[channels.index(int(events[i,channel_column])),j] += 1 break else: data[channels.index(int(_channel)), 0] += 1 yield 'Number of events constituting data set: %i.' % _np.sum(data) yield cls(data, channels, _np.array(phases), *args, **kwargs)
@classmethod def bin__event_list(cls, *args, **kwargs): return cls.phase_bin__event_list(*args, **kwargs) @classmethod @make_verbose('Loading OGIP compliant file', 'Data loaded') def load(cls, path, n_phases=32, channels=None, phase_column='PULSE_PHASE', channel_column='PI'): """ Load an OGIP compliant event list (EVT) or spectrum (PHA) fits file by redirecting to more specific class methods. :param str path: Path to EVT or PHA fits file to extract the data from. :param int n_phases: The number of phase bins to use, if the data has phase information. :param ndarray[n] | None channels: The energy channels to extract the data from. Should be the same as the channels of the instrument to use. If None, extract all channels in the event list. :param str phase_column: If the file is a EVT file, column containing event phases. :param str channel_column: If the file is a EVT file, column containing event channels. """ # Check whether event file or PHA with fits.open( path ) as hdul: HDUCLAS1 = hdul[1].header['HDUCLAS1'] # Select the case based on HDUCLAS1 if HDUCLAS1 == 'SPECTRUM': Data = cls.from_pha(path, channels=channels ) elif HDUCLAS1 == 'EVENTS': Data = cls.from_evt(path, n_phases=n_phases, channels=channels, phase_column=phase_column, channel_column=channel_column) else: raise IOError('HDUCLAS1 of Header does not match PHA or EVT files values. Could not load.') # Write the name of the instrument Data.instrument = hdul[1].header['INSTRUME'] return Data @classmethod @make_verbose('Loading event list and phase binning', 'Events loaded and binned') def from_evt(cls, path, n_phases=32, channels=None, phase_column='PULSE_PHASE', channel_column='PI', phase_shift=0.0): """ Load an OGIP compliant event list EVT fits file. :param str path: Path to event list fits file to extract the data from. :param int n_phases: The number of phase bins to use. :param ndarray[n] channels: The energy channels to extract the data from. Should be the same as the channels of the instrument to use. If None, extract all channels in the event list. :param str phase_column: The column in the OGIP EVT file containing event phases. :param str channel_column: The column in the OGIP EVT file containing event channels. :param float phase_shift: A common phase shift to apply to the event phases. """ # Read the fits file with fits.open( path ) as hdul: Header = hdul['EVENTS'].header EvtList = hdul['EVENTS'].data # Extract useful data exposure = Header['EXPOSURE'] channel_data = EvtList[channel_column] phases_data = ( EvtList[ phase_column ] + phase_shift ) % 1.0 # No channels specified, use everything if channels is None: min_channel = _np.min( channel_data ) max_channel = _np.max( channel_data ) channels = _np.arange(min_channel,max_channel+1) else: min_channel = channels[0] max_channel = channels[-1] channel_borders = _np.arange(min_channel, max_channel+2)-0.5 # Get intrinsinc values first = 0 last = max_channel - min_channel phases_borders = _np.linspace( 0.0 , 1.0 , n_phases+1 ) # Make the 2D histogram mask = [ ch>=min_channel and ch<=max_channel for ch in channel_data ] counts_histogram, _, _ = _np.histogram2d( channel_data[mask] , phases_data[mask], bins=[channel_borders, phases_borders]) # Instatiate the class return cls( counts_histogram.astype( dtype=_np.double ), channels=channels, phases=phases_borders, first=first, last=last, exposure_time=exposure ) @classmethod @make_verbose('Loading PHA spectrum and phase binning', 'Spectrum loaded') def from_pha( cls, path, channels=None ): """ Load an OGIP compliant spectrum PHA fits file. :param str path: Path to PHA fits file to extract the data from. :param ndarray[n] channels: The energy channels to extract the data from. Should be the same as the channels of the instrument to use. If None, extract all channels in the PHA file. """ # Read the fits files with fits.open( path ) as hdul: Header = hdul['SPECTRUM'].header spectrum = hdul['SPECTRUM'].data # Extract useful data exposure = _np.double( Header['EXPOSURE'] ) channel_data = spectrum['CHANNEL'] try: counts_data = spectrum['COUNTS'] except KeyError: counts_data = spectrum['RATE'] exposure = _np.double(1.0) # No channels specified, use everything if channels is None: min_channel = _np.min( channel_data ) max_channel = _np.max( channel_data ) channels = _np.arange(min_channel,max_channel+1) else: min_channel = channels[0] max_channel = channels[-1] # Get intrinsinc values first = 0 last = max_channel - min_channel phases = _np.array([0.0, 1.0]) # Match channels channel_counts_map = dict(zip(channel_data, counts_data)) if not all(ch in channel_counts_map for ch in channels): raise ValueError("Not all channels exist in channel_data.") counts = _np.array( [[float(channel_counts_map[ch]) for ch in channels]] , dtype=_np.double).T Data = cls( counts, channels=channels, phases=phases, first=first, last=last, exposure_time=exposure ) # Add useful paths Data.backscal = Header['BACKSCAL'] Data.ancrfile = Header['ANCRFILE'] Data.respfile = Header['RESPFILE'] try: if Header['HDUCLAS2'] == 'TOTAL': Data.backfile = Header['BACKFILE'] except KeyError as e: print('Keyword BACKFILE not found in file ',path) return Data def spectra_support(self, n, source_backscal, channels=None): """ Compute the spectrum support, if the data instance is background, assuming Poisson statistics. :param int n: The width, in sigma, to compute the support for. :param float source_backscal: The source scaling backscal parameter, to apply rescaling to the background spectrum so the background extraction area matches the source's. This value can be found using Data.backscal on the source's Data instance. """ # Get the background counts prior counts = self.counts.sum(axis=1) counts_support = _np.array([counts-n*_np.sqrt(counts),counts+n*_np.sqrt(counts)]).T counts_support[counts_support[:,0] < 0.0, 0] = 0.0 # Correct for null support upper, values which make the background marginalisation go wrong (interval of integration is then [0,0]) # Does it by getting the nearest strictly positive value of the upper support bound for i in range(counts_support.shape[0]): if counts_support[i,1] == 0.0: for j in range(1, counts_support.shape[0]): if i+j < counts_support.shape[0] and counts_support[i+j,1] > 0.0: counts_support[i,1] = counts_support[i+j,1] break elif i-j >= 0 and counts_support[i-j,1] > 0.0: counts_support[i,1] = counts_support[i-j,1] break # Apply the scaling count_rate_support = counts_support / self.exposure_time try: count_rate_support *= ( source_backscal / self.backscal ) except: raise IOError('No BACKSCAL was provided for source. Could not compute the support of the spectrum') # If channels, extract the reight values if channels is not None: channel_indexes = [ _np.where(self.channels == ch)[0][0] for ch in channels ] count_rate_support = count_rate_support[ channel_indexes , :] # Clean count_rate_support = _contig( count_rate_support, dtype=_np.double ) return count_rate_support def plot(self, num_rot = 2, dpi=200, colormap='inferno'): """ Plot the data in a convenient way. :param int num_rot: The number of rotations to plot. :param int dpi: The resolution of the plot. :param str colormap: The colormap to use. """ # Get the counts counts_list = [ self.counts for i in range(num_rot) ] phase_list = [self.phases[:-1] + i for i in range(num_rot)] counts = _np.concatenate( (counts_list), axis=1 ) phases = _np.concatenate( (phase_list), axis=0 ) # Do the plot fig,axs = _mpl.pyplot.subplots( 2,2 , height_ratios=[1.,1.], width_ratios=[3,1.5], sharex='col',sharey='row') fig.subplots_adjust(wspace=0, hspace=0) axs[0,1].axis('off') # Plot the 2D data ax2 = axs[1,0] im = ax2.pcolormesh( phases, self.channels , counts, cmap=colormap) ax2.set_xlabel(r'Phase $\phi$ [cycles]') ax2.set_ylabel('PI channel') ax2.set_yscale('log') # Plot the pulse ax1 = axs[0,0] ax1.sharex( ax2 ) ax1.tick_params(direction='in', which='both',labelbottom=False) ax1.errorbar( x=phases, y=counts.sum(axis=0), yerr=_np.sqrt( counts.sum(axis=0) ), ds='steps-mid', color='black' ) ax1.set_ylabel('Counts') # Plot the spectrum ax3 = axs[1,1] ax3.sharey( ax2 ) ax3.set_yscale('log') ax3.tick_params(direction='in', which='both',labelbottom=False) ax3.step( counts.sum(axis=1)/2, self.channels , color='black') ax3.set_xlabel('Cts/chan.') # Add the colorbar fig.colorbar( im , ax=ax3 , location='right', label='Counts') fig.set_dpi(dpi) return fig, axs