# -*- mode: python; coding: utf-8 -*-
# Copyright 2015 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.
"""pwkit.environments.sas.data - loading up SAS data sets
"""
from __future__ import absolute_import, division, print_function, unicode_literals
__all__ = str ('BaseSASData Events GTIData Lightcurve RegionData').split ()
import numpy as np, pandas as pd
from six.moves import range
from astropy.time import Time
from ... import astutil, cli
from ...io import Path
from ...numutil import fits_recarray_to_data_frame
[docs]class BaseSASData (object):
telescope = None
"Telescope name: likely 'XMM'"
instrument = None
"Instrument used: likely 'EMOS1', 'EMOS2', 'EPN'"
obsid = None
"Observation ID as a string; resembles '0748391401'."
expid = None
"Exposure ID as a string; resembles '0748391401003'."
revnum = None
"Revolution (orbit) number as an integer."
timesys = None
"Time reference system used; e.g. 'TT'."
mjdref = None
"MJD reference value; mjd = MET / 86400 + mjdref."
t0 = None
"""Offset Mission Elapsed Time (seconds) value ; default is
the first data timestamp in the data set.
"""
mjd0 = None
"Offset MJD; default mjd0 = floor (t0 / 86400 + mjdref)."
obs_start = None
"Start time of the observation as an astropy.time.Time() instance."
obs_end = None
"End time of the observation as an astropy.time.Time() instance."
targ_name = None
"Name of the observing target as a string."
targ_ra = None
"RA of the observing target, in radians."
targ_dec = None
"Dec of the observing target, in radians."
proj_csys = None
"Coordinate system of the Y/X projection as a string; e.g. 'FK5'."
proj_equinox = None
"Equinox of the Y/X projection as a float; e.g. 2000.0."
proj_types = None
"""Axis types of the Y/X projection as a pair of strings; e.g. ['DEC--TAN',
'RA---TAN'].
"""
proj_crpix = None
"""Reference point pixel values of the Y/X projection as a 2-element ndarray.
Order is [y,x].
"""
proj_crval = None
"""Reference point coordinate values of the Y/X projection as a 2-element
ndarray, in radians. Order is [dec,ra] (= [lat,lon] = [y,x]).
"""
proj_cdelt = None
"""Pixel-to-world scale factors of the Y/X projection as a 2-element ndarray,
in radians per pixel. Order is [y,x].
"""
calindex = None
"DataFrame of calibration table info. Schema to be investigated."
def __init__ (self, path, mjd0=None, t0=None):
self.mjd0 = mjd0
self.t0 = t0
with Path (path).read_fits () as hdulist:
self._process_main (hdulist, hdulist[0].header)
assert self.mjdref is not None
assert self.t0 is not None
assert self.mjd0 is not None
for hdu in hdulist[1:]:
self._process_hdu (hdu)
def _process_main (self, hdulist, header):
self.telescope = header.get ('TELESCOP')
self.instrument = header.get ('INSTRUME')
self.obsid = header.get ('OBS_ID')
self.expid = header.get ('EXP_ID')
self.revnum = header.get ('REVOLUT')
self.filter = header.get ('FILTER')
self.targ_name = header.get ('OBJECT')
if 'DATE-OBS' in header:
self.obs_start = Time (header['DATE-OBS'], format='isot')
if 'DATE-END' in header:
self.obs_stop = Time (header['DATE-END'], format='isot')
if 'RA_OBJ' in header:
self.targ_ra = header['RA_OBJ'] * astutil.D2R
self.targ_dec = header['DEC_OBJ'] * astutil.D2R
if 'RADECSYS' in header:
if header['REFYCUNI'] != 'deg' or header['REFXCUNI'] != 'deg':
raise ValueError ('expect projection to be in degree units')
self.proj_csys = header['RADECSYS']
self.proj_equinox = header['EQUINOX']
self.proj_types = [header['REFYCTYP'], header['REFXCTYP']]
self.proj_crpix = np.asarray ([header['REFYCRPX'], header['REFXCRPX']])
self.proj_crval = np.asarray ([header['REFYCRVL'], header['REFXCRVL']])
self.proj_crval *= astutil.D2R
self.proj_cdelt = np.asarray ([header['REFYCDLT'], header['REFXCDLT']])
self.proj_cdelt *= astutil.D2R
def _process_hdu (self, hdu):
if hdu.name == 'CALINDEX':
self.calindex = fits_recarray_to_data_frame (hdu.data)
else:
cli.warn ('ignoring HDU named %s', hdu.name)
[docs]class GTIData (BaseSASData):
gti = None
"""Dict mapping CCD number to DataFrames of GTI info. Index: integers.
Columns:
start_met - GTI start time in MET seconds
stop_met - GTI stop time in MET seconds
start_mjd - GTI start time as MJD
stop_mjd - GTI stop time as MJD
start_dks - GTI start time as delta-kiloseconds
stop_dks - GTI stop time as delta-kiloseconds
start_dmjd - GTI start time as `mjd - mjd0`
stop_dmjd - GTI stop time as `mjd - mjd0`
"""
def _process_main (self, hdulist, header):
super (GTIData, self)._process_main (hdulist, header)
self.gti = {}
def _process_hdu (self, hdu):
if hdu.name.startswith ('STDGTI'):
ccd = int (hdu.name[6:])
gti = self.gti[ccd] = fits_recarray_to_data_frame (hdu.data)
gti.rename (columns={'start': 'start_met', 'stop': 'stop_met'},
inplace=True)
gti['start_mjd'] = gti.start_met / 86400 + self.mjdref
gti['start_dks'] = 1e-3 * (gti.start_met - self.t0)
gti['start_dmjd'] = gti.start_mjd - self.mjd0
gti['stop_mjd'] = gti.stop_met / 86400 + self.mjdref
gti['stop_dks'] = 1e-3 * (gti.stop_met - self.t0)
gti['stop_dmjd'] = gti.stop_mjd - self.mjd0
else:
super (GTIData, self)._process_hdu (hdu)
def _plot_add_gtis (self, p, ccdnum, tunit='dmjd'):
import omega as om
gti = self.gti[ccdnum]
ngti = gti.shape[0]
gti0 = gti.at[0,'start_'+tunit]
gti1 = gti.at[ngti-1,'stop_'+tunit]
smallofs = (gti1 - gti0) * 0.03
start = gti0 - smallofs
for i in range (ngti):
p.add (om.rect.XBand (start, gti.at[i,'start_'+tunit], keyText=None), zheight=-1, dsn=1)
start = gti.at[i,'stop_'+tunit]
p.add (om.rect.XBand (start, start + smallofs, keyText=None), zheight=-1, dsn=1)
p.setBounds (gti0 - smallofs, gti1 + smallofs)
[docs]class RegionData (BaseSASData):
regions = None
"""Dict mapping identifier to DataFrames of selection region info. Identifiers
are like "00106" in SAS but I don't understand their significance.
DataFrame index: integers. Columns:
shape - region shape as string: 'CIRCLE', ...
x - region center in X
y - region center in Y
r - circle radius (meaning for other shapes?)
component - ?
"""
def _process_main (self, hdulist, header):
super (RegionData, self)._process_main (hdulist, header)
self.regions = {}
def _process_hdu (self, hdu):
if hdu.name.startswith ('REG'):
ident = hdu.name[3:]
self.regions[ident] = fits_recarray_to_data_frame (hdu.data)
else:
super (RegionData, self)._process_hdu (hdu)
[docs]class Events (GTIData, RegionData):
filter = None
"Filter used as a string; e.g. 'Medium'."
elapsed = None
"Time elapsed over all events, in seconds."
ccd_info = None
"""DataFrame of scalar CCD info. Index: CCD numbers. Columns:
ontime - CCD on time (sum of GTIs) in seconds
livetime - CCD live time (always less than ontime) in seconds
"""
events = None
"""DataFrame of events. Index: integers. Columns:
ccdnr - CCD number of event
detx - detector X; not very useful
dety - detector Y
flag -
pat_id -
pat_seq -
pattern -
pha - pulse height amplitude
pi - pulse-independent energy
rawx - raw CCD X
rawy - raw CCD Y
time - event time in MET seconds
time_raw - ?
x - corrected X
y - corrected Y
mjd - event time as MJD
dks - event time as delta-kiloseconds
dmjd - event time as `mjd - mjd0`
"""
offsets = None
"""DataFrame of "offsets". Some kind of CCD meta-info."""
#exposure = None
# Commented out; exposure tables are quite large and I don't know if they're
# useful for anything.
badpix = None
"""Dict mapping CCD number to DataFrames of bad-pixel info. Index: integers.
Columns: badflag, rawx, rawy, type, yextent.
"""
def _process_main (self, hdulist, header):
super (Events, self)._process_main (hdulist, header)
ccd_nums = set ()
for hdu in hdulist[1:]:
if hdu.name.startswith ('EXPOSU'):
ccd_nums.add (int (hdu.name[6:]))
ccd_nums = sorted (ccd_nums)
#self.exposure = {}
self.badpix = {}
self.ccd_info = pd.DataFrame ({}, index=ccd_nums)
hdu = hdulist['EVENTS']
self.events = fits_recarray_to_data_frame (hdu.data)
self.mjdref = hdu.header['MJDREF']
if self.t0 is None:
self.t0 = self.events.time.min ()
if self.mjd0 is None:
self.mjd0 = np.floor (self.t0 / 86400 + self.mjdref)
self.events['mjd'] = self.events.time / 86400 + self.mjdref
self.events['dks'] = 1e-3 * (self.events.time - self.t0)
self.events['dmjd'] = self.events.mjd - self.mjd0
self.timesys = hdu.header['TIMESYS']
self.elapsed = hdu.header['TELAPSE']
self.ccd_info['ontime'] = np.nan
self.ccd_info['livetime'] = np.nan
for ccd in ccd_nums:
self.ccd_info.at[ccd,'ontime'] = hdu.header['ONTIME%02d' % ccd]
self.ccd_info.at[ccd,'livetime'] = hdu.header['LIVETI%02d' % ccd]
def _process_hdu (self, hdu):
if hdu.name == 'EVENTS':
pass
elif hdu.name == 'OFFSETS':
self.offsets = fits_recarray_to_data_frame (hdu.data)
elif hdu.name.startswith ('EXPOSU'):
# These data are very large, and their purpose is unclear to me.
pass
#ccd = int (hdu.name[6:])
#exp = self.exposure[ccd] = fits_recarray_to_data_frame (hdu.data)
#exp['mjd'] = exp.time / 86400 + self.mjdref
#exp['dks'] = 1e-3 * (exp.time - self.t0)
elif hdu.name.startswith ('BADPIX'):
ccd = int (hdu.name[6:])
self.badpix[ccd] = fits_recarray_to_data_frame (hdu.data)
else:
super (Events, self)._process_hdu (hdu)
def plot_pi_time (self, ccdnum):
import omega as om
p = om.quickDF (self.events[self.events.ccdnr == ccdnum][['dmjd', 'pi']],
self.targ_name,
lines=False)
self._plot_add_gtis (p, ccdnum)
p.setLabels ('MJD - %.0f' % self.mjd0, 'PI')
return p
def plot_lightcurve (self, ccd_id=None, bin_energies=False):
# XXX CIAO COPY/PASTE DOES THIS EVEN WORK???
import omega as om
from ...bblocks import tt_bblock
from ..ciao.data import tight_bounds
if ccd_id is None:
if len (self.gti) != 1:
raise Exception ('must specify ccd_id')
ccd_id = list(self.gti.keys())[0]
kev = self.events['pi'] * 1e-3 # XXXXXXX
vb = om.layout.VBox (2)
if kev.size == 0:
vb[0] = om.RectPlot()
vb[1] = om.RectPlot()
tmin = self.gti[ccd_id]['start_dmjd'].min()
tmax = self.gti[ccd_id]['stop_dmjd'].max()
if np.isnan(tmin):
tmin, tmax = -1., 1.
emin, emax = -1., 1.
rmin, rmax = -1., 1.
else:
bbinfo = tt_bblock (
self.gti[ccd_id]['start_dmjd'],
self.gti[ccd_id]['stop_dmjd'],
self.events['dmjd'].sort_values(),
intersect_with_bins = True,
)
cps = bbinfo.rates / 86400
tmin, tmax = tight_bounds (bbinfo.ledges[0], bbinfo.redges[-1])
emin, emax = tight_bounds (kev.min (), kev.max ())
rmin, rmax = tight_bounds (cps.min (), cps.max ())
vb[0] = om.RectPlot ()
csp = om.rect.ContinuousSteppedPainter (keyText='%d events' % (self.events.shape[0]))
csp.setFloats (np.concatenate ((bbinfo.ledges, bbinfo.redges[-1:])),
np.concatenate ((cps, [0])))
vb[0].add (csp)
if bin_energies:
vb[1] = self._plot_binned_event_energies(
bbinfo,
energy_scale = 1e-3,
dsn = 0
)
else:
vb[1] = om.quickXY (self.events['dmjd'], kev, None, lines=0)
vb[0].setBounds (tmin, tmax, rmin, rmax)
vb[0].setYLabel ('Count rate (ct/s)')
vb[0].bpainter.paintLabels = False
self._plot_add_gtis (vb[0], ccd_id)
vb[1].setBounds (tmin, tmax, emin, emax)
vb[1].setLabels ('MJD - %d' % self.mjd0, 'Energy (keV)')
self._plot_add_gtis (vb[1], ccd_id)
return vb
[docs]class Lightcurve (GTIData, RegionData):
filter = None
"Filter used as a string; e.g. 'Medium'."
binsize = None
"The bin size used when creating the light curve, in seconds."
start_met = start_mjd = start_dks = start_dmjd = None
"The binning start time, in various time units."
stop_met = stop_mjd = stop_dks = stop_dmjd = None
"The binning stop time, in various time units."
energy_type = None
"The name of the energy column used; likely 'PHA' or 'PI'."
energy_min = None
"The lower limit on the energy column; units depend on `energy_type`."
energy_max = None
"The upper limit on the energy column; units depend on `energy_type`."
exposure = None
"Weighted live time of all the CCDs in the extraction region."
lc = None
"""DataFrame of light curve bins. Index: integers. Columns:
time - timestamp of bin center in MET seconds
mjd - timestamp of bin center as MJD
dks - timestamp of bin center in delta-kiloseconds
dmjd - timestamp of bin center as `mjd - mjd0`
left_dmjd - timestamp of bin left edge as `mjd - mjd0`
right_dmjd - timestamp of bin right edge as `mjd - mjd0`
counts - number of events in this bin
rate - event rate in counts per second
u_rate - uncertainty on `rate`.
I'm pretty sure that `u_rate` may not be just the straight Poisson noise
since there may be GTI gaps.
"""
def _process_main (self, hdulist, header):
super (Lightcurve, self)._process_main (hdulist, header)
# Early loading of bulk data
hdu = hdulist['RATE']
self.lc = fits_recarray_to_data_frame (hdu.data)
self.lc.rename (columns={'error': 'u_rate'}, inplace=True)
# Straighten out timekeeping
self.mjdref = float (hdu.header['MJDREF'])
self.timesys = hdu.header.get ('TIMESYS')
if self.t0 is None:
self.t0 = self.lc.time.min ()
if self.mjd0 is None:
self.mjd0 = np.floor (self.t0 / 86400 + self.mjdref)
# Fill in the rest
self.filter = hdu.header.get ('FILTER')
self.binsize = hdu.header.get ('TIMEDEL')
self.start_met = hdu.header['TSTART']
self.start_mjd = self.start_met / 86400 + self.mjdref
self.start_dks = 1e-3 * (self.start_met - self.t0)
self.start_dmjd = self.start_mjd - self.mjd0
self.stop_met = hdu.header['TSTOP']
self.stop_mjd = self.stop_met / 86400 + self.mjdref
self.stop_dks = 1e-3 * (self.stop_met - self.t0)
self.stop_dmjd = self.stop_mjd - self.mjd0
self.energy_min = hdu.header.get ('CHANMIN')
self.energy_max = hdu.header.get ('CHANMAX')
self.energy_type = hdu.header.get ('CHANTYPE')
self.exposure = hdu.header.get ('EXPOSURE')
self.lc['counts'] = self.lc.rate * self.binsize
self.lc['mjd'] = self.lc.time / 86400 + self.mjdref
self.lc['dks'] = 1e-3 * (self.lc.time - self.t0)
self.lc['dmjd'] = self.lc.mjd - self.mjd0
self.lc['left_dmjd'] = self.lc.dmjd - 0.5 * self.binsize / 86400
self.lc['right_dmjd'] = self.lc.dmjd + 0.5 * self.binsize / 86400
def _process_hdu (self, hdu):
if hdu.name == 'RATE':
pass
else:
super (Lightcurve, self)._process_hdu (hdu)
def plot_curve (self, ccdnum=None):
import omega as om
p = om.quickDF (self.lc[['dmjd', 'rate', 'u_rate']].dropna (),
self.targ_name,
lines=False)
if ccdnum is not None:
self._plot_add_gtis (p, ccdnum)
p.setLabels ('MJD - %.0f' % self.mjd0, 'Count rate')
return p