# -*- mode: python; coding: utf-8 -*-
# Copyright 2014 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.
"""pwkit.bblocks - Bayesian Blocks analysis, with a few extensions.
Bayesian Blocks analysis for the "time tagged" case described by Scargle+
2013. Inspired by the bayesian_blocks implementation by Jake Vanderplas in the
AstroML package, but that turned out to have some limitations.
We have iterative determination of the best number of blocks (using an ad-hoc
routine described in Scargle+ 2013) and bootstrap-based determination of
uncertainties on the block heights (ditto).
Functions are:
:func:`bin_bblock`
Bayesian Blocks analysis with counts and bins.
:func:`tt_bblock`
BB analysis of time-tagged events.
:func:`bs_tt_bblock`
Like :func:`tt_bblock` with bootstrap-based uncertainty assessment. NOTE:
the uncertainties are not very reliable!
"""
from __future__ import absolute_import, division, print_function, unicode_literals
__all__ = str("nlogn bin_bblock tt_bblock bs_tt_bblock").split()
import numpy as np
from . import Holder
def nlogn(n, dt):
# I really feel like there must be a cleverer way to do this
# scalar-or-vector possible-bad-value masking.
if np.isscalar(n):
if n == 0:
return 0.0
return n * (np.log(n) - np.log(dt))
n = np.asarray(n)
mask = n == 0
r = n * (np.log(np.where(mask, 1, n)) - np.log(dt))
return np.where(mask, 0, r)
[docs]
def bin_bblock(widths, counts, p0=0.05):
"""Fundamental Bayesian Blocks algorithm. Arguments:
widths - Array of consecutive cell widths.
counts - Array of numbers of counts in each cell.
p0=0.05 - Probability of preferring solutions with additional bins.
Returns a Holder with:
blockstarts - Start times of output blocks.
counts - Number of events in each output block.
finalp0 - Final value of p0, after iteration to minimize `nblocks`.
nblocks - Number of output blocks.
ncells - Number of input cells/bins.
origp0 - Original value of p0.
rates - Event rate associated with each block.
widths - Width of each output block.
"""
widths = np.asarray(widths)
counts = np.asarray(counts)
ncells = widths.size
origp0 = p0
if np.any(widths <= 0):
raise ValueError("bin widths must be positive")
if widths.size != counts.size:
raise ValueError("widths and counts must have same size")
if p0 < 0 or p0 >= 1.0:
raise ValueError("p0 must lie within [0, 1)")
vedges = np.cumsum(np.concatenate(([0], widths))) # size: ncells + 1
block_remainders = vedges[-1] - vedges # size: nedges = ncells + 1
ccounts = np.cumsum(np.concatenate(([0], counts)))
count_remainders = ccounts[-1] - ccounts
prev_blockstarts = None
best = np.zeros(ncells, dtype=float)
last = np.zeros(ncells, dtype=int)
for _ in range(10):
# Pluggable num-change-points prior-weight expression:
ncp_prior = 4 - np.log(p0 / (0.0136 * ncells**0.478))
for r in range(ncells):
tk = block_remainders[: r + 1] - block_remainders[r + 1]
nk = count_remainders[: r + 1] - count_remainders[r + 1]
# Pluggable fitness expression:
fit_vec = nlogn(nk, tk)
# This incrementally penalizes partitions with more blocks:
tmp = fit_vec - ncp_prior
tmp[1:] += best[:r]
imax = np.argmax(tmp)
last[r] = imax
best[r] = tmp[imax]
# different semantics than Scargle impl: our blockstarts is similar to
# their changepoints, but we always finish with blockstarts[0] = 0.
work = np.zeros(ncells, dtype=int)
workidx = 0
ind = last[-1]
while True:
work[workidx] = ind
workidx += 1
if ind == 0:
break
ind = last[ind - 1]
blockstarts = work[:workidx][::-1]
if prev_blockstarts is not None:
if (
blockstarts.size == prev_blockstarts.size
and (blockstarts == prev_blockstarts).all()
):
break # converged
if blockstarts.size == 1:
break # can't shrink any farther
# Recommended ad-hoc iteration to favor fewer blocks above and beyond
# the value of p0:
p0 = 1.0 - (1.0 - p0) ** (1.0 / (blockstarts.size - 1))
prev_blockstarts = blockstarts
assert blockstarts[0] == 0
nblocks = blockstarts.size
info = Holder()
info.ncells = ncells
info.nblocks = nblocks
info.origp0 = origp0
info.finalp0 = p0
info.blockstarts = blockstarts
info.counts = np.empty(nblocks, dtype=int)
info.widths = np.empty(nblocks)
for iblk in range(nblocks):
cellstart = blockstarts[iblk]
if iblk == nblocks - 1:
cellend = ncells - 1
else:
cellend = blockstarts[iblk + 1] - 1
info.widths[iblk] = widths[cellstart : cellend + 1].sum()
info.counts[iblk] = counts[cellstart : cellend + 1].sum()
info.rates = info.counts / info.widths
return info
[docs]
def tt_bblock(tstarts, tstops, times, p0=0.05, intersect_with_bins=False):
"""Bayesian Blocks for time-tagged events. Arguments:
*tstarts*
Array of input bin start times.
*tstops*
Array of input bin stop times.
*times*
Array of event arrival times.
*p0* = 0.05
Probability of preferring solutions with additional bins.
*intersect_with_bins* = False
If true, intersect bblock bins with input bins; can result in more bins
than bblocks wants; they will have the same rate values.
Returns a Holder with:
*counts*
Number of events in each output block.
*finalp0*
Final value of p0, after iteration to minimize `nblocks`.
*ledges*
Times of left edges of output blocks.
*midpoints*
Times of midpoints of output blocks.
*nblocks*
Number of output blocks.
*ncells*
Number of input cells/bins.
*origp0*
Original value of p0.
*rates*
Event rate associated with each block.
*redges*
Times of right edges of output blocks.
*widths*
Width of each output block.
Bin start/stop times are best derived from a 1D Voronoi tesselation of the
event arrival times, with some kind of global observation start/stop time
setting the extreme edges. Or they can be set from "good time intervals"
if observations were toggled on or off as in an X-ray telescope.
If *intersect_with_bins* is True, the true Bayesian Blocks bins (BBBs) are
intersected with the "good time intervals" (GTIs) defined by the *tstarts*
and *tstops* variables. One GTI may contain multiple BBBs if the event
rate appears to change within the GTI, and one BBB may span multiple GTIs
if the event date does *not* appear to change between the GTIs. The
intersection will ensure that no BBB intervals cross the edge of a GTI. If
this would happen, the BBB is split into multiple, partially redundant
records. Each of these records will have the **same** value for the
*counts*, *rates*, and *widths* values. However, the *ledges*, *redges*,
and *midpoints* values will be recalculated. Note that in this mode, it is
not necessarily true that ``widths = ledges - redges`` as is usually the
case. When this flag is true, keep in mind that multiple bins are
therefore *not* necessarily independent statistical samples.
"""
tstarts = np.asarray(tstarts)
tstops = np.asarray(tstops)
times = np.asarray(times)
if tstarts.size != tstops.size:
raise ValueError("must have same number of starts and stops")
ngti = tstarts.size
if ngti < 1:
raise ValueError("must have at least one goodtime interval")
if np.any((tstarts[1:] - tstarts[:-1]) <= 0):
raise ValueError("tstarts must be ordered and distinct")
if np.any((tstops[1:] - tstops[:-1]) <= 0):
raise ValueError("tstops must be ordered and distinct")
if np.any(tstarts >= tstops):
raise ValueError("tstarts must come before tstops")
if np.any((times[1:] - times[:-1]) < 0):
raise ValueError("times must be ordered")
if times.min() < tstarts[0]:
raise ValueError("no times may be smaller than first tstart")
if times.max() > tstops[-1]:
raise ValueError("no times may be larger than last tstop")
for i in range(1, ngti):
if np.where((times > tstops[i - 1]) & (times < tstarts[i]))[0].size:
raise ValueError("no times may fall in goodtime gap #%d" % i)
if p0 < 0 or p0 >= 1.0:
raise ValueError("p0 must lie within [0, 1)")
utimes, uidxs = np.unique(times, return_index=True)
nunique = utimes.size
counts = np.empty(nunique)
counts[:-1] = uidxs[1:] - uidxs[:-1]
counts[-1] = times.size - uidxs[-1]
assert counts.sum() == times.size
# we grow these arrays with concats, which will perform badly with lots of
# GTIs. Not expected to be a big deal.
widths = np.empty(0)
ledges = np.empty(0)
redges = np.empty(0)
for i in range(ngti):
tstart, tstop = tstarts[i], tstops[i]
w = np.where((utimes >= tstart) & (utimes <= tstop))[0]
if not w.size:
# No events during this goodtime! We have to insert a zero-count
# event block. This may break assumptions within bin_bblock()?
# j = idx of first event after this GTI
wafter = np.where(utimes > tstop)[0]
if wafter.size:
j = wafter[0]
else:
j = utimes.size
assert j == 0 or np.where(utimes < tstart)[0][-1] == j - 1
counts = np.concatenate((counts[:j], [0], counts[j:]))
widths = np.concatenate((widths, [tstop - tstart]))
ledges = np.concatenate((ledges, [tstart]))
redges = np.concatenate((redges, [tstop]))
else:
gtutimes = utimes[w]
midpoints = 0.5 * (gtutimes[1:] + gtutimes[:-1]) # size: n - 1
gtedges = np.concatenate(([tstart], midpoints, [tstop])) # size: n + 1
gtwidths = gtedges[1:] - gtedges[:-1] # size: n
assert gtwidths.sum() == tstop - tstart
widths = np.concatenate((widths, gtwidths))
ledges = np.concatenate((ledges, gtedges[:-1]))
redges = np.concatenate((redges, gtedges[1:]))
assert counts.size == widths.size
info = bin_bblock(widths, counts, p0=p0)
info.ledges = ledges[info.blockstarts]
# The right edge of the i'th block is the right edge of its rightmost
# bin, which is the bin before the leftmost bin of the (i+1)'th block:
info.redges = np.concatenate((redges[info.blockstarts[1:] - 1], [redges[-1]]))
info.midpoints = 0.5 * (info.ledges + info.redges)
del info.blockstarts
if intersect_with_bins:
# OK, we now need to intersect the bblock bins with the input bins.
# This can fracture one bblock bin into multiple ones but shouldn't
# make any of them disappear, since they're definitionally constrained
# to contain events.
#
# First: sorted list of all timestamps at which *something* changes:
# either a bblock edge, or a input bin edge. We drop the last entry,
# giving is a list of left edges of bins in which everything is the
# same.
all_times = set(tstarts)
all_times.update(tstops)
all_times.update(info.ledges)
all_times.update(info.redges)
all_times = np.array(sorted(all_times))[:-1]
# Now, construct a lookup table of which bblock number each of these
# bins corresponds to. More than one bin may have the same bblock
# number, if a GTI change slices a single bblock into more than one
# piece. We do this in a somewhat non-obvious way since we know that
# the bblocks completely cover the overall GTI span in order.
bblock_ids = np.zeros(all_times.size)
for i in range(1, info.nblocks):
bblock_ids[all_times >= info.ledges[i]] = i
# Now, a lookup table of which bins are within a good GTI span. Again,
# we know that all bins are either entirely in a good GTI or entirely
# outside, so the logic is simplified but not necessarily obvious.
good_timeslot = np.zeros(all_times.size, dtype=bool)
for t0, t1 in zip(tstarts, tstops):
ok = (all_times >= t0) & (all_times < t1)
good_timeslot[ok] = True
# Finally, look for contiguous spans that are in a good timeslot *and*
# have the same underlying bblock number. These are our intersected
# blocks.
old_bblock_ids = []
ledges = []
redges = []
cur_bblock_id = -1
for i in range(all_times.size):
if bblock_ids[i] != cur_bblock_id or not good_timeslot[i]:
if cur_bblock_id >= 0:
# Ending a previous span.
redges.append(all_times[i])
cur_bblock_id = -1
if good_timeslot[i]:
# Starting a new span.
ledges.append(all_times[i])
old_bblock_ids.append(bblock_ids[i])
cur_bblock_id = bblock_ids[i]
if cur_bblock_id >= 0:
# End the last span.
redges.append(tstops[-1])
# Finally, rewrite all of the data as planned.
old_bblock_ids = np.array(old_bblock_ids, dtype=int)
info.counts = info.counts[old_bblock_ids]
info.rates = info.rates[old_bblock_ids]
info.widths = info.widths[old_bblock_ids]
info.ledges = np.array(ledges)
info.redges = np.array(redges)
info.midpoints = 0.5 * (info.ledges + info.redges)
info.nblocks = info.ledges.size
return info
[docs]
def bs_tt_bblock(times, tstarts, tstops, p0=0.05, nbootstrap=512):
"""Bayesian Blocks for time-tagged events with bootstrapping uncertainty
assessment. THE UNCERTAINTIES ARE NOT VERY GOOD! Arguments:
tstarts - Array of input bin start times.
tstops - Array of input bin stop times.
times - Array of event arrival times.
p0=0.05 - Probability of preferring solutions with additional bins.
nbootstrap=512 - Number of bootstrap runs to perform.
Returns a Holder with:
blockstarts - Start times of output blocks.
bsrates - Mean event rate in each bin from bootstrap analysis.
bsrstds - ~Uncertainty: stddev of event rate in each bin from bootstrap analysis.
counts - Number of events in each output block.
finalp0 - Final value of p0, after iteration to minimize `nblocks`.
ledges - Times of left edges of output blocks.
midpoints - Times of midpoints of output blocks.
nblocks - Number of output blocks.
ncells - Number of input cells/bins.
origp0 - Original value of p0.
rates - Event rate associated with each block.
redges - Times of right edges of output blocks.
widths - Width of each output block.
"""
times = np.asarray(times)
tstarts = np.asarray(tstarts)
tstops = np.asarray(tstops)
nevents = times.size
if nevents < 1:
raise ValueError("must be given at least 1 event")
info = tt_bblock(tstarts, tstops, times, p0)
# Now bootstrap resample to assess uncertainties on the bin heights. This
# is the approach recommended by Scargle+.
bsrsums = np.zeros(info.nblocks)
bsrsumsqs = np.zeros(info.nblocks)
for _ in range(nbootstrap):
bstimes = times[np.random.randint(0, times.size, times.size)]
bstimes.sort()
bsinfo = tt_bblock(tstarts, tstops, bstimes, p0)
blocknums = np.minimum(
np.searchsorted(bsinfo.redges, info.midpoints), bsinfo.nblocks - 1
)
samprates = bsinfo.rates[blocknums]
bsrsums += samprates
bsrsumsqs += samprates**2
bsrmeans = bsrsums / nbootstrap
mask = bsrsumsqs / nbootstrap <= bsrmeans**2
bsrstds = np.sqrt(np.where(mask, 0, bsrsumsqs / nbootstrap - bsrmeans**2))
info.bsrates = bsrmeans
info.bsrstds = bsrstds
return info