# -*- mode: python; coding: utf-8 -*-
# Copyright 2015-2017 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.
"""The way that the official CASA 5 ``casapy`` code is written, it's basically
impossible to import its tasks into a straight-Python environment. (Trust me,
I've tried.) So, this module more-or-less duplicates lots of CASA code. But
this module also tries to provide to provide saner semantics and interfaces.
Fortunately, CASA 6 is way more reasonable.
The goal is to make task-like functionality available in a real Python
library, with no side effects, so that data processing can be scripted
tractably. These tasks are also accessible through the ``casatask`` command
line program provided with :mod:`pwkit`.
"""
import os.path, sys
import numpy as np
from . import util
from ... import reraise_context, PKError
from ...cli import check_usage, wrong_usage, warn, die
from ...kwargv import ParseKeywords, Custom
# Keep the tasks alphabetized!
__all__ = str(
"""
applycal applycal_cli ApplycalConfig
bpplot bpplot_cli BpplotConfig
clearcal clearcal_cli
closures_cli
concat concat_cli
delcal delcal_cli
delmod_cli
dftdynspec_cli
dftphotom_cli
dftspect_cli
elplot elplot_cli ElplotConfig
extractbpflags extractbpflags_cli
flagcmd flagcmd_cli FlagcmdConfig
flaglist flaglist_cli FlaglistConfig
flagmanager_cli
flagzeros flagzeros_cli FlagzerosConfig
fluxscale fluxscale_cli FluxscaleConfig
ft ft_cli FtConfig
gaincal gaincal_cli GaincalConfig
gencal gencal_cli GencalConfig
getopacities getopacities_cli
gpdetrend gpdetrend_cli GpdetrendConfig
gpdiagnostics_cli
gpplot gpplot_cli GpplotConfig
image2fits image2fits_cli
importalma importalma_cli
importasdm importasdm_cli
importevla importevla_cli
listobs listobs_cli
listsdm listsdm_cli
mfsclean mfsclean_cli MfscleanConfig
mjd2date mjd2date_cli
mstransform mstransform_cli MstransformConfig
plotants plotants_cli
plotcal plotcal_cli PlotcalConfig
polmodel_cli
setjy setjy_cli SetjyConfig
split split_cli SplitConfig
spwglue_cli
tsysplot tsysplot_cli TsysplotConfig
uvsub uvsub_cli UvsubConfig
xyphplot xyphplot_cli XyphplotConfig
commandline
"""
).split()
# Some utilities
precal_doc = """
**Pre-applied calibrations**
gaintable=
Comma-separated list of calibration tables to apply on-the-fly
before solving
gainfield=
SEMICOLON-separated list of field selections to apply for each gain table.
If there are fewer items than there are gaintable items, the list is
padded with blank items, implying no selection by field.
interp=
COMMA-separated list of interpolation types to use for each gain
table. If there are fewer items, the list is padded with 'linear'
entries. Allowed values: nearest linear cubic spline
spwmap=
SEMICOLON-separated list of spectral window mappings for each
existing gain table; each record is a COMMA-separated list of
integers. For the i'th spw in the dataset, spwmap[i] specifies
the record in the gain table to use. For instance [0, 0, 1, 1]
maps four spws in the UV data to just two spectral windows in
the preexisting gain table.
opacity=
Comma-separated list of opacities in nepers. One for each spw; if
there are more spws than entries, the last entry is used for the
remaining spws.
gaincurve=
Whether to apply VLA-specific built in gain curve correction
(default: false)
parang=
Whether to apply parallactic angle rotation correction
(default: false)
"""
stdsel_doc = """
Standard data selection keywords
This task can filter input data using any of the following keywords,
specified as in the standard CASA interface: ``antenna``, ``array``,
``correlation``, ``field``, ``intent``, ``observation``, ``scan``, ``spw``,
``taql``, ``timerange``, ``uvrange``.
"""
loglevel_doc = """
loglevel=
Level of detail from CASA logging system. Default value is ``warn``. Allowed
values are: ``severe``, ``warn``, ``info``, ``info1``, ``info2``, ``info3``,
``info4``, ``info5``, ``debug1``, ``debug2``, ``debugging``.
"""
def extractmsselect(
cfg,
havearray=False,
havecorr=False,
haveintent=True,
intenttoscanintent=False,
taqltomsselect=True,
observationtoobs=False,
):
# expects cfg to have:
# antenna [correlation] field intent observation scan spw taql timerange uvrange
# fills a dict with:
# baseline [correlation] field intent (msselect|taql) observation scan spw time uvrange
selkws = {}
direct = "field scan spw uvrange".split()
indirect = "antenna:baseline timerange:time".split()
if havearray:
indirect.append("array:subarray")
if havecorr:
direct.append("correlation")
if haveintent:
if intenttoscanintent:
indirect.append("intent:scanintent")
else:
direct.append("intent")
if observationtoobs:
indirect.append("observation:obs")
else:
direct.append("observation")
if taqltomsselect:
indirect.append("taql:msselect")
else:
direct.append("taql")
for k in direct:
selkws[k] = getattr(cfg, k) or ""
for p in indirect:
ck, sk = p.split(":")
selkws[sk] = getattr(cfg, ck) or ""
return selkws
def applyonthefly(cb, cfg):
# expects cfg to have:
# gaintable gainfield interp spwmap opacity gaincurve parang
n = len(cfg.gaintable)
# fill in missing values, taking care not to mutate cfg.
gainfields = list(cfg.gainfield)
interps = list(cfg.interp)
spwmaps = list(cfg.spwmap)
if len(gainfields) < n:
gainfields += [""] * (n - len(gainfields))
elif len(gainfields) > n:
raise ValueError('more "gainfield" entries than "gaintable" entries')
if len(interps) < n:
interps += ["linear"] * (n - len(interps))
elif len(interps) > n:
raise ValueError('more "interp" entries than "gaintable" entries')
if len(spwmaps) < n:
spwmaps += [[-1]] * (n - len(spwmaps))
elif len(spwmaps) > n:
raise ValueError('more "spwmap" entries than "gaintable" entries')
for table, field, interp, spwmap in zip(
cfg.gaintable, gainfields, interps, spwmaps
):
cb.setapply(
table=table,
field=field,
interp=interp,
spwmap=spwmap,
t=0.0,
calwt=True,
)
if len(cfg.opacity):
cb.setapply(type="TOPAC", opacity=cfg.opacity, t=-1, calwt=True)
if cfg.gaincurve:
cb.setapply(type="GAINCURVE", t=-1, calwt=True)
if cfg.parang:
cb.setapply(type="P")
_kwcli_cfg_class_doc_template = """\
This is a configuration object for the ``%(taskname)s`` task. This object
contains no methods. Rather it contains placeholders (and default values) for
parameters that can be passed to :func:`%(taskname)s`.
The following documentation is written to target the **command-line** version
of this task, which may be invoked as ``casatask %(taskname)s``. “Keywords”
refer attributes of this structure, “comma-separated lists” should become
Python lists, and so on.
%(bulk)s
"""
def makecfgdoc(taskname, doc):
"""In Python 2.x you can't alter the __doc__ of a class after you define it,
so we need to provide a function that does the munging when we define each
class. This is that function.
"""
doc_args = dict(
bulk="\n".join(l for l in doc.splitlines() if not l.startswith("casatask ")),
taskname=taskname,
)
return _kwcli_cfg_class_doc_template % doc_args
_kwcli_impl_doc_template = """\
The ``%(taskname)s`` task.
cfg
A :class:`%(cfgname)s` object.
This function runs the ``%(taskname)s`` task. *For documentation of the
general functionality of this task and the parameters it takes*, see the
documentation for the :class:`%(cfgname)s` object below. Example::
from pwkit.environments.casa import tasks
cfg = tasks.%(cfgname)s()
cfg.vis = 'mydataset.ms'
# ... set other parameters ...
tasks.%(taskname)s(cfg)
This task may also be invoked through the command line, as ``casatask
%(taskname)s``. Run ``casatask %(taskname)s --help`` to see another version of
the documentation provided below.
"""
def makekwcli(doc, cfgclass, impl):
def kwclifunc(argv):
check_usage(doc, argv, usageifnoargs=True)
cfg = cfgclass().parse(argv[1:])
util.logger(cfg.loglevel)
impl(cfg)
# Doc magic
doc_args = dict(
bulk="\n".join(l for l in doc.splitlines() if not l.startswith("casatask ")),
cfgname=cfgclass.__name__,
taskname=impl.__name__,
)
impl.__doc__ = _kwcli_impl_doc_template % doc_args
return kwclifunc
# applycal
applycal_doc = (
"""
casatask applycal vis=<MS> [keywords]
Fill in the CORRECTED_DATA column of a visibility dataset using
the raw data and a set of calibration tables.
vis=
The MS to modify
calwt=
Write out calibrated weights as well as calibrated visibilities.
Default: false
"""
+ precal_doc
+ stdsel_doc
+ loglevel_doc
)
[docs]
class ApplycalConfig(ParseKeywords):
__doc__ = makecfgdoc("applycal", applycal_doc)
vis = Custom(str, required=True)
calwt = False
# skipping: flagbackup
gaintable = [str]
gainfield = Custom([str], sep=";")
interp = [str]
@Custom([str], sep=";")
def spwmap(v):
if v is None:
return None
return [list(map(int, e.split(","))) for e in v]
opacity = [float]
gaincurve = False
parang = False
antenna = str
field = str
intent = str
observation = str
scan = str
spw = str
taql = str
timerange = str
uvrange = str
applymode = "calflag" # almost never want to change this
loglevel = "warn"
[docs]
def applycal(cfg):
cb = util.tools.calibrater()
cb.open(filename=cfg.vis, compress=False, addcorr=True, addmodel=False)
selkws = extractmsselect(cfg)
selkws["chanmode"] = "none" # ?
cb.selectvis(**selkws)
applyonthefly(cb, cfg)
cb.correct(cfg.applymode)
cb.close()
applycal_cli = makekwcli(applycal_doc, ApplycalConfig, applycal)
# bpplot
#
# plotcal() can do this, but it is slow, ugly, and has not-very-informative
# output. Yes, CASA is so crappy that I can even speed up bandpass plotting
# by an order of magnitude.
bpplot_doc = (
"""
casatask bpplot caltable= dest=
Plot a bandpass calibration table. Currently, the supported format is a series
of pages showing amplitude and phase against normalized channel number, with
each page showing a particular antenna and polarization. Polarizations are
always reported as "R" and "L" since the relevant information is not stored
within the bandpass data set.
This task also works well to plot frequency-dependent polarimetric leakage
calibration tables.
caltable=MS
The input calibration Measurement Set
dest=PATH
If specified, plots are saved to this file -- the format is inferred
from the extension, which must allow multiple pages to be saved. If
unspecified, the plots are displayed using a Gtk3 backend.
dims=WIDTH,HEIGHT
If saving to a file, the dimensions of a each page. These are in points
for vector formats (PDF, PS) and pixels for bitmaps (PNG). Defaults to
1000, 600.
margins=TOP,RIGHT,LEFT,BOTTOM
If saving to a file, the plot margins in the same units as the dims.
The default is 4 on every side.
"""
+ loglevel_doc
)
[docs]
class BpplotConfig(ParseKeywords):
__doc__ = makecfgdoc("bpplot", bpplot_doc)
caltable = Custom(str, required=True)
dest = str
dims = [1000, 600]
margins = [4, 4, 4, 4]
loglevel = "warn"
[docs]
def bpplot(cfg):
import omega as om, omega.render
from ... import numutil
if isinstance(cfg.dest, omega.render.Pager):
# This is for non-CLI invocation.
pager = cfg.dest
elif cfg.dest is None:
import omega.gtk3
pager = om.makeDisplayPager()
else:
pager = om.makePager(
cfg.dest,
dims=cfg.dims,
margins=cfg.margins,
style=om.styles.ColorOnWhiteVector(),
)
tb = util.tools.table()
# Initial recon
tb.open(cfg.caltable, nomodify=True)
spws = tb.getcol("SPECTRAL_WINDOW_ID")
ants = tb.getcol("ANTENNA1")
tb.close()
tb.open(os.path.join(cfg.caltable, "ANTENNA"), nomodify=True)
names = tb.getcol("NAME")
tb.close()
nsoln = ants.size
# Load up and organize main data. We can't load CPARAM and FLAG all at
# once since different spws might have different nchans and/or npols.
antpols = {}
seenspws = set()
vals = [None] * nsoln
flags = [None] * nsoln
npol = {}
nchan = {}
tb.open(cfg.caltable, nomodify=True)
for isoln in range(nsoln):
vals[isoln] = this_vals = tb.getcell("CPARAM", isoln)
flags[isoln] = this_flags = tb.getcell("FLAG", isoln)
this_spw = spws[isoln]
this_npol, this_nchan = this_vals.shape
prev_npol = npol.get(this_spw)
if prev_npol is None:
npol[this_spw] = this_npol
elif this_npol != prev_npol:
raise Exception(
"assumptions violated: npol varies by solution for spw %s" % this_spw
)
prev_nchan = nchan.get(this_spw)
if prev_nchan is None:
nchan[this_spw] = this_nchan
elif this_nchan != prev_nchan:
raise Exception(
"assumptions violated: nchan varies by solution for spw %s" % this_spw
)
for ipol in range(this_npol):
if not this_flags[
ipol
].all(): # make sure to ignore completely-flagged records
k = (ants[isoln], ipol)
byspw = antpols.get(k)
if byspw is None:
antpols[k] = byspw = []
byspw.append((spws[isoln], isoln))
seenspws.add(spws[isoln])
tb.close()
seenspws = sorted(seenspws)
spw_to_offset = {}
tot_seen_nchan = 0
for ispw in seenspws:
spw_to_offset[ispw] = tot_seen_nchan
tot_seen_nchan += nchan[ispw]
# normalize phases to avoid distracting wraps
for iant, ipol in sorted(antpols.keys()):
for ispw, isoln in antpols[iant, ipol]:
f = flags[isoln][ipol]
meanph = np.angle(vals[isoln][ipol, ~f].mean())
vals[isoln][ipol] *= np.exp(-1j * meanph)
# find plot limits
max_am = None
for isoln in range(nsoln):
this_vals = vals[isoln]
this_flags = flags[isoln]
okvals = this_vals[np.where(~this_flags)]
if not okvals.size:
continue
this_max_am = np.abs(okvals).max()
this_min_am = np.abs(okvals).min()
this_max_ph = np.angle(okvals, deg=True).max()
this_min_ph = np.angle(okvals, deg=True).min()
if max_am is None:
max_am = this_max_am
min_am = this_min_am
max_ph = this_max_ph
min_ph = this_min_ph
else:
max_am = max(max_am, this_max_am)
min_am = min(min_am, this_min_am)
max_ph = max(max_ph, this_max_ph)
min_ph = min(min_ph, this_min_ph)
span = max_am - min_am
if span == 0:
span = 0.1 * max_am
if span == 0:
span = 1
max_am += 0.05 * span
min_am -= 0.05 * span
span = max_ph - min_ph
if span == 0:
span = 60
max_ph += 0.05 * span
min_ph -= 0.05 * span
if max_ph > 160:
max_ph = 180
if min_ph < -160:
min_ph = -180
polnames = "RL" # XXX: identification doesn't seem to be stored in cal table
# plot away
for iant, ipol in sorted(antpols.keys()):
p_am = om.RectPlot()
p_ph = om.RectPlot()
for ispw, isoln in antpols[iant, ipol]:
f = flags[isoln][ipol]
a = np.abs(vals[isoln][ipol])
p = np.angle(vals[isoln][ipol], deg=True)
w = np.where(~f)[0]
for s in numutil.slice_around_gaps(w, 1):
wsub = w[s]
if wsub.size == 0:
continue # Should never happen, but eh.
else:
# It'd also be pretty weird to have a spectral window
# containing just one(valid) channel, but it could
# happen.
lines = wsub.size > 1
p_am.addXY(
wsub + spw_to_offset[ispw], a[wsub], None, lines=lines, dsn=ispw
)
p_ph.addXY(
wsub + spw_to_offset[ispw], p[wsub], None, lines=lines, dsn=ispw
)
p_am.setBounds(xmin=0, xmax=tot_seen_nchan, ymin=min_am, ymax=max_am)
p_ph.setBounds(xmin=0, xmax=tot_seen_nchan, ymin=min_ph, ymax=max_ph)
p_am.addKeyItem("%s %s" % (names[iant], polnames[ipol]))
p_am.bpainter.paintLabels = False
p_am.setYLabel("Amplitude")
p_ph.setLabels("Normalized channel", "De-meaned Phase(deg)")
vb = om.layout.VBox(2)
vb[0] = p_am
vb[1] = p_ph
vb.setWeight(0, 2.5)
pager.send(vb)
bpplot_cli = makekwcli(bpplot_doc, BpplotConfig, bpplot)
# clearcal
clearcal_doc = """
casatask clearcal [-w] <vis1> [more vises...]
Fill the imaging and calibration columns (MODEL_DATA, CORRECTED_DATA,
IMAGING_WEIGHT) of each measurement set with default values, creating
the columns if necessary.
If you want to reset calibration models, these days you probably want
delmod. If you want to quickly make the columns go away, you probably
want delcal.
-w - only create and fill the IMAGING_WEIGHT column
"""
clearcal_imaging_col_tmpl = {
"_c_order": True,
"comment": "",
"dataManagerGroup": "",
"dataManagerType": "",
"keywords": {},
"maxlen": 0,
"ndim": 1,
"option": 0,
"shape": None,
"valueType": "float",
}
clearcal_imaging_dminfo_tmpl = {
"TYPE": "TiledShapeStMan",
"SPEC": {"DEFAULTTILESHAPE": [32, 128]},
"NAME": "imagingweight",
}
[docs]
def clearcal(vis, weightonly=False):
"""Fill the imaging and calibration columns (``MODEL_DATA``,
``CORRECTED_DATA``, ``IMAGING_WEIGHT``) of each measurement set with
default values, creating the columns if necessary.
vis (string)
Path to the input measurement set
weightonly (boolean)
If true, just create the ``IMAGING_WEIGHT`` column; do not fill
in the visibility data columns.
If you want to reset calibration models, these days you probably want
:func:`delmod_cli`. If you want to quickly make the columns go away, you
probably want :func:`delcal`.
Example::
from pwkit.environments.casa import tasks
tasks.clearcal('myvis.ms')
"""
tb = util.tools.table()
cb = util.tools.calibrater()
# cb.open() will create the tables if they're not present, so
# if that's the case, we don't actually need to run initcalset()
tb.open(vis, nomodify=False)
colnames = tb.colnames()
needinit = ("MODEL_DATA" in colnames) or ("CORRECTED_DATA" in colnames)
if "IMAGING_WEIGHT" not in colnames:
c = dict(clearcal_imaging_col_tmpl)
c["shape"] = tb.getcell("DATA", 0).shape[-1:]
tb.addcols({"IMAGING_WEIGHT": c}, clearcal_imaging_dminfo_tmpl)
tb.close()
if not weightonly:
import casadef
if casadef.casa_version.startswith("5."):
cb.setvi(old=True, quiet=False)
cb.open(vis)
if needinit:
cb.initcalset()
cb.close()
def clearcal_cli(argv):
check_usage(clearcal_doc, argv, usageifnoargs=True)
argv = list(argv)
weightonly = "-w" in argv
if weightonly:
sys.argv.remove("-w")
if len(argv) < 2:
wrong_usage(clearcal_doc, "need at least one argument")
util.logger()
for vis in argv[1:]:
clearcal(vis, weightonly=weightonly)
# closures
#
# Shim for a separate module
def closures_cli(argv):
from .closures import closures_cli
closures_cli(argv)
# concat
concat_doc = """
casatask concat [-s] <input vises ...> <output vis>
Concatenate the visibility measurement sets.
-s - sort the output in time
"""
concat_freqtol = 1e-5
concat_dirtol = 1e-5
[docs]
def concat(invises, outvis, timesort=False):
"""Concatenate visibility measurement sets.
invises (list of str)
Paths to the input measurement sets
outvis (str)
Path to the output measurement set.
timesort (boolean)
If true, sort the output in time after concatenation.
Example::
from pwkit.environments.casa import tasks
tasks.concat(['epoch1.ms', 'epoch2.ms'], 'combined.ms')
"""
tb = util.tools.table()
ms = util.tools.ms()
if os.path.exists(outvis):
raise RuntimeError('output "%s" already exists' % outvis)
for invis in invises:
if not os.path.isdir(invis):
raise RuntimeError('input "%s" does not exist' % invis)
tb.open(invises[0])
tb.copy(outvis, deep=True, valuecopy=True)
tb.close()
ms.open(outvis, nomodify=False)
for invis in invises[1:]:
ms.concatenate(msfile=invis, freqtol=concat_freqtol, dirtol=concat_dirtol)
ms.writehistory(message="taskname=tasklib.concat", origin="tasklib.concat")
ms.writehistory(message="vis = " + ", ".join(invises), origin="tasklib.concat")
ms.writehistory(
message="timesort = " + "FT"[int(timesort)], origin="tasklib.concat"
)
if timesort:
ms.timesort()
ms.close()
def concat_cli(argv):
check_usage(concat_doc, argv, usageifnoargs=True)
argv = list(argv)
timesort = "-s" in argv
if timesort:
sys.argv.remove("-s")
if len(argv) < 3:
wrong_usage(concat_doc, "need at least two arguments")
util.logger()
concat(argv[1:-1], argv[-1], timesort)
# delcal
#
# Not a CASA task. Delmod on steroids, at least when delmod
# is operating on scratch columns and not OTF models.
delcal_doc = """
casatask delcal <ms> [mses...]
Delete the MODEL_DATA and CORRECTED_DATA columns from MSes.
"""
[docs]
def delcal(mspath):
"""Delete the ``MODEL_DATA`` and ``CORRECTED_DATA`` columns from a measurement set.
mspath (str)
The path to the MS to modify
Example::
from pwkit.environments.casa import tasks
tasks.delcal('dataset.ms')
"""
wantremove = "MODEL_DATA CORRECTED_DATA".split()
tb = util.tools.table()
tb.open(mspath, nomodify=False)
cols = frozenset(tb.colnames())
toremove = [c for c in wantremove if c in cols]
if len(toremove):
tb.removecols(toremove)
tb.close()
return toremove
def delcal_cli(argv):
check_usage(delcal_doc, argv, usageifnoargs=True)
util.logger()
for mspath in argv[1:]:
removed = delcal(mspath)
if len(removed):
print("%s: removed %s" % (mspath, ", ".join(removed)))
else:
print("%s: nothing to remove" % mspath)
# delmod
delmod_doc = """
casatask delmod <MS...>
Delete the "on-the-fly" model information from the specified
Measurement Set(s).
If you want to delete the scratch columns, use delcal. If you
want to clear the scratch columns, use clearcal. I'm torn
between wanting better terminology and not wanting to be
gratuitously different from CASA.
"""
[docs]
def delmod_cli(argv, alter_logger=True):
"""Command-line access to ``delmod`` functionality.
The ``delmod`` task deletes "on-the-fly" model information from a
Measurement Set. It is so easy to implement that a standalone
function is essentially unnecessary. Just write::
from pwkit.environments.casa import util
cb = util.tools.calibrater()
cb.open('datasaet.ms', addcorr=False, addmodel=False)
cb.delmod(otf=True, scr=False)
cb.close()
If you want to delete the scratch columns, use :func:`delcal`. If you want
to clear the scratch columns, use :func:`clearcal`.
"""
check_usage(delmod_doc, argv, usageifnoargs=True)
if alter_logger:
util.logger()
cb = util.tools.calibrater()
for mspath in argv[1:]:
cb.open(mspath, addcorr=False, addmodel=False)
cb.delmod(otf=True, scr=False)
cb.close()
# dftdynspec
#
# Shim for a separate module
def dftdynspec_cli(argv):
from .dftdynspec import dftdynspec_cli
dftdynspec_cli(argv)
# dftphotom
#
# Shim for a separate module
def dftphotom_cli(argv):
from .dftphotom import dftphotom_cli
dftphotom_cli(argv)
# dftspect
#
# Shim for a separate module
def dftspect_cli(argv):
from .dftspect import dftspect_cli
dftspect_cli(argv)
# elplot
#
# See bpplot() -- CASA plotcal can do this in a certain sense, but it's slow
# and ugly.
elplot_doc = (
"""
casatask elplot vis= dest=
Plot elevations of fields observed in a MeasurementSet.
vis=MS
The input Measurement Set.
dest=PATH
If specified, plots are saved to this file -- the format is inferred
from the extension, which must allow multiple pages to be saved. If
unspecified, the plots are displayed using a Gtk3 backend.
dims=WIDTH,HEIGHT
If saving to a file, the dimensions of a each page. These are in points
for vector formats(PDF, PS) and pixels for bitmaps(PNG). Defaults to
1000, 600.
margins=TOP,RIGHT,LEFT,BOTTOM
If saving to a file, the plot margins in the same units as the dims.
The default is 4 on every side.
"""
+ loglevel_doc
)
[docs]
class ElplotConfig(ParseKeywords):
__doc__ = makecfgdoc("elplot", elplot_doc)
vis = Custom(str, required=True)
dest = str
dims = [1000, 600]
margins = [4, 4, 4, 4]
loglevel = "warn"
[docs]
def elplot(cfg):
import omega as om, omega.render
if isinstance(cfg.dest, omega.render.Pager):
# This is for non-CLI invocation.
pager = cfg.dest
elif cfg.dest is None:
import omega.gtk3
pager = om.makeDisplayPager()
else:
pager = om.makePager(
cfg.dest,
dims=cfg.dims,
margins=cfg.margins,
style=om.styles.ColorOnWhiteVector(),
)
ms = util.tools.ms()
me = util.tools.measures()
ms.open(cfg.vis, nomodify=True)
scans = ms.range(["scan_number"])["scan_number"]
md = ms.metadata()
field_names = md.namesforfields()
obs = md.observatoryposition()
me.doframe(obs)
timetmpl = md.timerangeforobs(0)["begin"]
mjd0 = int(np.floor(md.timesforscan(scans[0]).min() / 86400))
field_dsns = {}
p = om.RectPlot()
for scan in scans:
mjds = md.timesforscan(scan=scan) / 86400
fields = md.fieldsforscan(scan=scan)
if fields.size != 1:
import sys
print("warning: scan %d does not contain one field: %r" % (scan, fields))
field = fields[0]
fdir = ms.getfielddirmeas(fieldid=field)
els = np.empty(mjds.size)
for i in range(mjds.size):
timetmpl["m0"]["value"] = mjds[i]
me.doframe(timetmpl)
els[i] = me.measure(fdir, "AZEL")["m1"]["value"] * 180 / np.pi
dsn = field_dsns.get(field)
kt = None
if dsn is None:
dsn = len(field_dsns)
field_dsns[field] = dsn
kt = field_names[field]
p.addXY(mjds - mjd0, els, kt, dsn=dsn)
p.setLabels("MJD - %d(day)" % mjd0, "Elevation(deg)")
pager.send(p)
pager.done()
elplot_cli = makekwcli(elplot_doc, ElplotConfig, elplot)
# extractbpflags
#
# Not a CASA task, but I've found this to be very useful.
extractbpflags_doc = """
When CASA encounters flagged channels in bandpass calibration tables, it
interpolates over them as best it can -- even if interp='<any>,nearest'. This
means that if certain channels are unflagged in some target data but entirely
flagged in your BP cal, they'll get multiplied by some number that may or may
not be reasonable, not flagged. This is scary if, for instance, you're using
an automated system to find RFI, or you flag edge channels in some uneven way.
This script writes out a list of flagging commands corresponding to the
flagged channels in the bandpass table to ensure that the data without
bandpass solutions are flagged.
Note that, because we can't select by antpol, we can't express a situation in
which the R and L bandpass solutions have different flags. But in CASA the
flags seem to always be the same.
We're assuming that the channelization of the bandpass solution and the data
are the same.
"""
extractbpflags.__doc__ += extractbpflags_doc
def extractbpflags_cli(argv):
check_usage(extractbpflags_doc, argv, usageifnoargs="long")
if len(argv) != 2:
wrong_usage(extractbpflags_doc, "expect one MS name as an argument")
extractbpflags(argv[1], sys.stdout)
# flagcmd
flagcmd_doc = """
casatask flagcmd vis= [keywords..]
Flag data using auto-generated lists of flagging commands.
"""
[docs]
class FlagcmdConfig(ParseKeywords):
__doc__ = makecfgdoc("flagcmd", flagcmd_doc)
vis = Custom(str, required=True)
inpmode = "table"
useapplied = False
action = "apply"
flagbackup = True
loglevel = "warn"
[docs]
def flagcmd(cfg):
from .scripting import CasapyScript
script = os.path.join(os.path.dirname(__file__), "cscript_flagcmd.py")
args = dict(
vis=cfg.vis,
inpmode=cfg.inpmode,
useapplied=cfg.useapplied,
action=cfg.action,
flagbackup=cfg.flagbackup,
)
with CasapyScript(script, **args):
pass
flagcmd_cli = makekwcli(flagcmd_doc, FlagcmdConfig, flagcmd)
# flaglist. Not quite a CASA task; something like
# flagcmd(vis=, inpmode='list', inpfile=, flagbackup=False)
#
# We have to reproduce a lot of the dumb logic from the flaghelper.py module
# because we can't import it because it drags in the whole casapy pile of
# stuff.
flaglist_doc = """
casatask flaglist vis= inpfile= [datacol=]
Flag data using a list of flagging commands stored in a text file. This
is approximately equivalent to 'flagcmd(vis=, inpfile=, inpmode='list',
flagbackup=False)'.
This implementation must emulate the CASA modules that load up the
flagging commands and may not be precisely compatible with the CASA
version.
"""
[docs]
class FlaglistConfig(ParseKeywords):
__doc__ = makecfgdoc("flaglist", flaglist_doc)
vis = Custom(str, required=True)
inpfile = Custom(str, required=True)
datacol = "data"
loglevel = "warn"
[docs]
def flaglist(cfg):
from ast import literal_eval
try:
factory = util.tools.agentflagger
except AttributeError:
factory = util.tools.testflagger
af = factory()
af.open(cfg.vis, 0.0)
af.selectdata()
for row, origline in enumerate(open(cfg.inpfile)):
origline = origline.rstrip()
if not len(origline):
continue
if origline[0] == "#":
continue
# emulating flaghelper.py here and elsewhere ...
bits = origline.replace("true", "True").replace("false", "False").split(" ")
params = {}
lastkey = None
for bit in bits:
subbits = bit.split("=", 1)
if len(subbits) == 1:
assert lastkey is not None, "illegal flag list syntax"
params[lastkey] += " " + bit
else:
params[subbits[0]] = subbits[1]
lastkey = subbits[0]
assert "ntime" not in params, 'cannot handle "ntime" flag key'
for key in list(params.keys()):
val = params[key]
try:
val = literal_eval(val)
except ValueError:
val = val.strip("'\"")
params[key] = val
params["name"] = "agent_%d" % row
params["datacolumn"] = cfg.datacol.upper()
params["apply"] = True
params.setdefault("mode", "manual")
if not af.parseagentparameters(params):
raise Exception("cannot parse flag line: %s" % origline)
af.init()
# A summary report would be nice. run() should return
# info but I can't get it to do so.(I'm just trying to
# copy the task_flagdata.py implementation.)
af.run(True, True)
af.done()
flaglist_cli = makekwcli(flaglist_doc, FlaglistConfig, flaglist)
# flagmanager. Not really complicated enough to make it worth making a
# modular implementation to be driven from the CLI.
flagmanager_doc = """
casatask flagmanager list <ms>
casatask flagmanager save <ms> <name>
casatask flagmanager restore <ms> <name>
casatask flagmanager delete <ms> <name>
"""
[docs]
def flagmanager_cli(argv, alter_logger=True):
"""Command-line access to ``flagmanager`` functionality.
The ``flagmanager`` task manages tables of flags associated with
measurement sets. Its features are easy to implement that a standalone
library function is essentially unnecessary. See the source code to this
function for the tool calls that implement different parts of the
``flagmanager`` functionality.
"""
check_usage(flagmanager_doc, argv, usageifnoargs=True)
if len(argv) < 3:
wrong_usage(flagmanager_doc, "expect at least a mode and an MS name")
mode = argv[1]
ms = argv[2]
if alter_logger:
if mode == "list":
util.logger("info")
elif mode == "delete":
# it WARNs 'deleting version xxx' ... yargh
util.logger("severe")
else:
util.logger()
try:
factory = util.tools.agentflagger
except AttributeError:
factory = util.tools.testflagger
af = factory()
af.open(ms)
if mode == "list":
if len(argv) != 3:
wrong_usage(flagmanager_doc, "expect exactly one argument in list mode")
af.getflagversionlist()
elif mode == "save":
if len(argv) != 4:
wrong_usage(flagmanager_doc, "expect exactly two arguments in save mode")
from time import strftime
name = argv[3]
af.saveflagversion(
versionname=name,
merge="replace",
comment="created %s(casatask flagmanager)" % strftime("%Y-%m-%dT%H:%M:%SZ"),
)
elif mode == "restore":
if len(argv) != 4:
wrong_usage(flagmanager_doc, "expect exactly two arguments in restore mode")
name = argv[3]
af.restoreflagversion(versionname=name, merge="replace")
elif mode == "delete":
if len(argv) != 4:
wrong_usage(flagmanager_doc, "expect exactly two arguments in delete mode")
name = argv[3]
if not os.path.isdir(os.path.join(ms + ".flagversions", "flags." + name)):
# This condition only results in a WARN from deleteflagversion()!
raise RuntimeError(
'version "%s" doesn\'t exist in "%s.flagversions"' % (name, ms)
)
af.deleteflagversion(versionname=name)
else:
wrong_usage(flagmanager_doc, 'unknown flagmanager mode "%s"' % mode)
af.done()
# flagzeros. Not quite a CASA task; something like
# flagdata(vis=, mode='clip', clipzeros=True, flagbackup=False)
flagzeros_doc = """
casatask flagzeros vis= [datacol=]
Flag zero data points in the specified data column.
"""
[docs]
class FlagzerosConfig(ParseKeywords):
__doc__ = makecfgdoc("flagzeros", flagzeros_doc)
vis = Custom(str, required=True)
datacol = "data"
loglevel = "warn"
[docs]
def flagzeros(cfg):
try:
factory = util.tools.agentflagger
except AttributeError:
factory = util.tools.testflagger
af = factory()
af.open(cfg.vis, 0.0)
af.selectdata()
pars = dict(
datacolumn=cfg.datacol.upper(),
clipzeros=True,
name="CLIP",
mode="clip",
apply=True,
)
af.parseagentparameters(pars)
af.init()
# A summary report would be nice. run() should return
# info but I can't get it to do so.(I'm just trying to
# copy the task_flagdata.py implementation.)
af.run(True, True)
af.done()
flagzeros_cli = makekwcli(flagzeros_doc, FlagzerosConfig, flagzeros)
# fluxscale
fluxscale_doc = (
"""
casatask fluxscale vis=<MS> caltable=<MS> fluxtable=<new MS> reference=<fields> transfer=<fields> [keywords]
Write a new calibation table determining the fluxes for intermediate calibrators
from known reference sources
vis=
The visibility dataset.(Shouldn't be needed, but ...)
caltable=
The preexisting calibration table with gains associated with more than one source.
fluxtable=
The path of a new calibration table to create
reference=
Comma-separated names of sources whose model fluxes are assumed to be well-known.
transfer=
Comma-separated names of sources whose fluxes should be computed from the gains.
listfile=
If specified, write out flux information to this path.
append=
Boolean, default false. If true, append to existing cal table rather than
overwriting.
refspwmap=
Comma-separated list of integers. If gains are only available for some spws,
map from the data to the gains. For instance, refspwmap=1,1,3,3 means that spw 0
will have its flux calculated using the gains for spw 1.
"""
+ loglevel_doc
)
# Not supported in CASA 3.4:
# incremental=
# Boolean, default false. If true, create an "incremental" table where the amplitudes
# are correction factors, not absolute gains.(I.e., for the reference sources,
# the amplitudes will be unity.)
[docs]
class FluxscaleConfig(ParseKeywords):
__doc__ = makecfgdoc("fluxscale", fluxscale_doc)
vis = Custom(str, required=True)
caltable = Custom(str, required=True)
fluxtable = Custom(str, required=True)
reference = Custom([str], required=True)
transfer = Custom([str], required=True)
listfile = str
append = False
refspwmap = [int]
# incremental = False
loglevel = "warn"
[docs]
def fluxscale(cfg):
cb = util.tools.calibrater()
reference = cfg.reference
if isinstance(reference, (list, tuple)):
reference = ",".join(reference)
transfer = cfg.transfer
if isinstance(transfer, (list, tuple)):
transfer = ",".join(transfer)
refspwmap = cfg.refspwmap
if not len(refspwmap):
refspwmap = [-1]
cb.open(cfg.vis, compress=False, addcorr=False, addmodel=False)
result = cb.fluxscale(
tablein=cfg.caltable,
tableout=cfg.fluxtable,
reference=reference,
transfer=transfer,
listfile=cfg.listfile or "",
append=cfg.append,
refspwmap=refspwmap,
)
# incremental=cfg.incremental)
cb.close()
return result
fluxscale_cli = makekwcli(fluxscale_doc, FluxscaleConfig, fluxscale)
# ft
#
# We derive 'nterms' from len(model), and always derive reffreq
# from the model images. These seem like safe constraints?
ft_doc = (
"""
casatask ft vis=<MS> [keywords]
Fill in(or update) the MODEL_DATA column of a Measurement Set with
visibilities computed from an image or list of components.
vis=
The path to the measurement set
model=
Comma-separated list of model images, each giving successive
Taylor terms of a spectral model for each source.(It's fine
to have just one model, and this will do what you want.) The
reference frequency for the Taylor expansion is taken from
the first image.
complist=
Path to a CASA ComponentList Measurement Set to use in the modeling.
I don't know what happens if you specify both this and "model".
They might both get applied?
incremental=
Bool, default false, meaning that the MODEL_DATA column will be
replaced with the new values computed here. If true, the new values
are added to whatever's already in MODEL_DATA.
wprojplanes=
Optional integer. If provided, W-projection will be used in the computation
of the model visibilities, using the specified number of planes. Note that
this *does* make a difference even now that this task only embeds information
in a MS to enable later on-the-fly computation of the UV model.
usescratch=
Foo.
"""
+ stdsel_doc
+ loglevel_doc
)
[docs]
class FtConfig(ParseKeywords):
__doc__ = makecfgdoc("ft", ft_doc)
vis = Custom(str, required=True)
model = [str]
complist = str
incremental = False
wprojplanes = int
usescratch = False
antenna = str
field = str
observation = str
scan = str
spw = str
taql = str
timerange = str
uvrange = str
loglevel = "warn"
[docs]
def ft(cfg):
im = util.tools.imager()
im.open(cfg.vis, usescratch=cfg.usescratch)
im.selectvis(**extractmsselect(cfg, haveintent=False, taqltomsselect=False))
nmodel = len(cfg.model)
if nmodel > 1:
ia = util.tools.image()
ia.open(cfg.model[0])
# This gives Hz:
reffreq = ia.coordsys().referencevalue(type="spectral")["numeric"][0]
ia.close()
im.settaylorterms(ntaylorterms=nmodel, reffreq=reffreq)
if cfg.wprojplanes is not None:
im.defineimage()
im.setoptions(ftmachine="wproject", wprojplanes=cfg.wprojplanes)
im.ft(model=cfg.model, complist=cfg.complist or "", incremental=cfg.incremental)
im.close()
ft_cli = makekwcli(ft_doc, FtConfig, ft)
# gaincal
#
# I've folded in the bandpass functionality since there's
# so much overlap. Some limitations that this leads to:
#
# - bandpass solint has a frequency component that we don't support
# - bandpass combine defaults to 'scan'
gaincal_doc = (
"""
casatask gaincal vis=<MS> caltable=<TBL> [keywords]
Compute calibration parameters from data. Encompasses the functionality
of CASA tasks 'gaincal' *and* 'bandpass'.
vis=
Input dataset
caltable=
Output calibration table (can exist if append=True)
gaintype=
Kind of gain solution:
G - gains per poln and spw(default)
T - like G, but one value for all polns
GSPLINE - like G, with a spline fit. "Experimental"
B - bandpass
BPOLY - bandpass with polynomial fit. "Somewhat experimental"
K - antenna-based delays
KCROSS - global cross-hand delay ; use parang=True
D - solve for instrumental leakage
Df - above with per-channel leakage terms
D+QU - solve for leakage and apparent source polarization
Df+QU - above with per-channel leakage terms
X - solve for absolute position angle phase term
Xf - above with per-channel phase terms
D+X - D and X. "Not normally done"
Df+X - Df and X. Presumably also not normally done.
XY+QU - ?
XYf+QU - ?
calmode=
What parameters to solve for: amplitude("a"), phase("p"), or both
("ap"). Default is "ap". Not used in bandpass solutions.
solint=
Solution interval; this is an upper bound, but solutions
will be broken across certain boundaries according to "combine".
'inf' - solutions as long as possible(the default)
'int' - one solution per integration
(str) - a specific time with units(e.g. '5min')
(number) - a specific time in seconds
combine=
Comma-separated list of boundary types; solutions will NOT be broken across
these boundaries. Types are: ``field``, ``scan``, ``spw``.
refant=
Comma-separated list of reference antennas in decreasing
priority order.
solnorm=
Normalize solution amplitudes to 1 after solving (only applies
to gaintypes G, T, B). Also normalizes bandpass phases to zero
when solving for bandpasses. Default: false.
append=
Whether to append solutions to an existing table. If the table
exists and append=False, the table is overwritten! (Default: false)
"""
+ precal_doc
+ """
**Low-level parameters**
minblperant=
Number of baselines for each ant in order to solve (default: 4)
minsnr=
Min. SNR for acceptable solutions (default: 3.0)
preavg=
Interval for pre-averaging data within each solution interval,
in seconds. Default is -1, meaning not to pre-average.
smodel=I,Q,U,V
Full-stokes point source model to use, if none is embedded in the vis file.
"""
+ stdsel_doc
+ loglevel_doc
)
[docs]
class GaincalConfig(ParseKeywords):
__doc__ = makecfgdoc("gaincal", gaincal_doc)
vis = Custom(str, required=True)
caltable = Custom(str, required=True)
gaintype = "G"
calmode = "ap"
solint = "inf"
combine = [str]
refant = [str]
solnorm = False
append = False
minblperant = 4
minsnr = 3.0
preavg = -1.0
gaintable = [str]
gainfield = Custom([str], sep=";")
interp = [str]
opacity = [float]
gaincurve = False
parang = False
smodel = [int]
@Custom([str], sep=";")
def spwmap(v):
return [list(map(int, e.split(","))) for e in v]
# gaincal keywords: splinetime npointaver phasewrap
# bandpass keywords: fillgaps degamp degphase visnorm maskcenter
# maskedge
antenna = str
field = str
intent = str
observation = str
scan = str
spw = str
taql = str # msselect
timerange = str
uvrange = str
loglevel = "warn" # teeny hack for CLI
[docs]
def gaincal(cfg):
cb = util.tools.calibrater()
cb.open(filename=cfg.vis, compress=False, addcorr=False, addmodel=False)
selkws = extractmsselect(cfg)
selkws["chanmode"] = "none" # ?
cb.selectvis(**selkws)
applyonthefly(cb, cfg)
if cfg.smodel is not None and len(cfg.smodel):
cb.setptmodel(cfg.smodel)
# Solve
solkws = {}
for k in "append preavg minblperant minsnr solnorm".split():
solkws[k] = getattr(cfg, k)
for p in "caltable:table calmode:apmode".split():
ck, sk = p.split(":")
solkws[sk] = getattr(cfg, ck)
if isinstance(cfg.solint, (int, float)):
solkws["t"] = "%fs" % cfg.solint # sugar
else:
solkws["t"] = str(cfg.solint)
if isinstance(cfg.refant, str):
solkws["refant"] = cfg.refant
else:
solkws["refant"] = ",".join(cfg.refant)
solkws["combine"] = ",".join(cfg.combine)
solkws["phaseonly"] = False
solkws["type"] = cfg.gaintype.upper()
if solkws["type"] == "GSPLINE":
cb.setsolvegainspline(**solkws)
elif solkws["type"] == "BPOLY":
cb.setsolvebandpoly(**solkws)
else:
cb.setsolve(**solkws)
cb.solve()
cb.close()
gaincal_cli = makekwcli(gaincal_doc, GaincalConfig, gaincal)
# gencal
gencal_doc = (
"""
casatask gencal vis=<MS> caltable=<TBL> caltype=<TYPE> [keywords...]
Generate certain calibration tables that don't need to be solved for from
the actual data.
If you want to generate antenna position corrections for Jansky VLA data, you
can just specify `caltype=antpos` and leave off the "parameter" keyword. This
will cause the task will talk to an NRAO server and automatically download the
correct position corrections. Other telescopes do not support this
functionality, but if you can obtain the position corrections, you can use the
"antenna" and "parameter" keywords to build the desired calibration table
manually.
vis=
Input dataset
caltable=
Output calibration table (appended to if preexisting)
caltype=
The kind of table to generate:
amp - generic amplitude correction; needs parameter(s)
ph - generic phase correction; needs parameter(s)
sbd - single-band delay: phase slope for each SPW; needs parameter(s)
mbd - multi-band delay: phase slope for all SPWs; needs parameter(s)
antpos - antenna position corrections in ITRF; what you want; accepts parameter(s)
antposvla - antenna position corrections in VLA frame; **not what you want**; accepts parameter(s)
tsys - tsys from ALMA syscal table
swpow - EVLA switched-power and requantizer gains("experimental")
opac - tropospheric opacity; needs parameter
gc - (E)VLA elevation-dependent gain curve
eff - (E)VLA antenna efficiency correction
gceff - combination of "gc" and "eff"
rq - EVLA requantizer gains; not what you want
swp/rq - EVLA switched-power gains divided by "rq"; not what you want
parameter=
Custom parameters for various caltypes. Dimensionality depends on selections applied.
amp - gain; dimensionless
ph - phase; degrees
sbd - delay; nanosec
mbd - delay; nanosec
antpos - position offsets; ITRF meters(or look up automatically for EVLA if unspecified)
antposvla - position offsets; meters in VLA reference frame
opac - opacity; dimensionless(nepers?)
antenna=
Selection keyword, governing which solutions to generate and controlling shape
of "parameter" keyword.
pol=
Analogous to "antenna"
spw=
Analogous to "antenna"
"""
+ loglevel_doc
)
[docs]
class GencalConfig(ParseKeywords):
__doc__ = makecfgdoc("gencal", gencal_doc)
vis = Custom(str, required=True)
caltable = Custom(str, required=True)
caltype = Custom(str, required=True)
parameter = [float]
antenna = str
pol = str
spw = str
loglevel = "warn"
[docs]
def gencal(cfg):
cb = util.tools.calibrater()
cb.open(filename=cfg.vis, compress=False, addcorr=False, addmodel=False)
antenna = cfg.antenna or ""
parameter = cfg.parameter
if cfg.caltype == "antpos" and cfg.antenna is None:
# There's a Python module in casapy that implements this; I don't want
# to shadow it entirely ...
from .scripting import CasapyScript
script = os.path.join(os.path.dirname(__file__), "cscript_genantpos.py")
with CasapyScript(script, vis=cfg.vis) as cs:
try:
with open(os.path.join(cs.workdir, "info.npy"), "rb") as f:
antenna = np.load(f)
parameter = np.load(f)
except Exception:
reraise_context("interal casapy script seemingly failed; no info.npy")
antenna = antenna.tostring()
cb.specifycal(
caltable=cfg.caltable,
time="",
spw=cfg.spw or "",
antenna=antenna,
pol=cfg.pol or "",
caltype=cfg.caltype,
parameter=parameter,
)
cb.close()
gencal_cli = makekwcli(gencal_doc, GencalConfig, gencal)
# getopacities
#
# This is a casapy script.
getopacities_doc = """
casatask getopacities <MS> <plotdest> [spwwidth1,spwwidth2]
Calculate atmospheric opacities in the MS's spectral windows from its weather
data. Output the opacities and save a plot of the weather conditions. Optionally
output opacities averaged over spectral windows; for instance, in a standard
VLA wideband setup, in which the data come in 16 spectral windows,
casatask getopacities unglued.ms weather.png 8,8
will print 2 values, averaged over 8 spectral windows each.
"""
[docs]
def getopacities(ms, plotdest):
try:
import casatasks
except ImportError:
# CASA 5: have to script it
from .scripting import CasapyScript
script = os.path.join(os.path.dirname(__file__), "cscript_getopacities.py")
with CasapyScript(script, ms=ms, plotdest=plotdest) as cs:
try:
with open(os.path.join(cs.workdir, "opac.npy"), "rb") as f:
opac = np.load(f)
except Exception:
reraise_context("interal casapy script seemingly failed; no opac.npy")
else:
# CASA 6+: can invoke the code directly
opac = np.asarray(casatasks.plotweather(vis=ms, plotName=plotdest))
return opac
def getopacities_cli(argv):
check_usage(getopacities_doc, argv, usageifnoargs=True)
if len(argv) not in (3, 4):
wrong_usage(getopacities_doc, "expected 2 or 3 arguments")
opac = getopacities(argv[1], argv[2])
if len(argv) > 3:
spwwidths = [int(x) for x in argv[3].split(",")]
averaged = []
idx = 0
for width in spwwidths:
averaged.append(opac[idx : idx + width].mean())
idx += width
opac = averaged
print("opacity = [%s]" % (", ".join("%.5f" % q for q in opac)))
# gpdetrend
gpdetrend_doc = (
"""
casatask gpdetrend caltable=
Remove long-term phase trends from a complex-gain calibration table. For each
antenna/spw/pol, the complex gains are divided into separate chunks(e.g., the
intention is for one chunk for each visit to the complex-gain calibrator). The
mean phase within each chunk is divided out. The effect is to remove long-term
phase trends from the calibration table, but preserve short-term ones.
caltable=MS
The input calibration Measurement Set
maxtimegap=int
Measured in minutes. Gaps between solutions of this duration or longer will
lead to a new segment being considered. Default is four times the smallest
time gap seen in the data set.
"""
+ loglevel_doc
)
[docs]
class GpdetrendConfig(ParseKeywords):
__doc__ = makecfgdoc("gpdetrend", gpdetrend_doc)
caltable = Custom(str, required=True)
maxtimegap = int
loglevel = "warn"
[docs]
def gpdetrend(cfg):
from ... import numutil
tb = util.tools.table()
tb.open(cfg.caltable, nomodify=False)
# fields = tb.getcol(b'FIELD_ID')
spws = tb.getcol("SPECTRAL_WINDOW_ID")
ants = tb.getcol("ANTENNA1")
vals = tb.getcol("CPARAM")
flags = tb.getcol("FLAG")
times = tb.getcol("TIME")
npol, unused, nsoln = vals.shape
assert unused == 1, "unexpected gain table structure!"
vals = vals[:, 0, :]
flags = flags[:, 0, :]
# see what we've got
def seen_values(data):
return [idx for idx, count in enumerate(np.bincount(data)) if count]
any_ok = ~(
np.all(flags, axis=0)
) # mask for solns where at least one pol is unflagged
# seenfields = seen_values(fields[any_ok])
seenspws = seen_values(spws[any_ok])
seenants = seen_values(ants[any_ok])
# time gap?
if cfg.maxtimegap is not None:
maxtimegap = cfg.maxtimegap * 60 # min => seconds
else:
stimes = np.sort(times)
dt = np.diff(stimes)
dt = dt[dt > 0]
maxtimegap = 4 * dt.min()
# Remove average phase of each chunk
for iant in seenants:
for ipol in range(npol):
filter = (ants == iant) & ~flags[ipol]
for ispw in seenspws:
# XXX: do something by field?
sfilter = filter & (spws == ispw)
t = times[sfilter]
if not t.size:
continue
for s in numutil.slice_around_gaps(t, maxtimegap):
w = np.where(sfilter)[0][s]
meanph = np.angle(vals[ipol, w].mean())
vals[ipol, w] *= np.exp(-1j * meanph)
# Rewrite and we're done.
tb.putcol("CPARAM", vals.reshape((npol, 1, nsoln)))
tb.close()
gpdetrend_cli = makekwcli(gpdetrend_doc, GpdetrendConfig, gpdetrend)
# gpdiagnostics
#
# Shim for a separate module
def gpdiagnostics_cli(argv):
from .gpdiagnostics import gpdiagnostics_cli
gpdiagnostics_cli(argv)
# gpplot
#
# See bpplot() -- CASA plotcal can do this in a certain sense, but it's slow
# and ugly.
gpplot_doc = (
"""
casatask gpplot caltable= dest=
Plot a gain calibration table. Currently, the supported format is a series of
pages showing amplitude and phase against time, with each page showing a
particular antenna and polarization. Polarizations are always reported as "R"
and "L" since the relevant information is not stored within the bandpass data
set.
caltable=MS
The input calibration Measurement Set
dest=PATH
If specified, plots are saved to this file -- the format is inferred
from the extension, which must allow multiple pages to be saved. If
unspecified, the plots are displayed using a Gtk3 backend.
dims=WIDTH,HEIGHT
If saving to a file, the dimensions of a each page. These are in points
for vector formats(PDF, PS) and pixels for bitmaps(PNG). Defaults to
1000, 600.
margins=TOP,RIGHT,LEFT,BOTTOM
If saving to a file, the plot margins in the same units as the dims.
The default is 4 on every side.
maxtimegap=10
Solutions separated by more than this number of minutes will be drawn
with separate lines for clarity.
mjdrange=START,STOP
If specified, gain solutions outside of the MJDs STOP and START will be
ignored.
phaseonly=false
If True, plot only phases, and not amplitudes.
"""
+ loglevel_doc
)
[docs]
class GpplotConfig(ParseKeywords):
__doc__ = makecfgdoc("gpplot", gpplot_doc)
caltable = Custom(str, required=True)
dest = str
dims = [1000, 600]
margins = [4, 4, 4, 4]
maxtimegap = 10.0 # minutes
mjdrange = [float]
phaseonly = False
loglevel = "warn"
[docs]
def gpplot(cfg):
import omega as om, omega.render
from ... import numutil
if len(cfg.mjdrange) not in (0, 2):
raise Exception('"mjdrange" parameter must provide exactly 0 or 2 numbers')
if isinstance(cfg.dest, omega.render.Pager):
# This is for non-CLI invocation.
pager = cfg.dest
elif cfg.dest is None:
import omega.gtk3
pager = om.makeDisplayPager()
else:
pager = om.makePager(
cfg.dest,
dims=cfg.dims,
margins=cfg.margins,
style=om.styles.ColorOnWhiteVector(),
)
tb = util.tools.table()
tb.open(cfg.caltable, nomodify=True)
fields = tb.getcol("FIELD_ID")
spws = tb.getcol("SPECTRAL_WINDOW_ID")
ants = tb.getcol("ANTENNA1")
vals = tb.getcol("CPARAM")
flags = tb.getcol("FLAG")
times = tb.getcol("TIME")
tb.close()
tb.open(os.path.join(cfg.caltable, "ANTENNA"), nomodify=True)
names = tb.getcol("NAME")
tb.close()
npol, unused, nsoln = vals.shape
assert unused == 1, "unexpected gain table structure!"
vals = vals[:, 0, :]
flags = flags[:, 0, :]
# Apply date filter by futzing with flags
times /= 86400.0 # convert to MJD
if len(cfg.mjdrange):
flags |= (times < cfg.mjdrange[0]) | (times > cfg.mjdrange[1])
# see what we've got
def seen_values(data):
return [idx for idx, count in enumerate(np.bincount(data)) if count]
any_ok = ~(
np.all(flags, axis=0)
) # mask for solns where at least one pol is unflagged
seenfields = seen_values(fields[any_ok])
seenspws = seen_values(spws[any_ok])
seenants = seen_values(ants[any_ok])
field_offsets = dict((fieldid, idx) for idx, fieldid in enumerate(seenfields))
spw_offsets = dict((spwid, idx) for idx, spwid in enumerate(seenspws))
ant_offsets = dict((antid, idx) for idx, antid in enumerate(seenants))
# normalize phases to avoid distracting wraps
mean_amps = np.ones((len(seenants), npol, len(seenspws)))
for iant in seenants:
for ipol in range(npol):
filter = (ants == iant) & ~flags[ipol]
for ispw in seenspws:
sfilter = filter & (spws == ispw)
if not sfilter.any():
continue
meanph = np.angle(vals[ipol, sfilter].mean())
vals[ipol, sfilter] *= np.exp(-1j * meanph)
meanam = np.abs(vals[ipol, sfilter]).mean()
mean_amps[ant_offsets[iant], ipol, spw_offsets[ispw]] = meanam
# find plot limits
min_time = times[any_ok].min()
max_time = times[any_ok].max()
mjdref = int(np.floor(min_time))
times -= mjdref # convert to delta-MJD
min_time = min_time - mjdref
max_time = max_time - mjdref
span = max_time - min_time
if span <= 0:
if max_time == 0:
span = 1.0
else:
span = 0.05 * max_time
max_time += 0.05 * span
min_time -= 0.05 * span
okvals = vals[np.where(~flags)]
max_am = np.abs(okvals).max()
min_am = np.abs(okvals).min()
span = max_am - min_am
if span <= 0:
if max_am == 0:
span = 1.0
else:
span = 0.05 * max_am
max_am += 0.05 * span
min_am -= 0.05 * span
max_ph = np.angle(okvals, deg=True).max()
min_ph = np.angle(okvals, deg=True).min()
span = max_ph - min_ph
if span <= 0:
if max_ph == 0:
span = 1.0
else:
span = 0.05 * max_ph
max_ph += 0.05 * span
min_ph -= 0.05 * span
if max_ph > 160:
max_ph = 180
if min_ph < -160:
min_ph = -180
polnames = "RL" # XXX: identification doesn't seem to be stored in cal table
maxtimegap = cfg.maxtimegap / 1440 # => units of days
for iant in seenants:
for ipol in range(npol):
filter = (ants == iant) & ~flags[ipol]
p_am = om.RectPlot()
p_ph = om.RectPlot()
anyseen = False
for ispw in seenspws:
# XXX: do something by field
sfilter = filter & (spws == ispw)
t = times[sfilter]
if not t.size:
continue
v = vals[ipol, sfilter]
a = np.abs(v)
p = np.angle(v, deg=True)
kt = "%s %s spw#%d" % (names[iant], polnames[ipol], ispw)
for s in numutil.slice_around_gaps(t, maxtimegap):
tsub, asub, psub = t[s], a[s], p[s]
if tsub.size == 0:
continue # Should never happen, but eh.
else:
lines = tsub.size > 1
if cfg.phaseonly:
p_ph.addXY(tsub, psub, kt, lines=lines, dsn=spw_offsets[ispw])
else:
p_am.addXY(tsub, asub, kt, lines=lines, dsn=spw_offsets[ispw])
p_ph.addXY(tsub, psub, None, lines=lines, dsn=spw_offsets[ispw])
anyseen = True
if kt is not None: # hack for per-spw "anyseen"-type checking
p_am.addHLine(
mean_amps[ant_offsets[iant], ipol, spw_offsets[ispw]],
None,
zheight=-1,
lineStyle={"color": "faint"},
)
kt = None
if not anyseen:
continue
p_ph.addHLine(0, None, zheight=-1, lineStyle={"color": "faint"})
p_am.setBounds(xmin=min_time, xmax=max_time, ymin=min_am, ymax=max_am)
p_ph.setBounds(xmin=min_time, xmax=max_time, ymin=min_ph, ymax=max_ph)
p_am.bpainter.paintLabels = False
p_am.setYLabel("Amplitude")
p_ph.setLabels("Time(MJD - %d)" % mjdref, "De-meaned Phase(deg)")
if cfg.phaseonly:
pager.send(p_ph)
else:
vb = om.layout.VBox(2)
vb[0] = p_am
vb[1] = p_ph
vb.setWeight(0, 2.5)
pager.send(vb)
gpplot_cli = makekwcli(gpplot_doc, GpplotConfig, gpplot)
# image2fits
#
# This is basically the "exportfits" task with fewer options and a slightly
# clearer name.
image2fits_doc = """
casatask image2fits <input MS image> <output FITS image>
Convert an image in MS format to FITS format.
"""
[docs]
def image2fits(
mspath,
fitspath,
velocity=False,
optical=False,
bitpix=-32,
minpix=0,
maxpix=-1,
overwrite=False,
dropstokes=False,
stokeslast=True,
history=True,
**kwargs
):
"""Convert an image in MS format to FITS format.
mspath (str)
The path to the input MS.
fitspath (str)
The path to the output FITS file.
velocity (boolean)
(To be documented.)
optical (boolean)
(To be documented.)
bitpix (integer)
(To be documented.)
minpix (integer)
(To be documented.)
maxpix (integer)
(To be documented.)
overwrite (boolean)
Whether the task is allowed to overwrite an existing destination file.
dropstokes (boolean)
Whether the "Stokes" (polarization) axis of the image should be dropped.
stokeslast (boolean)
Whether the "Stokes" (polarization) axis of the image should be placed as the last
(innermost?) axis of the image cube.
history (boolean)
(To be documented.)
``**kwargs``
Forwarded on to the ``tofits`` function of the CASA ``image`` tool.
"""
ia = util.tools.image()
ia.open(mspath)
ia.tofits(
outfile=fitspath,
velocity=velocity,
optical=optical,
bitpix=bitpix,
minpix=minpix,
maxpix=maxpix,
overwrite=overwrite,
dropstokes=dropstokes,
stokeslast=stokeslast,
history=history,
**kwargs
)
ia.close()
def image2fits_cli(argv):
check_usage(image2fits_doc, argv, usageifnoargs=True)
util.logger()
if len(argv) != 3:
wrong_usage(image2fits_doc, "expected exactly 2 arguments")
image2fits(argv[1], argv[2])
# importalma
#
# This is a casapy script. We don't reeeallly need to be, but there's enough
# logic in CASA's task_importasdm.py that I'm not thrilled to copy/paste it
# all.
importalma_doc = """
casatask importalma <ASDM> <MS>
Convert an ALMA low-level ASDM dataset to Measurement Set format. This
implementation automatically infers the value of the "tbuff" parameter.
"""
[docs]
def importalma(asdm, ms):
"""Convert an ALMA low-level ASDM dataset to Measurement Set format.
asdm (str)
The path to the input ASDM dataset.
ms (str)
The path to the output MS dataset.
This implementation automatically infers the value of the "tbuff"
parameter.
Example::
from pwkit.environments.casa import tasks
tasks.importalma('myalma.asdm', 'myalma.ms')
"""
from .scripting import CasapyScript
script = os.path.join(os.path.dirname(__file__), "cscript_importalma.py")
with CasapyScript(script, asdm=asdm, ms=ms) as cs:
pass
def importalma_cli(argv):
check_usage(importalma_doc, argv, usageifnoargs=True)
if len(argv) != 3:
wrong_usage(importalma_doc, "expected exactly 2 arguments")
importalma(argv[1], argv[2])
# importasdm
#
# CASA 6 only
importasdm_doc = """
casatask importasdm <ASDM> <MS>
Convert a low-level ASDM dataset to Measurement Set format. This implementation
automatically infers the value of the "tbuff" parameter.
"""
def importasdm(asdm, ms):
"""Convert a low-level ASDM dataset to Measurement Set format.
asdm (str)
The path to the input ASDM dataset.
ms (str)
The path to the output MS dataset.
This implementation automatically infers the value of the "tbuff"
parameter.
Example::
from pwkit.environments.casa import tasks
tasks.importasdm('myvla.sdm', 'myvla.ms')
"""
import casatasks
# Here's the best way I can figure to find the recommended value of tbuff
# (= 1.5 * integration time). Obviously you might have different
# integration times in the dataset and such, and we're just going to
# ignore that possibility.
bdfstem = os.listdir(os.path.join(asdm, "ASDMBinary"))[0]
bdf = os.path.join(asdm, "ASDMBinary", bdfstem)
tbuff = None
with open(bdf, "rb") as f:
for linenum, line in enumerate(f):
if linenum > 60:
raise PKError("cannot find integration time info in %s", bdf)
if not line.startswith(b"<sdmDataSubsetHeader"):
continue
try:
i1 = line.index(b"<interval>") + len(b"<interval>")
i2 = line.index(b"</interval>")
if i2 <= i1:
raise ValueError()
except ValueError:
raise PKError("cannot parse integration time info in %s", bdf)
tbuff = float(line[i1:i2]) * 1.5e-9 # nanosecs, and want 1.5x
break
if tbuff is None:
raise PKError("found no integration time info")
print("importasdm: %s -> %s with tbuff=%.2f" % (asdm, ms, tbuff))
casatasks.importasdm(
asdm=asdm,
vis=ms,
ocorr_mode="co",
tbuff=tbuff,
applyflags=True,
flagbackup=False,
process_pointing=False,
with_pointing_correction=True,
)
def importasdm_cli(argv):
check_usage(importasdm_doc, argv, usageifnoargs=True)
if len(argv) != 3:
wrong_usage(importasdm_doc, "expected exactly 2 arguments")
importasdm(argv[1], argv[2])
# importevla
#
# Superseded by importasdm.
#
# This is a casapy script. We don't reeeallly need to be, but there's enough
# logic in CASA's task_importevla.py that I'm not thrilled to copy/paste it
# all.
importevla_doc = """
casatask importevla <ASDM> <MS>
Convert an EVLA low-level ASDM dataset to Measurement Set format. This
implementation automatically infers the value of the "tbuff" parameter.
"""
[docs]
def importevla(asdm, ms):
"""Convert an EVLA low-level SDM dataset to Measurement Set format.
asdm (str)
The path to the input ASDM dataset.
ms (str)
The path to the output MS dataset.
This implementation automatically infers the value of the "tbuff"
parameter.
Example::
from pwkit.environments.casa import tasks
tasks.importevla('myvla.sdm', 'myvla.ms')
"""
from .scripting import CasapyScript
# Here's the best way I can figure to find the recommended value of tbuff
# (= 1.5 * integration time). Obviously you might have different
# integration times in the dataset and such, and we're just going to
# ignore that possibility.
bdfstem = os.listdir(os.path.join(asdm, "ASDMBinary"))[0]
bdf = os.path.join(asdm, "ASDMBinary", bdfstem)
tbuff = None
with open(bdf, "rb") as f:
for linenum, line in enumerate(f):
if linenum > 60:
raise PKError("cannot find integration time info in %s", bdf)
if not line.startswith(b"<sdmDataSubsetHeader"):
continue
try:
i1 = line.index(b"<interval>") + len(b"<interval>")
i2 = line.index(b"</interval>")
if i2 <= i1:
raise ValueError()
except ValueError:
raise PKError("cannot parse integration time info in %s", bdf)
tbuff = float(line[i1:i2]) * 1.5e-9 # nanosecs, and want 1.5x
break
if tbuff is None:
raise PKError("found no integration time info")
print("importevla: %s -> %s with tbuff=%.2f" % (asdm, ms, tbuff))
script = os.path.join(os.path.dirname(__file__), "cscript_importevla.py")
with CasapyScript(script, asdm=asdm, ms=ms, tbuff=tbuff) as cs:
pass
def importevla_cli(argv):
check_usage(importevla_doc, argv, usageifnoargs=True)
if len(argv) != 3:
wrong_usage(importevla_doc, "expected exactly 2 arguments")
importevla(argv[1], argv[2])
# listobs
#
# This one is mostly about the CLI.
listobs_doc = """
casatask listobs <MS>
Generate a standard "listobs" listing of visibility MS contents. If
standard output is a TTY, the listing will be paged.
"""
[docs]
def listobs(vis):
"""Textually describe the contents of a measurement set.
vis (str)
The path to the dataset.
Returns
A generator of lines of human-readable output
Errors can only be detected by looking at the output. Example::
from pwkit.environments.casa import tasks
for line in tasks.listobs('mydataset.ms'):
print(line)
"""
def inner_list(sink):
try:
ms = util.tools.ms()
ms.open(vis)
ms.summary(verbose=True)
ms.close()
except Exception as e:
sink.post("listobs failed: %s" % e, priority="SEVERE")
for line in util.forkandlog(inner_list):
info = line.rstrip().split("\t", 3) # date, priority, origin, message
if len(info) > 3:
yield info[3]
else:
yield ""
def listobs_cli(argv):
check_usage(listobs_doc, argv, usageifnoargs=True)
if len(argv) != 2:
wrong_usage(listobs_doc, "expect exactly one argument, the MS path")
vis = argv[1]
proc = None
out = sys.stdout
if sys.stdout.isatty() or (
hasattr(sys.stdout, "stream") and sys.stdout.stream.isatty()
):
# Send our output to a pager!
import subprocess
pager = os.environ.get("PAGER") or "less -SRFX"
try:
proc = subprocess.Popen(
pager, stdin=subprocess.PIPE, close_fds=True, shell=True
)
except Exception as e:
warn("couldn't start pager %r: %s", pager, e)
else:
import codecs
out = proc.stdin
out = codecs.getwriter("utf8")(out)
for line in listobs(vis):
print(line, file=out)
if proc is not None:
proc.stdin.close()
proc.wait() # ignore return code
# listsdm
#
# This one is mostly about the CLI.
listsdm_doc = """
casatask listsdm <MS>
Generate a standard "listsdm" listing of(A)SDM dataset contents. If standard
output is a TTY, the listing will be paged.
The CASA "listsdm" functionality is implemented in pure Python, so unlike the
"listobs" case, here we alter the standard format to be more to our liking.
"""
[docs]
def listsdm(sdm, file=None):
"""Generate a standard "listsdm" listing of(A)SDM dataset contents.
sdm (str)
The path to the (A)SDM dataset to parse
file (stream-like object, such as an opened file)
Where to print the human-readable listing. If unspecified, results
go to :data:`sys.stdout`.
Returns
A dictionary of information about the dataset. Contents not yet
documented.
Example::
from pwkit.environments.casa import tasks
tasks.listsdm('myalmaa.asdm')
This code based on CASA's `task_listsdm.py`, with this version info::
# v1.0: 2010.12.07, M. Krauss
# v1.1: 2011.02.23, M. Krauss: added functionality for ALMA data
#
# Original code based on readscans.py, courtesy S. Meyers
"""
from xml.dom import minidom
import string
def printf(fmt, *args):
if len(args):
s = fmt % args
else:
s = str(fmt)
print(s, file=file)
qa = util.tools.quanta()
me = util.tools.measures()
list_scans = True
list_antennas = False
list_fields = True
list_spws = False
# read Scan.xml
xmlscans = minidom.parse(sdm + "/Scan.xml")
scandict = {}
startTimeShort = []
endTimeShort = []
rowlist = xmlscans.getElementsByTagName("row")
for rownode in rowlist:
rowfid = rownode.getElementsByTagName("scanNumber")
fid = int(rowfid[0].childNodes[0].nodeValue)
scandict[fid] = {}
# number of subscans
rowsubs = rownode.getElementsByTagName("numSubscan")
if len(rowsubs) == 0:
# EVLA and old ALMA data
rowsubs = rownode.getElementsByTagName("numSubScan")
nsubs = int(rowsubs[0].childNodes[0].nodeValue)
# intents
rownint = rownode.getElementsByTagName("numIntent")
nint = int(rownint[0].childNodes[0].nodeValue)
rowintents = rownode.getElementsByTagName("scanIntent")
sint = str(rowintents[0].childNodes[0].nodeValue)
sints = sint.split()
rint = ""
for r in range(nint):
intent = sints[2 + r]
if rint == "":
rint = intent
else:
rint += " " + intent
# start and end times in mjd ns
rowstart = rownode.getElementsByTagName("startTime")
start = int(rowstart[0].childNodes[0].nodeValue)
startmjd = float(start) * 1.0e-9 / 86400.0
t = qa.quantity(startmjd, "d")
starttime = qa.time(t, form="ymd", prec=8)[0]
startTimeShort.append(qa.time(t, prec=8)[0])
rowend = rownode.getElementsByTagName("endTime")
end = int(rowend[0].childNodes[0].nodeValue)
endmjd = float(end) * 1.0e-9 / 86400.0
t = qa.quantity(endmjd, "d")
endtime = qa.time(t, form="ymd", prec=8)[0]
endTimeShort.append(qa.time(t, prec=8)[0])
# source name
rowsrc = rownode.getElementsByTagName("sourceName")
try:
src = str(rowsrc[0].childNodes[0].nodeValue)
except:
src = "???" # PKGW
scandict[fid]["start"] = starttime
scandict[fid]["end"] = endtime
timestr = starttime + "~" + endtime
scandict[fid]["timerange"] = timestr
scandict[fid]["source"] = src
scandict[fid]["intent"] = rint
scandict[fid]["nsubs"] = nsubs
# read Main.xml
xmlmain = minidom.parse(sdm + "/Main.xml")
rowlist = xmlmain.getElementsByTagName("row")
mainScanList = []
mainConfigList = []
fieldIdList = []
for rownode in rowlist:
# get the scan numbers
rowfid = rownode.getElementsByTagName("scanNumber")
fid = int(rowfid[0].childNodes[0].nodeValue)
mainScanList.append(fid)
# get the configuration description
rowconfig = rownode.getElementsByTagName("configDescriptionId")
config = str(rowconfig[0].childNodes[0].nodeValue)
mainConfigList.append(config)
# get the field ID
rowfieldid = rownode.getElementsByTagName("fieldId")
fieldid = string.split(str(rowfieldid[0].childNodes[0].nodeValue), "_")[1]
fieldIdList.append(fieldid)
# read ConfigDescription.xml to relate the configuration
# description to a(set) of data description IDs
xmlconfig = minidom.parse(sdm + "/ConfigDescription.xml")
rowlist = xmlconfig.getElementsByTagName("row")
configDescList = []
dataDescList = []
for rownode in rowlist:
# get the configuration description
rowConfigDesc = rownode.getElementsByTagName("configDescriptionId")
configDesc = str(rowConfigDesc[0].childNodes[0].nodeValue)
configDescList.append(configDesc)
# make a list of the data description IDs:
rowNumDataDesc = rownode.getElementsByTagName("numDataDescription")
numDataDesc = int(rowNumDataDesc[0].childNodes[0].nodeValue)
rowDataDesc = rownode.getElementsByTagName("dataDescriptionId")
dataDescStr = str(rowDataDesc[0].childNodes[0].nodeValue)
dataDescSplit = dataDescStr.split()
dataDesc = []
for i in range(numDataDesc):
dataDesc.append(dataDescSplit[i + 2])
dataDescList.append(dataDesc)
# read DataDescription.xml to relate the data description IDs to
# spectral window IDs
xmlDataDesc = minidom.parse(sdm + "/DataDescription.xml")
rowlist = xmlDataDesc.getElementsByTagName("row")
dataDescElList = []
spwIdDataDescList = []
for rownode in rowlist:
# get the data description ID, make another list:
rowDataDescEl = rownode.getElementsByTagName("dataDescriptionId")
dataDescEl = str(rowDataDescEl[0].childNodes[0].nodeValue)
dataDescElList.append(dataDescEl)
# get the related spectral window ID:
rowSpwIdDataDesc = rownode.getElementsByTagName("spectralWindowId")
spwIdDataDesc = str(rowSpwIdDataDesc[0].childNodes[0].nodeValue)
spwIdDataDescList.append(spwIdDataDesc)
# read SpectralWindow.xml, get information about number of
# channels, reference frequency, baseband name, channel width.
# Interesting that there seem to be multiple fields that give the
# same information: chanFreqStart=reFreq,
# chanFreqStep=chanWidth=resolution. Why?(Note: all units are Hz)
# Note: this is where the script breaks for ALMA data, since there
# are different tags in SpectraWindow.xml(for varying channel widths).
xmlSpecWin = minidom.parse(sdm + "/SpectralWindow.xml")
rowlist = xmlSpecWin.getElementsByTagName("row")
spwIdList = []
nChanList = []
refFreqList = []
chanWidthList = []
basebandList = []
for rownode in rowlist:
# get the various row values:
rowSpwId = rownode.getElementsByTagName("spectralWindowId")
rowNChan = rownode.getElementsByTagName("numChan")
rowRefFreq = rownode.getElementsByTagName("refFreq")
# For EVLA
rowChanWidth = rownode.getElementsByTagName("chanWidth")
# For ALMA
rowChanWidthArr = rownode.getElementsByTagName("chanWidthArray")
rowBaseband = rownode.getElementsByTagName("basebandName")
# convert to values or strings and append to the relevant lists:
spwId = str(rowSpwId[0].childNodes[0].nodeValue)
spwIdList.append(spwId)
nChan = int(rowNChan[0].childNodes[0].nodeValue)
nChanList.append(nChan)
refFreq = float(rowRefFreq[0].childNodes[0].nodeValue)
refFreqList.append(refFreq)
if rowChanWidth:
chanWidth = float(rowChanWidth[0].childNodes[0].nodeValue)
chanWidthList.append(chanWidth)
if rowChanWidthArr:
tmpArr = str(rowChanWidthArr[0].childNodes[0].nodeValue).split(" ")
tmpWidth = []
for cw in range(2, len(tmpArr)):
thisWidth = float(tmpArr[cw])
tmpWidth.append(thisWidth)
chanWidthList.append(tmpWidth)
baseband = str(rowBaseband[0].childNodes[0].nodeValue)
basebandList.append(baseband)
# read Field.xml
xmlField = minidom.parse(sdm + "/Field.xml")
rowlist = xmlField.getElementsByTagName("row")
fieldList = []
fieldNameList = []
fieldCodeList = []
fieldRAList = []
fieldDecList = []
fieldSrcIDList = []
for rownode in rowlist:
rowField = rownode.getElementsByTagName("fieldId")
rowName = rownode.getElementsByTagName("fieldName")
rowCode = rownode.getElementsByTagName("code")
rowCoords = rownode.getElementsByTagName("referenceDir")
rowSrcId = rownode.getElementsByTagName("sourceId")
# convert to values or strings and append to relevent lists:
fieldList.append(
int(string.split(str(rowField[0].childNodes[0].nodeValue), "_")[1])
)
fieldNameList.append(str(rowName[0].childNodes[0].nodeValue))
fieldCodeList.append(str(rowCode[0].childNodes[0].nodeValue))
coordInfo = rowCoords[0].childNodes[0].nodeValue.split()
RADeg = float(coordInfo[3]) * (180.0 / np.pi)
DecDeg = float(coordInfo[4]) * (180.0 / np.pi)
RAInp = {"unit": "deg", "value": RADeg}
DecInp = {"unit": "deg", "value": DecDeg}
RAHMS = qa.formxxx(RAInp, format="hms")
DecDMS = qa.formxxx(DecInp, format="dms")
fieldRAList.append(RAHMS)
fieldDecList.append(DecDMS)
fieldSrcIDList.append(int(rowSrcId[0].childNodes[0].nodeValue))
# read Antenna.xml
xmlAnt = minidom.parse(sdm + "/Antenna.xml")
rowlist = xmlAnt.getElementsByTagName("row")
antList = []
antNameList = []
dishDiamList = []
stationList = []
for rownode in rowlist:
rowAnt = rownode.getElementsByTagName("antennaId")
rowAntName = rownode.getElementsByTagName("name")
rowDishDiam = rownode.getElementsByTagName("dishDiameter")
rowStation = rownode.getElementsByTagName("stationId")
# convert and append
antList.append(
int(string.split(str(rowAnt[0].childNodes[0].nodeValue), "_")[1])
)
antNameList.append(str(rowAntName[0].childNodes[0].nodeValue))
dishDiamList.append(float(rowDishDiam[0].childNodes[0].nodeValue))
stationList.append(str(rowStation[0].childNodes[0].nodeValue))
# read Station.xml
xmlStation = minidom.parse(sdm + "/Station.xml")
rowlist = xmlStation.getElementsByTagName("row")
statIdList = []
statNameList = []
statLatList = []
statLonList = []
for rownode in rowlist:
rowStatId = rownode.getElementsByTagName("stationId")
rowStatName = rownode.getElementsByTagName("name")
rowStatPos = rownode.getElementsByTagName("position")
# convert and append
statIdList.append(str(rowStatId[0].childNodes[0].nodeValue))
statNameList.append(str(rowStatName[0].childNodes[0].nodeValue))
posInfo = string.split(str(rowStatPos[0].childNodes[0].nodeValue))
x = qa.quantity([float(posInfo[2])], "m")
y = qa.quantity([float(posInfo[3])], "m")
z = qa.quantity([float(posInfo[4])], "m")
pos = me.position("ITRF", x, y, z)
qLon = pos["m0"]
qLat = pos["m1"]
statLatList.append(qa.formxxx(qLat, "dms", prec=0))
statLonList.append(qa.formxxx(qLon, "dms", prec=0))
# associate antennas with stations:
assocStatList = []
for station in stationList:
i = np.where(np.array(statIdList) == station)[0][0]
assocStatList.append(statNameList[i])
# read ExecBlock.xml
xmlExecBlock = minidom.parse(sdm + "/ExecBlock.xml")
rowlist = xmlExecBlock.getElementsByTagName("row")
sTime = (
float(rowlist[0].getElementsByTagName("startTime")[0].childNodes[0].nodeValue)
* 1.0e-9
)
eTime = (
float(rowlist[0].getElementsByTagName("endTime")[0].childNodes[0].nodeValue)
* 1.0e-9
)
# integration time in seconds, start and end times:
intTime = eTime - sTime
t = qa.quantity(sTime / 86400.0, "d")
obsStart = qa.time(t, form="ymd", prec=8)[0]
t = qa.quantity(eTime / 86400.0, "d")
obsEnd = qa.time(t, form="ymd", prec=8)[0]
# observer name and obs. info:
observerName = str(
rowlist[0].getElementsByTagName("observerName")[0].childNodes[0].nodeValue
)
configName = str(
rowlist[0].getElementsByTagName("configName")[0].childNodes[0].nodeValue
)
telescopeName = str(
rowlist[0].getElementsByTagName("telescopeName")[0].childNodes[0].nodeValue
)
numAntenna = int(
rowlist[0].getElementsByTagName("numAntenna")[0].childNodes[0].nodeValue
)
# make lists like the dataDescList for spectral windows & related info:
spwOrd = []
nChanOrd = []
rFreqOrd = []
cWidthOrd = []
bbandOrd = []
for i in range(0, len(configDescList)):
spwTempList = []
nChanTempList = []
rFreqTempList = []
cWidthTempList = []
bbandTempList = []
for dDesc in dataDescList[i]:
el = np.where(np.array(dataDescElList) == dDesc)[0][0]
spwIdN = spwIdDataDescList[el]
spwEl = np.where(np.array(spwIdList) == spwIdN)[0][0]
spwTempList.append(int(string.split(spwIdList[spwEl], "_")[1]))
nChanTempList.append(nChanList[spwEl])
rFreqTempList.append(refFreqList[spwEl])
cWidthTempList.append(chanWidthList[spwEl])
bbandTempList.append(basebandList[spwEl])
spwOrd.append(spwTempList)
nChanOrd.append(nChanTempList)
rFreqOrd.append(rFreqTempList)
cWidthOrd.append(cWidthTempList)
bbandOrd.append(bbandTempList)
# add this info to the scan dictionary:
for scanNum in scandict:
spwOrdList = []
nChanOrdList = []
rFreqOrdList = []
cWidthOrdList = []
bbandOrdList = []
# scanEl could have multiple elements if subscans are present,
# or for ALMA data:
scanEl = np.where(np.array(mainScanList) == scanNum)[0]
for thisEl in scanEl:
configEl = mainConfigList[thisEl]
listEl = np.where(np.array(configDescList) == configEl)[0][0]
spwOrdList.append(spwOrd[listEl])
nChanOrdList.append(nChanOrd[listEl])
rFreqOrdList.append(rFreqOrd[listEl])
cWidthOrdList.append(cWidthOrd[listEl])
bbandOrdList.append(bbandOrd[listEl])
try:
scandict[scanNum]["field"] = int(fieldIdList[scanEl[0]])
except:
scandict[scanNum]["field"] = -1 # PKGW
scandict[scanNum]["spws"] = spwOrdList
scandict[scanNum]["nchan"] = nChanOrdList
scandict[scanNum]["reffreq"] = rFreqOrdList
scandict[scanNum]["chanwidth"] = cWidthOrdList
scandict[scanNum]["baseband"] = bbandOrdList
# report information to the logger
printf(
"================================================================================"
)
printf(" SDM File: %s", sdm)
printf(
"================================================================================"
)
printf(" Observer: %s", observerName)
printf(" Facility: %s, %s-configuration", telescopeName, configName)
printf(" Observed from %s to %s(UTC)", obsStart, obsEnd)
printf(
" Total integration time = %.2f seconds(%.2f hours)",
intTime,
intTime / 3600,
)
if list_scans:
printf(" ")
printf("Scan listing:")
maxspwlen = 0
for scaninfo in scandict.values():
SPWs = []
for spw in scaninfo["spws"]:
SPWs += spw
scaninfo["spwstr"] = str(list(set(SPWs)))
maxspwlen = max(maxspwlen, len(scaninfo["spwstr"]))
fmt = " %-25s %-4s %-5s %-15s %-*s %s"
printf(
fmt,
"Timerange(UTC)",
"Scan",
"FldID",
"FieldName",
maxspwlen,
"SpwIDs",
"Intent(s)",
)
for i, (scanid, scaninfo) in enumerate(scandict.items()):
printf(
fmt,
startTimeShort[i] + " - " + endTimeShort[i],
scanid,
scaninfo["field"],
scaninfo["source"],
maxspwlen,
scaninfo["spwstr"],
scaninfo["intent"],
)
if list_spws:
printf(" ")
printf("Spectral window information:")
printf(" SpwID #Chans Ch0(MHz) ChWidth(kHz) TotBW(MHz) Baseband")
for i in range(0, len(spwIdList)):
printf(
" %s %s %s %s %s %s",
string.split(spwIdList[i], "_")[1].ljust(4),
str(nChanList[i]).ljust(4),
str(refFreqList[i] / 1e6).ljust(8),
str(np.array(chanWidthList[i]) / 1e3).ljust(8),
str(np.array(chanWidthList[i]) * nChanList[i] / 1e6).ljust(8),
basebandList[i].ljust(8),
)
if list_fields:
printf(" ")
printf("Field information:")
printf(" FldID Code Name RA Dec SrcID")
for i in range(0, len(fieldList)):
printf(
" %-6d %-6s %-15s %-13s %-15s %-5d",
fieldList[i],
fieldCodeList[i],
fieldNameList[i],
fieldRAList[i],
fieldDecList[i],
fieldSrcIDList[i],
)
if list_antennas:
printf(" ")
printf("Antennas(%i):" % len(antList))
printf(" ID Name Station Diam.(m) Lat. Long.")
for i in range(0, len(antList)):
printf(
" %s %s %s %s %s %s ",
str(antList[i]).ljust(5),
antNameList[i].ljust(6),
assocStatList[i].ljust(5),
str(dishDiamList[i]).ljust(5),
statLatList[i].ljust(12),
statLonList[i].ljust(12),
)
# return the scan dictionary
return scandict
def listsdm_cli(argv):
check_usage(listsdm_doc, argv, usageifnoargs=True)
if len(argv) != 2:
wrong_usage(listsdm_doc, "expect exactly one argument, the SDM path")
sdm = argv[1]
proc = None
out = sys.stdout
if sys.stdout.isatty() or (
hasattr(sys.stdout, "stream") and sys.stdout.stream.isatty()
):
# Send our output to a pager!
import subprocess
pager = os.environ.get("PAGER") or "less -SRFX"
try:
proc = subprocess.Popen(
pager, stdin=subprocess.PIPE, close_fds=True, shell=True
)
except Exception as e:
warn("couldn't start pager %r: %s", pager, e)
else:
out = proc.stdin
listsdm(sdm, file=out)
if proc is not None:
proc.stdin.close()
proc.wait() # ignore return code
# mfsclean
#
# This isn't a CASA task, but we're pulling out a subset of the functionality
# of clean, which has a bajillion options and has a really gross implementation
# in the library.
mfsclean_doc = (
"""
casatask mfsclean vis=[] [keywords]
Drive the CASA imager with a very restricted set of options.
For W-projection, set ftmachine='wproject' and wprojplanes=64(or so).
vis=
Input visibility MS
imbase=
Base name of output files. We create files named "imbaseEXT"
where EXT is all of "mask", "modelTT", "imageTT", "residualTT",
and "psfTT", and TT is empty if nterms = 1, and "ttN." otherwise.
cell = 1 [arcsec]
ftmachine = 'ft' or 'wproject'
gain = 0.1
imsize = 256,256
mask = (blank) or path of CASA-format region text file
niter = 500
nterms = 1
phasecenter = (blank) or 'J2000 12h34m56.7 -12d34m56.7'
reffreq = 0 [GHz]
stokes = I
threshold = 0 [mJy]
weighting = 'briggs'(robust=0.5) or 'natural'
wprojplanes = 1
"""
+ stdsel_doc
+ loglevel_doc.replace("warn;", "info;")
)
[docs]
class MfscleanConfig(ParseKeywords):
__doc__ = makecfgdoc("mfsclean", mfsclean_doc)
vis = Custom(str, required=True)
imbase = Custom(str, required=True)
cell = 1.0 # arcsec
ftmachine = "ft"
gain = 0.1
imsize = [256, 256]
mask = str
minpb = 0.2
niter = 500
nterms = 1
phasecenter = str
reffreq = 0.0 # GHz; 0 -> be sensible
stokes = "I"
threshold = 0.0 # mJy
weighting = "briggs"
wprojplanes = 1
# allowchunk = False
# cyclefactor = 1.5
# cyclespeedup = -1
# imagermode = csclean
# interactive = False
# gridmode = '' -- related to ftmachine keyword
# mode = mfs
# modelimage = []
# multiscale = []
# nchan = -1
# npixels = 0
# pbcor = False
# psfmode = clark
# restoringbeam = []
# robust = 0.5
# smallscalebias = 0.6
# usescratch = False
# uvtaper = False
# veltype = radio
# width = 1
antenna = str
field = str
observation = str
scan = str
spw = str
timerange = str
uvrange = str
taql = str
loglevel = "info"
specframenames = "REST LSRK LSRD BARY GEO TOPO GALACTO LGROUP CMB".split()
[docs]
def mfsclean(cfg):
ms = util.tools.ms()
im = util.tools.imager()
tb = util.tools.table()
qa = util.tools.quanta()
ia = util.tools.image()
# Filenames. TODO: make sure nothing exists
mask = cfg.imbase + "mask"
pb = cfg.imbase + "flux"
if cfg.nterms == 1:
models = [cfg.imbase + "model"]
restoreds = [cfg.imbase + "image"]
resids = [cfg.imbase + "residual"]
psfs = [cfg.imbase + "psf"]
else:
# Note: the names for the 'alpha' and 'alpha.error' images are
# generated automatically inside the C++ stuff by looking for image
# names ending in 'tt0', so we're limited in our naming flexibility
# here.
models, restoreds, resids, psfs = [], [], [], []
for i in range(cfg.nterms):
models.append(cfg.imbase + "model.tt%d" % i)
restoreds.append(cfg.imbase + "image.tt%d" % i)
resids.append(cfg.imbase + "residual.tt%d" % i)
psfs.append(cfg.imbase + "psf.tt%d" % i)
# Get info on our selected data for various things we need to figure
# out later.
selkws = extractmsselect(
cfg, havearray=False, haveintent=False, taqltomsselect=False
)
ms.open(cfg.vis)
ms.msselect(selkws)
rangeinfo = ms.range("data_desc_id field_id".split())
ddids = rangeinfo["data_desc_id"]
fields = rangeinfo["field_id"]
# Get the spectral frame from the first spw of the selected data
tb.open(os.path.join(cfg.vis, "DATA_DESCRIPTION"))
ddspws = tb.getcol("SPECTRAL_WINDOW_ID")
tb.close()
spw0 = ddspws[ddids[0]]
tb.open(os.path.join(cfg.vis, "SPECTRAL_WINDOW"))
specframe = specframenames[tb.getcell("MEAS_FREQ_REF", spw0)]
tb.close()
# Choose phase center
if cfg.phasecenter is None:
phasecenter = int(fields[0])
else:
phasecenter = cfg.phasecenter
if ":" in phasecenter:
# phasecenter='J2000 19:06:48.568 40:11:06.68'
# parses to "07:06:48.57 297.13.19.80"
# Using hm/dm instead of ::/:: as separators makes it parse
# correctly. No idea how the colons manage to parse both
# without warning and totally wrongly.
warn("you moron, who uses colons in sexagesimal?")
tmp1 = phasecenter.split()
twiddled = False
if len(tmp1) == 3:
tmp2 = tmp1[1].split(":")
tmp3 = tmp1[2].split(":")
if len(tmp2) == 3 and len(tmp3) == 3:
tmp2 = tmp2[0] + "h" + tmp2[1] + "m" + tmp2[2]
tmp3 = tmp3[0] + "d" + tmp3[1] + "m" + tmp3[2]
phasecenter = " ".join([tmp1[0], tmp2, tmp3])
twiddled = True
if twiddled:
warn('attempted to fix it up: "%s"\n\n', phasecenter)
else:
warn("couldn't parse, left as-is; good luck\n\n")
# Set up all of this junk
im.open(cfg.vis, usescratch=False)
im.selectvis(
nchan=-1, start=0, step=1, usescratch=False, writeaccess=False, **selkws
)
im.defineimage(
nx=cfg.imsize[0],
ny=cfg.imsize[1],
cellx=qa.quantity(cfg.cell, "arcsec"),
celly=qa.quantity(cfg.cell, "arcsec"),
outframe=specframe,
phasecenter=phasecenter,
stokes=cfg.stokes,
spw=-1, # to verify: selectvis(spw=) good enough?
restfreq="",
mode="mfs",
veltype="radio",
nchan=-1,
start=0,
step=1,
facets=1,
)
if cfg.weighting == "briggs":
im.weight(type="briggs", rmode="norm", robust=0.5, npixels=0) # noise=, mosaic=
elif cfg.weighting == "natural":
im.weight(type="natural", rmode="none")
else:
raise ValueError('unknown weighting type "%s"' % cfg.weighting)
# im.filter(...)
im.setscales(scalemethod="uservector", uservector=[0])
im.setsmallscalebias(0.6)
im.setmfcontrol()
im.setvp(dovp=True)
im.makeimage(type="pb", image=pb, compleximage="", verbose=False)
im.setvp(dovp=False, verbose=False)
im.setoptions(
ftmachine=cfg.ftmachine,
wprojplanes=cfg.wprojplanes,
freqinterp="linear",
padding=1.2,
pastep=360.0,
pblimit=cfg.minpb,
applypointingoffsets=False,
dopbgriddingcorrections=True,
)
if cfg.nterms > 1:
im.settaylorterms(ntaylorterms=cfg.nterms, reffreq=cfg.reffreq * 1e9)
im.setmfcontrol(
stoplargenegatives=-1, cyclefactor=1.5, cyclespeedup=-1, minpb=cfg.minpb
)
# Create the mask
if cfg.mask is None:
maskstr = ""
else:
maskstr = mask
im.make(mask)
ia.open(mask)
maskcs = ia.coordsys()
maskcs.setreferencecode(specframe, "spectral", True)
ia.setcoordsys(maskcs.torecord())
if cfg.mask is not None:
rg = util.tools.regionmanager()
regions = rg.fromtextfile(
filename=cfg.mask, shape=ia.shape(), csys=maskcs.torecord()
)
im.regiontoimagemask(mask=mask, region=regions)
ia.close()
# Create blank model(s). Diverging from task_clean even more
# significantly than usual here.
for model in models:
im.make(model)
# Go!
im.clean(
algorithm="msmfs",
niter=cfg.niter,
gain=cfg.gain,
threshold=qa.quantity(cfg.threshold, "mJy"),
model=models,
residual=resids,
image=restoreds,
psfimage=psfs,
mask=maskstr,
interactive=False,
)
im.close()
mfsclean_cli = makekwcli(mfsclean_doc, MfscleanConfig, mfsclean)
# mjd2date
mjd2date_doc = """
casatask mjd2date <date>
Convert an MJD to a date in the format used by CASA.
"""
[docs]
def mjd2date(mjd, precision=3):
"""Convert an MJD to a data string in the format used by CASA.
mjd (numeric)
An MJD value in the UTC timescale.
precision (integer, default 3)
The number of digits of decimal precision in the seconds portion of
the returned string
Returns
A string representing the input argument in CASA format:
``YYYY/MM/DD/HH:MM:SS.SSS``.
Example::
from pwkit.environment.casa import tasks
print(tasks.mjd2date(55555))
# yields '2010/12/25/00:00:00.000'
"""
from astropy.time import Time
dt = Time(mjd, format="mjd", scale="utc").to_datetime()
fracsec = ("%.*f" % (precision, 1e-6 * dt.microsecond)).split(".")[1]
return "%04d/%02d/%02d/%02d:%02d:%02d.%s" % (
dt.year,
dt.month,
dt.day,
dt.hour,
dt.minute,
dt.second,
fracsec,
)
def mjd2date_cli(argv):
check_usage(mjd2date_doc, argv, usageifnoargs=True)
if len(argv) != 2:
wrong_usage(mjd2date_doc, "expect exactly one argument")
print(mjd2date(float(argv[1])))
# mstransform
mstransform_doc = (
"""
casatask mstransform vis=[] [keywords]
vis=
Input visibility MS
out=
Output visibility MS
datacolumn=corrected
The data column on which to operate. Comma-separated list of: ``data``,
``model``, ``corrected``, ``float_data``, ``lag_data``, ``all``
realmodelcol=False
If true, turn a virtual model column into a real one.
keepflags=True
If false, discard completely-flagged rows.
usewtspectrum=False
If true, fill in a WEIGHT_SPECTRUM column in the output data set.
combinespws=False
If true, combine spectral windows
chanaverage=False
If true, average the data in frequency. NOT WIRED UP.
hanning=False
If true, Hanning smooth the data spectrally to remove Gibbs ringing.
regridms=False
If true, put the data on a new spectral window structure or reference frame.
timebin=<seconds>
If specified, time-average the visibilities with the specified binning.
timespan=<undefined>
Allow averaging to span over potential discontinuities in the data set.
Comma-separated list of options; allowed values are: ``scan``, ``state``
"""
+ stdsel_doc
+ loglevel_doc
)
# not implemented: updating FLAG_CMD table
# not implemented: updating history
mstransform_cli = makekwcli(mstransform_doc, MstransformConfig, mstransform)
# plotants
plotants_doc = """
casatask plotants <MS> <figfile>
Plot the physical layout of the antennas described in the MS.
"""
[docs]
def plotants(vis, figfile):
"""Plot the physical layout of the antennas described in the MS.
vis (str)
Path to the input dataset
figfile (str)
Path to the output image file.
The output image format will be inferred from the extension of *figfile*.
Example::
from pwkit.environments.casa import tasks
tasks.plotants('dataset.ms', 'antennas.png')
"""
try:
import casatasks
except ImportError:
# CASA 5: need to run plotants as a script
from .scripting import CasapyScript
script = os.path.join(os.path.dirname(__file__), "cscript_plotants.py")
with CasapyScript(script, vis=vis, figfile=figfile) as cs:
pass
else:
# CASA 6+: can run plotants directly
casatasks.plotants(vis=vis, figfile=figfile)
def plotants_cli(argv):
check_usage(plotants_doc, argv, usageifnoargs=True)
if len(argv) != 3:
wrong_usage(plotants_doc, "expect exactly two arguments")
plotants(argv[1], argv[2])
# plotcal
plotcal_doc = (
"""
casatask plotcal caltable=<MS> [keywords]
Plot values from a calibration dataset in any of a variety of ways.
caltable=
The calibration MS to plot
xaxis=
amp antenna chan freq imag phase real snr time
yaxis=
amp antenna imag phase real snr
iteration=
antenna field spw time
**Supported data selection keywords**
Limited data selection is supported. Allowed keywords are ``antenna``,
``field``, ``poln``, ``spw``, and ``timerange``. The ``poln`` keyword may take
on the values ``RL``, ``R``, ``L``, ``XY``, ``X``, ``Y``, and ``/``.
**Plot appearance options**
To be documented. These keywords control the plot appearance: ``plotsymbol``,
``plotcolor``, ``fontsize``, ``figfile``.
"""
+ loglevel_doc
)
[docs]
class PlotcalConfig(ParseKeywords):
__doc__ = makecfgdoc("plotcal", plotcal_doc)
caltable = Custom(str, required=True)
xaxis = "time"
yaxis = "amp"
iteration = ""
# not implemented: subplot, overplot, clearpanel, plotrange,
# showflags, showgui
plotsymbol = "."
plotcolor = "blue"
markersize = 5.0
fontsize = 10.0
figfile = str
antenna = ""
field = ""
poln = "RL"
spw = ""
timerange = ""
loglevel = "warn"
[docs]
def plotcal(cfg):
# casa-tools plotting relies on invoking matplotlib in an internal Python
# interpreter(!), and it uses a very old version of matplotlib that's
# essentially incompatible with what's available in any up-to-date Python
# environment(e.g. Anaconda). Therefore we have to launch any plotcal
# operations as casapy scripts to ensure compatibility.
from .scripting import CasapyScript
script = os.path.join(os.path.dirname(__file__), "cscript_plotcal.py")
selectcals = dict(
antenna=cfg.antenna,
field=cfg.field,
poln=cfg.poln.upper(),
spw=cfg.spw,
time=cfg.timerange,
)
plotoptions = dict(
iteration=cfg.iteration,
plotrange=[0.0] * 4,
plotsymbol=cfg.plotsymbol,
plotcolor=cfg.plotcolor,
markersize=cfg.markersize,
fontsize=cfg.fontsize,
)
with CasapyScript(
script,
caltable=cfg.caltable,
selectcals=selectcals,
plotoptions=plotoptions,
xaxis=cfg.xaxis.upper(),
yaxis=cfg.yaxis.upper(),
figfile=cfg.figfile,
) as cs:
pass
plotcal_cli = makekwcli(plotcal_doc, PlotcalConfig, plotcal)
# polmodel
#
# Shim for a separate module
def polmodel_cli(argv):
from .polmodel import polmodel_cli
polmodel_cli(argv)
# setjy
setjy_doc = (
"""
casatask setjy vis= [keywords]
Insert model data into a measurement set. We force usescratch=False
and scalebychan=True. You probably want to specify "field".
fluxdensity=
Up to four comma-separated numbers giving Stokes IQUV intensities in
Jy. Default values are [-1, 0, 0, 0]. If the Stokes I intensity is
negative (i.e., the default), a "sensible default" will be used:
detailed spectral models if the source is known (see "standard"), or
1 otherwise. If it is zero and "modimage" is used, the flux density
of the model image is used. The built-in standards do NOT have
polarimetric information, so for pol cal you do need to manually
specify the flux density information -- or see the program
"casatask polmodel".
modimage=
An image to use as the basis for the source's spatial structure and,
potentialy, flux density (if fluxdensity=0). Only usable for Stokes
I. If the verbatim value of "modimage" can't be opened as a path,
it is assumed to be relative to the CASA data directory; a typical
value might be "nrao/VLA/CalModels/3C286_C.im".
spindex=
If using ``fluxdensity``, these specify the spectral dependence of the values,
such that ``S = fluxdensity * (freq/reffreq)**spindex``. Reffreq is in GHz.
Default values are 0 and 1, giving no spectral dependence.
reffreq=
See ``spindex``.
standard='Perley-Butler 2013'
Acceptable values are: Baars, Perley 90, Perley-Taylor 95,
Perley-Taylor 99, Perley-Butler 2010, Perley-Butler 2013. You can
specify the solar-system standard "Butler-JPL-Horizons 2012", but
doing so farms out the work to a stock CASA installation.
**Supported data selection keywords**
Only a subset of the standard data selection keywords are supported:
``field``, ``observation``, ``scan``, ``spw``, ``timerange``..
"""
+ loglevel_doc
)
[docs]
class SetjyConfig(ParseKeywords):
__doc__ = makecfgdoc("setjy", setjy_doc)
vis = Custom(str, required=True)
modimage = str
fluxdensity = [-1.0, 0.0, 0.0, 0.0]
spindex = 0.0
reffreq = 1.0 # GHz
standard = "Perley-Butler 2013"
field = str
observation = str
scan = str
spw = str
timerange = str
loglevel = "warn"
[docs]
def setjy(cfg):
if cfg.standard == "Butler-JPL-Horizons 2012":
# The CASA C++ code has stuff that fakes you into thinking that the
# solar system flux density cal implementation is all in C++, but
# actually the current implementation is all in Python in the core
# CASA distribution. It'd be a real pain to duplicate so we farm it
# out to a CASA distribution.
from .scripting import CasapyScript
script = os.path.join(os.path.dirname(__file__), "cscript_setjy.py")
args = dict(
vis=cfg.vis,
standard=cfg.standard,
field=cfg.field,
observation=cfg.observation,
scan=cfg.scan,
spw=cfg.spw,
timerange=cfg.timerange,
scalebychan=True, # see below
)
print("Farming out to CASA ...")
with CasapyScript(script, **args) as cs:
with open(os.path.join(cs.workdir, "casa_stderr"), "r") as f:
stderr = f.read()
print(stderr)
return
kws = {}
for kw in "field fluxdensity observation scan spw standard".split():
kws[kw] = getattr(cfg, kw) or ""
kws["time"] = cfg.timerange or ""
kws["reffreq"] = str(cfg.reffreq) + "GHz"
kws["spix"] = cfg.spindex
kws["scalebychan"] = True # don't think you'd ever want false??
if cfg.modimage is None:
kws["modimage"] = ""
else:
if os.path.isdir(cfg.modimage):
mi = cfg.modimage
else:
mi = util.datadir(cfg.modimage)
if not os.path.isdir(mi):
raise RuntimeError('no model image "%s" or "%s"' % (cfg.modimage, mi))
kws["modimage"] = mi
im = util.tools.imager()
im.open(cfg.vis, usescratch=False) # don't think you'll ever want True?
im.setjy(**kws)
im.close()
setjy_cli = makekwcli(setjy_doc, SetjyConfig, setjy)
# split
#
# note: spw=999 -> exception; scan=999 -> no output, or error, generated
split_doc = (
"""
casatask split vis=<MS> out=<MS> [keywords...]
timebin=
Time-average data into bins of "timebin" seconds; defaults to no averaging
step=
Frequency-average data in bins of "step" channels; defaults to no averaging
col=all
Extract the column "col" as the DATA column. If "all", copy all available
columns without renaming. Possible values: ``all``, ``DATA``, ``MODEL_DATA``,
``CORRECTED_DATA``, ``FLOAT_DATA``, ``LAG_DATA``.
combine=[col1,col2,...]
When time-averaging, don't start a new bin when the specified columns change.
Acceptable column names: ``scan``, ``state``.
"""
+ stdsel_doc
+ loglevel_doc
)
[docs]
class SplitConfig(ParseKeywords):
__doc__ = makecfgdoc("split", split_doc)
vis = Custom(str, required=True)
out = Custom(str, required=True)
timebin = float # seconds
step = 1
col = "all"
combine = [str]
antenna = str
array = str
correlation = str
field = str
intent = str
observation = str # renamed from obs for consistency
scan = str
spw = str
taql = str
timerange = str
uvrange = str
loglevel = "warn"
[docs]
def split(cfg):
import tempfile, shutil
kws = extractmsselect(
cfg, havearray=True, havecorr=True, observationtoobs=True, taqltomsselect=False
)
kws["whichcol"] = cfg.col
kws["combine"] = ",".join(cfg.combine)
kws["step"] = [cfg.step] # can be done on per-spw basis; we skip that
if cfg.timebin is None:
kws["timebin"] = "-1s"
else:
kws["timebin"] = str(cfg.timebin) + "s"
ms = util.tools.ms()
ms.open(cfg.vis)
# split() will merrily overwrite an existing MS, which I think is
# very bad behavior. We try to prevent this in two steps: 1) claim
# the desired output name in a way that will error out if it
# already exists; 2) tell split() to create its outputs in an
# empty temporary directory, to minimize the chances of blowing
# away anything preexisting. In the pathological case, there's a
# chance for someone with our UID to move something into the
# temporary directory with our target name and have us delete
# it. There's nothing we can do about that so long as split() is
# happy to overwrite existing data.
#
# It's also possible for someone with our UID to spoil our rename
# by changing the permissions on our placeholder output directory
# and stuffing something in it, but this failure mode doesn't
# involve data loss.
#
# We put the temporary working directory adjacent to the destination
# to make sure it's on the same device.
didntmakeit = True
renamed = False
workdir = None
try:
didntmakeit = os.mkdir(cfg.out, 0) # error raised if already exists.
try:
workdir = tempfile.mkdtemp(
dir=os.path.dirname(cfg.out), prefix=os.path.basename(cfg.out) + "_"
)
kws["outputms"] = os.path.join(workdir, os.path.basename(cfg.out))
ms.split(**kws)
os.rename(kws["outputms"], cfg.out)
renamed = True
finally:
if workdir is not None:
shutil.rmtree(workdir, ignore_errors=True)
finally:
if not didntmakeit and not renamed:
try:
os.rmdir(cfg.out)
except:
pass
ms.close()
split_cli = makekwcli(split_doc, SplitConfig, split)
# spwglue
#
# Shim for a separate module
def spwglue_cli(argv):
from .spwglue import spwglue_cli
spwglue_cli(argv)
# tsysplot
#
# See bpplot() -- CASA plotcal can do this in a certain sense, but it's slow
# and ugly.
tsysplot_doc = (
"""
casatask tsysplot caltable= dest=
Plot a system temperature(Tsys) calibration table.
caltable=MS
The input calibration Measurement Set
dest=PATH
If specified, plots are saved to this file -- the format is inferred
from the extension, which must allow multiple pages to be saved. If
unspecified, the plots are displayed using a Gtk3 backend.
dims=WIDTH,HEIGHT
If saving to a file, the dimensions of a each page. These are in points
for vector formats(PDF, PS) and pixels for bitmaps(PNG). Defaults to
1000, 600.
margins=TOP,RIGHT,LEFT,BOTTOM
If saving to a file, the plot margins in the same units as the dims.
The default is 4 on every side.
"""
+ loglevel_doc
)
[docs]
class TsysplotConfig(ParseKeywords):
__doc__ = makecfgdoc("tsysplot", tsysplot_doc)
caltable = Custom(str, required=True)
dest = str
dims = [1000, 600]
margins = [4, 4, 4, 4]
loglevel = "warn"
[docs]
def tsysplot(cfg):
import omega as om, omega.render
from ... import numutil
if isinstance(cfg.dest, omega.render.Pager):
# This is for non-CLI invocation.
pager = cfg.dest
elif cfg.dest is None:
import omega.gtk3
pager = om.makeDisplayPager()
else:
pager = om.makePager(
cfg.dest,
dims=cfg.dims,
margins=cfg.margins,
style=om.styles.ColorOnWhiteVector(),
)
tb = util.tools.table()
tb.open(cfg.caltable, nomodify=True)
fields = tb.getcol("FIELD_ID")
spws = tb.getcol("SPECTRAL_WINDOW_ID")
ants = tb.getcol("ANTENNA1")
vals = tb.getcol("FPARAM")
flags = tb.getcol("FLAG")
times = tb.getcol("TIME")
tb.close()
tb.open(os.path.join(cfg.caltable, "ANTENNA"), nomodify=True)
antnames = tb.getcol("NAME")
tb.close()
tb.open(os.path.join(cfg.caltable, "FIELD"), nomodify=True)
fieldnames = tb.getcol("NAME")
tb.close()
npol, nchan, nsoln = vals.shape
# see what we've got
def seen_values(data):
return [idx for idx, count in enumerate(np.bincount(data)) if count]
any_ok = ~(np.all(flags, axis=(0, 1)))
seenfields = seen_values(fields[any_ok])
field_offsets = dict((fieldid, idx) for idx, fieldid in enumerate(seenfields))
seenants = seen_values(ants[any_ok])
ant_offsets = dict((antid, idx) for idx, antid in enumerate(seenants))
apbyfield = {}
seenspws = set()
for ifield in seenfields:
antpols = apbyfield[ifield] = {}
for ipol in range(npol):
for isoln in np.where(fields == ifield)[0]:
if not flags[ipol, :, isoln].all():
k = (ants[isoln], ipol)
byspw = antpols.get(k)
if byspw is None:
antpols[k] = byspw = []
byspw.append((spws[isoln], isoln))
seenspws.add(spws[isoln])
seenspws = sorted(seenspws)
spw_to_offset = dict(
(spwid, spwofs * nchan) for spwofs, spwid in enumerate(seenspws)
)
# find plot limits
min_time = times[any_ok].min()
max_time = times[any_ok].max()
mjdref = int(np.floor(min_time))
times -= mjdref # convert to delta-MJD
min_time = min_time - mjdref
max_time = max_time - mjdref
span = max_time - min_time
if span <= 0:
if max_time == 0:
span = 1.0
else:
span = 0.05 * max_time
max_time += 0.05 * span
min_time -= 0.05 * span
okvals = vals[np.where(~flags)]
max_val = okvals.max()
min_val = okvals.min()
span = max_val - min_val
if span <= 0:
if max_val == 0:
span = 1.0
else:
span = 0.05 * max_val
max_val += 0.05 * span
min_val -= 0.05 * span
polnames = "XY" # XXX: identification doesn't seem to be stored in cal table
# plot away
for iant, ipol in sorted(antpols.keys()):
p = om.RectPlot()
p.addKeyItem("%s %s" % (antnames[iant], polnames[ipol]))
for ifield in seenfields:
antpols = apbyfield[ifield]
kt = fieldnames[ifield]
for ispw, isoln in antpols.get((iant, ipol), []):
f = flags[ipol, :, isoln]
v = vals[ipol, :, isoln]
w = np.where(~f)[0]
for s in numutil.slice_around_gaps(w, 1):
wsub = w[s]
if wsub.size == 0:
continue # Should never happen, but eh.
else:
# It'd also be pretty weird to have a spectral window
# containing just one(valid) channel, but it could
# happen.
lines = wsub.size > 1
p.addXY(
wsub + spw_to_offset[ispw], v[wsub], kt, lines=lines, dsn=ifield
)
kt = None
p.setBounds(xmin=0, xmax=len(seenspws) * nchan, ymin=min_val, ymax=max_val)
p.setLabels("Normalized channel", "System temperature(K)")
pager.send(p)
tsysplot_cli = makekwcli(tsysplot_doc, TsysplotConfig, tsysplot)
# uvsub
#
# We add UV selection keywords not supported by the CASA task.
# I assume that they're honored ...
uvsub_doc = (
"""
casatask uvsub vis= [keywords]
Subtract MODEL_DATA from CORRECTED_DATA. If CORRECTED_DATA doesn't already
exist, it defaults to DATA, so that the effect is to set CORRECTED = DATA - MODEL.
vis=
The input data set.
reverse=
Boolean, default false, indicating whether we should add rather than subtract
MODEL.
"""
+ stdsel_doc
+ loglevel_doc
)
[docs]
class UvsubConfig(ParseKeywords):
__doc__ = makecfgdoc("uvsub", uvsub_doc)
vis = Custom(str, required=True)
reverse = False
antenna = str
array = str
field = str
intent = str
observation = str
scan = str
spw = str
timerange = str
uvrange = str
taql = str
loglevel = "warn"
[docs]
def uvsub(cfg):
ms = util.tools.ms()
ms.open(cfg.vis, nomodify=False)
ms.msselect(
extractmsselect(
cfg, havearray=True, intenttoscanintent=True, taqltomsselect=False
)
)
ms.uvsub(reverse=cfg.reverse)
ms.close()
uvsub_cli = makekwcli(uvsub_doc, UvsubConfig, uvsub)
# xyphplot
#
# This is nearly the same as bpplot.
xyphplot_doc = (
"""
casatask xyphplot caltable= dest=
Plot a frequency-dependent X/Y phase calibration table.
caltable=MS
The input calibration Measurement Set
dest=PATH
If specified, plots are saved to this file -- the format is inferred
from the extension, which must allow multiple pages to be saved. If
unspecified, the plots are displayed using a Gtk3 backend.
dims=WIDTH,HEIGHT
If saving to a file, the dimensions of a each page. These are in points
for vector formats(PDF, PS) and pixels for bitmaps(PNG). Defaults to
1000, 600.
margins=TOP,RIGHT,LEFT,BOTTOM
If saving to a file, the plot margins in the same units as the dims.
The default is 4 on every side.
"""
+ loglevel_doc
)
[docs]
class XyphplotConfig(ParseKeywords):
__doc__ = makecfgdoc("xyphplot", xyphplot_doc)
caltable = Custom(str, required=True)
dest = str
dims = [1000, 600]
margins = [4, 4, 4, 4]
loglevel = "warn"
[docs]
def xyphplot(cfg):
import omega as om, omega.render
from ... import numutil
if isinstance(cfg.dest, omega.render.Pager):
# This is for non-CLI invocation.
pager = cfg.dest
elif cfg.dest is None:
import omega.gtk3
pager = om.makeDisplayPager()
else:
pager = om.makePager(
cfg.dest,
dims=cfg.dims,
margins=cfg.margins,
style=om.styles.ColorOnWhiteVector(),
)
tb = util.tools.table()
# Every antenna has the same solution, and only the first of two
# polarizations is not just unity. And the solution is phase only. So this
# is prett simple to plot!
tb.open(cfg.caltable, nomodify=True)
spws = tb.getcol("SPECTRAL_WINDOW_ID")
vals = tb.getcol("CPARAM")
flags = tb.getcol("FLAG")
tb.close()
npol, nchan, nsoln = vals.shape
seenspws = np.unique(spws) # assume all available
spw_to_offset = dict(
(spwid, spwofs * nchan) for spwofs, spwid in enumerate(seenspws)
)
# find plot limits
okvals = vals[np.where(~flags)]
max_ph = np.angle(okvals, deg=True).max()
min_ph = np.angle(okvals, deg=True).min()
span = max_ph - min_ph
max_ph += 0.05 * span
min_ph -= 0.05 * span
if max_ph > 160:
max_ph = 180
if min_ph < -160:
min_ph = -180
polnames = "XY" # XXX: identification doesn't seem to be stored in cal table
# plot away
p = om.RectPlot()
for ispw in seenspws:
ipol = 0
isoln = np.where((spws == ispw) & ~np.all(flags, axis=(0, 1)))[0][0]
f = flags[ipol, :, isoln]
ph = np.angle(vals[ipol, :, isoln], deg=True)
w = np.where(~f)[0]
for s in numutil.slice_around_gaps(w, 1):
wsub = w[s]
if wsub.size == 0:
continue # Should never happen, but eh.
else:
# It'd also be pretty weird to have a spectral window
# containing just one(valid) channel, but it could
# happen.
lines = wsub.size > 1
p.addXY(wsub + spw_to_offset[ispw], ph[wsub], None, lines=lines, dsn=ispw)
p.setBounds(xmin=0, xmax=len(seenspws) * nchan, ymin=min_ph, ymax=max_ph)
p.setLabels("Normalized channel", "Phase(deg)")
pager.send(p)
xyphplot_cli = makekwcli(xyphplot_doc, XyphplotConfig, xyphplot)
# Driver for command-line access. I wrote this before multitool, and it
# doesn't seem particularly valuable to convert them to Multitool since the
# current system works fine.
def cmdline_usage(stream, exitcode):
print("usage: casatask <task> [task-specific arguments]", file=stream)
print(file=stream)
print("Supported tasks:", file=stream)
print(file=stream)
for name in sorted(globals().keys()):
if name.endswith("_cli"):
print(name[:-4], file=stream)
raise SystemExit(exitcode)
def commandline(argv=None):
if argv is None:
argv = sys.argv
from ... import cli
cli.propagate_sigint()
cli.backtrace_on_usr1()
cli.unicode_stdio()
if len(argv) < 2 or argv[1] == "--help":
cmdline_usage(sys.stdout, 0)
driver = globals().get(argv[1] + "_cli")
if driver is None:
die('unknown task "%s"; run with no arguments for a list', argv[1])
subargv = [" ".join(argv[:2])] + argv[2:]
driver(subargv)