diff --git a/doc/source/reference/window.rst b/doc/source/reference/window.rst index d09ac0d1fa7f7..3db1aa12a4275 100644 --- a/doc/source/reference/window.rst +++ b/doc/source/reference/window.rst @@ -74,3 +74,14 @@ Exponentially-weighted moving window functions EWM.var EWM.corr EWM.cov + +Window Indexer +-------------- +.. currentmodule:: pandas + +Base class for defining custom window boundaries. + +.. autosummary:: + :toctree: api/ + + api.indexers.BaseIndexer diff --git a/doc/source/user_guide/computation.rst b/doc/source/user_guide/computation.rst index bc00cd7f13e13..627a83b7359bb 100644 --- a/doc/source/user_guide/computation.rst +++ b/doc/source/user_guide/computation.rst @@ -466,6 +466,64 @@ default of the index) in a DataFrame. dft dft.rolling('2s', on='foo').sum() +.. _stats.custom_rolling_window: + +Custom window rolling +~~~~~~~~~~~~~~~~~~~~~ + +.. versionadded:: 1.0 + +In addition to accepting an integer or offset as a ``window`` argument, ``rolling`` also accepts +a ``BaseIndexer`` subclass that allows a user to define a custom method for calculating window bounds. +The ``BaseIndexer`` subclass will need to define a ``get_window_bounds`` method that returns +a tuple of two arrays, the first being the starting indices of the windows and second being the +ending indices of the windows. Additionally, ``num_values``, ``min_periods``, ``center``, ``closed`` +and will automatically be passed to ``get_window_bounds`` and the defined method must +always accept these arguments. + +For example, if we have the following ``DataFrame``: + +.. ipython:: python + + use_expanding = [True, False, True, False, True] + use_expanding + df = pd.DataFrame({'values': range(5)}) + df + +and we want to use an expanding window where ``use_expanding`` is ``True`` otherwise a window of size +1, we can create the following ``BaseIndexer``: + +.. code-block:: ipython + + In [2]: from pandas.api.indexers import BaseIndexer + ...: + ...: class CustomIndexer(BaseIndexer): + ...: + ...: def get_window_bounds(self, num_values, min_periods, center, closed): + ...: start = np.empty(num_values, dtype=np.int64) + ...: end = np.empty(num_values, dtype=np.int64) + ...: for i in range(num_values): + ...: if self.use_expanding[i]: + ...: start[i] = 0 + ...: end[i] = i + 1 + ...: else: + ...: start[i] = i + ...: end[i] = i + self.window_size + ...: return start, end + ...: + + In [3]: indexer = CustomIndexer(window_size=1, use_expanding=use_expanding) + + In [4]: df.rolling(indexer).sum() + Out[4]: + values + 0 0.0 + 1 1.0 + 2 3.0 + 3 3.0 + 4 10.0 + + .. _stats.rolling_window.endpoints: Rolling window endpoints diff --git a/doc/source/whatsnew/v1.0.0.rst b/doc/source/whatsnew/v1.0.0.rst index 5c9543580be26..1032f2e73531d 100644 --- a/doc/source/whatsnew/v1.0.0.rst +++ b/doc/source/whatsnew/v1.0.0.rst @@ -169,6 +169,16 @@ You can use the alias ``"boolean"`` as well. s = pd.Series([True, False, None], dtype="boolean") s +.. _whatsnew_1000.custom_window: + +Defining custom windows for rolling operations +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +We've added a :func:`pandas.api.indexers.BaseIndexer` class that allows users to define how +window bounds are created during ``rolling`` operations. Users can define their own ``get_window_bounds`` +method on a :func:`pandas.api.indexers.BaseIndexer` subclass that will generate the start and end +indices used for each window during the rolling aggregation. For more details and example usage, see +the :ref:`custom window rolling documentation ` .. _whatsnew_1000.enhancements.other: diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 303b4f6f24eac..1fdecbca32102 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -183,7 +183,8 @@ cdef inline void remove_sum(float64_t val, int64_t *nobs, float64_t *sum_x) nogi def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): + ndarray[int64_t] end, int64_t minp, + bint is_monotonic_bounds=True): cdef: float64_t sum_x = 0 int64_t s, e @@ -198,11 +199,10 @@ def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0: + if i == 0 or not is_monotonic_bounds: # setup - sum_x = 0.0 - nobs = 0 + for j in range(s, e): add_sum(values[j], &nobs, &sum_x) @@ -218,6 +218,10 @@ def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start, output[i] = calc_sum(minp, nobs, sum_x) + if not is_monotonic_bounds: + for j in range(s, e): + remove_sum(values[j], &nobs, &sum_x) + return output @@ -327,7 +331,8 @@ def roll_mean_fixed(ndarray[float64_t] values, ndarray[int64_t] start, def roll_mean_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): + ndarray[int64_t] end, int64_t minp, + bint is_monotonic_bounds=True): cdef: float64_t val, sum_x = 0 int64_t s, e @@ -342,11 +347,9 @@ def roll_mean_variable(ndarray[float64_t] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0: + if i == 0 or not is_monotonic_bounds: # setup - sum_x = 0.0 - nobs = 0 for j in range(s, e): val = values[j] add_mean(val, &nobs, &sum_x, &neg_ct) @@ -365,6 +368,10 @@ def roll_mean_variable(ndarray[float64_t] values, ndarray[int64_t] start, output[i] = calc_mean(minp, nobs, neg_ct, sum_x) + if not is_monotonic_bounds: + for j in range(s, e): + val = values[j] + remove_mean(val, &nobs, &sum_x, &neg_ct) return output # ---------------------------------------------------------------------- @@ -486,7 +493,8 @@ def roll_var_fixed(ndarray[float64_t] values, ndarray[int64_t] start, def roll_var_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int ddof=1): + ndarray[int64_t] end, int64_t minp, int ddof=1, + bint is_monotonic_bounds=True): """ Numerically stable implementation using Welford's method. """ @@ -508,7 +516,7 @@ def roll_var_variable(ndarray[float64_t] values, ndarray[int64_t] start, # Over the first window, observations can only be added # never removed - if i == 0: + if i == 0 or not is_monotonic_bounds: for j in range(s, e): add_var(values[j], &nobs, &mean_x, &ssqdm_x) @@ -528,6 +536,10 @@ def roll_var_variable(ndarray[float64_t] values, ndarray[int64_t] start, output[i] = calc_var(minp, ddof, nobs, ssqdm_x) + if not is_monotonic_bounds: + for j in range(s, e): + remove_var(values[j], &nobs, &mean_x, &ssqdm_x) + return output # ---------------------------------------------------------------------- @@ -629,7 +641,8 @@ def roll_skew_fixed(ndarray[float64_t] values, ndarray[int64_t] start, def roll_skew_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): + ndarray[int64_t] end, int64_t minp, + bint is_monotonic_bounds=True): cdef: float64_t val, prev float64_t x = 0, xx = 0, xxx = 0 @@ -648,7 +661,7 @@ def roll_skew_variable(ndarray[float64_t] values, ndarray[int64_t] start, # Over the first window, observations can only be added # never removed - if i == 0: + if i == 0 or not is_monotonic_bounds: for j in range(s, e): val = values[j] @@ -671,6 +684,11 @@ def roll_skew_variable(ndarray[float64_t] values, ndarray[int64_t] start, output[i] = calc_skew(minp, nobs, x, xx, xxx) + if not is_monotonic_bounds: + for j in range(s, e): + val = values[j] + remove_skew(val, &nobs, &x, &xx, &xxx) + return output # ---------------------------------------------------------------------- @@ -776,7 +794,8 @@ def roll_kurt_fixed(ndarray[float64_t] values, ndarray[int64_t] start, def roll_kurt_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): + ndarray[int64_t] end, int64_t minp, + bint is_monotonic_bounds=True): cdef: float64_t val, prev float64_t x = 0, xx = 0, xxx = 0, xxxx = 0 @@ -794,7 +813,7 @@ def roll_kurt_variable(ndarray[float64_t] values, ndarray[int64_t] start, # Over the first window, observations can only be added # never removed - if i == 0: + if i == 0 or not is_monotonic_bounds: for j in range(s, e): add_kurt(values[j], &nobs, &x, &xx, &xxx, &xxxx) @@ -814,6 +833,10 @@ def roll_kurt_variable(ndarray[float64_t] values, ndarray[int64_t] start, output[i] = calc_kurt(minp, nobs, x, xx, xxx, xxxx) + if not is_monotonic_bounds: + for j in range(s, e): + remove_kurt(values[j], &nobs, &x, &xx, &xxx, &xxxx) + return output @@ -1007,7 +1030,8 @@ def roll_min_fixed(ndarray[float64_t] values, ndarray[int64_t] start, def roll_min_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): + ndarray[int64_t] end, int64_t minp, + bint is_monotonic_bounds=True): """ Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. @@ -1400,7 +1424,10 @@ def roll_generic_variable(object obj, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp, int offset, object func, bint raw, - object args, object kwargs): + object args, object kwargs, + bint is_monotonic_bounds=True): + # is_monotonic_bounds unused since variable algorithm doesn't calculate + # adds/subtracts across windows, but matches other *_variable functions cdef: ndarray[float64_t] output, counts, bufarr ndarray[float64_t, cast=True] arr diff --git a/pandas/_libs/window/indexers.pyx b/pandas/_libs/window/indexers.pyx index eab9f0f8aab43..2d01d1964c043 100644 --- a/pandas/_libs/window/indexers.pyx +++ b/pandas/_libs/window/indexers.pyx @@ -1,140 +1,105 @@ # cython: boundscheck=False, wraparound=False, cdivision=True -from typing import Tuple - import numpy as np from numpy cimport ndarray, int64_t -# ---------------------------------------------------------------------- -# The indexer objects for rolling -# These define start/end indexers to compute offsets +# Cython routines for window indexers -class FixedWindowIndexer: +def calculate_variable_window_bounds( + int64_t num_values, + int64_t window_size, + object min_periods, # unused but here to match get_window_bounds signature + object center, # unused but here to match get_window_bounds signature + object closed, + const int64_t[:] index +): """ - create a fixed length window indexer object - that has start & end, that point to offsets in - the index object; these are defined based on the win - arguments + Calculate window boundaries for rolling windows from a time offset. Parameters ---------- - values: ndarray - values data array - win: int64_t - window size - index: object - index of the values - closed: string - closed behavior - """ - def __init__(self, ndarray values, int64_t win, object closed, object index=None): - cdef: - ndarray[int64_t, ndim=1] start_s, start_e, end_s, end_e - int64_t N = len(values) + num_values : int64 + total number of values - start_s = np.zeros(win, dtype='int64') - start_e = np.arange(win, N, dtype='int64') - win + 1 - self.start = np.concatenate([start_s, start_e])[:N] + window_size : int64 + window size calculated from the offset - end_s = np.arange(win, dtype='int64') + 1 - end_e = start_e + win - self.end = np.concatenate([end_s, end_e])[:N] + min_periods : object + ignored, exists for compatibility - def get_window_bounds(self) -> Tuple[np.ndarray, np.ndarray]: - return self.start, self.end + center : object + ignored, exists for compatibility + closed : str + string of side of the window that should be closed -class VariableWindowIndexer: - """ - create a variable length window indexer object - that has start & end, that point to offsets in - the index object; these are defined based on the win - arguments + index : ndarray[int64] + time series index to roll over - Parameters - ---------- - values: ndarray - values data array - win: int64_t - window size - index: ndarray - index of the values - closed: string - closed behavior + Returns + ------- + (ndarray[int64], ndarray[int64]) """ - def __init__(self, ndarray values, int64_t win, object closed, ndarray index): - cdef: - bint left_closed = False - bint right_closed = False - int64_t N = len(index) - - # if windows is variable, default is 'right', otherwise default is 'both' - if closed is None: - closed = 'right' if index is not None else 'both' - - if closed in ['right', 'both']: - right_closed = True - - if closed in ['left', 'both']: - left_closed = True - - self.start, self.end = self.build(index, win, left_closed, right_closed, N) - - @staticmethod - def build(const int64_t[:] index, int64_t win, bint left_closed, - bint right_closed, int64_t N) -> Tuple[np.ndarray, np.ndarray]: - - cdef: - ndarray[int64_t] start, end - int64_t start_bound, end_bound - Py_ssize_t i, j - - start = np.empty(N, dtype='int64') - start.fill(-1) - end = np.empty(N, dtype='int64') - end.fill(-1) - - start[0] = 0 - - # right endpoint is closed - if right_closed: - end[0] = 1 - # right endpoint is open - else: - end[0] = 0 - - with nogil: - - # start is start of slice interval (including) - # end is end of slice interval (not including) - for i in range(1, N): - end_bound = index[i] - start_bound = index[i] - win - - # left endpoint is closed - if left_closed: - start_bound -= 1 - - # advance the start bound until we are - # within the constraint - start[i] = i - for j in range(start[i - 1], i): - if index[j] > start_bound: - start[i] = j - break - - # end bound is previous end - # or current index - if index[end[i - 1]] <= end_bound: - end[i] = i + 1 - else: - end[i] = end[i - 1] - - # right endpoint is open - if not right_closed: - end[i] -= 1 - return start, end - - def get_window_bounds(self) -> Tuple[np.ndarray, np.ndarray]: - return self.start, self.end + cdef: + bint left_closed = False + bint right_closed = False + ndarray[int64_t, ndim=1] start, end + int64_t start_bound, end_bound + Py_ssize_t i, j + + # if windows is variable, default is 'right', otherwise default is 'both' + if closed is None: + closed = 'right' if index is not None else 'both' + + if closed in ['right', 'both']: + right_closed = True + + if closed in ['left', 'both']: + left_closed = True + + start = np.empty(num_values, dtype='int64') + start.fill(-1) + end = np.empty(num_values, dtype='int64') + end.fill(-1) + + start[0] = 0 + + # right endpoint is closed + if right_closed: + end[0] = 1 + # right endpoint is open + else: + end[0] = 0 + + with nogil: + + # start is start of slice interval (including) + # end is end of slice interval (not including) + for i in range(1, num_values): + end_bound = index[i] + start_bound = index[i] - window_size + + # left endpoint is closed + if left_closed: + start_bound -= 1 + + # advance the start bound until we are + # within the constraint + start[i] = i + for j in range(start[i - 1], i): + if index[j] > start_bound: + start[i] = j + break + + # end bound is previous end + # or current index + if index[end[i - 1]] <= end_bound: + end[i] = i + 1 + else: + end[i] = end[i - 1] + + # right endpoint is open + if not right_closed: + end[i] -= 1 + return start, end diff --git a/pandas/api/__init__.py b/pandas/api/__init__.py index 58422811990c4..d0a26864a1102 100644 --- a/pandas/api/__init__.py +++ b/pandas/api/__init__.py @@ -1,2 +1,2 @@ """ public toolkit API """ -from . import extensions, types # noqa +from . import extensions, indexers, types # noqa diff --git a/pandas/api/indexers/__init__.py b/pandas/api/indexers/__init__.py new file mode 100644 index 0000000000000..a5d6bc07da3eb --- /dev/null +++ b/pandas/api/indexers/__init__.py @@ -0,0 +1,2 @@ +"""Public API for Rolling Window Indexers""" +from pandas.core.window.indexers import BaseIndexer # noqa: F401 diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py new file mode 100644 index 0000000000000..0fa24a0ba1b5a --- /dev/null +++ b/pandas/core/window/indexers.py @@ -0,0 +1,122 @@ +"""Indexer objects for computing start/end window bounds for rolling operations""" +from typing import Optional, Tuple + +import numpy as np + +from pandas._libs.window.indexers import calculate_variable_window_bounds +from pandas.util._decorators import Appender + +get_window_bounds_doc = """ +Computes the bounds of a window. + +Parameters +---------- +num_values : int, default 0 + number of values that will be aggregated over +window_size : int, default 0 + the number of rows in a window +min_periods : int, default None + min_periods passed from the top level rolling API +center : bool, default None + center passed from the top level rolling API +closed : str, default None + closed passed from the top level rolling API +win_type : str, default None + win_type passed from the top level rolling API + +Returns +------- +A tuple of ndarray[int64]s, indicating the boundaries of each +window +""" + + +class BaseIndexer: + """Base class for window bounds calculations""" + + def __init__( + self, index_array: Optional[np.ndarray] = None, window_size: int = 0, **kwargs, + ): + """ + Parameters + ---------- + **kwargs : + keyword arguments that will be available when get_window_bounds is called + """ + self.index_array = index_array + self.window_size = window_size + # Set user defined kwargs as attributes that can be used in get_window_bounds + for key, value in kwargs.items(): + setattr(self, key, value) + + @Appender(get_window_bounds_doc) + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + + raise NotImplementedError + + +class FixedWindowIndexer(BaseIndexer): + """Creates window boundaries that are of fixed length.""" + + @Appender(get_window_bounds_doc) + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + + start_s = np.zeros(self.window_size, dtype="int64") + start_e = ( + np.arange(self.window_size, num_values, dtype="int64") + - self.window_size + + 1 + ) + start = np.concatenate([start_s, start_e])[:num_values] + + end_s = np.arange(self.window_size, dtype="int64") + 1 + end_e = start_e + self.window_size + end = np.concatenate([end_s, end_e])[:num_values] + return start, end + + +class VariableWindowIndexer(BaseIndexer): + """Creates window boundaries that are of variable length, namely for time series.""" + + @Appender(get_window_bounds_doc) + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + + return calculate_variable_window_bounds( + num_values, self.window_size, min_periods, center, closed, self.index_array, + ) + + +class ExpandingIndexer(BaseIndexer): + """Calculate expanding window bounds, mimicking df.expanding()""" + + @Appender(get_window_bounds_doc) + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + + return ( + np.zeros(num_values, dtype=np.int64), + np.arange(1, num_values + 1, dtype=np.int64), + ) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 19ec4e335ee21..9f804584f532a 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -4,13 +4,13 @@ """ from datetime import timedelta from functools import partial +import inspect from textwrap import dedent from typing import Callable, Dict, List, Optional, Set, Tuple, Union import numpy as np import pandas._libs.window.aggregations as window_aggregations -import pandas._libs.window.indexers as window_indexers from pandas.compat._optional import import_optional_dependency from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, cache_readonly @@ -49,6 +49,11 @@ _zsqrt, calculate_min_periods, ) +from pandas.core.window.indexers import ( + BaseIndexer, + FixedWindowIndexer, + VariableWindowIndexer, +) class _Window(PandasObject, ShallowMixin, SelectionMixin): @@ -118,6 +123,26 @@ def validate(self): raise ValueError("closed must be 'right', 'left', 'both' or 'neither'") if not isinstance(self.obj, (ABCSeries, ABCDataFrame)): raise TypeError(f"invalid type: {type(self)}") + if isinstance(self.window, BaseIndexer): + self._validate_get_window_bounds_signature(self.window) + + @staticmethod + def _validate_get_window_bounds_signature(window: BaseIndexer) -> None: + """ + Validate that the passed BaseIndexer subclass has + a get_window_bounds with the correct signature. + """ + get_window_bounds_signature = inspect.signature( + window.get_window_bounds + ).parameters.keys() + expected_signature = inspect.signature( + BaseIndexer().get_window_bounds + ).parameters.keys() + if get_window_bounds_signature != expected_signature: + raise ValueError( + f"{type(window).__name__} does not implement the correct signature for " + f"get_window_bounds" + ) def _create_blocks(self): """ @@ -200,6 +225,8 @@ def _get_window(self, other=None, win_type: Optional[str] = None) -> int: ------- window : int """ + if isinstance(self.window, BaseIndexer): + return self.min_periods or 0 return self.window @property @@ -391,17 +418,21 @@ def _get_cython_func_type(self, func): Variable algorithms do not use window while fixed do. """ - if self.is_freq_type: + if self.is_freq_type or isinstance(self.window, BaseIndexer): return self._get_roll_func(f"{func}_variable") return partial(self._get_roll_func(f"{func}_fixed"), win=self._get_window()) - def _get_window_indexer(self): + def _get_window_indexer( + self, index_as_array: Optional[np.ndarray], window: int + ) -> BaseIndexer: """ Return an indexer class that will compute the window start and end bounds """ + if isinstance(self.window, BaseIndexer): + return self.window if self.is_freq_type: - return window_indexers.VariableWindowIndexer - return window_indexers.FixedWindowIndexer + return VariableWindowIndexer(index_array=index_as_array, window_size=window) + return FixedWindowIndexer(index_array=index_as_array, window_size=window) def _apply( self, @@ -440,7 +471,7 @@ def _apply( blocks, obj = self._create_blocks() block_list = list(blocks) index_as_array = self._get_index() - window_indexer = self._get_window_indexer() + window_indexer = self._get_window_indexer(index_as_array, window) results = [] exclude: List[Scalar] = [] @@ -468,12 +499,31 @@ def _apply( def calc(x): x = np.concatenate((x, additional_nans)) - min_periods = calculate_min_periods( - window, self.min_periods, len(x), require_min_periods, floor + if not isinstance(window, BaseIndexer): + min_periods = calculate_min_periods( + window, self.min_periods, len(x), require_min_periods, floor + ) + else: + min_periods = calculate_min_periods( + self.min_periods or 1, + self.min_periods, + len(x), + require_min_periods, + floor, + ) + start, end = window_indexer.get_window_bounds( + num_values=len(x), + min_periods=self.min_periods, + center=self.center, + closed=self.closed, ) - start, end = window_indexer( - x, window, self.closed, index_as_array - ).get_window_bounds() + if np.any(np.diff(start) < 0) or np.any(np.diff(end) < 0): + # Our "variable" algorithms assume start/end are + # monotonically increasing. A custom window indexer + # can produce a non monotonic start/end. + return func( + x, start, end, min_periods, is_monotonic_bounds=False + ) return func(x, start, end, min_periods) else: @@ -754,13 +804,18 @@ class Window(_Window): Parameters ---------- - window : int, or offset + window : int, offset, or BaseIndexer subclass Size of the moving window. This is the number of observations used for calculating the statistic. Each window will be a fixed size. If its an offset then this will be the time period of each window. Each window will be a variable sized based on the observations included in the time-period. This is only valid for datetimelike indexes. + + If a BaseIndexer subclass is passed, calculates the window boundaries + based on the defined ``get_window_bounds`` method. Additional rolling + keyword arguments, namely `min_periods`, `center`, and + `closed` will be passed to `get_window_bounds`. min_periods : int, default None Minimum number of observations in window required to have a value (otherwise result is NA). For a window that is specified by an offset, @@ -901,7 +956,11 @@ def validate(self): super().validate() window = self.window - if isinstance(window, (list, tuple, np.ndarray)): + if isinstance(window, BaseIndexer): + raise NotImplementedError( + "BaseIndexer subclasses not implemented with win_types." + ) + elif isinstance(window, (list, tuple, np.ndarray)): pass elif is_integer(window): if window <= 0: @@ -1755,6 +1814,9 @@ def validate(self): if self.min_periods is None: self.min_periods = 1 + elif isinstance(self.window, BaseIndexer): + # Passed BaseIndexer subclass should handle all other rolling kwargs + return elif not is_integer(self.window): raise ValueError("window must be an integer") elif self.window < 0: diff --git a/pandas/tests/api/test_api.py b/pandas/tests/api/test_api.py index 3c0abd7fca830..76141dceae930 100644 --- a/pandas/tests/api/test_api.py +++ b/pandas/tests/api/test_api.py @@ -219,7 +219,7 @@ def test_api(self): class TestApi(Base): - allowed = ["types", "extensions"] + allowed = ["types", "extensions", "indexers"] def test_api(self): diff --git a/pandas/tests/window/test_base_indexer.py b/pandas/tests/window/test_base_indexer.py new file mode 100644 index 0000000000000..6a3f2c19babdc --- /dev/null +++ b/pandas/tests/window/test_base_indexer.py @@ -0,0 +1,82 @@ +import numpy as np +import pytest + +from pandas import DataFrame, Series +from pandas.api.indexers import BaseIndexer +from pandas.core.window.indexers import ExpandingIndexer +import pandas.util.testing as tm + + +def test_bad_get_window_bounds_signature(): + class BadIndexer(BaseIndexer): + def get_window_bounds(self): + return None + + indexer = BadIndexer() + with pytest.raises(ValueError, match="BadIndexer does not implement"): + Series(range(5)).rolling(indexer) + + +def test_expanding_indexer(): + s = Series(range(10)) + indexer = ExpandingIndexer() + result = s.rolling(indexer).mean() + expected = s.expanding().mean() + tm.assert_series_equal(result, expected) + + +def test_indexer_constructor_arg(): + # Example found in computation.rst + use_expanding = [True, False, True, False, True] + df = DataFrame({"values": range(5)}) + + class CustomIndexer(BaseIndexer): + def get_window_bounds(self, num_values, min_periods, center, closed): + start = np.empty(num_values, dtype=np.int64) + end = np.empty(num_values, dtype=np.int64) + for i in range(num_values): + if self.use_expanding[i]: + start[i] = 0 + end[i] = i + 1 + else: + start[i] = i + end[i] = i + self.window_size + return start, end + + indexer = CustomIndexer(window_size=1, use_expanding=use_expanding) + result = df.rolling(indexer).sum() + expected = DataFrame({"values": [0.0, 1.0, 3.0, 3.0, 10.0]}) + tm.assert_frame_equal(result, expected) + + +def test_indexer_accepts_rolling_args(): + df = DataFrame({"values": range(5)}) + + class CustomIndexer(BaseIndexer): + def get_window_bounds(self, num_values, min_periods, center, closed): + start = np.empty(num_values, dtype=np.int64) + end = np.empty(num_values, dtype=np.int64) + for i in range(num_values): + if center and min_periods == 1 and closed == "both" and i == 2: + start[i] = 0 + end[i] = num_values + else: + start[i] = i + end[i] = i + self.window_size + return start, end + + indexer = CustomIndexer(window_size=1) + result = df.rolling(indexer, center=True, min_periods=1, closed="both").sum() + expected = DataFrame({"values": [0.0, 1.0, 10.0, 3.0, 4.0]}) + tm.assert_frame_equal(result, expected) + + +def test_win_type_not_implemented(): + class CustomIndexer(BaseIndexer): + def get_window_bounds(self, num_values, min_periods, center, closed): + return np.array([0, 1]), np.array([1, 2]) + + df = DataFrame({"values": range(2)}) + indexer = CustomIndexer() + with pytest.raises(NotImplementedError, match="BaseIndexer subclasses not"): + df.rolling(indexer, win_type="boxcar")