Source code for pwkit.pdm

# -*- mode: python; coding: utf-8 -*-
# Copyright 2014 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.

"""pwkit.pdm - period-finding with phase dispersion minimization

As defined in Stellingwerf (1978ApJ...224..953S). See the update in
Schwarzenberg-Czerny (1997ApJ...489..941S), however, which corrects the
significance test formally; Linnell Nemec & Nemec (1985AJ.....90.2317L)
provide a Monte Carlo approach. Also, Stellingwerf has developed "PDM2" which
attempts to improve a few aspects; see

- `Stellingwerf’s page <http://www.stellingwerf.com/rfs-bin/index.cgi?action=PageView&id=29>`_
- `The Wikipedia article <http://en.wikipedia.org/wiki/Phase_dispersion_minimization>`_

"""
from __future__ import absolute_import, division, print_function, unicode_literals

# TODO: automatic rule for nbin?
# TODO: ditto for nr periods to try?
# TODO: confidence in peak value or something

__all__ = str("PDMResult pdm").split()

import numpy as np
from collections import namedtuple

from .numutil import weighted_variance
from .parallel import make_parallel_helper

PDMResult = namedtuple(
    "PDMResult", "thetas imin pmin mc_tmins " "mc_pvalue mc_pmins mc_puncert".split()
)


def one_theta(t, x, wt, period, nbin, nshift, v_all):
    phase0 = t / period
    numer = denom = 0.0

    for i in range(nshift):
        phase = (phase0 + float(i) / (nshift * nbin)) % 1.0
        binloc = np.floor(phase * nbin).astype(int)

        for j in range(nbin):
            wh = np.where(binloc == j)[0]
            if wh.size < 3:
                continue

            numer += weighted_variance(x[wh], wt[wh]) * (wh.size - 1)
            denom += wh.size - 1

    return numer / (denom * v_all)


def _map_one_theta(args):
    """Needed for the parallel map() call in pdm() due to the gross way in which
    Python multiprocessing works.

    """
    return one_theta(*args)


[docs] def pdm( t, x, u, periods, nbin, nshift=8, nsmc=256, numc=256, weights=False, parallel=True ): """Perform phase dispersion minimization. t : 1D array time coordinate x : 1D array, same size as *t* observed value u : 1D array, same size as *t* uncertainty on observed value; same units as `x` periods : 1D array set of candidate periods to sample; same units as `t` nbin : int number of phase bins to construct nshift : int=8 number of shifted binnings to sample to combact statistical flukes nsmc : int=256 number of Monte Carlo shufflings to compute, to evaluate the significance of the minimal theta value. numc : int=256 number of Monte Carlo added-noise datasets to compute, to evaluate the uncertainty in the location of the minimal theta value. weights : bool=False if True, 'u' is actually weights, not uncertainties. Usually weights = u**-2. parallel : default True Controls parallelization of the algorithm. Default uses all available cores. See `pwkit.parallel.make_parallel_helper`. Returns named tuple of: thetas : 1D array values of theta statistic, same size as `periods` imin index of smallest (best) value in `thetas` pmin the `period` value with the smallest (best) `theta` mc_tmins 1D array of size `nsmc` with Monte Carlo samplings of minimal theta values for shufflings of the data; assesses significance of the peak mc_pvalue probability (between 0 and 1) of obtaining the best theta value in a randomly-shuffled dataset mc_pmins 1D array of size `numc` with Monte Carlo samplings of best period values for noise-added data; assesses uncertainty of `pmin` mc_puncert standard deviation of `mc_pmins`; approximate uncertainty on `pmin`. We don't do anything clever, so runtime scales at least as ``t.size * periods.size * nbin * nshift * (nsmc + numc + 1)``. """ t = np.asarray(t, dtype=float) x = np.asarray(x, dtype=float) u = np.asarray(u, dtype=float) periods = np.asarray(periods, dtype=float) t, x, u, periods = np.atleast_1d(t, x, u, periods) nbin = int(nbin) nshift = int(nshift) nsmc = int(nsmc) numc = int(numc) phelp = make_parallel_helper(parallel) if t.ndim != 1: raise ValueError("`t` must be <= 1D") if x.shape != t.shape: raise ValueError("`t` and `x` arguments must be the same size") if u.shape != t.shape: raise ValueError("`t` and `u` arguments must be the same size") if periods.ndim != 1: raise ValueError("`periods` must be <= 1D") if nbin < 2: raise ValueError("`nbin` must be at least 2") if nshift < 1: raise ValueError("`nshift` must be at least 1") if nsmc < 0: raise ValueError("`nsmc` must be nonnegative") if numc < 0: raise ValueError("`numc` must be nonnegative") # We can finally get started! if weights: wt = u u = wt**-0.5 else: wt = u**-2 v_all = weighted_variance(x, wt) with phelp.get_map() as map: get_thetas = lambda args: np.asarray(map(_map_one_theta, args)) thetas = get_thetas((t, x, wt, p, nbin, nshift, v_all) for p in periods) imin = thetas.argmin() pmin = periods[imin] # Now do the Monte Carlo jacknifing so that the caller can have some idea # as to the significance of the minimal value of `thetas`. mc_thetas = np.empty(periods.shape) mc_tmins = np.empty(nsmc) for i in range(nsmc): shuf = np.random.permutation(x.size) # Note that what we do here is very MapReduce-y. I'm not aware of # an easy way to implement this computation in that model, though. mc_thetas = get_thetas( (t, x[shuf], wt[shuf], p, nbin, nshift, v_all) for p in periods ) mc_tmins[i] = mc_thetas.min() mc_tmins.sort() mc_pvalue = mc_tmins.searchsorted(thetas[imin]) / nsmc # Now add noise to assess the uncertainty of the period. mc_pmins = np.empty(numc) for i in range(numc): noised = np.random.normal(x, u) mc_thetas = get_thetas( (t, noised, wt, p, nbin, nshift, v_all) for p in periods ) mc_pmins[i] = periods[mc_thetas.argmin()] mc_pmins.sort() mc_puncert = mc_pmins.std() # All done. return PDMResult( thetas=thetas, imin=imin, pmin=pmin, mc_tmins=mc_tmins, mc_pvalue=mc_pvalue, mc_pmins=mc_pmins, mc_puncert=mc_puncert, )