Synthetic data

This notebook is to help synthetise data using a very simple model (ST- : single temperature) For more complex model, see: https://xpsi-group.github.io/xpsi/Modeling.html For synthetic data with different instruments, see: https://xpsi-group.github.io/xpsi/Instrument_synergy.html

[1]:
%matplotlib inline

import os
import numpy as np
import math

from matplotlib import pyplot as plt
from matplotlib import pyplot as plt
from matplotlib.offsetbox import AnchoredText
from matplotlib import cm

import xpsi
from xpsi import Parameter
from xpsi.global_imports import _c, _G, _dpr, gravradius, _csq, _km, _2pi
/=============================================\
| X-PSI: X-ray Pulse Simulation and Inference |
|---------------------------------------------|
|                Version: 3.2.1               |
|---------------------------------------------|
|      https://xpsi-group.github.io/xpsi      |
\=============================================/

Imported emcee version: 3.1.6
Imported PyMultiNest.
Check your UltraNest installation.
Check your installation of UltraNest if using the UltranestSampler
Warning: Cannot import torch and test SBI_wrapper.
Imported GetDist version: 1.7.5
Imported nestcheck version: 0.2.1

Instrument (fake)

We create a fake telescope response.

This fake instrument has:

  • Energy range of 0.1-15 keV

  • A perfect normalized RMF

  • An effective area of 1800 cm2 for each photon energy bin

  • Channel edges

[2]:



channel_number=np.arange(0,1501) # The channel nnumber energy_low=np.arange(0,15.01, 0.01) # Lower bounds of each channel energy_high=energy_low+0.01 # Upper bounds of each channel channel_edges=np.array([list(channel_number),list(energy_low),list(energy_high)]).T # ARF arf_energy_low=[0.1] arf_energy_high=[0.105] arf_val=[1800] counter=1 while arf_energy_low[-1]<=14.995: arf_energy_low.append(arf_energy_low[-1]+0.005) arf_energy_high.append(arf_energy_high[-1]+0.005) arf_val.append(1800) counter +=1 ARF=np.array([list(arf_energy_low), list(arf_energy_high), list(arf_val)]).T # RMF RMF=np.diag(np.full(counter,1))
[3]:
# Quick plot to show what we have created

fig,ax=plt.subplots(1,3,figsize=(17,5))
ax[0].plot(ARF[:,0],ARF[:,2])
rmf=ax[1].imshow(RMF,origin="lower")#,aspect="auto")
rsp=ax[2].imshow(RMF*ARF[:,2],origin="lower")#,extent = [0, 1500, 0, ],aspect=1)

plt.colorbar(rmf,ax=ax[1], fraction=0.046, pad=0.05)
plt.colorbar(rsp,ax=ax[2],fraction=0.046, pad=0.05, shrink=1)

ax[0].set_title('ARF')
ax[1].set_title('RMF')
ax[2].set_title('RSP')

ax[0].set_xlabel("Energy (KeV)")
ax[0].set_ylabel("Efective area (cm^2)")
[3]:
Text(0, 0.5, 'Efective area (cm^2)')
_images/Synthetic_data_6_1.png
[4]:
# Instrument Class
class CustomInstrument(xpsi.Instrument):

    """ Fake telescope response. """

    def __call__(self, signal, *args):
        """ Overwrite base just to show it is possible.

        We loaded only a submatrix of the total instrument response
        matrix into memory, so here we can simplify the method in the
        base class.

        """
        matrix = self.construct_matrix()
        self._folded_signal = np.dot(matrix, signal)
        return self._folded_signal

    @classmethod
    def from_response_files(cls, ARF, RMF, max_input, min_input=0,channel=[1,1500],
                            channel_edges=None):
        """ Constructor which converts response files into :class:`numpy.ndarray`s.
        :param str ARF: Path to ARF which is compatible with
                                :func:`numpy.loadtxt`.
        :param str RMF: Path to RMF which is compatible with
                                :func:`numpy.loadtxt`.
        :param str channel_edges: Optional path to edges which is compatible with
                                  :func:`numpy.loadtxt`.
        """

        if min_input != 0:
            min_input = int(min_input)

        max_input = int(max_input)

        matrix = np.ascontiguousarray(RMF[min_input:max_input,channel[0]:channel[1]].T, dtype=np.double)

        edges = np.zeros(ARF[min_input:max_input,2].shape[0]+1, dtype=np.double)
        edges[0] = ARF[min_input,0]; edges[1:] = ARF[min_input:max_input,1]

        for i in range(matrix.shape[0]):
            matrix[i,:] *= ARF[min_input:max_input,2]
        channels = np.arange(channel[0],channel[1])

        return cls(matrix, edges, channels, channel_edges[channel[0]:channel[1]+1,1])
[5]:
# Because this RSP is ideal and perfectly diagonal: then max_input, min_input =channel[1], channel[0]

Instrument = CustomInstrument.from_response_files(ARF =ARF,
                                             RMF = RMF,
                                             max_input = 301,
                                             min_input = 10,
                                             channel=[10,301],
                                             channel_edges =channel_edges)
Setting channels for loaded instrument response (sub)matrix...
Channels set.
An empty subspace was created. This is normal behavior - no parameters were supplied.

Signal

[6]:
# Nothing very special here
from xpsi.tools.synthesise import synthesise_exposure as _synthesise_expo

class CustomSignal(xpsi.Signal):
    """ A custom calculation of the logarithm of the NICER likelihood.
    Here we actually restrict computation to the expected counts only
    as the likelihood is not needed.

    We extend the :class:`xpsi.Signal.Signal` class to make it callable.

    We overwrite the body of the __call__ method. The docstring for the
    abstract method is copied.
    """

    def __init__(self, support = None, *args, **kwargs):
        super(CustomSignal, self).__init__(*args, **kwargs)

    @property
    def support(self):
        return self._support

    @support.setter
    def support(self, obj):
        self._support = obj

    def __call__(self, *args, **kwargs):
        self._expected_counts, _, _  = _synthesise_expo(self.exposure_time,
                                                                self._data.phases,
                                                                self._signals,
                                                                self._phases,
                                                                self._shifts,
                                                                0.0,
                                                                np.zeros((len(self._data.channels),len(self._data.phases)-1))
                                                                )

Space-time

[7]:
bounds = dict(distance = (0.1, 10.0),                       # (Earth) distance
                mass = (1.0, 3.0),                          # mass
                radius = (3.0 * gravradius(1.0), 16.0),     # equatorial radius
                cos_inclination = (0.0, 1.0))               # (Earth) inclination to rotation axis

spacetime = xpsi.Spacetime(bounds=bounds, values=dict(frequency=314.0))# Fixing the spin

Creating parameter:
    > Named "frequency" with fixed value 3.140e+02.
    > Spin frequency [Hz].
Creating parameter:
    > Named "mass" with bounds [1.000e+00, 3.000e+00].
    > Gravitational mass [solar masses].
Creating parameter:
    > Named "radius" with bounds [4.430e+00, 1.600e+01].
    > Coordinate equatorial radius [km].
Creating parameter:
    > Named "distance" with bounds [1.000e-01, 1.000e+01].
    > Earth distance [kpc].
Creating parameter:
    > Named "cos_inclination" with bounds [0.000e+00, 1.000e+00].
    > Cosine of Earth inclination to rotation axis.

Hot spot

We will use a single hot spot. For complex models, see: https://xpsi-group.github.io/xpsi

[8]:
# Using default hard-coded bounds

bounds = dict(super_colatitude = (None, None),
              super_radius = (None, None),
              phase_shift = (-0.25, 0.75),
              super_temperature = (None, None))

# a simple circular, simply-connected spot
hot_spot = xpsi.HotRegion(bounds=bounds,
                            values={}, # no initial values and no derived/fixed
                            symmetry=True,
                            omit=False,
                            cede=False,
                            concentric=False,
                            sqrt_num_cells=32,
                            min_sqrt_num_cells=10,
                            max_sqrt_num_cells=64,
                            num_leaves=100,
                            num_rays=200,
                            is_antiphased=True,
                            image_order_limit=3, # up to tertiary
                            prefix='hot') # unique prefix needed because >1 instance

Creating parameter:
    > Named "super_colatitude" with bounds [0.000e+00, 3.142e+00].
    > The colatitude of the centre of the superseding region [radians].
Creating parameter:
    > Named "super_radius" with bounds [0.000e+00, 1.571e+00].
    > The angular radius of the (circular) superseding region [radians].
Creating parameter:
    > Named "phase_shift" with bounds [-2.500e-01, 7.500e-01].
    > The phase of the hot region, a periodic parameter [cycles].
Creating parameter:
    > Named "super_temperature" with bounds [3.000e+00, 7.600e+00].
    > log10(superseding region effective temperature [K]).

Phostosphere

We will use here the default black-body emission model. No emission from the rest of the star, all the emission is from the hot spot

[9]:
class CustomPhotosphere(xpsi.Photosphere):
    """ Implement method for imaging."""

    @property
    def global_variables(self):

        return np.array([self['hot__super_colatitude'],
                          self['hot__phase_shift'] * _2pi,
                          self['hot__super_radius'],
                          self['hot__super_temperature']])

photosphere = CustomPhotosphere(hot = hot_spot, elsewhere = None,
                                values=dict(mode_frequency = spacetime['frequency']))


Creating parameter:
    > Named "mode_frequency" with fixed value 3.140e+02.
    > Coordinate frequency of the mode of radiative asymmetry in the
photosphere that is assumed to generate the pulsed signal [Hz].

Star

[10]:
# Star object
star = xpsi.Star(spacetime = spacetime, photospheres = photosphere)

Prior

Simple Prior class, nothing very fancy here

[11]:
class CustomPrior(xpsi.Prior):
    """ A custom (joint) prior distribution.

    Source: Fictitious
    Model variant: ST-

    """

    __derived_names__ = ['compactness', 'phase_separation',]

    def __init__(self):
        """ Nothing to be done.

        A direct reference to the spacetime object could be put here
        for use in __call__:

        .. code-block::

            self.spacetime = ref

        Instead we get a reference to the spacetime object through the
        a reference to a likelihood object which encapsulates a
        reference to the spacetime object.

        """
        super(CustomPrior, self).__init__() # not strictly required if no hyperparameters

    def __call__(self, p = None):
        """ Evaluate distribution at ``p``.

        :param list p: Model parameter values.

        :returns: Logarithm of the distribution evaluated at ``p``.

        """
        temp = super(CustomPrior, self).__call__(p)
        if not np.isfinite(temp):
            return temp

        # based on contemporary EOS theory
        if not self.parameters['radius'] <= 16.0:
            return -np.inf

        ref = self.parameters.star.spacetime # shortcut

        # Compactness limit
        R_p = 1.0 + ref.epsilon * (-0.788 + 1.030 * ref.zeta)
        if R_p < 1.505 / ref.R_r_s:
            return -np.inf

        mu = math.sqrt(-1.0 / (3.0 * ref.epsilon * (-0.788 + 1.030 * ref.zeta)))

        # 2-surface cross-section have a single maximum in |z|
        # i.e., an elliptical surface; minor effect on support, if any,
        # for high spin frequenies
        if mu < 1.0:
            return -np.inf

        ref = self.parameters

        return 0.0

    def inverse_sample(self, hypercube=None):
        """ Draw sample uniformly from the distribution via inverse sampling. """

        to_cache = self.parameters.vector

        if hypercube is None:
            hypercube = np.random.rand(len(self))

        # the base method is useful, so to avoid writing that code again:
        _ = super(CustomPrior, self).inverse_sample(hypercube)

        ref = self.parameters # shortcut

        # restore proper cache
        for parameter, cache in zip(ref, to_cache):
            parameter.cached = cache

        # it is important that we return the desired vector because it is
        # automatically written to disk by MultiNest and only by MultiNest
        return self.parameters.vector

    def transform(self, p, **kwargs):
        """ A transformation for post-processing. """

        p = list(p) # copy

        # used ordered names and values
        ref = dict(zip(self.parameters.names, p))

        # compactness ratio M/R_eq
        p += [gravradius(ref['mass']) / ref['radius']]

        return p

[12]:
# Prior object
prior = CustomPrior()
An empty subspace was created. This is normal behavior - no parameters were supplied.

Background

[13]:
class CustomBackground(xpsi.Background):
    """ The background injected to generate synthetic data. """

    def __init__(self, bounds=None, value=None):

        # first the parameters that are fundemental to this class
        doc = """ Powerlaw spectral index. """
        index = xpsi.Parameter('powerlaw_index',
                                strict_bounds = (-4.0, -1.01),
                                bounds = bounds,
                                doc = doc,
                                symbol = r'$\Gamma$',
                                value = value)

        super(CustomBackground, self).__init__(index)

    def __call__(self, energy_edges, phases):
        """ Evaluate the incident background field. """

        G = self['powerlaw_index']
        temp = np.zeros((energy_edges.shape[0] - 1, phases.shape[0]))
        temp[:,0] = (energy_edges[1:]**(G + 1.0) - energy_edges[:-1]**(G + 1.0)) / (G + 1.0)
        for i in range(phases.shape[0]):
            temp[:,i] = temp[:,0]

        self._incident_background= temp
[14]:
background = CustomBackground(bounds=(None, None))
Creating parameter:
    > Named "powerlaw_index" with bounds [-4.000e+00, -1.010e+00].
    > Powerlaw spectral index.

Data

[15]:
class SynthesiseData(xpsi.Data):
    """ Custom data container to enable synthesis. """

    def __init__(self, channels, phases, first, last):

        self.channels = channels
        self._phases = phases

        try:
            self._first = int(first)
            self._last = int(last)
        except TypeError:
            raise TypeError('The first and last channels must be integers.')
        if self._first >= self._last:
            raise ValueError('The first channel number must be lower than the '
                             'the last channel number.')
[16]:
301-10-1
[16]:
290
[17]:
201-20-1
[17]:
180
[18]:
_data = SynthesiseData(np.arange(10,301), np.linspace(0.0, 1.0, 33), 0, 290 )
Setting channels for event data...
Channels set.

Synthesise

[19]:
signal = CustomSignal(data = _data,
                        instrument = Instrument,
                        background = background,
                        interstellar = None,
                        cache = True,
                        prefix='Instrument')

likelihood = xpsi.Likelihood(star = star, signals = signal,
                             num_energies=384,
                             threads=1,
                             externally_updated=False,
                             prior = prior)

for h in hot_spot.objects:
    h.set_phases(num_leaves = 100)
Creating parameter:
    > Named "phase_shift" with fixed value 0.000e+00.
    > The phase shift for the signal, a periodic parameter [cycles].
[20]:
likelihood
[20]:
Free parameters
---------------
mass: Gravitational mass [solar masses].
radius: Coordinate equatorial radius [km].
distance: Earth distance [kpc].
cos_inclination: Cosine of Earth inclination to rotation axis.
hot__phase_shift: The phase of the hot region, a periodic parameter [cycles].
hot__super_colatitude: The colatitude of the centre of the superseding region [radians].
hot__super_radius: The angular radius of the (circular) superseding region [radians].
hot__super_temperature: log10(superseding region effective temperature [K]).
Instrument__powerlaw_index: Powerlaw spectral index.

Saving to TXT file

[21]:
print("Processing ...")

p_T=[1.4,                        # Mass in solar Mass
    12,                          # Equatorial radius in km
    1.,                          # Distance in kpc
    math.cos(60*np.pi/180),      # Cosine of Earth inclination to rotation axis
    0.0,                         # Phase shift
    70*np.pi/180,                # Colatitude of the centre of the superseding region
    0.75,                        # Angular radius of the (circular) superseding region
    6.7,                         # Temperature in log 10
    -2                           # Background sprectral index : gamma (E^gamma)
    ]

Instrument_kwargs = dict(exposure_time=1000.0,
                        expected_background_counts=10000.0,
                        name='new_synthetic',
                        directory='../../examples/examples_fast/Data/',
                        seed=42)    # The reference has been made with seed=42 so let's use that

likelihood.synthesise(p_T, force=True, Instrument=Instrument_kwargs)

print("Done !")
Processing ...
Using background from signal._background.
Done !

Checking

[22]:
# Loading the data that you would get if the installation went well
good_xspi_data=np.loadtxt("../../examples/examples_fast/Data/xpsi_good_realisation.dat")

# Loading the data that you got
your_data=np.loadtxt("../../examples/examples_fast/Data/new_synthetic_realisation.dat")

residual=(good_xspi_data-your_data)

fig,ax=plt.subplots(1,3,figsize=(20,5))

xpsi_d=ax[0].imshow(good_xspi_data,cmap=cm.magma,origin="lower", aspect="auto",extent=[0,1,10,300])
you_d=ax[1].imshow(your_data,cmap=cm.magma,origin="lower", aspect="auto",extent=[0,1,10,300])
res=ax[2].imshow(residual,cmap=cm.magma,origin="lower", aspect="auto",extent=[0,1,10,300])

anchored_text1 = AnchoredText("xpsi_good_installation",loc=1)
anchored_text2 = AnchoredText("Your installation",loc=1)
anchored_text3 = AnchoredText("Residual",loc=1)

ax[0].set_ylabel("Channels")
ax[0].set_xlabel("Phases")
ax[1].set_xlabel("Phases")
ax[2].set_xlabel("Phases")
ax[0].add_artist(anchored_text1)
ax[1].add_artist(anchored_text2)
ax[2].add_artist(anchored_text3)
plt.colorbar(xpsi_d,ax=ax[0])
plt.colorbar(you_d,ax=ax[1])
plt.colorbar(res,ax=ax[2])
[22]:
<matplotlib.colorbar.Colorbar at 0x37d6ceba0>
_images/Synthetic_data_40_1.png

Saving to FITS file

We can also save the data as fits files.

It is also possible to also define yourself the background, however, you need to not have a background instance linked to your likelihood.

Here is how it goes.

[23]:
# Recreating signal but without background
signal = CustomSignal(data = _data,
                        instrument = Instrument,
                        interstellar = None,
                        cache = True,
                        prefix='Instrument')

likelihood = xpsi.Likelihood(star = star, signals = signal,
                             num_energies=384,
                             threads=1,
                             externally_updated=False,
                             prior = prior)

for h in hot_spot.objects:
    h.set_phases(num_leaves = 100)
Creating parameter:
    > Named "phase_shift" with fixed value 0.000e+00.
    > The phase shift for the signal, a periodic parameter [cycles].
[24]:
print("Processing ...")

p_T=[1.4,                         # Mass in solar Mass
    12,                          # Equatorial radius in km
    1.,                          # Distance in kpc
    math.cos(60*np.pi/180),      # Cosine of Earth inclination to rotation axis
    0.0,                         # Phase shift
    70*np.pi/180,                # Colatitude of the centre of the superseding region
    0.75,                        # Angular radius of the (circular) superseding region
    6.7,                         # Temperature in log 10
    ]

background_data = 1e3*np.ones(291)
Instrument_kwargs = dict(exposure_time=1000.0,              # Exposure time in seconds
                        data_BKG=background_data,           # Background data to apply, can be 1D if phase-averaged
                                                            # Needs to be 2D otherwise
                        format='FITS',                      # Format of the data : 'FITS' or 'TXT'
                        instrument_name='FAKE INSTRUMENT',  # Name of the instrument, must match the response files
                        backscal_ratio=1.0,                 # Backscale ratio (bkg/src), 1 if the data is from the same region as the source
                                                            # Otherwise set it to the ratio to use for latter inference on synthetic data
                        name='new_synthetic',               # File name
                        directory='../../examples/examples_fast/Data',  # Directory
                        seed=42,                            # Seed for RNG
                        save_background=True,               # Save the background files as well ?
                        )

likelihood.synthesise(p_T, force=True, Instrument=Instrument_kwargs)

print("Done !")
Processing ...
Using data_BKG.
Done !
[25]:
# Loading the data that you just generated
expected_counts_realization = xpsi.Data.from_evt("../../examples/examples_fast/Data/new_synthetic_realisation.evt")
background_realization = xpsi.Data.from_evt("../../examples/examples_fast/Data/new_synthetic_bkg_realisation.evt")

fig,ax=plt.subplots(1,3,figsize=(20,5))

expec_d = ax[0].imshow(expected_counts_realization.counts,cmap=cm.magma,origin="lower", aspect="auto",extent=[0,1,10,300])
bkg_d = ax[1].imshow(background_realization.counts,cmap=cm.magma,origin="lower", aspect="auto",extent=[0,1,10,300])
ax[2].plot(expected_counts_realization.phases[:-1], expected_counts_realization.counts.sum(axis=0),label="Signal", color='k')
ax[2].plot(background_realization.phases[:-1], background_realization.counts.sum(axis=0),label="Background", color='r')

anchored_text1 = AnchoredText("Expected counts",loc=1)
anchored_text2 = AnchoredText("Background",loc=1)
anchored_text3 = AnchoredText("Bolometric pulse",loc=1)

ax[0].set_ylabel("Channels")
ax[0].set_xlabel("Phases")
ax[1].set_xlabel("Phases")
ax[2].set_xlabel("Phases")
ax[2].set_ylabel("Counts")
ax[2].legend(loc='upper left')
ax[0].add_artist(anchored_text1)
ax[1].add_artist(anchored_text2)
ax[2].add_artist(anchored_text3)
plt.colorbar(expec_d,ax=ax[0])
plt.colorbar(bkg_d,ax=ax[1])
Loading event list and phase binning...
Setting channels for event data...
Channels set.
Events loaded and binned.
Loading event list and phase binning...
Setting channels for event data...
Channels set.
Events loaded and binned.
[25]:
<matplotlib.colorbar.Colorbar at 0x37d78fdd0>
_images/Synthetic_data_45_2.png