Source code for pwkit.numutil

# -*- mode: python; coding: utf-8 -*-
# Copyright 2014-2022 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.

"""The :mod:`numpy` and :mod:`scipy` packages provide a whole host of
routines, but there are still some that are missing. The :mod:`pwkit.numutil`
module provides several useful additions.

"""
from __future__ import absolute_import, division, print_function

__all__ = """broadcastize dfsmooth fits_recarray_to_data_frame make_step_lcont
           make_step_rcont make_tophat_ee make_tophat_ei make_tophat_ie
           make_tophat_ii parallel_newton parallel_quad rms unit_tophat_ee
           unit_tophat_ei unit_tophat_ie unit_tophat_ii usmooth weighted_mean
           weighted_mean_df weighted_variance""".split()

import functools
import numpy as np

from .method_decorator import method_decorator


def _broadcastize_spec_to_scalar_filter(s):
    if s is None:
        # This return value is independent of the inputs altogether.
        return lambda x: x
    if s == 0:
        # This return value has the same shape as the input(s). If we
        # promoted to a 1-element vector, we need to demote.
        return lambda x: np.asarray(x).item()
    if s == 1:
        # This return value is a larger vector of the input(s). If we promoted
        # from a scalar, we drop the final axis. We asarray() the result for
        # convenience/robustness.
        return lambda x: np.asarray(x)[..., 0]

    raise ValueError("unrecognized @broadcastize ret_spec value %r" % s)


class _Broadcaster(method_decorator):
    # _BroadcasterDecorator must set self._n_arr and _scalar_ret_filter on creation.

    def fixup(self, newobj):
        # This function is used by the method_decorator superclass.
        newobj._n_arr = object.__getattribute__(self, "_n_arr")
        newobj._force_float = object.__getattribute__(self, "_force_float")
        newobj._scalar_ret_filter = object.__getattribute__(self, "_scalar_ret_filter")

    def __call__(self, *args, **kwargs):
        n_arr = object.__getattribute__(self, "_n_arr")
        force_float = object.__getattribute__(self, "_force_float")

        if len(args) < n_arr:
            raise TypeError(
                "expected at least %d arguments, got %d" % (n_arr, len(args))
            )

        bc_raw = np.broadcast_arrays(*args[:n_arr])
        if force_float:
            bc_raw = tuple(np.asarray(a, dtype=float) for a in bc_raw)
        bc_1d = tuple(np.atleast_1d(a) for a in bc_raw)
        rest = args[n_arr:]
        result = super(_Broadcaster, self).__call__(*(bc_1d + rest), **kwargs)

        if bc_raw[0].ndim == 0:
            # Inputs were all scalars. We need to filter the output(s) to
            # remove extra axes.
            scalar_ret_filter = object.__getattribute__(self, "_scalar_ret_filter")
            result = scalar_ret_filter(result)

        return result


class _BroadcasterDecorator(object):
    """Decorator to make functions automatically work on vectorized arguments. See
    the pwkit documentation for usage information.

    """

    def __init__(self, n_arr, ret_spec=0, force_float=True):
        self._n_arr = int(n_arr)
        if self._n_arr < 1:
            raise ValueError(
                "broadcastiz'ed function must take at least 1 " "array argument"
            )

        self._force_float = bool(force_float)

        if isinstance(ret_spec, tuple):
            filters = tuple(_broadcastize_spec_to_scalar_filter(s) for s in ret_spec)
            self._scalar_ret_filter = lambda r: tuple(f(v) for f, v in zip(filters, r))
        else:
            self._scalar_ret_filter = _broadcastize_spec_to_scalar_filter(ret_spec)

    def __call__(self, subfunc):
        b = _Broadcaster(subfunc)
        b._n_arr = self._n_arr
        b._force_float = self._force_float
        b._scalar_ret_filter = self._scalar_ret_filter
        return b


broadcastize = _BroadcasterDecorator


# Very misc.


[docs] def fits_recarray_to_data_frame(recarray, drop_nonscalar_ok=True): """Convert a FITS data table, stored as a Numpy record array, into a Pandas DataFrame object. By default, non-scalar columns are discarded, but if *drop_nonscalar_ok* is False then a :exc:`ValueError` is raised. Column names are lower-cased. Example:: from pwkit import io, numutil hdu_list = io.Path('my-table.fits').read_fits() # assuming the first FITS extension is a binary table: df = numutil.fits_recarray_to_data_frame(hdu_list[1].data) FITS data are big-endian, whereas nowadays almost everything is little-endian. This seems to be an issue for Pandas DataFrames, where ``df[['col1', 'col2']]`` triggers an assertion for me if the underlying data are not native-byte-ordered. This function normalizes the read-in data to native endianness to avoid this. See also :meth:`pwkit.io.Path.read_fits_bintable`. """ from pandas import DataFrame def normalize(): for column in recarray.columns: n = column.name d = recarray[n] if d.ndim != 1: if not drop_nonscalar_ok: raise ValueError("input must have only scalar columns") continue if d.dtype.isnative: yield (n.lower(), d) else: yield (n.lower(), d.byteswap(True).newbyteorder()) return DataFrame(dict(normalize()))
[docs] def data_frame_to_astropy_table(dataframe): """This is a backport of the Astropy method :meth:`astropy.table.table.Table.from_pandas`. It converts a Pandas :class:`pandas.DataFrame` object to an Astropy :class:`astropy.table.Table`. """ from astropy.utils import OrderedDict from astropy.table import Table, Column, MaskedColumn out = OrderedDict() for name in dataframe.columns: column = dataframe[name] mask = np.array(column.isnull()) data = np.array(column) if data.dtype.kind == "O": # If all elements of an object array are string-like or np.nan # then coerce back to a native numpy str/unicode array. string_types = (str, bytes) nan = np.nan if all(isinstance(x, string_types) or x is nan for x in data): # Force any missing (null) values to b''. Numpy will # upcast to str/unicode as needed. data[mask] = b"" # When the numpy object array is represented as a list then # numpy initializes to the correct string or unicode type. data = np.array([x for x in data]) if np.any(mask): out[name] = MaskedColumn(data=data, name=name, mask=mask) else: out[name] = Column(data=data, name=name) return Table(out)
[docs] def page_data_frame(df, pager_argv=["less"], **kwargs): """Render a DataFrame as text and send it to a terminal pager program (e.g. `less`), so that one can browse a full table conveniently. df The DataFrame to view pager_argv: default ``['less']`` A list of strings passed to :class:`subprocess.Popen` that launches the pager program kwargs Additional keywords are passed to :meth:`pandas.DataFrame.to_string`. Returns ``None``. Execution blocks until the pager subprocess exits. """ import codecs, subprocess, sys pager = subprocess.Popen( pager_argv, shell=False, stdin=subprocess.PIPE, close_fds=True ) try: enc = codecs.getwriter(sys.stdout.encoding or "utf8")(pager.stdin) df.to_string(enc, **kwargs) finally: enc.close() pager.stdin.close() pager.wait()
# Chunked averaging of data tables
[docs] def slice_around_gaps(values, maxgap): """Given an ordered array of values, generate a set of slices that traverse all of the values. Within each slice, no gap between adjacent values is larger than `maxgap`. In other words, these slices break the array into chunks separated by gaps of size larger than maxgap. """ if not (maxgap > 0): # above test catches NaNs, other weird cases raise ValueError("maxgap must be positive; got %r" % maxgap) values = np.asarray(values) delta = values[1:] - values[:-1] if np.any(delta < 0): raise ValueError("values must be in nondecreasing order") whgap = np.where(delta > maxgap)[0] + 1 prev_idx = None for gap_idx in whgap: yield slice(prev_idx, gap_idx) prev_idx = gap_idx yield slice(prev_idx, None)
[docs] def slice_evenly_with_gaps(values, target_len, maxgap): """Given an ordered array of values, generate a set of slices that traverse all of the values. Each slice contains about `target_len` items. However, no slice contains a gap larger than `maxgap`, so a slice may contain only a single item (if it is surrounded on both sides by a large gap). If a non-gapped run of values does not divide evenly into `target_len`, the algorithm errs on the side of making the slices contain more than `target_len` items, rather than fewer. It also attempts to keep the slice size uniform within each non-gapped run. """ if not (target_len > 0): raise ValueError("target_len must be positive; got %r" % target_len) values = np.asarray(values) l = values.size for gapslice in slice_around_gaps(values, maxgap): start, stop, ignored_stride = gapslice.indices(l) num_elements = stop - start nsegments = int(np.floor(float(num_elements) / target_len)) nsegments = max(nsegments, 1) nsegments = min(nsegments, num_elements) segment_len = num_elements / nsegments offset = 0.0 prev = start for _ in range(nsegments): offset += segment_len next = start + int(round(offset)) if next > prev: yield slice(prev, next) prev = next
[docs] def reduce_data_frame( df, chunk_slicers, avg_cols=(), uavg_cols=(), minmax_cols=(), nchunk_colname="nchunk", uncert_prefix="u", min_points_per_chunk=3, ): """ "Reduce" a DataFrame by collapsing rows in grouped chunks. Returns another DataFrame with similar columns but fewer rows. Arguments: df The input :class:`pandas.DataFrame`. chunk_slicers An iterable that returns values that are used to slice *df* with its :meth:`pandas.DataFrame.iloc` indexer. An example value might be the generator returned from :func:`slice_evenly_with_gaps`. avg_cols An iterable of names of columns that are to be reduced by taking the mean. uavg_cols An iterable of names of columns that are to be reduced by taking a weighted mean. minmax_cols An iterable of names of columns that are to be reduced by reporting minimum and maximum values. nchunk_colname The name of a column to create reporting the number of rows contributing to each chunk. uncert_prefix The column name prefix for locating uncertainty estimates. By default, the uncertainty on the column ``"temp"`` is given in the column ``"utemp"``. min_points_per_chunk Require at least this many rows in each chunk. Smaller chunks are discarded. Returns a new :class:`pandas.DataFrame`. """ subds = [df.iloc[idx] for idx in chunk_slicers] subds = [sd for sd in subds if sd.shape[0] >= min_points_per_chunk] chunked = df.__class__({nchunk_colname: np.zeros(len(subds), dtype=int)}) # Some future-proofing: allow possibility of different ways of mapping # from a column giving a value to a column giving its uncertainty. uncert_col_name = lambda c: uncert_prefix + c for i, subd in enumerate(subds): label = chunked.index[i] chunked.loc[label, nchunk_colname] = subd.shape[0] for col in avg_cols: chunked.loc[label, col] = subd[col].mean() for col in uavg_cols: ucol = uncert_col_name(col) v, u = weighted_mean(subd[col], subd[ucol]) chunked.loc[label, col] = v chunked.loc[label, ucol] = u for col in minmax_cols: chunked.loc[label, "min_" + col] = subd[col].min() chunked.loc[label, "max_" + col] = subd[col].max() return chunked
[docs] def reduce_data_frame_evenly_with_gaps(df, valcol, target_len, maxgap, **kwargs): """ "Reduce" a DataFrame by collapsing rows in grouped chunks, grouping based on gaps in one of the columns. This function combines :func:`reduce_data_frame` with :func:`slice_evenly_with_gaps`. """ return reduce_data_frame( df, slice_evenly_with_gaps(df[valcol], target_len, maxgap), **kwargs )
# Smooth a timeseries with uncertainties
[docs] def usmooth(window, uncerts, *data, **kwargs): """Smooth data series according to a window, weighting based on uncertainties. Arguments: window The smoothing window. uncerts An array of uncertainties used to weight the smoothing. data One or more data series, of the same size as *uncerts*. k = None If specified, only every *k*-th point of the results will be kept. If k is None (the default), it is set to ``window.size``, i.e. correlated points will be discarded. Returns: ``(s_uncerts, s_data[0], s_data[1], ...)``, the smoothed uncertainties and data series. Example:: u, x, y = numutil.usmooth(np.hamming(7), u, x, y) """ window = np.asarray(window) uncerts = np.asarray(uncerts) # Hacky keyword argument handling because you can't write "def foo(*args, # k=0)". k = kwargs.pop("k", None) if len(kwargs): raise TypeError( "smooth() got an unexpected keyword argument '%s'" % kwargs.keys()[0] ) # Done with kwargs futzing. if k is None: k = window.size conv = lambda q, r: np.convolve(q, r, mode="valid") if uncerts is None: w = np.ones_like(x) else: w = uncerts**-2 cw = conv(w, window) cu = np.sqrt(conv(w, window**2)) / cw result = [cu] + [conv(w * np.asarray(x), window) / cw for x in data] if k != 1: result = [x[::k] for x in result] return result
[docs] def dfsmooth(window, df, ucol, k=None): """Smooth a :class:`pandas.DataFrame` according to a window, weighting based on uncertainties. Arguments are: window The smoothing window. df The :class:`pandas.DataFrame`. ucol The name of the column in *df* that contains the uncertainties to weight by. k = None If specified, only every *k*-th point of the results will be kept. If k is None (the default), it is set to ``window.size``, i.e. correlated points will be discarded. Returns: a smoothed data frame. The returned data frame has a default integer index. Example:: sdata = numutil.dfsmooth(np.hamming(7), data, 'u_temp') """ import pandas as pd if k is None: k = window.size conv = lambda q, r: np.convolve(q, r, mode="valid") w = df[ucol] ** -2 invcw = 1.0 / conv(w, window) # XXX: we're not smoothing the index. res = {} for col in df.columns: if col == ucol: res[col] = np.sqrt(conv(w, window**2)) * invcw else: res[col] = conv(w * df[col], window) * invcw res = pd.DataFrame(res) return res[::k]
[docs] def smooth_data_frame_with_gaps( window, df, uncert_col, time_col, max_gap, min_points_per_chunk=3, k=None, ): """Smooth a :class:`pandas.DataFrame` according to a window, weighting based on uncertainties, and breaking the smoothing process at gaps in a time axis. Arguments are: window The smoothing window. df The :class:`pandas.DataFrame`. uncert_col The name of the column in *df* that contains the uncertainties to weight by. time_col The name of the column in *df* that contains the time-like quantities used to determine where the gaps are. max_gap If a difference larger than this value is encountered in ``df[time_col]``, the smoothing will be discontinuous around this gap. Therefore the units of this column are whatever the units of ``df[time_col]`` are. k = None If specified, only every *k*-th point of the results will be kept. If k is None (the default), it is set to ``window.size``, i.e. correlated points will be discarded. This decimation does not cross over gaps. min_points_per_chunk = 3 When gaps are identified, if any chunk of data has fewer than this many elements, it is dropped altogether. This counting happens before decimation by *k*. Returns: a smoothed data frame with a default integer index. """ import pandas as pd chunk_slicers = slice_around_gaps(df[time_col], max_gap) subds = [df.iloc[idx] for idx in chunk_slicers] subds = [sd for sd in subds if sd.shape[0] >= min_points_per_chunk] chunks = [dfsmooth(window, subd, uncert_col, k=k) for subd in subds] return pd.concat(chunks, ignore_index=True)
# Parallelized versions of various routines that don't operate vectorially # even though sometimes it'd be nice to pretend that they do.
[docs] def parallel_newton( func, x0, fprime=None, par_args=(), simple_args=(), tol=1.48e-8, maxiter=50, parallel=True, **kwargs ): """A parallelized version of :func:`scipy.optimize.newton`. Arguments: func The function to search for zeros, called as ``f(x, [*par_args...], [*simple_args...])``. x0 The initial point for the zero search. fprime (Optional) The first derivative of *func*, called the same way. par_args Tuple of additional parallelized arguments. simple_args Tuple of additional arguments passed identically to every invocation. tol The allowable error of the zero value. maxiter Maximum number of iterations. parallel Controls parallelization; default uses all available cores. See :func:`pwkit.parallel.make_parallel_helper`. kwargs Passed to :func:`scipy.optimize.newton`. Returns: an array of locations of zeros. Finds zeros in parallel. The values *x0*, *tol*, *maxiter*, and the items of *par_args* should all be numeric, and may be N-dimensional Numpy arrays. They are all broadcast to a common shape, and one zero-finding run is performed for each element in the resulting array. The return value is an array of zero locations having the same shape as the common broadcast of the parameters named above. The *simple_args* are passed to each function identically for each integration. They do not need to be Pickle-able. Example:: >>> parallel_newton(lambda x, a: x - 2 * a, 2, par_args=(np.arange(6),)) <<< array([ 0., 2., 4., 6., 8., 10.]) >>> parallel_newton(lambda x: np.sin(x), np.arange(6)) <<< array([ 0.00000000e+00, 3.65526589e-26, 3.14159265e+00, 3.14159265e+00, 3.14159265e+00, 6.28318531e+00]) """ from scipy.optimize import newton from .parallel import make_parallel_helper phelp = make_parallel_helper(parallel) if not isinstance(par_args, tuple): raise ValueError("par_args must be a tuple") if not isinstance(simple_args, tuple): raise ValueError("simple_args must be a tuple") bc_raw = np.broadcast_arrays(x0, tol, maxiter, *par_args) bc_1d = tuple(np.atleast_1d(a) for a in bc_raw) def gen_var_args(): for i in range(bc_1d[0].size): yield tuple(x.flat[i] for x in bc_1d) def helper(i, _, var_args): x0, tol, maxiter = var_args[:3] args = var_args[3:] + simple_args return newton( func, x0, fprime=fprime, args=args, tol=tol, maxiter=maxiter, **kwargs ) with phelp.get_ppmap() as ppmap: result = np.asarray(ppmap(helper, None, gen_var_args())) if bc_raw[0].ndim == 0: return np.asarray(result).item() return result
[docs] def parallel_quad(func, a, b, par_args=(), simple_args=(), parallel=True, **kwargs): """A parallelized version of :func:`scipy.integrate.quad`. Arguments are: func The function to integrate, called as ``f(x, [*par_args...], [*simple_args...])``. a The lower limit(s) of integration. b The upper limits(s) of integration. par_args Tuple of additional parallelized arguments. simple_args Tuple of additional arguments passed identically to every invocation. parallel Controls parallelization; default uses all available cores. See :func:`pwkit.parallel.make_parallel_helper`. kwargs Passed to :func:`scipy.integrate.quad`. Don't set *full_output* to True. Returns: integrals and errors; see below. Computes many integrals in parallel. The values *a*, *b*, and the items of *par_args* should all be numeric, and may be N-dimensional Numpy arrays. They are all broadcast to a common shape, and one integral is performed for each element in the resulting array. If this common shape is (X,Y,Z), the return value has shape (2,X,Y,Z), where the subarray [0,...] contains the computed integrals and the subarray [1,...] contains the absolute error estimates. If *a*, *b*, and the items in *par_args* are all scalars, the return value has shape (2,). The *simple_args* are passed to each integrand function identically for each integration. They do not need to be Pickle-able. Example:: >>> parallel_quad(lambda x, u, v, q: u * x + v, 0, # a [3, 4], # b (np.arange(6).reshape((3,2)), np.arange(3).reshape((3,1))), # par_args ('hello',),) Computes six integrals and returns an array of shape ``(2,3,2)``. The functions that are evaluated are:: [[ 0*x + 0, 1*x + 0 ], [ 2*x + 1, 3*x + 1 ], [ 4*x + 2, 5*x + 2 ]] and the bounds of the integrals are:: [[ (0, 3), (0, 4) ], [ (0, 3), (0, 4) ], [ (0, 3), (0, 4) ]] In all cases the unused fourth parameter *q* is ``'hello'``. """ from scipy.integrate import quad from .parallel import make_parallel_helper phelp = make_parallel_helper(parallel) if not isinstance(par_args, tuple): raise ValueError("par_args must be a tuple") if not isinstance(simple_args, tuple): raise ValueError("simple_args must be a tuple") bc_raw = np.broadcast_arrays(a, b, *par_args) bc_1d = tuple(np.atleast_1d(a) for a in bc_raw) def gen_var_args(): for i in range(bc_1d[0].size): yield tuple(x.flat[i] for x in bc_1d) def helper(i, _, var_args): a, b = var_args[:2] return quad(func, a, b, var_args[2:] + simple_args, **kwargs) with phelp.get_ppmap() as ppmap: result_list = ppmap(helper, None, gen_var_args()) if bc_raw[0].ndim == 0: return np.asarray(result_list[0]) result_arr = np.empty((2,) + bc_raw[0].shape) for i in range(bc_1d[0].size): result_arr[0].flat[i], result_arr[1].flat[i] = result_list[i] return result_arr
# Some miscellaneous numerical tools
[docs] def rms(x): """Return the square root of the mean of the squares of ``x``.""" return np.sqrt(np.square(x).mean())
[docs] def weighted_mean(values, uncerts, **kwargs): values = np.asarray(values) uncerts = np.asarray(uncerts) weights = uncerts**-2 wt_mean, wt_sum = np.average(values, weights=weights, returned=True, **kwargs) return wt_mean, wt_sum**-0.5
[docs] def weighted_mean_df(df, **kwargs): """The same as :func:`weighted_mean`, except the argument is expected to be a two-column :class:`pandas.DataFrame` whose first column gives the data values and second column gives their uncertainties. Returns ``(weighted_mean, uncertainty_in_mean)``. """ return weighted_mean(df[df.columns[0]], df[df.columns[1]], **kwargs)
[docs] def weighted_variance(x, weights): """Return the variance of a weighted sample. The weighted sample mean is calculated and subtracted off, so the returned variance is upweighted by ``n / (n - 1)``. If the sample mean is known to be zero, you should just compute ``np.average(x**2, weights=weights)``. """ n = len(x) if n < 3: raise ValueError( "cannot calculate meaningful variance of fewer " "than three samples" ) wt_mean = np.average(x, weights=weights) return np.average(np.square(x - wt_mean), weights=weights) * n / (n - 1)
# Tophat functions -- numpy doesn't have anything built-in (that I know of) # that does this in a convenient way that I'd like. These are useful for # defining functions in a piecewise-ish way, although also pay attention to # the existence of np.piecewise! # # We're careful with inclusivity/exclusivity of the bounds since that can be # important.
[docs] def unit_tophat_ee(x): """Tophat function on the unit interval, left-exclusive and right-exclusive. Returns 1 if 0 < x < 1, 0 otherwise. """ x = np.asarray(x) x1 = np.atleast_1d(x) r = ((0 < x1) & (x1 < 1)).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r
[docs] def unit_tophat_ei(x): """Tophat function on the unit interval, left-exclusive and right-inclusive. Returns 1 if 0 < x <= 1, 0 otherwise. """ x = np.asarray(x) x1 = np.atleast_1d(x) r = ((0 < x1) & (x1 <= 1)).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r
[docs] def unit_tophat_ie(x): """Tophat function on the unit interval, left-inclusive and right-exclusive. Returns 1 if 0 <= x < 1, 0 otherwise. """ x = np.asarray(x) x1 = np.atleast_1d(x) r = ((0 <= x1) & (x1 < 1)).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r
[docs] def unit_tophat_ii(x): """Tophat function on the unit interval, left-inclusive and right-inclusive. Returns 1 if 0 <= x <= 1, 0 otherwise. """ x = np.asarray(x) x1 = np.atleast_1d(x) r = ((0 <= x1) & (x1 <= 1)).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r
[docs] def make_tophat_ee(lower, upper): """Return a ufunc-like tophat function on the defined range, left-exclusive and right-exclusive. Returns 1 if lower < x < upper, 0 otherwise. """ if not np.isfinite(lower): raise ValueError('"lower" argument must be finite number; got %r' % lower) if not np.isfinite(upper): raise ValueError('"upper" argument must be finite number; got %r' % upper) def range_tophat_ee(x): x = np.asarray(x) x1 = np.atleast_1d(x) r = ((lower < x1) & (x1 < upper)).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r range_tophat_ee.__doc__ = ( "Ranged tophat function, left-exclusive and " "right-exclusive. Returns 1 if %g < x < %g, " "0 otherwise." ) % (lower, upper) return range_tophat_ee
[docs] def make_tophat_ei(lower, upper): """Return a ufunc-like tophat function on the defined range, left-exclusive and right-inclusive. Returns 1 if lower < x <= upper, 0 otherwise. """ if not np.isfinite(lower): raise ValueError('"lower" argument must be finite number; got %r' % lower) if not np.isfinite(upper): raise ValueError('"upper" argument must be finite number; got %r' % upper) def range_tophat_ei(x): x = np.asarray(x) x1 = np.atleast_1d(x) r = ((lower < x1) & (x1 <= upper)).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r range_tophat_ei.__doc__ = ( "Ranged tophat function, left-exclusive and " "right-inclusive. Returns 1 if %g < x <= %g, " "0 otherwise." ) % (lower, upper) return range_tophat_ei
[docs] def make_tophat_ie(lower, upper): """Return a ufunc-like tophat function on the defined range, left-inclusive and right-exclusive. Returns 1 if lower <= x < upper, 0 otherwise. """ if not np.isfinite(lower): raise ValueError('"lower" argument must be finite number; got %r' % lower) if not np.isfinite(upper): raise ValueError('"upper" argument must be finite number; got %r' % upper) def range_tophat_ie(x): x = np.asarray(x) x1 = np.atleast_1d(x) r = ((lower <= x1) & (x1 < upper)).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r range_tophat_ie.__doc__ = ( "Ranged tophat function, left-inclusive and " "right-exclusive. Returns 1 if %g <= x < %g, " "0 otherwise." ) % (lower, upper) return range_tophat_ie
[docs] def make_tophat_ii(lower, upper): """Return a ufunc-like tophat function on the defined range, left-inclusive and right-inclusive. Returns 1 if lower < x < upper, 0 otherwise. """ if not np.isfinite(lower): raise ValueError('"lower" argument must be finite number; got %r' % lower) if not np.isfinite(upper): raise ValueError('"upper" argument must be finite number; got %r' % upper) def range_tophat_ii(x): x = np.asarray(x) x1 = np.atleast_1d(x) r = ((lower <= x1) & (x1 <= upper)).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r range_tophat_ii.__doc__ = ( "Ranged tophat function, left-inclusive and " "right-inclusive. Returns 1 if %g <= x <= %g, " "0 otherwise." ) % (lower, upper) return range_tophat_ii
# Step functions
[docs] def make_step_lcont(transition): """Return a ufunc-like step function that is left-continuous. Returns 1 if x > transition, 0 otherwise. """ if not np.isfinite(transition): raise ValueError( '"transition" argument must be finite number; got %r' % transition ) def step_lcont(x): x = np.asarray(x) x1 = np.atleast_1d(x) r = (x1 > transition).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r step_lcont.__doc__ = ( "Left-continuous step function. Returns 1 if x > %g, " "0 otherwise." ) % (transition,) return step_lcont
[docs] def make_step_rcont(transition): """Return a ufunc-like step function that is right-continuous. Returns 1 if x >= transition, 0 otherwise. """ if not np.isfinite(transition): raise ValueError( '"transition" argument must be finite number; got %r' % transition ) def step_rcont(x): x = np.asarray(x) x1 = np.atleast_1d(x) r = (x1 >= transition).astype(x.dtype) if x.ndim == 0: return np.asarray(r).item() return r step_rcont.__doc__ = ( "Right-continuous step function. Returns 1 if x >= " "%g, 0 otherwise." ) % (transition,) return step_rcont