Source code for pwkit.astutil

# -*- mode: python; coding: utf-8 -*-
# Copyright 2012-2016 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.

"""This module provides functions and constants for doing a variety of basic
calculations and conversions that come up in astronomy.

"""
from __future__ import absolute_import, division, print_function, unicode_literals

__all__ = str(
    """np pi twopi halfpi R2A A2R R2D D2R R2H H2R F2S S2F J2000 angcen orientcen
                  fmthours fmtdeglon fmtdeglat fmtradec parsehours parsedeglat
                  parsedeglon sphdist sphbear sphofs parang gaussian_convolve
                  gaussian_deconvolve load_skyfield_data AstrometryInfo app2abs abs2app"""
).split()

import numpy as np

from . import unicode_to_str, PKError
from .numutil import broadcastize


# Workaround for Sphinx bug 1641. Without this, the "autoattribute" directive
# does not work if the print_function future is used. Copied from
# https://github.com/sphinx-doc/sphinx/issues/1641#issuecomment-204323049.
# See commit message associated with this code for more detail.

try:
    import builtins

    print_ = getattr(builtins, "print")
except ImportError:
    import __builtin__

    print_ = getattr(__builtin__, "print")


# Constants.

pi = np.pi
twopi = 2 * pi
halfpi = 0.5 * pi

R2A = 3600 * 180 / pi
A2R = pi / (3600 * 180)
R2D = 180 / pi
D2R = pi / 180
R2H = 12 / pi
H2R = pi / 12
F2S = 1 / np.sqrt(8 * np.log(2))  # FWHM to sigma
S2F = np.sqrt(8 * np.log(2))

J2000 = 51544.5


# Angle and orientation (PA) normalization
#
# PA's seem to usually be given in the range [-90, 90]

angcen = lambda a: (((a + pi) % twopi) - pi)
orientcen = lambda a: (((a + halfpi) % pi) - halfpi)


# Formatting/parsing of lat/long/etc


def _fmtsexagesimal(base, norm, basemax, seps, precision=3):
    if norm == "none":
        pass
    elif norm == "raise":
        if base > basemax or base < 0:
            raise ValueError("illegal coordinate of %f" % base)
    elif norm == "wrap":
        base = base % basemax
    else:
        raise ValueError('unrecognized normalization type "%s"' % norm)

    if len(seps) < 2:
        # To ponder: we accept len(seps) > 3; seems OK.
        raise ValueError(
            "there must be at least two sexagesimal separators; "
            'got value "%s"' % seps
        )

    precision = max(int(precision), 0)
    if precision == 0:
        width = 2
    else:
        width = precision + 3

    basewidth = len(str(basemax))

    bs = int(np.floor(base))
    min = int(np.floor((base - bs) * 60))
    sec = round(3600 * (base - bs - min / 60.0), precision)

    if sec >= 60:
        # Can happen if we round up
        sec -= 60
        min += 1

        if min >= 60:
            min -= 60
            bs += 1

            if bs >= basemax:
                bs -= basemax

    if len(seps) > 2:
        sep2 = seps[2]
    else:
        sep2 = ""

    return "%0*d%s%02d%s%0*.*f%s" % (
        basewidth,
        bs,
        seps[0],
        min,
        seps[1],
        width,
        precision,
        sec,
        sep2,
    )


[docs] def fmthours(radians, norm="wrap", precision=3, seps="::"): """Format an angle as sexagesimal hours in a string. Arguments are: radians The angle, in radians. norm (default "wrap") The normalization mode, used for angles outside of the standard range of 0 to 2π. If "none", the value is formatted ignoring any potential problems. If "wrap", it is wrapped to lie within the standard range. If "raise", a :exc:`ValueError` is raised. precision (default 3) The number of decimal places in the "seconds" place to use in the formatted string. seps (default "::") A two- or three-item iterable, used to separate the hours, minutes, and seconds components. If a third element is present, it appears after the seconds component. Specifying "hms" yields something like "12h34m56s"; specifying ``['', '']`` yields something like "123456". Returns a string. """ return _fmtsexagesimal(radians * R2H, norm, 24, seps, precision=precision)
[docs] def fmtdeglon(radians, norm="wrap", precision=2, seps="::"): """Format a longitudinal angle as sexagesimal degrees in a string. Arguments are: radians The angle, in radians. norm (default "wrap") The normalization mode, used for angles outside of the standard range of 0 to 2π. If "none", the value is formatted ignoring any potential problems. If "wrap", it is wrapped to lie within the standard range. If "raise", a :exc:`ValueError` is raised. precision (default 2) The number of decimal places in the "arcseconds" place to use in the formatted string. seps (default "::") A two- or three-item iterable, used to separate the degrees, arcminutes, and arcseconds components. If a third element is present, it appears after the arcseconds component. Specifying "dms" yields something like "12d34m56s"; specifying ``['', '']`` yields something like "123456". Returns a string. """ return _fmtsexagesimal(radians * R2D, norm, 360, seps, precision=precision)
[docs] def fmtdeglat(radians, norm="raise", precision=2, seps="::"): """Format a latitudinal angle as sexagesimal degrees in a string. Arguments are: radians The angle, in radians. norm (default "raise") The normalization mode, used for angles outside of the standard range of -π/2 to π/2. If "none", the value is formatted ignoring any potential problems. If "wrap", it is wrapped to lie within the standard range. If "raise", a :exc:`ValueError` is raised. precision (default 2) The number of decimal places in the "arcseconds" place to use in the formatted string. seps (default "::") A two- or three-item iterable, used to separate the degrees, arcminutes, and arcseconds components. If a third element is present, it appears after the arcseconds component. Specifying "dms" yields something like "+12d34m56s"; specifying ``['', '']`` yields something like "123456". Returns a string. The return value always includes a plus or minus sign. Note that the default of *norm* is different than in :func:`fmthours` and :func:`fmtdeglon` since it's not so clear what a "latitude" of 110 degrees (e.g.) means. """ if norm == "none": pass elif norm == "raise": if radians > halfpi or radians < -halfpi: raise ValueError("illegal latitude of %f radians" % radians) elif norm == "wrap": radians = angcen(radians) if radians > halfpi: radians = pi - radians elif radians < -halfpi: radians = -pi - radians else: raise ValueError('unrecognized normalization type "%s"' % norm) if len(seps) < 2: # To ponder: we accept len(seps) > 3; seems OK. raise ValueError( "there must be at least two sexagesimal separators; " 'got value "%s"' % seps ) precision = max(int(precision), 0) if precision == 0: width = 2 else: width = precision + 3 degrees = radians * R2D if degrees >= 0: sgn = "+" else: sgn = "-" degrees = -degrees deg = int(np.floor(degrees)) amin = int(np.floor((degrees - deg) * 60)) asec = round(3600 * (degrees - deg - amin / 60.0), precision) if asec >= 60: # Can happen if we round up asec -= 60 amin += 1 if amin >= 60: amin -= 60 deg += 1 if len(seps) > 2: sep2 = seps[2] else: sep2 = "" return "%s%02d%s%02d%s%0*.*f%s" % ( sgn, deg, seps[0], amin, seps[1], width, precision, asec, sep2, )
[docs] def fmtradec(rarad, decrad, precision=2, raseps="::", decseps="::", intersep=" "): """Format equatorial coordinates in a single sexagesimal string. Returns a string of the RA/lon coordinate, formatted as sexagesimal hours, then *intersep*, then the Dec/lat coordinate, formatted as degrees. This yields something like "12:34:56.78 -01:23:45.6". Arguments are: rarad The right ascension coordinate, in radians. More generically, this is the longitudinal coordinate; note that the ordering in this function differs than the other spherical functions, which generally prefer coordinates in "lat, lon" order. decrad The declination coordinate, in radians. More generically, this is the latitudinal coordinate. precision (default 2) The number of decimal places in the "arcseconds" place of the latitudinal (declination) coordinate. The longitudinal (right ascension) coordinate gets one additional place, since hours are bigger than degrees. raseps (default "::") A two- or three-item iterable, used to separate the hours, minutes, and seconds components of the RA/lon coordinate. If a third element is present, it appears after the seconds component. Specifying "hms" yields something like "12h34m56s"; specifying ``['', '']`` yields something like "123456". decseps (default "::") A two- or three-item iterable, used to separate the degrees, arcminutes, and arcseconds components of the Dec/lat coordinate. intersep (default " ") The string separating the RA/lon and Dec/lat coordinates """ return ( fmthours(rarad, precision=precision + 1, seps=raseps) + str(intersep) + fmtdeglat(decrad, precision=precision, seps=decseps) )
# Parsing routines are currently very lame. def _parsesexagesimal(sxgstr, desc, negok): sxgstr_orig = sxgstr sgn = 1 if sxgstr[0] == "-": if negok: sgn = -1 sxgstr = sxgstr[1:] else: raise ValueError("illegal negative %s expression: %s" % (desc, sxgstr_orig)) try: # TODO: other separators ... bs, mn, sec = sxgstr.split(":") bs = int(bs) mn = int(mn) sec = float(sec) except Exception: raise ValueError("unable to parse as %s: %s" % (desc, sxgstr_orig)) if mn < 0 or mn > 59 or sec < 0 or sec >= 60.0: raise ValueError("illegal sexagesimal %s expression: %s" % (desc, sxgstr_orig)) if bs < 0: # two minus signs, or something raise ValueError("illegal negative %s expression: %s" % (desc, sxgstr_orig)) return sgn * (bs + mn / 60.0 + sec / 3600.0)
[docs] def parsehours(hrstr): """Parse a string formatted as sexagesimal hours into an angle. This function converts a textual representation of an angle, measured in hours, into a floating point value measured in radians. The format of *hrstr* is very limited: it may not have leading or trailing whitespace, and the components of the sexagesimal representation must be separated by colons. The input must therefore resemble something like ``"12:34:56.78"``. A :exc:`ValueError` will be raised if the input does not resemble this template. Hours greater than 24 are not allowed, but negative values are. """ hr = _parsesexagesimal(hrstr, "hours", False) if hr >= 24: raise ValueError("illegal hour specification: " + hrstr) return hr * H2R
[docs] def parsedeglat(latstr): """Parse a latitude formatted as sexagesimal degrees into an angle. This function converts a textual representation of a latitude, measured in degrees, into a floating point value measured in radians. The format of *latstr* is very limited: it may not have leading or trailing whitespace, and the components of the sexagesimal representation must be separated by colons. The input must therefore resemble something like ``"-00:12:34.5"``. A :exc:`ValueError` will be raised if the input does not resemble this template. Latitudes greater than 90 or less than -90 degrees are not allowed. """ deg = _parsesexagesimal(latstr, "latitude", True) if abs(deg) > 90: raise ValueError("illegal latitude specification: " + latstr) return deg * D2R
[docs] def parsedeglon(lonstr): """Parse a longitude formatted as sexagesimal degrees into an angle. This function converts a textual representation of a longitude, measured in degrees, into a floating point value measured in radians. The format of *lonstr* is very limited: it may not have leading or trailing whitespace, and the components of the sexagesimal representation must be separated by colons. The input must therefore resemble something like ``"270:12:34.5"``. A :exc:`ValueError` will be raised if the input does not resemble this template. Values of any sign and magnitude are allowed, and they are not normalized (e.g. to lie within the range [0, 2π]). """ return _parsesexagesimal(lonstr, "longitude", True) * D2R
# Spherical trig
[docs] @broadcastize(4) def sphdist(lat1, lon1, lat2, lon2): """Calculate the distance between two locations on a sphere. lat1 The latitude of the first location. lon1 The longitude of the first location. lat2 The latitude of the second location. lon2 The longitude of the second location. Returns the separation in radians. All arguments are in radians as well. The arguments may be vectors. Note that the ordering of the arguments maps to the nonstandard ordering ``(Dec, RA)`` in equatorial coordinates. In a spherical projection it maps to ``(Y, X)`` which may also be unexpected. The distance is computed with the "specialized Vincenty formula". Faster but more error-prone formulae are possible; see Wikipedia on Great-circle Distance. """ cd = np.cos(lon2 - lon1) sd = np.sin(lon2 - lon1) c2 = np.cos(lat2) c1 = np.cos(lat1) s2 = np.sin(lat2) s1 = np.sin(lat1) a = np.sqrt((c2 * sd) ** 2 + (c1 * s2 - s1 * c2 * cd) ** 2) b = s1 * s2 + c1 * c2 * cd return np.arctan2(a, b)
[docs] @broadcastize(4) def sphbear(lat1, lon1, lat2, lon2, tol=1e-15): """Calculate the bearing between two locations on a sphere. lat1 The latitude of the first location. lon1 The longitude of the first location. lat2 The latitude of the second location. lon2 The longitude of the second location. tol Tolerance for checking proximity to poles and rounding to zero. The bearing (AKA the position angle, PA) is the orientation of point 2 with regards to point 1 relative to the longitudinal axis. Returns the bearing in radians. All arguments are in radians as well. The arguments may be vectors. Note that the ordering of the arguments maps to the nonstandard ordering ``(Dec, RA)`` in equatorial coordinates. In a spherical projection it maps to ``(Y, X)`` which may also be unexpected. The sign convention is astronomical: bearings range from -π to π, with negative values if point 2 is in the western hemisphere with regards to point 1, positive if it is in the eastern. (That is, “east from north”.) If point 1 is very near the pole, the bearing is undefined and the result is NaN. The *tol* argument is used for checking proximity to the poles and for rounding the bearing to precisely zero if it's extremely small. Derived from ``bear()`` in `angles.py from Prasanth Nair <https://github.com/phn/angles>`_. His version is BSD licensed. This one is sufficiently different that I think it counts as a separate implementation. """ # cross product on outer axis: ocross = lambda a, b: np.cross(a, b, axisa=0, axisb=0, axisc=0) # if args have shape S, this has shape (3, S) v1 = np.asarray( [np.cos(lat1) * np.cos(lon1), np.cos(lat1) * np.sin(lon1), np.sin(lat1)] ) v2 = np.asarray( [np.cos(lat2) * np.cos(lon2), np.cos(lat2) * np.sin(lon2), np.sin(lat2)] ) is_bad = (v1[0] ** 2 + v1[1] ** 2) < tol p12 = ocross(v1, v2) # ~"perpendicular to great circle containing points" p1z = np.asarray([v1[1], -v1[0], np.zeros_like(lat1)]) # ~"perp to base and Z axis" cm = np.sqrt((ocross(p12, p1z) ** 2).sum(axis=0)) # ~"angle between the vectors" bearing = np.arctan2(cm, np.sum(p12 * p1z, axis=0)) bearing = np.where(p12[2] < 0, -bearing, bearing) # convert to [-pi/2, pi/2] bearing = np.where(np.abs(bearing) < tol, 0, bearing) # clamp bearing[np.where(is_bad)] = np.nan return bearing
[docs] def sphofs(lat1, lon1, r, pa, tol=1e-2, rmax=None): """Offset from one location on the sphere to another. This function is given a start location, expressed as a latitude and longitude, a distance to offset, and a direction to offset (expressed as a bearing, AKA position angle). It uses these to compute a final location. This function mirrors :func:`sphdist` and :func:`sphbear` such that:: # If: r = sphdist (lat1, lon1, lat2a, lon2a) pa = sphbear (lat1, lon1, lat2a, lon2a) lat2b, lon2b = sphofs (lat1, lon1, r, pa) # Then lat2b = lat2a and lon2b = lon2a Arguments are: lat1 The latitude of the start location. lon1 The longitude of the start location. r The distance to offset by. pa The position angle (“PA” or bearing) to offset towards. tol The tolerance for the accuracy of the calculation. rmax The maximum allowed offset distance. Returns a pair ``(lat2, lon2)``. All arguments and the return values are measured in radians. The arguments may be vectors. The PA sign convention is astronomical, measuring orientation east from north. Note that the ordering of the arguments and return values maps to the nonstandard ordering ``(Dec, RA)`` in equatorial coordinates. In a spherical projection it maps to ``(Y, X)`` which may also be unexpected. The offset is computed naively as:: lat2 = lat1 + r * cos (pa) lon2 = lon1 + r * sin (pa) / cos (lat2) This will fail for large offsets. Error checking can be done in two ways. If *tol* is not None, :func:`sphdist` is used to calculate the actual distance between the two locations, and if the magnitude of the fractional difference between that and *r* is larger than *tol*, :exc:`ValueError` is raised. This will add an overhead to the computation that may be significant if you're going to be calling this function a lot. Additionally, if *rmax* is not None, magnitudes of *r* greater than *rmax* are rejected. For reference, an *r* of 0.2 (~11 deg) gives a maximum fractional distance error of ~3%. """ if rmax is not None and np.abs(r) > rmax: raise ValueError( "sphofs radius value %f is too big for " "our approximation" % r ) lat2 = lat1 + r * np.cos(pa) lon2 = lon1 + r * np.sin(pa) / np.cos(lat2) if tol is not None: s = sphdist(lat1, lon1, lat2, lon2) if np.any(np.abs((s - r) / s) > tol): raise ValueError( "sphofs approximation broke down " "(%s %s %s %s %s %s %s)" % (lat1, lon1, lat2, lon2, r, s, pa) ) return lat2, lon2
# Spherical trig tools that are more astronomy-specific. Note that precise # positional calculations should generally use skyfield.
[docs] def parang(hourangle, declination, latitude): """Calculate the parallactic angle of a sky position. This computes the parallactic angle of a sky position expressed in terms of an hour angle and declination. Arguments: hourangle The hour angle of the location on the sky. declination The declination of the location on the sky. latitude The latitude of the observatory. Inputs and outputs are all in radians. Implementation adapted from GBTIDL ``parangle.pro``. """ return -np.arctan2( -np.sin(hourangle), np.cos(declination) * np.tan(latitude) - np.sin(declination) * np.cos(hourangle), )
# 2D Gaussian (de)convolution
[docs] def gaussian_convolve(maj1, min1, pa1, maj2, min2, pa2): """Convolve two Gaussians analytically. Given the shapes of two 2-dimensional Gaussians, this function returns the shape of their convolution. Arguments: maj1 Major axis of input Gaussian 1. min1 Minor axis of input Gaussian 1. pa1 Orientation angle of input Gaussian 1, in radians. maj2 Major axis of input Gaussian 2. min2 Minor axis of input Gaussian 2. pa2 Orientation angle of input Gaussian 2, in radians. The return value is ``(maj3, min3, pa3)``, with the same format as the input arguments. The axes can be measured in any units, so long as they're consistent. Implementation copied from MIRIAD’s ``gaufac``. """ c1 = np.cos(pa1) s1 = np.sin(pa1) c2 = np.cos(pa2) s2 = np.sin(pa2) a = (maj1 * c1) ** 2 + (min1 * s1) ** 2 + (maj2 * c2) ** 2 + (min2 * s2) ** 2 b = (maj1 * s1) ** 2 + (min1 * c1) ** 2 + (maj2 * s2) ** 2 + (min2 * c2) ** 2 g = 2 * ((min1**2 - maj1**2) * s1 * c1 + (min2**2 - maj2**2) * s2 * c2) s = a + b t = np.sqrt((a - b) ** 2 + g**2) maj3 = np.sqrt(0.5 * (s + t)) min3 = np.sqrt(0.5 * (s - t)) if abs(g) + abs(a - b) == 0: pa3 = 0.0 else: pa3 = 0.5 * np.arctan2(-g, a - b) # "Amplitude of the resulting Gaussian": # f = pi / (4 * np.log (2)) * maj1 * min1 * maj2 * min2 \ # / np.sqrt (a * b - 0.25 * g**2) return maj3, min3, pa3
[docs] def gaussian_deconvolve(smaj, smin, spa, bmaj, bmin, bpa): """Deconvolve two Gaussians analytically. Given the shapes of 2-dimensional “source” and “beam” Gaussians, this returns a deconvolved “result” Gaussian such that the convolution of “beam” and “result” is “source”. Arguments: smaj Major axis of source Gaussian. smin Minor axis of source Gaussian. spa Orientation angle of source Gaussian, in radians. bmaj Major axis of beam Gaussian. bmin Minor axis of beam Gaussian. bpa Orientation angle of beam Gaussian, in radians. The return value is ``(rmaj, rmin, rpa, status)``. The first three values have the same format as the input arguments. The *status* result is one of "ok", "pointlike", or "fail". A "pointlike" status indicates that the source and beam shapes are difficult to distinguish; a "fail" status indicates that the two shapes seem to be mutually incompatible (e.g., source and beam are very narrow and orthogonal). The axes can be measured in any units, so long as they're consistent. Ideally if:: rmaj, rmin, rpa, status = gaussian_deconvolve (smaj, smin, spa, bmaj, bmin, bpa) then:: smaj, smin, spa = gaussian_convolve (rmaj, rmin, rpa, bmaj, bmin, bpa) Implementation derived from MIRIAD’s ``gaudfac``. This function currently doesn't do a great job of dealing with pointlike sources, i.e. ones where “source” and “beam” are nearly indistinguishable. """ # I've added extra code to ensure ``smaj >= bmaj``, ``smin >= bmin``, and # increased the coefficient in front of "limit" from 0.1 to 0.5. Feel a # little wary about that first change. from numpy import cos, sin, sqrt, min, abs, arctan2 if smaj < bmaj: smaj = bmaj if smin < bmin: smin = bmin alpha = ( (smaj * cos(spa)) ** 2 + (smin * sin(spa)) ** 2 - (bmaj * cos(bpa)) ** 2 - (bmin * sin(bpa)) ** 2 ) beta = ( (smaj * sin(spa)) ** 2 + (smin * cos(spa)) ** 2 - (bmaj * sin(bpa)) ** 2 - (bmin * cos(bpa)) ** 2 ) gamma = 2 * ( (smin**2 - smaj**2) * sin(spa) * cos(spa) - (bmin**2 - bmaj**2) * sin(bpa) * cos(bpa) ) s = alpha + beta t = sqrt((alpha - beta) ** 2 + gamma**2) limit = 0.5 * min([smaj, smin, bmaj, bmin]) ** 2 status = "ok" if alpha < 0 or beta < 0 or s < t: dmaj = dmin = dpa = 0 if 0.5 * (s - t) < limit and alpha > -limit and beta > -limit: status = "pointlike" else: status = "fail" else: dmaj = sqrt(0.5 * (s + t)) dmin = sqrt(0.5 * (s - t)) if abs(gamma) + abs(alpha - beta) == 0: dpa = 0 else: dpa = 0.5 * arctan2(-gamma, alpha - beta) return dmaj, dmin, dpa, status
# Given astrometric properties of a source, predict its position *with # uncertainties* at a given date through Monte Carlo simulations with # skyfield.
[docs] def load_skyfield_data(): """Load data files used in Skyfield. This will download files from the internet if they haven't been downloaded before. Skyfield downloads files to the current directory by default, which is not ideal. Here we abuse astropy and use its cache directory to cache the data files per-user. If we start downloading files in other places in pwkit we should maybe make this system more generic. And the dep on astropy is not at all necessary. Skyfield will print out a progress bar as it downloads things. Returns ``(planets, ts)``, the standard Skyfield ephemeris and timescale data files. """ import os.path from astropy.config import paths from skyfield.api import Loader cache_dir = os.path.join(paths.get_cache_dir(), "pwkit") loader = Loader(cache_dir) planets = loader("de421.bsp") ts = loader.timescale() return planets, ts
# Hack to implement epochs-of-position. For what it's worth, Skyfield is # MIT-licensed like us. try: from skyfield.api import Star, T0 except ImportError: def PromoEpochStar(**kwargs): raise NotImplementedError( 'the "skyfield" package is required for this functionality' ) else: class PromoEpochStar(Star): """A customized version of the Skyfield Star class that accepts a new epoch-of-position parameter. Derived from the Skyfield source as of commit 49c2467b (2018 Mar 28). """ def __init__(self, jd_of_position=T0, **kwargs): super(PromoEpochStar, self).__init__(**kwargs) self.jd_of_position = jd_of_position def __repr__(self): opts = [] for ( name ) in "ra_mas_per_year dec_mas_per_year parallax_mas radial_km_per_s jd_of_position names".split(): value = getattr(self, name) if value: opts.append(", {0}={1!r}".format(name, value)) return "PromoEpochStar(ra_hours={0!r}, dec_degrees={1!r}{2})".format( self.ra.hours, self.dec.degrees, "".join(opts) ) def _observe_from_bcrs(self, observer): from numpy import outer from skyfield.constants import C_AUDAY from skyfield.functions import length_of from skyfield.relativity import light_time_difference position, velocity = self._position_au, self._velocity_au_per_d t = observer.t dt = light_time_difference(position, observer.position.au) if t.shape: position = ( outer(velocity, t.tdb + dt - self.jd_of_position).T + position ).T else: position = position + velocity * (t.tdb + dt - self.jd_of_position) vector = position - observer.position.au distance = length_of(vector) light_time = distance / C_AUDAY return vector, (observer.velocity.au_per_d.T - velocity).T, t, light_time del Star, T0 _vizurl = "http://vizier.cds.unistra.fr/viz-bin/asu-tsv"
[docs] def get_2mass_epoch(tmra, tmdec, debug=False): """Given a 2MASS position, look up the epoch when it was observed. This function uses the CDS Vizier web service to look up information in the 2MASS point source database. Arguments are: tmra The source's J2000 right ascension, in radians. tmdec The source's J2000 declination, in radians. debug If True, the web server's response will be printed to :data:`sys.stdout`. The return value is an MJD. If the lookup fails, a message will be printed to :data:`sys.stderr` (unconditionally!) and the :data:`J2000` epoch will be returned. """ from urllib.parse import urlencode from urllib.request import urlopen import codecs postdata = { "-mime": "csv", "-source": "2MASS", "-out": "_q,JD", "-c": f"{tmra * R2D:.6f} {tmdec * R2D:.6f}", "-c.eq": "J2000", } url = f"{_vizurl}?{urlencode(postdata)}" jd = None for line in codecs.getreader("utf-8")(urlopen(url)): line = line.strip() if debug: print_("D: 2M >>", line) if line.startswith("1;"): jd = float(line[2:]) if jd is None: import sys print_( "warning: 2MASS epoch lookup failed; astrometry could be very wrong!", file=sys.stderr, ) return J2000 return jd - 2400000.5
_simbadbase = "http://simbad.u-strasbg.fr/simbad/sim-script?script=" _simbaditems = ( "COO(d;A) COO(d;D) COO(E) COO(B) PM(A) PM(D) PM(E) PLX(V) PLX(E) " "RV(V) RV(E)" ).split()
[docs] def get_simbad_astrometry_info(ident, items=_simbaditems, debug=False): """Fetch astrometric information from the Simbad web service. Given the name of a source as known to the CDS Simbad service, this function looks up its positional information and returns it in a dictionary. In most cases you should use an :class:`AstrometryInfo` object and its :meth:`~AstrometryInfo.fill_from_simbad` method instead of this function. Arguments: ident The Simbad name of the source to look up. items An iterable of data items to look up. The default fetches position, proper motion, parallax, and radial velocity information. Each item name resembles the string ``COO(d;A)`` or ``PLX(E)``. The allowed formats are defined `on this CDS page <http://simbad.u-strasbg.fr/Pages/guide/sim-fscript.htx>`_. debug If true, the response from the webserver will be printed. The return value is a dictionary with a key corresponding to the textual result returned for each requested item. """ import codecs try: from urllib.parse import quote except ImportError: from urllib import quote try: from urllib.request import urlopen except ImportError: from urllib2 import urlopen s = "\\n".join("%s %%%s" % (i, i) for i in items) s = """output console=off script=off format object "%s" query id %s""" % ( s, ident, ) url = _simbadbase + quote(s) results = {} errtext = None for line in codecs.getreader("utf-8")(urlopen(url)): line = line.strip() if debug: print_("D: SA >>", line) if errtext is not None: errtext += line elif line.startswith("::error"): errtext = "" elif len(line): k, v = line.split(" ", 1) results[k] = v if errtext is not None: raise Exception("SIMBAD query error: " + errtext) return results
[docs] class AstrometryInfo(object): """Holds astrometric data and their uncertainties, and can predict positions with uncertainties. """ ra = None "The J2000 right ascension of the object, measured in radians." dec = None "The J2000 declination of the object, measured in radians." pos_u_maj = None "Major axis of the error ellipse for the object position, in radians." pos_u_min = None "Minor axis of the error ellipse for the object position, in radians." pos_u_pa = None """Position angle (really orientation) of the error ellipse for the object position, east from north, in radians. """ pos_epoch = None """The epoch of position, that is, the date when the position was measured, in MJD[TT]. """ promo_ra = None """The proper motion in right ascension, in milliarcsec per year. XXX: cos(dec) confusion! """ promo_dec = None """The object's proper motion in declination, in milliarcsec per year.""" promo_u_maj = None """Major axis of the error ellipse for the object's proper motion, in milliarcsec per year. """ promo_u_min = None """Minor axis of the error ellipse for the object's proper motion, in milliarcsec per year. """ promo_u_pa = None """Position angle (really orientation) of the error ellipse for the object proper motion, east from north, in radians. """ parallax = None "The object's parallax, in milliarcsec." u_parallax = None "Uncertainty in the object's parallax, in milliarcsec." vradial = None "The object's radial velocity, in km/s. NOTE: not relevant in our usage." u_vradial = None """The uncertainty in the object's radial velocity, in km/s. NOTE: not relevant in our usage. """ def __init__(self, simbadident=None, **kwargs): if simbadident is not None: self.fill_from_simbad(simbadident) for k, v in kwargs.items(): setattr(self, k, v) def _partial_info(self, val0, *rest): if not len(rest): return False first = val0 is None for v in rest: if (v is None) != first: return True return False
[docs] def verify(self, complain=True): """Validate that the attributes are self-consistent. This function does some basic checks of the object attributes to ensure that astrometric calculations can legally be performed. If the *complain* keyword is true, messages may be printed to :data:`sys.stderr` if non-fatal issues are encountered. Returns *self*. """ import sys if self.ra is None: raise ValueError('AstrometryInfo missing "ra"') if self.dec is None: raise ValueError('AstrometryInfo missing "dec"') if self._partial_info(self.promo_ra, self.promo_dec): raise ValueError("partial proper-motion info in AstrometryInfo") if self._partial_info(self.pos_u_maj, self.pos_u_min, self.pos_u_pa): raise ValueError("partial positional uncertainty info in AstrometryInfo") if self._partial_info(self.promo_u_maj, self.promo_u_min, self.promo_u_pa): raise ValueError("partial proper-motion uncertainty info in AstrometryInfo") if self.pos_u_maj is None: if complain: print_( "AstrometryInfo: no positional uncertainty info", file=sys.stderr ) elif self.pos_u_maj < self.pos_u_min: # Based on experience with PM, this may be possible if complain: print_( "AstrometryInfo: swapped positional uncertainty " "major/minor axes", file=sys.stderr, ) self.pos_u_maj, self.pos_u_min = self.pos_u_min, self.pos_u_maj self.pos_u_pa += 0.5 * np.pi if self.pos_epoch is None: if complain: print_( "AstrometryInfo: assuming epoch of position is J2000.0", file=sys.stderr, ) if self.promo_ra is None: if complain: print_("AstrometryInfo: assuming zero proper motion", file=sys.stderr) elif self.promo_u_maj is None: if complain: print_( "AstrometryInfo: no uncertainty on proper motion", file=sys.stderr ) elif self.promo_u_maj < self.promo_u_min: # I've seen this: V* V374 Peg if complain: print_( "AstrometryInfo: swapped proper motion uncertainty " "major/minor axes", file=sys.stderr, ) self.promo_u_maj, self.promo_u_min = self.promo_u_min, self.promo_u_maj self.promo_u_pa += 0.5 * np.pi if self.parallax is None: if complain: print_("AstrometryInfo: assuming zero parallax", file=sys.stderr) else: if self.parallax < 0.0: raise ValueError("negative parallax in AstrometryInfo") if self.u_parallax is None: if complain: print_( "AstrometryInfo: no uncertainty on parallax", file=sys.stderr ) if self.vradial is None: pass # not worth complaining elif self.u_vradial is None: if complain: print_("AstrometryInfo: no uncertainty on v_radial", file=sys.stderr) return self # chain-friendly
[docs] def predict_without_uncertainties(self, mjd, complain=True): """Predict the object position at a given MJD. The return value is a tuple ``(ra, dec)``, in radians, giving the predicted position of the object at *mjd*. Unlike :meth:`predict`, the astrometric uncertainties are ignored. This function is therefore deterministic but potentially misleading. If *complain* is True, print out warnings for incomplete information. This function relies on the external :mod:`skyfield` package. """ import sys self.verify(complain=complain) planets, ts = load_skyfield_data() # might download stuff from the internet earth = planets["earth"] t = ts.tdb(jd=mjd + 2400000.5) # "Best" position. The implementation here is a bit weird to keep # parallelism with predict(). args = { "ra_hours": self.ra * R2H, "dec_degrees": self.dec * R2D, } if self.pos_epoch is not None: args["jd_of_position"] = self.pos_epoch + 2400000.5 if self.promo_ra is not None: args["ra_mas_per_year"] = self.promo_ra args["dec_mas_per_year"] = self.promo_dec if self.parallax is not None: args["parallax_mas"] = self.parallax if self.vradial is not None: args["radial_km_per_s"] = self.vradial bestra, bestdec, _ = earth.at(t).observe(PromoEpochStar(**args)).radec() return bestra.radians, bestdec.radians
[docs] def predict(self, mjd, complain=True, n=20000): """Predict the object position at a given MJD. The return value is a tuple ``(ra, dec, major, minor, pa)``, all in radians. These are the predicted position of the object and its uncertainty at *mjd*. If *complain* is True, print out warnings for incomplete information. *n* is the number of Monte Carlo samples to draw for computing the positional uncertainty. The uncertainty ellipse parameters are sigmas, not FWHM. These may be converted with the :data:`S2F` constant. This function relies on the external :mod:`skyfield` package. """ import sys from . import ellipses self.verify(complain=complain) planets, ts = load_skyfield_data() # might download stuff from the internet earth = planets["earth"] t = ts.tdb(jd=mjd + 2400000.5) # "Best" position. args = { "ra_hours": self.ra * R2H, "dec_degrees": self.dec * R2D, } if self.pos_epoch is not None: args["jd_of_position"] = self.pos_epoch + 2400000.5 if self.promo_ra is not None: args["ra_mas_per_year"] = self.promo_ra args["dec_mas_per_year"] = self.promo_dec if self.parallax is not None: args["parallax_mas"] = self.parallax if self.vradial is not None: args["radial_km_per_s"] = self.vradial bestra, bestdec, _ = earth.at(t).observe(PromoEpochStar(**args)).radec() bestra = bestra.radians bestdec = bestdec.radians # Monte Carlo to get an uncertainty. As always, astronomy position # angle convention requires that we treat declination as X and RA as # Y. First, we check sanity and generate randomized parameters: if ( self.pos_u_maj is None and self.promo_u_maj is None and self.u_parallax is None ): if complain: print_( "AstrometryInfo.predict(): no uncertainties " "available; cannot Monte Carlo!", file=sys.stderr, ) return (bestra, bestdec, 0.0, 0.0, 0.0) if self.pos_u_maj is not None: sd, sr, cdr = ellipses.ellbiv(self.pos_u_maj, self.pos_u_min, self.pos_u_pa) decs, ras = ellipses.bivrandom(self.dec, self.ra, sd, sr, cdr, n).T else: ras = np.zeros(n) + self.ra decs = np.zeros(n) + self.dec if self.promo_ra is None: pmras = np.zeros(n) pmdecs = np.zeros(n) elif self.promo_u_maj is not None: sd, sr, cdr = ellipses.ellbiv( self.promo_u_maj, self.promo_u_min, self.promo_u_pa ) pmdecs, pmras = ellipses.bivrandom( self.promo_dec, self.promo_ra, sd, sr, cdr, n ).T else: pmras = np.zeros(n) + self.promo_ra pmdecs = np.zeros(n) + self.promo_dec if self.parallax is None: parallaxes = np.zeros(n) elif self.u_parallax is not None: parallaxes = np.random.normal(self.parallax, self.u_parallax, n) else: parallaxes = np.zeros(n) + self.parallax if self.vradial is None: vradials = np.zeros(n) elif self.u_vradial is not None: vradials = np.random.normal(self.vradial, self.u_vradial, n) else: vradials = np.zeros(n) + self.vradial # Now we compute the positions and summarize as an ellipse: results = np.empty((n, 2)) for i in range(n): args["ra_hours"] = ras[i] * R2H args["dec_degrees"] = decs[i] * R2D args["ra_mas_per_year"] = pmras[i] args["dec_mas_per_year"] = pmdecs[i] args["parallax_mas"] = parallaxes[i] args["radial_km_per_s"] = vradials[i] ara, adec, _ = earth.at(t).observe(PromoEpochStar(**args)).radec() results[i] = adec.radians, ara.radians maj, min, pa = ellipses.bivell(*ellipses.databiv(results)) # All done. return bestra, bestdec, maj, min, pa
[docs] def print_prediction(self, ptup, precision=2): """Print a summary of a predicted position. The argument *ptup* is a tuple returned by :meth:`predict`. It is printed to :data:`sys.stdout` in a reasonable format that uses Unicode characters. """ from . import ellipses bestra, bestdec, maj, min, pa = ptup f = ellipses.sigmascale(1) maj *= R2A min *= R2A pa *= R2D print_("position =", fmtradec(bestra, bestdec, precision=precision)) print_( 'err(1σ) = %.*f" × %.*f" @ %.0f°' % (precision, maj * f, precision, min * f, pa) )
[docs] def fill_from_simbad(self, ident, debug=False): """Fill in astrometric information using the Simbad web service. This uses the CDS Simbad web service to look up astrometric information for the source name *ident* and fills in attributes appropriately. Values from Simbad are not always reliable. Returns *self*. """ info = get_simbad_astrometry_info(ident, debug=debug) posref = "unknown" for k, v in info.items(): if "~" in v: continue # no info if k == "COO(d;A)": self.ra = float(v) * D2R elif k == "COO(d;D)": self.dec = float(v) * D2R elif k == "COO(E)": a = v.split() self.pos_u_maj = float(a[0]) * A2R * 1e-3 # mas -> rad self.pos_u_min = float(a[1]) * A2R * 1e-3 self.pos_u_pa = float(a[2]) * D2R elif k == "COO(B)": posref = v elif k == "PM(A)": self.promo_ra = float(v) # mas/yr elif k == "PM(D)": self.promo_dec = float(v) # mas/yr elif k == "PM(E)": a = v.split() self.promo_u_maj = float(a[0]) # mas/yr self.promo_u_min = float(a[1]) self.promo_u_pa = float(a[2]) * D2R # rad! elif k == "PLX(V)": self.parallax = float(v) # mas elif k == "PLX(E)": self.u_parallax = float(v) # mas elif k == "RV(V)": self.vradial = float(v) # km/s elif k == "RV(E)": self.u_vradial = float(v) # km/s if self.ra is None: raise Exception('no position returned by Simbad for "%s"' % ident) if self.u_parallax == 0: self.u_parallax = None if self.u_vradial == 0: self.u_vradial = None # Get the right epoch of position when possible if posref == "2003yCat.2246....0C": self.pos_epoch = get_2mass_epoch(self.ra, self.dec, debug) elif posref == "2018yCat.1345....0G": self.pos_epoch = 57205.875 # J2015.5 for Gaia DR2 return self # eases chaining
[docs] def fill_from_allwise(self, ident, catalog_ident="II/328/allwise"): """Fill in astrometric information from the AllWISE catalog using Astroquery. This uses the :mod:`astroquery` module to query the AllWISE (2013wise.rept....1C) source catalog through the Vizier (2000A&AS..143...23O) web service. It then fills in the instance with the relevant information. Arguments are: ident The AllWISE catalog identifier of the form ``"J112254.70+255021.9"``. catalog_ident The Vizier designation of the catalog to query. The default is "II/328/allwise", the current version of the AllWISE catalog. Raises :exc:`~pwkit.PKError` if something unexpected happens that doesn't itself result in an exception within :mod:`astroquery`. You should probably prefer :meth:`fill_from_simbad` for objects that are known to the CDS Simbad service, but not all objects in the AllWISE catalog are so known. If you use this function, you should `acknowledge AllWISE <https://wise2.ipac.caltech.edu/docs/release/allwise/>`_ and `Vizier <http://cds.u-strasbg.fr/vizier-org/licences_vizier.html>`_. Returns *self*. """ from astroquery.vizier import Vizier import numpy.ma.core as ma_core # We should match exactly one table and one row within that table, but # for robustness we ignore additional results if they happen to # appear. Strangely, querying for an invalid identifier yields a table # with two rows that are filled with masked out data. table_list = Vizier.query_constraints(catalog=catalog_ident, AllWISE=ident) if not len(table_list): raise PKError( "Vizier query returned no tables (catalog=%r AllWISE=%r)", catalog_ident, ident, ) table = table_list[0] if not len(table): raise PKError( "Vizier query returned empty %s table (catalog=%r AllWISE=%r)", table.meta["name"], catalog_ident, ident, ) row = table[0] if isinstance(row["_RAJ2000"], ma_core.MaskedConstant): raise PKError( "Vizier query returned flagged row in %s table; your AllWISE " "identifier likely does not exist (it should be of the form " '"J112254.70+255021.9"; catalog=%r AllWISE=%r)', table.meta["name"], catalog_ident, ident, ) # OK, we can actually do this. self.ra = row["RA_pm"] * D2R self.dec = row["DE_pm"] * D2R if row["e_RA_pm"] > row["e_DE_pm"]: self.pos_u_maj = row["e_RA_pm"] * A2R self.pos_u_min = row["e_DE_pm"] * A2R self.pos_u_pa = halfpi else: self.pos_u_maj = row["e_DE_pm"] * A2R self.pos_u_min = row["e_RA_pm"] * A2R self.pos_u_pa = 0 self.pos_epoch = 55400.0 # hardcoded in the catalog self.promo_ra = row["pmRA"] self.promo_dec = row["pmDE"] if row["e_pmRA"] > row["e_pmDE"]: self.promo_u_maj = row["e_pmRA"] * 1.0 self.promo_u_min = row["e_pmDE"] * 1.0 self.promo_u_pa = halfpi else: self.promo_u_maj = row["e_pmDE"] * 1.0 self.promo_u_min = row["e_pmRA"] * 1.0 self.promo_u_pa = 0.0 return self # eases chaining
def __unicode__(self): self.verify(complain=False) a = [] a.append("Position: " + fmtradec(self.ra, self.dec)) if self.pos_u_maj is None: a.append("No uncertainty info for position.") else: a.append( 'Pos. uncert: %.3f" × %.3f" @ %.0f°' % (self.pos_u_maj * R2A, self.pos_u_min * R2A, self.pos_u_pa * R2D) ) if self.pos_epoch is None: a.append("No epoch of position.") else: a.append("Epoch of position: MJD %.3f" % self.pos_epoch) if self.promo_ra is None: a.append("No proper motion.") else: a.append( "Proper motion: %.3f, %.3f mas/yr" % (self.promo_ra, self.promo_dec) ) if self.promo_u_maj is None: a.append("No uncertainty info for proper motion.") else: a.append( "Promo. uncert: %.1f × %.1f mas/yr @ %.0f°" % (self.promo_u_maj, self.promo_u_min, self.promo_u_pa * R2D) ) if self.parallax is None: a.append("No parallax information.") elif self.u_parallax is not None: a.append("Parallax: %.1f ± %.1f mas" % (self.parallax, self.u_parallax)) else: a.append("Parallax: %.1f mas, unknown uncert." % self.parallax) if self.vradial is None: a.append("No radial velocity information.") elif self.u_vradial is not None: a.append( "Radial velocity: %.2f ± %.2f km/s" % (self.vradial, self.u_vradial) ) else: a.append("Radial velocity: %.1f km/s, unknown uncert." % self.vradial) return "\n".join(a) __str__ = unicode_to_str
# Other astronomical calculations
[docs] def app2abs(app_mag, dist_pc): """Convert an apparent magnitude to an absolute magnitude, given a source's (luminosity) distance in parsecs. Arguments: app_mag Apparent magnitude. dist_pc Distance, in parsecs. Returns the absolute magnitude. The arguments may be vectors. """ return app_mag - 5 * (np.log10(dist_pc) - 1)
[docs] def abs2app(abs_mag, dist_pc): """Convert an absolute magnitude to an apparent magnitude, given a source's (luminosity) distance in parsecs. Arguments: abs_mag Absolute magnitude. dist_pc Distance, in parsecs. Returns the apparent magnitude. The arguments may be vectors. """ return abs_mag + 5 * (np.log10(dist_pc) - 1)