Source code for pwkit.pdm

# -*- mode: python; coding: utf-8 -*-
# Copyright 2014 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.

"""pwkit.pdm - period-finding with phase dispersion minimization

As defined in Stellingwerf (1978ApJ...224..953S). See the update in
Schwarzenberg-Czerny (1997ApJ...489..941S), however, which corrects the
significance test formally; Linnell Nemec & Nemec (1985AJ.....90.2317L)
provide a Monte Carlo approach. Also, Stellingwerf has developed "PDM2" which
attempts to improve a few aspects; see

- `Stellingwerf’s page <http://www.stellingwerf.com/rfs-bin/index.cgi?action=PageView&id=29>`_
- `The Wikipedia article <http://en.wikipedia.org/wiki/Phase_dispersion_minimization>`_

"""
from __future__ import absolute_import, division, print_function, unicode_literals

# TODO: automatic rule for nbin?
# TODO: ditto for nr periods to try?
# TODO: confidence in peak value or something

__all__ = str ('PDMResult pdm').split ()

import numpy as np
from collections import namedtuple
from six.moves import range

from .numutil import weighted_variance
from .parallel import make_parallel_helper

PDMResult = namedtuple ('PDMResult', 'thetas imin pmin mc_tmins '
                        'mc_pvalue mc_pmins mc_puncert'.split ())


def one_theta (t, x, wt, period, nbin, nshift, v_all):
    phase0 = t / period
    numer = denom = 0.

    for i in range (nshift):
        phase = (phase0 + float (i) / (nshift * nbin)) % 1.
        binloc = np.floor (phase * nbin).astype (np.int)

        for j in range (nbin):
            wh = np.where (binloc == j)[0]
            if wh.size < 3:
                continue

            numer += weighted_variance (x[wh], wt[wh]) * (wh.size - 1)
            denom += wh.size - 1

    return numer / (denom * v_all)


def _map_one_theta (args):
    """Needed for the parallel map() call in pdm() due to the gross way in which
    Python multiprocessing works.

    """
    return one_theta (*args)


[docs]def pdm (t, x, u, periods, nbin, nshift=8, nsmc=256, numc=256, weights=False, parallel=True): """Perform phase dispersion minimization. t : 1D array time coordinate x : 1D array, same size as *t* observed value u : 1D array, same size as *t* uncertainty on observed value; same units as `x` periods : 1D array set of candidate periods to sample; same units as `t` nbin : int number of phase bins to construct nshift : int=8 number of shifted binnings to sample to combact statistical flukes nsmc : int=256 number of Monte Carlo shufflings to compute, to evaluate the significance of the minimal theta value. numc : int=256 number of Monte Carlo added-noise datasets to compute, to evaluate the uncertainty in the location of the minimal theta value. weights : bool=False if True, 'u' is actually weights, not uncertainties. Usually weights = u**-2. parallel : default True Controls parallelization of the algorithm. Default uses all available cores. See `pwkit.parallel.make_parallel_helper`. Returns named tuple of: thetas : 1D array values of theta statistic, same size as `periods` imin index of smallest (best) value in `thetas` pmin the `period` value with the smallest (best) `theta` mc_tmins 1D array of size `nsmc` with Monte Carlo samplings of minimal theta values for shufflings of the data; assesses significance of the peak mc_pvalue probability (between 0 and 1) of obtaining the best theta value in a randomly-shuffled dataset mc_pmins 1D array of size `numc` with Monte Carlo samplings of best period values for noise-added data; assesses uncertainty of `pmin` mc_puncert standard deviation of `mc_pmins`; approximate uncertainty on `pmin`. We don't do anything clever, so runtime scales at least as ``t.size * periods.size * nbin * nshift * (nsmc + numc + 1)``. """ t = np.asfarray (t) x = np.asfarray (x) u = np.asfarray (u) periods = np.asfarray (periods) t, x, u, periods = np.atleast_1d (t, x, u, periods) nbin = int (nbin) nshift = int (nshift) nsmc = int (nsmc) numc = int (numc) phelp = make_parallel_helper (parallel) if t.ndim != 1: raise ValueError ('`t` must be <= 1D') if x.shape != t.shape: raise ValueError ('`t` and `x` arguments must be the same size') if u.shape != t.shape: raise ValueError ('`t` and `u` arguments must be the same size') if periods.ndim != 1: raise ValueError ('`periods` must be <= 1D') if nbin < 2: raise ValueError ('`nbin` must be at least 2') if nshift < 1: raise ValueError ('`nshift` must be at least 1') if nsmc < 0: raise ValueError ('`nsmc` must be nonnegative') if numc < 0: raise ValueError ('`numc` must be nonnegative') # We can finally get started! if weights: wt = u u = wt ** -0.5 else: wt = u ** -2 v_all = weighted_variance (x, wt) with phelp.get_map () as map: get_thetas = lambda args: np.asarray (map (_map_one_theta, args)) thetas = get_thetas ((t, x, wt, p, nbin, nshift, v_all) for p in periods) imin = thetas.argmin () pmin = periods[imin] # Now do the Monte Carlo jacknifing so that the caller can have some idea # as to the significance of the minimal value of `thetas`. mc_thetas = np.empty (periods.shape) mc_tmins = np.empty (nsmc) for i in range (nsmc): shuf = np.random.permutation (x.size) # Note that what we do here is very MapReduce-y. I'm not aware of # an easy way to implement this computation in that model, though. mc_thetas = get_thetas ((t, x[shuf], wt[shuf], p, nbin, nshift, v_all) for p in periods) mc_tmins[i] = mc_thetas.min () mc_tmins.sort () mc_pvalue = mc_tmins.searchsorted (thetas[imin]) / nsmc # Now add noise to assess the uncertainty of the period. mc_pmins = np.empty (numc) for i in range (numc): noised = np.random.normal (x, u) mc_thetas = get_thetas ((t, noised, wt, p, nbin, nshift, v_all) for p in periods) mc_pmins[i] = periods[mc_thetas.argmin ()] mc_pmins.sort () mc_puncert = mc_pmins.std () # All done. return PDMResult (thetas=thetas, imin=imin, pmin=pmin, mc_tmins=mc_tmins, mc_pvalue=mc_pvalue, mc_pmins=mc_pmins, mc_puncert=mc_puncert)