Source code for pwkit.environments.sas.data

# -*- mode: python; coding: utf-8 -*-
# Copyright 2015 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.

"""pwkit.environments.sas.data - loading up SAS data sets

"""
from __future__ import absolute_import, division, print_function, unicode_literals

__all__ = str("BaseSASData Events GTIData Lightcurve RegionData").split()

import numpy as np, pandas as pd
from astropy.time import Time
from ... import astutil, cli
from ...io import Path
from ...numutil import fits_recarray_to_data_frame


[docs] class BaseSASData(object): telescope = None "Telescope name: likely 'XMM'" instrument = None "Instrument used: likely 'EMOS1', 'EMOS2', 'EPN'" obsid = None "Observation ID as a string; resembles '0748391401'." expid = None "Exposure ID as a string; resembles '0748391401003'." revnum = None "Revolution (orbit) number as an integer." timesys = None "Time reference system used; e.g. 'TT'." mjdref = None "MJD reference value; mjd = MET / 86400 + mjdref." t0 = None """Offset Mission Elapsed Time (seconds) value ; default is the first data timestamp in the data set. """ mjd0 = None "Offset MJD; default mjd0 = floor (t0 / 86400 + mjdref)." obs_start = None "Start time of the observation as an astropy.time.Time() instance." obs_end = None "End time of the observation as an astropy.time.Time() instance." targ_name = None "Name of the observing target as a string." targ_ra = None "RA of the observing target, in radians." targ_dec = None "Dec of the observing target, in radians." proj_csys = None "Coordinate system of the Y/X projection as a string; e.g. 'FK5'." proj_equinox = None "Equinox of the Y/X projection as a float; e.g. 2000.0." proj_types = None """Axis types of the Y/X projection as a pair of strings; e.g. ['DEC--TAN', 'RA---TAN']. """ proj_crpix = None """Reference point pixel values of the Y/X projection as a 2-element ndarray. Order is [y,x]. """ proj_crval = None """Reference point coordinate values of the Y/X projection as a 2-element ndarray, in radians. Order is [dec,ra] (= [lat,lon] = [y,x]). """ proj_cdelt = None """Pixel-to-world scale factors of the Y/X projection as a 2-element ndarray, in radians per pixel. Order is [y,x]. """ calindex = None "DataFrame of calibration table info. Schema to be investigated." def __init__(self, path, mjd0=None, t0=None): self.mjd0 = mjd0 self.t0 = t0 with Path(path).read_fits() as hdulist: self._process_main(hdulist, hdulist[0].header) assert self.mjdref is not None assert self.t0 is not None assert self.mjd0 is not None for hdu in hdulist[1:]: self._process_hdu(hdu) def _process_main(self, hdulist, header): self.telescope = header.get("TELESCOP") self.instrument = header.get("INSTRUME") self.obsid = header.get("OBS_ID") self.expid = header.get("EXP_ID") self.revnum = header.get("REVOLUT") self.filter = header.get("FILTER") self.targ_name = header.get("OBJECT") if "DATE-OBS" in header: self.obs_start = Time(header["DATE-OBS"], format="isot") if "DATE-END" in header: self.obs_stop = Time(header["DATE-END"], format="isot") if "RA_OBJ" in header: self.targ_ra = header["RA_OBJ"] * astutil.D2R self.targ_dec = header["DEC_OBJ"] * astutil.D2R if "RADECSYS" in header: if header["REFYCUNI"] != "deg" or header["REFXCUNI"] != "deg": raise ValueError("expect projection to be in degree units") self.proj_csys = header["RADECSYS"] self.proj_equinox = header["EQUINOX"] self.proj_types = [header["REFYCTYP"], header["REFXCTYP"]] self.proj_crpix = np.asarray([header["REFYCRPX"], header["REFXCRPX"]]) self.proj_crval = np.asarray([header["REFYCRVL"], header["REFXCRVL"]]) self.proj_crval *= astutil.D2R self.proj_cdelt = np.asarray([header["REFYCDLT"], header["REFXCDLT"]]) self.proj_cdelt *= astutil.D2R def _process_hdu(self, hdu): if hdu.name == "CALINDEX": self.calindex = fits_recarray_to_data_frame(hdu.data) else: cli.warn("ignoring HDU named %s", hdu.name)
[docs] class GTIData(BaseSASData): gti = None """Dict mapping CCD number to DataFrames of GTI info. Index: integers. Columns: start_met - GTI start time in MET seconds stop_met - GTI stop time in MET seconds start_mjd - GTI start time as MJD stop_mjd - GTI stop time as MJD start_dks - GTI start time as delta-kiloseconds stop_dks - GTI stop time as delta-kiloseconds start_dmjd - GTI start time as `mjd - mjd0` stop_dmjd - GTI stop time as `mjd - mjd0` """ def _process_main(self, hdulist, header): super(GTIData, self)._process_main(hdulist, header) self.gti = {} def _process_hdu(self, hdu): if hdu.name.startswith("STDGTI"): ccd = int(hdu.name[6:]) gti = self.gti[ccd] = fits_recarray_to_data_frame(hdu.data) gti.rename(columns={"start": "start_met", "stop": "stop_met"}, inplace=True) gti["start_mjd"] = gti.start_met / 86400 + self.mjdref gti["start_dks"] = 1e-3 * (gti.start_met - self.t0) gti["start_dmjd"] = gti.start_mjd - self.mjd0 gti["stop_mjd"] = gti.stop_met / 86400 + self.mjdref gti["stop_dks"] = 1e-3 * (gti.stop_met - self.t0) gti["stop_dmjd"] = gti.stop_mjd - self.mjd0 else: super(GTIData, self)._process_hdu(hdu) def _plot_add_gtis(self, p, ccdnum, tunit="dmjd"): import omega as om gti = self.gti[ccdnum] ngti = gti.shape[0] gti0 = gti.at[0, "start_" + tunit] gti1 = gti.at[ngti - 1, "stop_" + tunit] smallofs = (gti1 - gti0) * 0.03 start = gti0 - smallofs for i in range(ngti): p.add( om.rect.XBand(start, gti.at[i, "start_" + tunit], keyText=None), zheight=-1, dsn=1, ) start = gti.at[i, "stop_" + tunit] p.add(om.rect.XBand(start, start + smallofs, keyText=None), zheight=-1, dsn=1) p.setBounds(gti0 - smallofs, gti1 + smallofs)
[docs] class RegionData(BaseSASData): regions = None """Dict mapping identifier to DataFrames of selection region info. Identifiers are like "00106" in SAS but I don't understand their significance. DataFrame index: integers. Columns: shape - region shape as string: 'CIRCLE', ... x - region center in X y - region center in Y r - circle radius (meaning for other shapes?) component - ? """ def _process_main(self, hdulist, header): super(RegionData, self)._process_main(hdulist, header) self.regions = {} def _process_hdu(self, hdu): if hdu.name.startswith("REG"): ident = hdu.name[3:] self.regions[ident] = fits_recarray_to_data_frame(hdu.data) else: super(RegionData, self)._process_hdu(hdu)
[docs] class Events(GTIData, RegionData): filter = None "Filter used as a string; e.g. 'Medium'." elapsed = None "Time elapsed over all events, in seconds." ccd_info = None """DataFrame of scalar CCD info. Index: CCD numbers. Columns: ontime - CCD on time (sum of GTIs) in seconds livetime - CCD live time (always less than ontime) in seconds """ events = None """DataFrame of events. Index: integers. Columns: ccdnr - CCD number of event detx - detector X; not very useful dety - detector Y flag - pat_id - pat_seq - pattern - pha - pulse height amplitude pi - pulse-independent energy rawx - raw CCD X rawy - raw CCD Y time - event time in MET seconds time_raw - ? x - corrected X y - corrected Y mjd - event time as MJD dks - event time as delta-kiloseconds dmjd - event time as `mjd - mjd0` """ offsets = None """DataFrame of "offsets". Some kind of CCD meta-info.""" # exposure = None # Commented out; exposure tables are quite large and I don't know if they're # useful for anything. badpix = None """Dict mapping CCD number to DataFrames of bad-pixel info. Index: integers. Columns: badflag, rawx, rawy, type, yextent. """ def _process_main(self, hdulist, header): super(Events, self)._process_main(hdulist, header) ccd_nums = set() for hdu in hdulist[1:]: if hdu.name.startswith("EXPOSU"): ccd_nums.add(int(hdu.name[6:])) ccd_nums = sorted(ccd_nums) # self.exposure = {} self.badpix = {} self.ccd_info = pd.DataFrame({}, index=ccd_nums) hdu = hdulist["EVENTS"] self.events = fits_recarray_to_data_frame(hdu.data) self.mjdref = hdu.header["MJDREF"] if self.t0 is None: self.t0 = self.events.time.min() if self.mjd0 is None: self.mjd0 = np.floor(self.t0 / 86400 + self.mjdref) self.events["mjd"] = self.events.time / 86400 + self.mjdref self.events["dks"] = 1e-3 * (self.events.time - self.t0) self.events["dmjd"] = self.events.mjd - self.mjd0 self.timesys = hdu.header["TIMESYS"] self.elapsed = hdu.header["TELAPSE"] self.ccd_info["ontime"] = np.nan self.ccd_info["livetime"] = np.nan for ccd in ccd_nums: self.ccd_info.at[ccd, "ontime"] = hdu.header["ONTIME%02d" % ccd] self.ccd_info.at[ccd, "livetime"] = hdu.header["LIVETI%02d" % ccd] def _process_hdu(self, hdu): if hdu.name == "EVENTS": pass elif hdu.name == "OFFSETS": self.offsets = fits_recarray_to_data_frame(hdu.data) elif hdu.name.startswith("EXPOSU"): # These data are very large, and their purpose is unclear to me. pass # ccd = int (hdu.name[6:]) # exp = self.exposure[ccd] = fits_recarray_to_data_frame (hdu.data) # exp['mjd'] = exp.time / 86400 + self.mjdref # exp['dks'] = 1e-3 * (exp.time - self.t0) elif hdu.name.startswith("BADPIX"): ccd = int(hdu.name[6:]) self.badpix[ccd] = fits_recarray_to_data_frame(hdu.data) else: super(Events, self)._process_hdu(hdu) def plot_pi_time(self, ccdnum): import omega as om p = om.quickDF( self.events[self.events.ccdnr == ccdnum][["dmjd", "pi"]], self.targ_name, lines=False, ) self._plot_add_gtis(p, ccdnum) p.setLabels("MJD - %.0f" % self.mjd0, "PI") return p def plot_lightcurve(self, ccd_id=None, bin_energies=False): # XXX CIAO COPY/PASTE DOES THIS EVEN WORK??? import omega as om from ...bblocks import tt_bblock from ..ciao.data import tight_bounds if ccd_id is None: if len(self.gti) != 1: raise Exception("must specify ccd_id") ccd_id = list(self.gti.keys())[0] kev = self.events["pi"] * 1e-3 # XXXXXXX vb = om.layout.VBox(2) if kev.size == 0: vb[0] = om.RectPlot() vb[1] = om.RectPlot() tmin = self.gti[ccd_id]["start_dmjd"].min() tmax = self.gti[ccd_id]["stop_dmjd"].max() if np.isnan(tmin): tmin, tmax = -1.0, 1.0 emin, emax = -1.0, 1.0 rmin, rmax = -1.0, 1.0 else: bbinfo = tt_bblock( self.gti[ccd_id]["start_dmjd"], self.gti[ccd_id]["stop_dmjd"], self.events["dmjd"].sort_values(), intersect_with_bins=True, ) cps = bbinfo.rates / 86400 tmin, tmax = tight_bounds(bbinfo.ledges[0], bbinfo.redges[-1]) emin, emax = tight_bounds(kev.min(), kev.max()) rmin, rmax = tight_bounds(cps.min(), cps.max()) vb[0] = om.RectPlot() csp = om.rect.ContinuousSteppedPainter( keyText="%d events" % (self.events.shape[0]) ) csp.setFloats( np.concatenate((bbinfo.ledges, bbinfo.redges[-1:])), np.concatenate((cps, [0])), ) vb[0].add(csp) if bin_energies: vb[1] = self._plot_binned_event_energies( bbinfo, energy_scale=1e-3, dsn=0 ) else: vb[1] = om.quickXY(self.events["dmjd"], kev, None, lines=0) vb[0].setBounds(tmin, tmax, rmin, rmax) vb[0].setYLabel("Count rate (ct/s)") vb[0].bpainter.paintLabels = False self._plot_add_gtis(vb[0], ccd_id) vb[1].setBounds(tmin, tmax, emin, emax) vb[1].setLabels("MJD - %d" % self.mjd0, "Energy (keV)") self._plot_add_gtis(vb[1], ccd_id) return vb
[docs] class Lightcurve(GTIData, RegionData): filter = None "Filter used as a string; e.g. 'Medium'." binsize = None "The bin size used when creating the light curve, in seconds." start_met = start_mjd = start_dks = start_dmjd = None "The binning start time, in various time units." stop_met = stop_mjd = stop_dks = stop_dmjd = None "The binning stop time, in various time units." energy_type = None "The name of the energy column used; likely 'PHA' or 'PI'." energy_min = None "The lower limit on the energy column; units depend on `energy_type`." energy_max = None "The upper limit on the energy column; units depend on `energy_type`." exposure = None "Weighted live time of all the CCDs in the extraction region." lc = None """DataFrame of light curve bins. Index: integers. Columns: time - timestamp of bin center in MET seconds mjd - timestamp of bin center as MJD dks - timestamp of bin center in delta-kiloseconds dmjd - timestamp of bin center as `mjd - mjd0` left_dmjd - timestamp of bin left edge as `mjd - mjd0` right_dmjd - timestamp of bin right edge as `mjd - mjd0` counts - number of events in this bin rate - event rate in counts per second u_rate - uncertainty on `rate`. I'm pretty sure that `u_rate` may not be just the straight Poisson noise since there may be GTI gaps. """ def _process_main(self, hdulist, header): super(Lightcurve, self)._process_main(hdulist, header) # Early loading of bulk data hdu = hdulist["RATE"] self.lc = fits_recarray_to_data_frame(hdu.data) self.lc.rename(columns={"error": "u_rate"}, inplace=True) # Straighten out timekeeping self.mjdref = float(hdu.header["MJDREF"]) self.timesys = hdu.header.get("TIMESYS") if self.t0 is None: self.t0 = self.lc.time.min() if self.mjd0 is None: self.mjd0 = np.floor(self.t0 / 86400 + self.mjdref) # Fill in the rest self.filter = hdu.header.get("FILTER") self.binsize = hdu.header.get("TIMEDEL") self.start_met = hdu.header["TSTART"] self.start_mjd = self.start_met / 86400 + self.mjdref self.start_dks = 1e-3 * (self.start_met - self.t0) self.start_dmjd = self.start_mjd - self.mjd0 self.stop_met = hdu.header["TSTOP"] self.stop_mjd = self.stop_met / 86400 + self.mjdref self.stop_dks = 1e-3 * (self.stop_met - self.t0) self.stop_dmjd = self.stop_mjd - self.mjd0 self.energy_min = hdu.header.get("CHANMIN") self.energy_max = hdu.header.get("CHANMAX") self.energy_type = hdu.header.get("CHANTYPE") self.exposure = hdu.header.get("EXPOSURE") self.lc["counts"] = self.lc.rate * self.binsize self.lc["mjd"] = self.lc.time / 86400 + self.mjdref self.lc["dks"] = 1e-3 * (self.lc.time - self.t0) self.lc["dmjd"] = self.lc.mjd - self.mjd0 self.lc["left_dmjd"] = self.lc.dmjd - 0.5 * self.binsize / 86400 self.lc["right_dmjd"] = self.lc.dmjd + 0.5 * self.binsize / 86400 def _process_hdu(self, hdu): if hdu.name == "RATE": pass else: super(Lightcurve, self)._process_hdu(hdu) def plot_curve(self, ccdnum=None): import omega as om p = om.quickDF( self.lc[["dmjd", "rate", "u_rate"]].dropna(), self.targ_name, lines=False ) if ccdnum is not None: self._plot_add_gtis(p, ccdnum) p.setLabels("MJD - %.0f" % self.mjd0, "Count rate") return p