Source code for pwkit.bblocks

# -*- mode: python; coding: utf-8 -*-
# Copyright 2014 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.

"""pwkit.bblocks - Bayesian Blocks analysis, with a few extensions.

Bayesian Blocks analysis for the "time tagged" case described by Scargle+
2013. Inspired by the bayesian_blocks implementation by Jake Vanderplas in the
AstroML package, but that turned out to have some limitations.

We have iterative determination of the best number of blocks (using an ad-hoc
routine described in Scargle+ 2013) and bootstrap-based determination of
uncertainties on the block heights (ditto).

Functions are:

:func:`bin_bblock`
  Bayesian Blocks analysis with counts and bins.
:func:`tt_bblock`
  BB analysis of time-tagged events.
:func:`bs_tt_bblock`
  Like :func:`tt_bblock` with bootstrap-based uncertainty assessment. NOTE:
  the uncertainties are not very reliable!

"""
from __future__ import absolute_import, division, print_function, unicode_literals

__all__ = str ('nlogn bin_bblock tt_bblock bs_tt_bblock').split ()


from six.moves import range
import numpy as np

from . import Holder


def nlogn (n, dt):
    # I really feel like there must be a cleverer way to do this
    # scalar-or-vector possible-bad-value masking.

    if np.isscalar (n):
        if n == 0:
            return 0.
        return n * (np.log (n) - np.log (dt))

    n = np.asarray (n)
    mask = (n == 0)
    r = n * (np.log (np.where (mask, 1, n)) - np.log (dt))
    return np.where (mask, 0, r)


[docs]def bin_bblock (widths, counts, p0=0.05): """Fundamental Bayesian Blocks algorithm. Arguments: widths - Array of consecutive cell widths. counts - Array of numbers of counts in each cell. p0=0.05 - Probability of preferring solutions with additional bins. Returns a Holder with: blockstarts - Start times of output blocks. counts - Number of events in each output block. finalp0 - Final value of p0, after iteration to minimize `nblocks`. nblocks - Number of output blocks. ncells - Number of input cells/bins. origp0 - Original value of p0. rates - Event rate associated with each block. widths - Width of each output block. """ widths = np.asarray (widths) counts = np.asarray (counts) ncells = widths.size origp0 = p0 if np.any (widths <= 0): raise ValueError ('bin widths must be positive') if widths.size != counts.size: raise ValueError ('widths and counts must have same size') if p0 < 0 or p0 >= 1.: raise ValueError ('p0 must lie within [0, 1)') vedges = np.cumsum (np.concatenate (([0], widths))) # size: ncells + 1 block_remainders = vedges[-1] - vedges # size: nedges = ncells + 1 ccounts = np.cumsum (np.concatenate (([0], counts))) count_remainders = ccounts[-1] - ccounts prev_blockstarts = None best = np.zeros (ncells, dtype=np.float) last = np.zeros (ncells, dtype=np.int) for _ in range (10): # Pluggable num-change-points prior-weight expression: ncp_prior = 4 - np.log (p0 / (0.0136 * ncells**0.478)) for r in range (ncells): tk = block_remainders[:r+1] - block_remainders[r+1] nk = count_remainders[:r+1] - count_remainders[r+1] # Pluggable fitness expression: fit_vec = nlogn (nk, tk) # This incrementally penalizes partitions with more blocks: tmp = fit_vec - ncp_prior tmp[1:] += best[:r] imax = np.argmax (tmp) last[r] = imax best[r] = tmp[imax] # different semantics than Scargle impl: our blockstarts is similar to # their changepoints, but we always finish with blockstarts[0] = 0. work = np.zeros (ncells, dtype=int) workidx = 0 ind = last[-1] while True: work[workidx] = ind workidx += 1 if ind == 0: break ind = last[ind - 1] blockstarts = work[:workidx][::-1] if prev_blockstarts is not None: if (blockstarts.size == prev_blockstarts.size and (blockstarts == prev_blockstarts).all ()): break # converged if blockstarts.size == 1: break # can't shrink any farther # Recommended ad-hoc iteration to favor fewer blocks above and beyond # the value of p0: p0 = 1. - (1. - p0)**(1. / (blockstarts.size - 1)) prev_blockstarts = blockstarts assert blockstarts[0] == 0 nblocks = blockstarts.size info = Holder () info.ncells = ncells info.nblocks = nblocks info.origp0 = origp0 info.finalp0 = p0 info.blockstarts = blockstarts info.counts = np.empty (nblocks, dtype=np.int) info.widths = np.empty (nblocks) for iblk in range (nblocks): cellstart = blockstarts[iblk] if iblk == nblocks - 1: cellend = ncells - 1 else: cellend = blockstarts[iblk+1] - 1 info.widths[iblk] = widths[cellstart:cellend+1].sum () info.counts[iblk] = counts[cellstart:cellend+1].sum () info.rates = info.counts / info.widths return info
[docs]def tt_bblock (tstarts, tstops, times, p0=0.05, intersect_with_bins=False): """Bayesian Blocks for time-tagged events. Arguments: *tstarts* Array of input bin start times. *tstops* Array of input bin stop times. *times* Array of event arrival times. *p0* = 0.05 Probability of preferring solutions with additional bins. *intersect_with_bins* = False If true, intersect bblock bins with input bins; can result in more bins than bblocks wants; they will have the same rate values. Returns a Holder with: *counts* Number of events in each output block. *finalp0* Final value of p0, after iteration to minimize `nblocks`. *ledges* Times of left edges of output blocks. *midpoints* Times of midpoints of output blocks. *nblocks* Number of output blocks. *ncells* Number of input cells/bins. *origp0* Original value of p0. *rates* Event rate associated with each block. *redges* Times of right edges of output blocks. *widths* Width of each output block. Bin start/stop times are best derived from a 1D Voronoi tesselation of the event arrival times, with some kind of global observation start/stop time setting the extreme edges. Or they can be set from "good time intervals" if observations were toggled on or off as in an X-ray telescope. If *intersect_with_bins* is True, the true Bayesian Blocks bins (BBBs) are intersected with the "good time intervals" (GTIs) defined by the *tstarts* and *tstops* variables. One GTI may contain multiple BBBs if the event rate appears to change within the GTI, and one BBB may span multiple GTIs if the event date does *not* appear to change between the GTIs. The intersection will ensure that no BBB intervals cross the edge of a GTI. If this would happen, the BBB is split into multiple, partially redundant records. Each of these records will have the **same** value for the *counts*, *rates*, and *widths* values. However, the *ledges*, *redges*, and *midpoints* values will be recalculated. Note that in this mode, it is not necessarily true that ``widths = ledges - redges`` as is usually the case. When this flag is true, keep in mind that multiple bins are therefore *not* necessarily independent statistical samples. """ tstarts = np.asarray (tstarts) tstops = np.asarray (tstops) times = np.asarray (times) if tstarts.size != tstops.size: raise ValueError ('must have same number of starts and stops') ngti = tstarts.size if ngti < 1: raise ValueError ('must have at least one goodtime interval') if np.any ((tstarts[1:] - tstarts[:-1]) <= 0): raise ValueError ('tstarts must be ordered and distinct') if np.any ((tstops[1:] - tstops[:-1]) <= 0): raise ValueError ('tstops must be ordered and distinct') if np.any (tstarts >= tstops): raise ValueError ('tstarts must come before tstops') if np.any ((times[1:] - times[:-1]) < 0): raise ValueError ('times must be ordered') if times.min () < tstarts[0]: raise ValueError ('no times may be smaller than first tstart') if times.max () > tstops[-1]: raise ValueError ('no times may be larger than last tstop') for i in range (1, ngti): if np.where ((times > tstops[i-1]) & (times < tstarts[i]))[0].size: raise ValueError ('no times may fall in goodtime gap #%d' % i) if p0 < 0 or p0 >= 1.: raise ValueError ('p0 must lie within [0, 1)') utimes, uidxs = np.unique (times, return_index=True) nunique = utimes.size counts = np.empty (nunique) counts[:-1] = uidxs[1:] - uidxs[:-1] counts[-1] = times.size - uidxs[-1] assert counts.sum () == times.size # we grow these arrays with concats, which will perform badly with lots of # GTIs. Not expected to be a big deal. widths = np.empty (0) ledges = np.empty (0) redges = np.empty (0) for i in range (ngti): tstart, tstop = tstarts[i], tstops[i] w = np.where ((utimes >= tstart) & (utimes <= tstop))[0] if not w.size: # No events during this goodtime! We have to insert a zero-count # event block. This may break assumptions within bin_bblock()? # j = idx of first event after this GTI wafter = np.where (utimes > tstop)[0] if wafter.size: j = wafter[0] else: j = utimes.size assert j == 0 or np.where (utimes < tstart)[0][-1] == j - 1 counts = np.concatenate ((counts[:j], [0], counts[j:])) widths = np.concatenate ((widths, [tstop - tstart])) ledges = np.concatenate ((ledges, [tstart])) redges = np.concatenate ((redges, [tstop])) else: gtutimes = utimes[w] midpoints = 0.5 * (gtutimes[1:] + gtutimes[:-1]) # size: n - 1 gtedges = np.concatenate (([tstart], midpoints, [tstop])) # size: n + 1 gtwidths = gtedges[1:] - gtedges[:-1] # size: n assert gtwidths.sum () == tstop - tstart widths = np.concatenate ((widths, gtwidths)) ledges = np.concatenate ((ledges, gtedges[:-1])) redges = np.concatenate ((redges, gtedges[1:])) assert counts.size == widths.size info = bin_bblock (widths, counts, p0=p0) info.ledges = ledges[info.blockstarts] # The right edge of the i'th block is the right edge of its rightmost # bin, which is the bin before the leftmost bin of the (i+1)'th block: info.redges = np.concatenate ((redges[info.blockstarts[1:] - 1], [redges[-1]])) info.midpoints = 0.5 * (info.ledges + info.redges) del info.blockstarts if intersect_with_bins: # OK, we now need to intersect the bblock bins with the input bins. # This can fracture one bblock bin into multiple ones but shouldn't # make any of them disappear, since they're definitionally constrained # to contain events. # # First: sorted list of all timestamps at which *something* changes: # either a bblock edge, or a input bin edge. We drop the last entry, # giving is a list of left edges of bins in which everything is the # same. all_times = set(tstarts) all_times.update(tstops) all_times.update(info.ledges) all_times.update(info.redges) all_times = np.array(sorted(all_times))[:-1] # Now, construct a lookup table of which bblock number each of these # bins corresponds to. More than one bin may have the same bblock # number, if a GTI change slices a single bblock into more than one # piece. We do this in a somewhat non-obvious way since we know that # the bblocks completely cover the overall GTI span in order. bblock_ids = np.zeros(all_times.size) for i in range(1, info.nblocks): bblock_ids[all_times >= info.ledges[i]] = i # Now, a lookup table of which bins are within a good GTI span. Again, # we know that all bins are either entirely in a good GTI or entirely # outside, so the logic is simplified but not necessarily obvious. good_timeslot = np.zeros(all_times.size, dtype=np.bool) for t0, t1 in zip(tstarts, tstops): ok = (all_times >= t0) & (all_times < t1) good_timeslot[ok] = True # Finally, look for contiguous spans that are in a good timeslot *and* # have the same underlying bblock number. These are our intersected # blocks. old_bblock_ids = [] ledges = [] redges = [] cur_bblock_id = -1 for i in range(all_times.size): if bblock_ids[i] != cur_bblock_id or not good_timeslot[i]: if cur_bblock_id >= 0: # Ending a previous span. redges.append(all_times[i]) cur_bblock_id = -1 if good_timeslot[i]: # Starting a new span. ledges.append(all_times[i]) old_bblock_ids.append(bblock_ids[i]) cur_bblock_id = bblock_ids[i] if cur_bblock_id >= 0: # End the last span. redges.append(tstops[-1]) # Finally, rewrite all of the data as planned. old_bblock_ids = np.array(old_bblock_ids, dtype=np.int) info.counts = info.counts[old_bblock_ids] info.rates = info.rates[old_bblock_ids] info.widths = info.widths[old_bblock_ids] info.ledges = np.array(ledges) info.redges = np.array(redges) info.midpoints = 0.5 * (info.ledges + info.redges) info.nblocks = info.ledges.size return info
[docs]def bs_tt_bblock (times, tstarts, tstops, p0=0.05, nbootstrap=512): """Bayesian Blocks for time-tagged events with bootstrapping uncertainty assessment. THE UNCERTAINTIES ARE NOT VERY GOOD! Arguments: tstarts - Array of input bin start times. tstops - Array of input bin stop times. times - Array of event arrival times. p0=0.05 - Probability of preferring solutions with additional bins. nbootstrap=512 - Number of bootstrap runs to perform. Returns a Holder with: blockstarts - Start times of output blocks. bsrates - Mean event rate in each bin from bootstrap analysis. bsrstds - ~Uncertainty: stddev of event rate in each bin from bootstrap analysis. counts - Number of events in each output block. finalp0 - Final value of p0, after iteration to minimize `nblocks`. ledges - Times of left edges of output blocks. midpoints - Times of midpoints of output blocks. nblocks - Number of output blocks. ncells - Number of input cells/bins. origp0 - Original value of p0. rates - Event rate associated with each block. redges - Times of right edges of output blocks. widths - Width of each output block. """ times = np.asarray (times) tstarts = np.asarray (tstarts) tstops = np.asarray (tstops) nevents = times.size if nevents < 1: raise ValueError ('must be given at least 1 event') info = tt_bblock (tstarts, tstops, times, p0) # Now bootstrap resample to assess uncertainties on the bin heights. This # is the approach recommended by Scargle+. bsrsums = np.zeros (info.nblocks) bsrsumsqs = np.zeros (info.nblocks) for _ in range (nbootstrap): bstimes = times[np.random.randint (0, times.size, times.size)] bstimes.sort () bsinfo = tt_bblock (tstarts, tstops, bstimes, p0) blocknums = np.minimum (np.searchsorted (bsinfo.redges, info.midpoints), bsinfo.nblocks - 1) samprates = bsinfo.rates[blocknums] bsrsums += samprates bsrsumsqs += samprates**2 bsrmeans = bsrsums / nbootstrap mask = bsrsumsqs / nbootstrap <= bsrmeans**2 bsrstds = np.sqrt (np.where (mask, 0, bsrsumsqs / nbootstrap - bsrmeans**2)) info.bsrates = bsrmeans info.bsrstds = bsrstds return info