# -*- mode: python; coding: utf-8 -*-
# Copyright 2013, 2016-2018 Peter Williams <peter@newton.cx> and collaborators
# Licensed under the MIT License.
# NB. This is super-redundant with both dftphotom and dftspect; things are
# getting a little silly here. But I think it's faster to copy/paste/hack than
# it is to merge everything into one uberprogram.
"""This module provides code to extract dynamic spectra from CASA Measurement
Sets. CASA doesn't have a task that does this.
"""
from __future__ import absolute_import, division, print_function
__all__ = 'Config Loader dftdynspec dftdynspec_cli'.split()
import io, os.path, sys
import numpy as np
import six
from six.moves import range
from ... import binary_type, text_type
# Note: zany spacing so that Sphinx can parse the file correctly.
from . ..astutil import *
from . ..cli import check_usage, die
from . ..io import get_stdout_bytes
from . ..kwargv import ParseKeywords, Custom
from . import util
from .util import sanitize_unicode as b
dftdynspec_doc = \
"""
casatask dftdynspec vis=<MS> [keywords...]
Extract a dynamic spectrum from the visibilities in a measurement set. See
below the keyword docs for some important caveats.
vis=
Path of the MeasurementSet dataset to read. Required.
out=
Path to which data will be written. If unspecified, data are written to
standard output.
rephase=RA,DEC
Phase the data to extract fluxes in the specified direction. If unspecified,
the data are not rephased, i.e. the flux at the phase center is extracted.
RA and DEC should be in sexigesimal with fields separated by colons.
array=, baseline=, field=, observation=, polarization=, scan=,
scanintent=, spw=, taql=, time=, uvdist=
MeasurementSet selectors used to filter the input data. Default polarization
is 'RR,LL'. All polarizations are averaged together, so mixing parallel- and
cross-hand pols is almost never what you want to do.
NOTE: only whole spectral windows can be selected. If you attempt to select
channels within a window, a warning will be printed and that component of
the selection will be ignored.
datacol=
Name of the column to use for visibility data. Defaults to 'data'. You might
want it to be 'corrected_data'.
believeweights=[t|f]
Defaults to false, which means that we assume that the 'weight' column in
the dataset is NOT scaled such that the variance in the visibility samples
is equal to 1/weight. In this case uncertainties are assessed from the
scatter of all the visibilities in each timeslot. If true, we trust that
variance=1/weight and propagate this in the standard way.
IMPORTANT: the fundamental assumption of this task is that the only
signal in the visibilities is from a point source at the phasing
center. We also assume that all sampled polarizations get equal
contributions from the source (though you can resample the Stokes
parameters on the fly, so this is not quite the same thing as
requiring the source be unpolarized).
The output data are stored as a file containing several serialized Numpy
arrays. The Python class `pwkit.environments.casa.dftdynspec.Loader` can be
used to load the data into a Python program.
"""
class Config(ParseKeywords):
vis = Custom(str, required=True)
datacol = 'data'
believeweights = False
@Custom(str, uiname='out')
def outstream(val):
if val is None:
return get_stdout_bytes()
try:
return open(val, 'wb')
except Exception as e:
die('cannot open path "%s" for writing', val)
@Custom([str, str], default=None)
def rephase(val):
if val is None:
return None
try:
ra = parsehours(val[0])
dec = parsedeglat(val[1])
except Exception as e:
die('cannot parse "rephase" values as RA/dec: %s', e)
return ra, dec
# MeasurementSet filters
array = str
baseline = str
field = str
observation = str
polarization = 'RR,LL'
scan = str
scanintent = str
spw = str
taql = str
time = str
uvdist = str
loglevel = 'warn'
def dftdynspec(cfg):
tb = util.tools.table()
ms = util.tools.ms()
me = util.tools.measures()
# Read stuff in. Even if the weight values don't have their
# absolute scale set correctly, we can still use them to set the
# relative weighting of the data points.
#
# datacol is (ncorr, nchan, nchunk)
# flag is (ncorr, nchan, nchunk)
# weight is (ncorr, nchunk)
# uvw is (3, nchunk)
# time is (nchunk)
# axis_info.corr_axis is (ncorr)
# axis_info.freq_axis.chan_freq is (nchan, 1) [for now?]
#
# Note that we apply msselect() again when reading the data because
# selectinit() is broken, but the invocation here is good because it
# affects the results from ms.range() and friends.
if ':' in (cfg.spw or ''):
warn('it looks like you are attempting to select channels within one or more spws')
warn('this is NOT IMPLEMENTED; I will process the whole spw instead')
ms.open(b(cfg.vis))
totrows = ms.nrow()
ms_sels = dict((n, cfg.get(n)) for n in util.msselect_keys
if cfg.get(n) is not None)
ms.msselect(b(ms_sels))
rangeinfo = ms.range(b'data_desc_id field_id'.split())
ddids = rangeinfo['data_desc_id']
fields = rangeinfo['field_id']
colnames = [cfg.datacol] + 'flag weight time axis_info'.split()
rephase = (cfg.rephase is not None)
if fields.size != 1:
# I feel comfortable making this a fatal error, even if we're
# not rephasing.
die('selected data should contain precisely one field; got %d', fields.size)
tb.open(b(os.path.join(cfg.vis, 'DATA_DESCRIPTION')))
ddspws = tb.getcol(b'SPECTRAL_WINDOW_ID')
tb.close()
# Get frequencies and precompute merged, sorted frequency array
# FIXME: below we get 'freqs' on the fly; should honor that.
# But then mapping and data storage get super inefficient.
tb.open(b(os.path.join(cfg.vis, 'SPECTRAL_WINDOW')))
nspw = tb.nrows()
spwfreqs = []
for i in range(nspw):
spwfreqs.append(tb.getcell(b'CHAN_FREQ', i) * 1e-9) # -> GHz
tb.close()
allfreqs = set()
for freqs in spwfreqs:
allfreqs.update(freqs)
allfreqs = np.asarray(sorted(allfreqs))
nfreq = allfreqs.size
freqmaps = []
for i in range(nspw):
freqmaps.append(np.searchsorted(allfreqs, spwfreqs[i]))
if rephase:
fieldid = fields[0]
tb.open(b(os.path.join(cfg.vis, 'FIELD')))
phdirinfo = tb.getcell(b'PHASE_DIR', fieldid)
tb.close()
if phdirinfo.shape[1] != 1:
die('trying to rephase but target field (#%d) has a '
'time-variable phase center, which I can\'t handle', fieldid)
ra0, dec0 = phdirinfo[:,0] # in radians.
# based on intflib/pwflux.py, which was copied from
# hex/hex-lib-calcgainerr:
dra = cfg.rephase[0] - ra0
dec = cfg.rephase[1]
l = np.sin(dra) * np.cos(dec)
m = np.sin(dec) * np.cos(dec0) - np.cos(dra) * np.cos(dec) * np.sin(dec0)
n = np.sin(dec) * np.sin(dec0) + np.cos(dra) * np.cos(dec) * np.cos(dec0)
n -= 1 # makes the work below easier
lmn = np.asarray([l, m, n])
colnames.append('uvw')
tbins = {}
colnames = b(colnames)
for ddindex, ddid in enumerate(ddids):
# Starting in CASA 4.6, selectinit(ddid) stopped actually filtering
# your data to match the specified DDID! What garbage. Work around
# with our own filtering.
ms_sels['taql'] = 'DATA_DESC_ID == %d' % ddid
ms.msselect(b(ms_sels))
ms.selectinit(ddid)
if cfg.polarization is not None:
ms.selectpolarization(b(cfg.polarization.split(',')))
ms.iterinit(maxrows=4096)
ms.iterorigin()
spwid = ddspws[ddid]
while True:
cols = ms.getdata(items=colnames)
if rephase:
# With appropriate spw/DDID selection, `freqs` has shape
# (nchan, 1). Convert to m^-1 so we can multiply against UVW
# directly.
freqs = cols['axis_info']['freq_axis']['chan_freq']
assert freqs.shape[1] == 1, 'internal inconsistency, chan_freq??'
freqs = freqs[:,0] * util.INVERSE_C_MS
for i in range(cols['time'].size): # all records
time = cols['time'][i]
# get out of UTC as fast as we can! For some reason
# giving 'unit=s' below doesn't do what one might hope it would.
# CASA can convert to a variety of timescales; TAI is probably
# the safest conversion in terms of being helpful while remaining
# close to the fundamental data, but TT is possible and should
# be perfectly precise for standard applications.
mq = me.epoch(b'utc', b({'value': time / 86400., 'unit': 'd'}))
mjdtt = me.measure(b(mq), b'tt')['m0']['value']
tdata = tbins.get(mjdtt)
if tdata is None:
tdata = tbins[mjdtt] = np.zeros((nfreq, 7))
if rephase:
uvw = cols['uvw'][:,i]
ph = np.exp((0-2j) * np.pi * np.dot(lmn, uvw) * freqs)
for j in range(cols['flag'].shape[0]): # all polns
# We just average together all polarizations right now!
# (Not actively, but passively by just iterating over them.)
data = cols[cfg.datacol][j,:,i]
flags = cols['flag'][j,:,i]
# XXXXX casacore is currently broken and returns the raw
# weights from the dataset rather than applying the
# polarization selection. Fortunately all of our weights
# are the same, and you can never fetch more pol types
# than the dataset has, so this bit works despite the bug.
w = np.where(~flags)[0]
if not w.size:
continue # all flagged
if rephase:
data *= ph
m = freqmaps[spwid][w]
d = data[w]
wt = cols['weight'][j,i]
tdata[m,0] += wt * d.real
tdata[m,1] += wt * d.imag
tdata[m,2] += wt * d.real**2
tdata[m,3] += wt * d.imag**2
tdata[m,4] += wt
tdata[m,5] += wt**2
tdata[m,6] += 1
if not ms.iternext():
break
ms.reset() # reset selection filter so we can get next DDID
ms.close()
# Could gain some efficiency by using a better data structure than a dict().
smjd = np.asarray(sorted(six.iterkeys(tbins)))
data = np.zeros((5, smjd.size, nfreq))
for tid in range(smjd.size):
mjd = smjd[tid]
wr, wi, wr2, wi2, wt, wt2, n = tbins[mjd].T
w = np.where(n > 0)[0]
if w.size < 3: # not enough data for meaningful statistics
continue
r = wr[w] / wt[w]
i = wi[w] / wt[w]
if cfg.believeweights:
ru = wt[w]**-0.5
iu = wt[w]**-0.5
else:
r2 = wr2[w] / wt[w]
i2 = wi2[w] / wt[w]
rv = r2 - r**2 # variance among real/imag msmts
iv = i2 - i**2
ru = np.sqrt(rv * wt2[w]) / wt[w] # uncert in mean real/img values
iu = np.sqrt(iv * wt2[w]) / wt[w]
data[0,tid,w] = r
data[1,tid,w] = ru
data[2,tid,w] = i
data[3,tid,w] = iu
data[4,tid,w] = n[w]
np.save(cfg.outstream, smjd)
np.save(cfg.outstream, allfreqs)
np.save(cfg.outstream, data)
def dftdynspec_cli(argv):
check_usage(dftdynspec_doc, argv, usageifnoargs=True)
cfg = Config().parse(argv[1:])
util.logger(cfg.loglevel)
dftdynspec(cfg)
[docs]class Loader(object):
"""Read in a dynamic-spectrum file produced by the `dftdynspec` task.
**Constructor arguments**
*path*
The path of the file to read.
"""
mjds = None
"A 1D sorted array of the MJDs of the data samples."
freqs = None
"A 1D sorted array of the frequencies of the data samples, measured in GHz."
reals = None
"""A 2D array of the real parts of the averaged visibilities. Shape is
`(mjds.size, freqs.size)`.
"""
u_reals = None
"""A 2D array of the estimated uncertainties on the real parts of the averaged
visibilities. Shape is `(mjds.size, freqs.size)`.
"""
imags = None
"""A 2D array of the imaginary parts of the averaged visibilities. Shape is
`(mjds.size, freqs.size)`.
"""
u_imags = None
"""A 2D array of the estimated uncertainties on the imaginary parts of the
averaged visibilities. Shape is `(mjds.size, freqs.size)`.
"""
counts = None
"""A 2D array recording the number of visibilities that went into each
average. Shape is `(mjds.size, freqs.size)`.
"""
def __init__(self, path):
with io.open(path, 'rb') as f:
self.mjds = np.load(f)
self.freqs = np.load(f)
cube = np.load(f)
self.reals, self.u_reals, self.imags, self.u_imags, self.counts = cube
@property
def n_mjds(self):
"The size of the MJD axis of the data arrays; an integer."
return self.mjds.size
@property
def n_freqs(self):
"The size of the frequency axis of the data arrays; an integer."
return self.freqs.size