Source code for pwkit.astimage

# -*- mode: python; coding: utf-8 -*-
# Copyright 2012-2014 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.

"""pwkit.astimage -- generic loading of (radio) astronomical images

Use `open (path, mode)` to open an astronomical image, regardless of its file
format.

The emphasis of this module is on getting 90%-good-enough semantics and a
really, genuinely, uniform interface. This can be tough to achieve.

"""
from __future__ import absolute_import, division, print_function

# NOTE: intentionally omitting unicode_literals: problematic for CASA

# Developer notes:
"""Note that CASA allegedly supports HDF5, FITS, and MIRIAD format images too.
Frankly, I don't trust it, I don't like its API, and I don't want to rely on
some variant of casacore being installed.

TODO: axis types (ugh standardizing these would be a bear)
      Some kind of way to get generic formatting of RA/Dec, glat/glon,
      etc would be nice.

TODO: image units (ie, "set units to Jy/px"; standardization also a pain)

"""
__all__ = """
UnsupportedError
AstroImage
MIRIADImage
PyrapImage
FITSImage
SimpleImage
open""".split()

import numpy as np
from numpy import pi

from . import PKError
from .astutil import D2R, R2D


[docs] class UnsupportedError(PKError): pass
def _load_fits_module(): try: from astropy.io import fits return fits except ImportError: pass try: import pyfits import warnings # YARRRRGGHH. Some versionfs of Pyfits override functions in the # warnings module for no good reason. (The motivation seems to be # compat with Python <2.6.) Part of what it does is to make warnings # be printed to stdout, rather than stderr, which breaks things if # you're printing specially-formatted output to stdout and a random # warning comes up. Hypothetically. At least the original routines are # backed up. if hasattr(pyfits.core, "_formatwarning"): warnings.formatwarning = pyfits.core._formatwarning if hasattr(pyfits.core, "_showwarning"): warnings.showwarning = pyfits.core._showwarning return pyfits except ImportError: pass raise UnsupportedError( "cannot open FITS images without either the " '"astropy.io.fits" or "pyfits" modules' ) def _load_wcs_module(): try: from astropy import wcs return wcs except ImportError: pass try: import pywcs return pywcs except ImportError: pass raise UnsupportedError( "cannot open this image without either the " '"astropy.wcs" or "pywcs" modules' ) def _create_wcs(fitsheader): """For compatibility between astropy and pywcs.""" wcsmodule = _load_wcs_module() is_pywcs = hasattr(wcsmodule, "UnitConverter") wcs = wcsmodule.WCS(fitsheader) wcs.wcs.set() wcs.wcs.fix() # I'm interested in MJD computation via datfix() if hasattr(wcs, "wcs_pix2sky"): wcs.wcs_pix2world = wcs.wcs_pix2sky wcs.wcs_world2pix = wcs.wcs_sky2pix return wcs
[docs] class AstroImage(object): """An astronomical image. path The filesystem path of the image. mode Its access mode: 'r' for read, 'rw' for read/write. shape The data shape, like numpy.ndarray.shape. bmaj If not None, the restoring beam FWHM major axis in radians. bmin If not None, the restoring beam FWHM minor axis in radians. bpa If not None, the restoring beam position angle (east from celestial north) in radians. units Lower-case string describing image units (e.g., jy/beam, jy/pixel). Not standardized between formats. pclat Latitude (usually dec) of the pointing center in radians. pclon Longitude (usually RA) of the pointing center in radians. charfreq Characteristic observing frequency of the image in GHz. mjd Mean MJD of the observations. axdescs If not None, list of strings describing the axis types. Not standardized. size The number of pixels in the image (=shape.prod ()). Methods: close Close the image. read Read all of the data. write Rewrite all of the data. toworld Convert pixel coordinates to world coordinates. topixel Convert world coordinates to pixel coordinates. simple Convert to a 2D lat/lon image. subimage Extract a sub-cube of the image. save_copy Save a copy of the image. save_as_fits Save a copy of the image in FITS format. delete Delete the on-disk image. """ path = None mode = None _handle = None _latax = None # index of the spatial latitude axis _lonax = None # ditto for longitude _specax = None # ditto for spectral axis (may be freq or velocity!) shape = None "An integer ndarray of the image shape" bmaj = None "If not None, the restoring beam FWHM major axis in radians" bmin = None "If not None, the restoring beam FWHM minor axis in radians" bpa = None """If not None, the restoring beam position angle (east from celestial north) in radians""" units = None "Lower-case string describing image units (e.g., jy/beam, jy/pixel)" pclat = None "Latitude of the pointing center in radians" pclon = None "Longitude of the pointing center in radians" charfreq = None "Characteristic observing frequency of the image in GHz" # NOTE: we get this from evaluating the spectral axis in its middle # pixel, not the reference value. mjd = None "Mean MJD of the observations" axdescs = None """If not None, list of strings describing the axis types; no standard format.""" def __init__(self, path, mode): self.path = path self.mode = mode def __del__(self): self.close() def close(self): if self._handle is not None: self._close_impl() self._handle = None def __enter__(self): return self def __exit__(self, etype, evalue, traceback): self.close() return False # raise any exception that may have happened def _checkOpen(self): if self._handle is None: raise UnsupportedError( "this operation cannot be performed on the " 'closed image at "%s"', self.path, ) def _checkWriteable(self): if self.mode == "r": raise UnsupportedError( "this operation cannot be performed on the " 'read-only image at "%s"', self.path, ) @property def size(self): return np.prod(self.shape) def read(self, squeeze=False, flip=False): raise NotImplementedError() def write(self, data): raise NotImplementedError() def toworld(self, pixel): raise NotImplementedError() def topixel(self, world): raise NotImplementedError() def simple(self): if self._latax == 0 and self._lonax == 1 and self.shape.size == 2: return self # noop return SimpleImage(self)
[docs] def subimage(self, pixofs, shape): """Extract a sub-cube of this image. Both `pixofs` and `shape` should be integer arrays with as many elements as this image has axes. Thinking of this operation as taking a Python slice of an N-dimensional cube, the i'th axis of the sub-image is slices from `pixofs[i]` to `pixofs[i] + shape[i]`. """ return NaiveSubImage(self, pixofs, shape)
def save_copy(self, path, overwrite=False, openmode=None): raise NotImplementedError() def save_as_fits(self, path, overwrite=False, openmode=None): raise NotImplementedError() def delete(self): raise NotImplementedError()
def maybescale(x, a): if x is None: return None return a * x def maybelower(x): if x is None: return None return x.lower() # We use astropy.wcs/WCSLIB/pywcs for coordinates for both FITS and MIRIAD # images. It does two things that we don't like. First of all, it stores axes # in Fortran style, with the first axis being the most rapidly varying. # Secondly, it does all of its angular work in degrees, not radians (why??). # We fix these up as best we can. def _get_wcs_scale(wcs, naxis): wcsmodule = _load_wcs_module() wcscale = np.ones(naxis) is_pywcs = hasattr(wcsmodule, "UnitConverter") if not is_pywcs: from astropy.units import UnitsError, rad for i in range(naxis): u = wcs.wcs.cunit[wcscale.size - 1 - i] if is_pywcs: try: uc = wcsmodule.UnitConverter(u.strip(), "rad") wcscale[i] = uc.scale except SyntaxError: # !! pywcs 1.10 pass # not an angle unit; don't futz. except ValueError: # pywcs 1.11 pass else: try: wcscale[i] = u.to(rad) except UnitsError: pass # not an angle unit return wcscale def _wcs_toworld(wcs, pixel, wcscale, naxis): # TODO: we don't allow the usage of "SIP" or "Paper IV" # transformations, let alone a concatenation of these, because # they're not invertible. pixel = np.asarray(pixel) if pixel.shape != (naxis,): raise ValueError("pixel coordinate must be a %d-element vector", naxis) pixel = pixel.reshape((1, naxis))[:, ::-1] world = wcs.wcs_pix2world(pixel, 0) return world[0, ::-1] * wcscale def _wcs_topixel(wcs, world, wcscale, naxis): world = np.asarray(world) if world.shape != (naxis,): raise ValueError("world coordinate must be a %d-element vector", naxis) world = (world / wcscale)[::-1].reshape((1, naxis)) pixel = wcs.wcs_world2pix(world, 0) return pixel[0, ::-1] def _wcs_axes(wcs, naxis): lat = lon = spec = None if wcs.wcs.lat >= 0: lat = naxis - 1 - wcs.wcs.lat if wcs.wcs.lng >= 0: lon = naxis - 1 - wcs.wcs.lng if wcs.wcs.spec >= 0: spec = naxis - 1 - wcs.wcs.spec return lat, lon, spec def _wcs_get_freq(wcs, specval): from mirtask._miriad_c import mirwcs_compute_freq assert wcs.wcs.spec >= 0 spectype = wcs.wcs.ctype[wcs.wcs.spec][:4] return mirwcs_compute_freq(spectype, specval, wcs.wcs.restfrq) * 1e-9
[docs] class MIRIADImage(AstroImage): """A MIRIAD format image. Requires the `mirtask` module from miriad-python.""" _modemap = {"r": "rw", "rw": "rw"} # no true read-only option def __init__(self, path, mode): try: from mirtask import XYDataSet except ImportError: raise UnsupportedError( "cannot open MIRIAD images without the " 'Python module "mirtask"' ) super(MIRIADImage, self).__init__(path, mode) self._handle = h = XYDataSet(path, self._modemap[mode]) self._wcs, warnings = h.wcs() if hasattr(self._wcs, "wcs_pix2sky"): self._wcs.wcs_pix2world = self._wcs.wcs_pix2sky self._wcs.wcs_world2pix = self._wcs.wcs_sky2pix for w in warnings: # Whatever. import sys print( 'irregularity in coordinates of "%s": %s' % (self.path, w), file=sys.stderr, ) naxis = h.getScalarItem("naxis", 0) self.shape = np.empty(naxis, dtype=int) self.axdescs = [] for i in range(naxis): q = naxis - i self.shape[i] = h.getScalarItem("naxis%d" % q, 1) self.axdescs.append(h.getScalarItem("ctype%d" % q, "???")) self.units = maybelower(h.getScalarItem("bunit")) self.bmaj = h.getScalarItem("bmaj") if self.bmaj is not None: self.bmin = h.getScalarItem("bmin", self.bmaj) self.bpa = h.getScalarItem("bpa", 0) * D2R self.pclat = h.getScalarItem("obsdec") if self.pclat is not None: self.pclon = h.getScalarItem("obsra") else: try: import mirtask.mostable except ImportError: pass else: mt = mirtask.mostable.readDataSet(h)[0] # ignore WCS warnings here if mt.radec.shape[0] == 1: self.pclat = mt.radec[0, 1] self.pclon = mt.radec[0, 0] self._wcscale = _get_wcs_scale(self._wcs, self.shape.size) self._latax, self._lonax, self._specax = _wcs_axes(self._wcs, self.shape.size) if self._specax is not None: try: from mirtask._miriad_c import mirwcs_compute_freq except ImportError: pass else: specval = self.toworld(0.5 * (self.shape - 1))[self._specax] self.charfreq = _wcs_get_freq(self._wcs, specval) jd = h.getScalarItem("obstime") if jd is not None: self.mjd = jd - 2400000.5 def _close_impl(self): self._handle.close() def read(self, squeeze=False, flip=False): self._checkOpen() nonplane = self.shape[:-2] if nonplane.size == 0: data = self._handle.readPlane([], topIsZero=flip) else: data = np.ma.empty(self.shape, dtype=np.float32) data.mask = np.zeros(self.shape, dtype=bool) n = np.prod(nonplane) fdata = data.reshape((n, self.shape[-2], self.shape[-1])) for i in range(n): # Must convert from C to Fortran indexing convention axes = np.unravel_index(i, nonplane)[::-1] self._handle.readPlane(axes, fdata[i], topIsZero=flip) if squeeze: data = data.squeeze() return data def write(self, data): data = np.ma.asarray(data) if data.shape != tuple(self.shape): raise ValueError( '"data" is wrong shape: got %s, want %s' % (data.shape, tuple(self.shape)) ) self._checkOpen() self._checkWriteable() nonplane = self.shape[:-2] if nonplane.size == 0: self._handle.writePlane(data, []) else: n = np.prod(nonplane) fdata = data.reshape((n, self.shape[-2], self.shape[-1])) for i in range(n): axes = np.unravel_index(i, nonplane) self._handle.writePlane(fdata[i], axes) return self def toworld(self, pixel): # self._wcs is still valid if we've been closed, so no need # to _checkOpen(). if self._wcs is None: raise UnsupportedError( "world coordinate information is required " 'but not present in "%s"', self.path, ) return _wcs_toworld(self._wcs, pixel, self._wcscale, self.shape.size) def topixel(self, world): if self._wcs is None: raise UnsupportedError( "world coordinate information is required " 'but not present in "%s"', self.path, ) return _wcs_topixel(self._wcs, world, self._wcscale, self.shape.size) def save_copy(self, path, overwrite=False, openmode=None): import shutil, os.path # FIXME: race conditions and such in overwrite checks. # Too lazy to do a better job. if os.path.exists(path): if overwrite: if os.path.isdir(path): shutil.rmtree(path) else: os.unlink(path) else: raise UnsupportedError( 'refusing to copy "%s" to "%s": ' "destination already exists" % (self.path, path) ) shutil.copytree(self.path, path, symlinks=False) if openmode is None: return None return open(path, openmode) def save_as_fits(self, path, overwrite=False, openmode=None): from mirexec import TaskFits import os.path if os.path.exists(path): if overwrite: os.unlink(path) else: raise UnsupportedError( 'refusing to export "%s" to "%s": ' "destination already exists" % (self.path, path) ) TaskFits(op="xyout", in_=self.path, out=path).runsilent() if openmode is None: return None return FITSImage(path, openmode) def delete(self): if self._handle is not None: raise UnsupportedError( 'cannot delete the image at "%s" without ' "first closing it", self.path ) self._checkWriteable() import shutil, os.path if os.path.isdir(self.path): shutil.rmtree(self.path) else: os.unlink(self.path) # may be a symlink; rmtree rejects this
# CASA images. We need either casac or pyrap. class _CasaUnsupportedImage(AstroImage): def __init__(self, path, mode): raise UnsupportedError("no modules are available for reading CASA images") def _pyrap_convert(d, unitstr): from pyrap.quanta import quantity return quantity(d["value"], d["unit"]).get_value(unitstr)
[docs] class PyrapImage(AstroImage): """A CASA-format image loaded with the 'pyrap' Python module.""" def __init__(self, path, mode): try: from pyrap.images import image except ImportError: raise UnsupportedError( "cannot open CASAcore images in Pyrap mode without " 'the Python module "pyrap.images"' ) super(PyrapImage, self).__init__(path, mode) # no mode specifiable self._handle = image(path) allinfo = self._handle.info() self.units = maybelower(allinfo.get("unit")) self.shape = np.asarray(self._handle.shape(), dtype=int) self.axdescs = [] if "coordinates" in allinfo: pc = allinfo["coordinates"].get("pointingcenter") # initial=True signifies that the pointing center information # hasn't actually been initialized. if pc is not None and not pc["initial"]: # This bit of info doesn't have any metadata about units or # whatever; appears to be fixed as RA/Dec in radians. self.pclat = pc["value"][1] self.pclon = pc["value"][0] ii = self._handle.imageinfo() if "restoringbeam" in ii: self.bmaj = _pyrap_convert(ii["restoringbeam"]["major"], "rad") self.bmin = _pyrap_convert(ii["restoringbeam"]["minor"], "rad") self.bpa = _pyrap_convert(ii["restoringbeam"]["positionangle"], "rad") # Make sure that angular units are always measured in radians, # because anything else is ridiculous. from pyrap.quanta import quantity self._wcscale = wcscale = np.ones(self.shape.size) c = self._handle.coordinates() radian = quantity(1.0, "rad") for item in c.get_axes(): if isinstance(item, str): self.axdescs.append(item.replace(" ", "_")) else: for subitem in item: self.axdescs.append(subitem.replace(" ", "_")) def getconversion(text): q = quantity(1.0, text) if q.conforms(radian): return q.get_value("rad") return 1 i = 0 for item in c.get_unit(): if isinstance(item, str): wcscale[i] = getconversion(item) i += 1 elif len(item) == 0: wcscale[i] = 1 # null unit i += 1 else: for subitem in item: wcscale[i] = getconversion(subitem) i += 1 # Figure out which axes are lat/long/spec. We have some # paranoia code the give up in case there are multiple axes # that appear to be of the same type. This stuff could # be cleaned up. lat = lon = spec = -1 try: logspecidx = c.get_names().index("spectral") except ValueError: specaxname = None else: specaxname = c.get_axes()[logspecidx] for i, name in enumerate(self.axdescs): # These symbolic direction names obtained from # casacore/coordinates/Coordinates/DirectionCoordinate.cc # Would be nice to have a better system for determining # this a la what wcslib provides. if name == specaxname: spec = i elif name in ("Right_Ascension", "Hour_Angle", "Longitude"): if lon == -1: lon = i else: lon = -2 elif name in ("Declination", "Latitude"): if lat == -1: lat = i else: lat = -2 if lat >= 0: self._latax = lat if lon >= 0: self._lonax = lon if spec >= 0: self._specax = spec # Phew, that was gross. if self._specax is not None: sd = c.get_coordinate("spectral").dict() wi = sd.get("wcs") if wi is not None: try: from mirtask._miriad_c import mirwcs_compute_freq except ImportError: pass else: spectype = wi["ctype"].replace("\x00", "")[:4] restfreq = sd.get("restfreq", 0.0) specval = self.toworld(0.5 * (self.shape - 1))[self._specax] self.charfreq = ( mirwcs_compute_freq(spectype, specval, restfreq) * 1e-9 ) # TODO: any unit weirdness or whatever here? self.mjd = c.get_obsdate()["m0"]["value"] def _close_impl(self): # No explicit close method provided here. Annoying. del self._handle def read(self, squeeze=False, flip=False): self._checkOpen() data = self._handle.get() if flip: data = data[..., ::-1, :] if squeeze: data = data.squeeze() return data def write(self, data): data = np.ma.asarray(data) if data.shape != tuple(self.shape): raise ValueError( "data is wrong shape: got %s, want %s" % (data.shape, tuple(self.shape)) ) self._checkOpen() self._checkWriteable() self._handle.put(data) return self def toworld(self, pixel): self._checkOpen() pixel = np.asarray(pixel) return self._wcscale * np.asarray(self._handle.toworld(pixel)) def topixel(self, world): self._checkOpen() world = np.asarray(world) return np.asarray(self._handle.topixel(world / self._wcscale)) def save_copy(self, path, overwrite=False, openmode=None): self._checkOpen() self._handle.saveas(path, overwrite=overwrite) if openmode is None: return None return open(path, openmode) def save_as_fits(self, path, overwrite=False, openmode=None): self._checkOpen() self._handle.tofits(path, overwrite=overwrite) if openmode is None: return None return FITSImage(path, openmode) def delete(self): if self._handle is not None: raise UnsupportedError( 'cannot delete the image at "%s" without ' "first closing it", self.path ) self._checkWriteable() import shutil, os.path if os.path.isdir(self.path): shutil.rmtree(self.path) else: os.unlink(self.path) # may be a symlink; rmtree rejects this
# 'casac' images. def _casac_convert(d, unitstr): from .environments.casa.util import tools, sanitize_unicode as b qa = tools.quanta() x = qa.quantity(b(d["value"]), b(d["unit"])) return qa.convert(b(x), b(unitstr))["value"] def _casac_findwcoord(cs, kind): v = cs.findcoordinate(kind) if "world" in v: return np.atleast_1d(v["world"]) return v[2] class CasaCImage(AstroImage): """A CASA-format image loaded with the 'casac' Python module.""" # casac uses Fortran-style axis ordering, with innermost first. casac # coordinate systems have two axis orderings, "world" and "pixel", and # generally default to world. We want to keep things in pixel terms so we # have to undo the mapping. def __init__(self, path, mode): super(CasaCImage, self).__init__(path, mode) from .environments.casa.util import tools self._handle = tools.image() self._handle.open(path) # no mode specifiable. self.shape = np.asarray(self._handle.shape())[::-1].copy() cs = self._handle.coordsys() naxis = self.shape.size # for world-to-pixel, we reverse the values in the array: w2p = self._wax2pax = naxis - 1 - np.asarray(cs.axesmap(toworld=False)) # for pixel-to-world, we reverse the array ordering: p2w = self._pax2wax = np.asarray(cs.axesmap(toworld=True))[::-1].copy() assert p2w.size == self.shape.size self._specax = w2p[_casac_findwcoord(cs, "spectral")[0]] # self._polax = w2p[_casac_findwcoord (cs, 'stokes')[0]] self._lonax = w2p[_casac_findwcoord(cs, "direction")[0]] self._latax = w2p[_casac_findwcoord(cs, "direction")[1]] self.axdescs = [None] * naxis names = cs.names() for i in range(naxis): self.axdescs[i] = names[p2w[i]] self.charfreq = _casac_convert( cs.referencevalue(format="m", type="spectral")["measure"]["spectral"][ "frequency" ]["m0"], "GHz", ) self.units = maybelower(self._handle.brightnessunit()) rb = self._handle.restoringbeam() if "major" in rb: self.bmaj = _casac_convert(rb["major"], "rad") self.bmin = _casac_convert(rb["minor"], "rad") self.bpa = _casac_convert(rb["positionangle"], "rad") # pclat, pclon? # this is the *start* of the observation, annoyingly. The timescale is # available but we ignore it. Generally is in UTC :-( self.mjd = cs.epoch()["m0"]["value"] def _close_impl(self): self._handle.close() def read(self, squeeze=False, flip=False): self._checkOpen() # Older casac doesn't accept ndarrays ... but casa 4.7 only works with # ndarrays! Sigh. blc = np.zeros(self.shape.size, dtype=int) trc = np.array(self.shape[::-1] - 1) data = self._handle.getchunk(blc, trc, dropdeg=squeeze, getmask=False) data = data.T # does the right thing and gives us C-contiguous data mask = self._handle.getchunk(blc, trc, dropdeg=squeeze, getmask=True) np.logical_not( mask, mask ) # CASA image masking is opposite of CASA UV flagging: T -> OK mask = mask.T data = np.ma.MaskedArray(data, mask=mask) if flip: data = data[..., ::-1, :] return data def write(self, data): self._checkOpen() self._checkWriteable() data = np.ma.asarray(data) if data.shape != tuple(self.shape): raise ValueError( "data is wrong shape: got %s, want %s" % (data.shape, tuple(self.shape)) ) data = data.T # back to CASA convention # putchunk can't do the mask: self._handle.putregion(pixels=data.data, pixelmask=data.mask, usemask=True) return self def toworld(self, pixel): # TODO: CASA quantities seem to be spat out in radians and Hz, # which work well enough for us. But this might not be # reliable. And perhaps we'll want to enforce units for # frequency and/or velocity axes. self._checkOpen() pixel = np.asarray(pixel)[::-1] # reverse to CASA's ordering qtys = self._handle.toworld(pixel, format="q")["quantity"] # Our "world" coordinates are still in what CASA would call # its "pixel" ordering. This will probably all go down in # flames if anyone ever reorders or removes axes. naxis = self.shape.size world = np.empty(naxis) for i in range(naxis): s = "*%d" % (self._pax2wax[i] + 1) world[i] = qtys[s]["value"] return world def topixel(self, world): self._checkOpen() myworld = np.asarray(world) ncwa = self._wax2pax.size # num of CASA world axes casaworld = np.zeros(ncwa) for i in range(ncwa): casaworld[self._pax2wax[i]] = myworld[i] casapixel = self._handle.topixel(casaworld)["numeric"] return casapixel[::-1].copy() def save_copy(self, path, overwrite=False, openmode=None): self._checkOpen() # In theory we could be more efficient and reuse this tool if openmode # is not None. In practice, I'd have to mess with __init__() and who # cares? from .environments.casa.util import tools ia = tools.image() ia.newimagefromimage(self.path, path, overwrite=overwrite) ia.close() if openmode is None: return None return open(path, openmode) def save_as_fits(self, path, overwrite=False, openmode=None): self._checkOpen() self._handle.tofits(path, overwrite=overwrite) if openmode is None: return None return FITSImage(path, openmode) def delete(self): if self._handle is not None: raise UnsupportedError( 'cannot delete the image at "%s" without ' "first closing it", self.path ) self._checkWriteable() import shutil, os.path if os.path.isdir(self.path): shutil.rmtree(self.path) else: os.unlink(self.path) # may be a symlink; rmtree rejects this try: import casatools # CASA 6 CASAImage = CasaCImage except ImportError: try: import casac # CASA 5 CASAImage = CasaCImage except ImportError: try: from pyrap.images import image # ancient history CASAImage = PyrapImage except ImportError: CASAImage = _CasaUnsupportedImage
[docs] class FITSImage(AstroImage): """A FITS format image.""" _modemap = {"r": "readonly", "rw": "update"} # ??? def __init__(self, path, mode): fitsmodule = _load_fits_module() wcsmodule = _load_wcs_module() super(FITSImage, self).__init__(path, mode) self._handle = fitsmodule.open(path, self._modemap[mode]) header = self._handle[0].header self._wcs = _create_wcs(header) self.units = maybelower(header.get("bunit")) naxis = header.get("naxis", 0) self.shape = np.empty(naxis, dtype=int) self.axdescs = [] for i in range(naxis): q = naxis - i self.shape[i] = header.get("naxis%d" % q, 1) self.axdescs.append(header.get("ctype%d" % q, "???")) self.bmaj = maybescale(header.get("bmaj"), D2R) if self.bmaj is None: bmindefault = None else: bmindefault = self.bmaj * R2D self.bmin = maybescale(header.get("bmin", bmindefault), D2R) self.bpa = maybescale(header.get("bpa", 0), D2R) self.pclat = maybescale(header.get("obsdec"), D2R) self.pclon = maybescale(header.get("obsra"), D2R) self._wcscale = _get_wcs_scale(self._wcs, self.shape.size) self._latax, self._lonax, self._specax = _wcs_axes(self._wcs, self.shape.size) if self._specax is not None: try: from mirtask._miriad_c import mirwcs_compute_freq except ImportError: pass else: specval = self.toworld(0.5 * (self.shape - 1))[self._specax] self.charfreq = _wcs_get_freq(self._wcs, specval) if np.isfinite(self._wcs.wcs.mjdavg): self.mjd = self._wcs.wcs.mjdavg elif np.isfinite(self._wcs.wcs.mjdobs): # close enough self.mjd = self._wcs.wcs.mjdobs def _close_impl(self): self._handle.close() def read(self, squeeze=False, flip=False): self._checkOpen() data = np.ma.asarray(self._handle[0].data) # Are there other standards for expressing masking in FITS? data.mask = ~np.isfinite(data.data) if flip: data = data[..., ::-1, :] if squeeze: data = data.squeeze() return data def write(self, data): data = np.ma.asarray(data) if data.shape != tuple(self.shape): raise ValueError( "data is wrong shape: got %s, want %s" % (data.shape, tuple(self.shape)) ) self._checkOpen() self._checkWriteable() self._handle[0].data[:] = data self._handle.flush() return self def toworld(self, pixel): if self._wcs is None: raise UnsupportedError( "world coordinate information is required " 'but not present in "%s"', self.path, ) return _wcs_toworld(self._wcs, pixel, self._wcscale, self.shape.size) def topixel(self, world): if self._wcs is None: raise UnsupportedError( "world coordinate information is required " 'but not present in "%s"', self.path, ) return _wcs_topixel(self._wcs, world, self._wcscale, self.shape.size) def save_copy(self, path, overwrite=False, openmode=None): self._checkOpen() self._handle.writeto(path, output_verify="fix", clobber=overwrite) if openmode is None: return None return open(path, openmode) def save_as_fits(self, path, overwrite=False, openmode=None): return self.save_copy(path, overwrite=overwrite, openmode=openmode) def delete(self): if self._handle is not None: raise UnsupportedError( 'cannot delete the image at "%s" without ' "first closing it", self.path ) self._checkWriteable() os.unlink(self.path)
[docs] class SimpleImage(AstroImage): """A 2D, latitude/longitude image, referenced to a parent image.""" def __init__(self, parent): platax, plonax = parent._latax, parent._lonax if platax is None or plonax is None or platax == plonax: raise UnsupportedError( 'the image "%s" does not have both latitude ' "and longitude axes", parent.path, ) self._handle = parent self._platax = platax self._plonax = plonax self._latax = 0 self._lonax = 1 checkworld1 = parent.toworld(parent.shape * 0.0) # need float! checkworld2 = parent.toworld(parent.shape - 1.0) # (for pyrap) self._topixelok = True for i in range(parent.shape.size): # Two things to check. Firstly, that all non-lat/lon axes have # only one pixel; this limitation can be relaxed if we add a # mechanism for choosing which non-spatial pixels to work with. # # Secondly, check that non-lat/lon world coordinates don't vary # over the image; otherwise topixel() will be broken. if i in (platax, plonax): continue if parent.shape[i] != 1: raise UnsupportedError( "cannot simplify an image with " "nondegenerate nonspatial axes" ) if checkworld2[i] == 0: if checkworld1[i] == 0: pass elif np.abs(checkworld1[i]) > 1e-6: self._topixelok = False elif np.abs(1 - checkworld1[i] / checkworld2[i]) > 1e-6: self._topixelok = False self.path = "<subimage of %s>" % parent.path self.shape = np.asarray([parent.shape[platax], parent.shape[plonax]]) self.axdescs = [parent.axdescs[platax], parent.axdescs[plonax]] self.bmaj = parent.bmaj self.bmin = parent.bmin self.bpa = parent.bpa self.units = parent.units self.pclat = parent.pclat self.pclon = parent.pclon self.charfreq = parent.charfreq self.mjd = parent.mjd self._pctmpl = np.zeros(parent.shape.size) self._wctmpl = parent.toworld(self._pctmpl) def _close_impl(self): pass def read(self, squeeze=False, flip=False): self._checkOpen() data = self._handle.read(flip=flip) idx = [0] * self._pctmpl.size idx[self._platax] = slice(None) idx[self._plonax] = slice(None) data = data[tuple(idx)] if self._platax > self._plonax: # Ensure that order is (lat, lon). Note that unlike the # above operations, this forces a copy of data. data = data.T if squeeze: data = data.squeeze() # could be 1-by-N ... return data def write(self, data): data = np.ma.asarray(data) if data.shape != tuple(self.shape): raise ValueError( "data is wrong shape: got %s, want %s" % (data.shape, tuple(self.shape)) ) self._checkOpen() self._checkWriteable() fulldata = np.ma.empty(self._handle.shape, dtype=data.dtype) idx = [0] * self._pctmpl.size idx[self._platax] = slice(None) idx[self._plonax] = slice(None) if self._platax > self._plonax: fulldata[tuple(idx)] = data.T else: fulldata[tuple(idx)] = data self._handle.write(fulldata) return self def toworld(self, pixel): pixel = np.asarray(pixel) if pixel.shape != (2,): raise ValueError('"pixel" must be a single pair of numbers') self._checkOpen() p = self._pctmpl.copy() p[self._platax] = pixel[0] p[self._plonax] = pixel[1] w = self._handle.toworld(p) world = np.empty(2) world[0] = w[self._platax] world[1] = w[self._plonax] return world def topixel(self, world): world = np.asarray(world) if world.shape != (2,): raise ValueError('"world" must be a single pair of numbers') self._checkOpen() if not self._topixelok: raise UnsupportedError( "mixing in the coordinate system of " "this subimage prevents mapping from " "world to pixel coordinates" ) w = self._wctmpl.copy() w[self._platax] = world[0] w[self._plonax] = world[1] p = self._handle.topixel(w) pixel = np.empty(2) pixel[0] = p[self._platax] pixel[1] = p[self._plonax] return pixel def simple(self): return self def save_copy(self, path, overwrite=False, openmode=None): raise UnsupportedError("cannot save a copy of a subimage") def save_as_fits(self, path, overwrite=False, openmode=None): raise UnsupportedError("cannot save subimage as FITS")
class NaiveSubImage(AstroImage): """A subset of a parent image, implemented naively.""" def __init__(self, parent, pixofs, shape): pixofs = np.asarray(pixofs) shape = np.asarray(shape) if pixofs.shape != parent.shape.shape: raise UnsupportedError("sub-image pixofs must match parent shape") if shape.shape != parent.shape.shape: raise UnsupportedError("sub-image shape must match parent shape") if np.any(pixofs < 0): raise UnsupportedError( "sub-image pixofs must be nonnegative; " "got %r" % pixofs ) if np.any(pixofs + shape > parent.shape): raise UnsupportedError( "sub-image may not extend past parent; " "got pixofs=%r subshape=%r vs shape=%r" % (pixofs, shape, parent.shape) ) self._handle = parent self._pixofs = pixofs self.shape = shape self.path = "<subimage of %s>" % parent.path self.axdescs = parent.axdescs self.bmaj = parent.bmaj self.bmin = parent.bmin self.bpa = parent.bpa self.units = parent.units self.pclat = parent.pclat self.pclon = parent.pclon self.charfreq = parent.charfreq self.mjd = parent.mjd def _close_impl(self): pass def read(self, squeeze=False, flip=False): self._checkOpen() data = self._handle.read(flip=flip) slices = tuple( slice(self._pixofs[i], self._pixofs[i] + self.shape[i]) for i in range(self.shape.size) ) data = data[slices] if squeeze: data = data.squeeze() return data def write(self, data): raise UnsupportedError("writing a subimage is not supported") def toworld(self, pixel): self._checkOpen() return self._handle.toworld(pixel + self._pixofs) def topixel(self, world): self._checkOpen() return self._handle.topixel(world) - self._pixofs def save_copy(self, path, overwrite=False, openmode=None): raise UnsupportedError("cannot save a copy of a subimage") def save_as_fits(self, path, overwrite=False, openmode=None): raise UnsupportedError("cannot save subimage as FITS") def open(path, mode, eat_warnings=False): import io from os.path import exists, join, isdir if eat_warnings: import warnings with warnings.catch_warnings(): warnings.filterwarnings("ignore", module="astropy.*") return open(path, mode, eat_warnings=False) if mode not in ("r", "rw"): raise ValueError('mode must be "r" or "rw"; got "%s"' % mode) if exists(join(path, "image")): return MIRIADImage(path, mode) if exists(join(path, "table.dat")): return CASAImage(path, mode) if isdir(path): raise UnsupportedError('cannot infer format of image "%s"' % path) with io.open(path, "rb") as f: sniff = f.read(9) if sniff.startswith(b"SIMPLE ="): return FITSImage(path, mode) raise UnsupportedError('cannot infer format of image "%s"' % path)