diff --git a/doc/source/user_guide/computation.rst b/doc/source/user_guide/computation.rst index 19fdb541a6a45..897e5d5fb0e24 100644 --- a/doc/source/user_guide/computation.rst +++ b/doc/source/user_guide/computation.rst @@ -561,7 +561,7 @@ For example, if we have the following ``DataFrame``: df and we want to use an expanding window where ``use_expanding`` is ``True`` otherwise a window of size -1, we can create the following ``BaseIndexer``: +1, we can create the following ``BaseIndexer`` subclass: .. code-block:: ipython @@ -593,6 +593,8 @@ and we want to use an expanding window where ``use_expanding`` is ``True`` other 3 3.0 4 10.0 +You can view other examples of ``BaseIndexer`` subclasses `here `__ + .. versionadded:: 1.1 For some problems knowledge of the future is available for analysis. For example, this occurs when diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index f0a76dc17b411..b710a35410458 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -1,4 +1,5 @@ """Indexer objects for computing start/end window bounds for rolling operations""" +from datetime import timedelta from typing import Dict, Optional, Tuple, Type, Union import numpy as np @@ -6,6 +7,8 @@ from pandas._libs.window.indexers import calculate_variable_window_bounds from pandas.util._decorators import Appender +from pandas.tseries.offsets import Nano + get_window_bounds_doc = """ Computes the bounds of a window. @@ -104,6 +107,88 @@ def get_window_bounds( ) +class NonFixedVariableWindowIndexer(BaseIndexer): + """Calculate window boundaries based on a non-fixed offset such as a BusinessDay""" + + def __init__( + self, + index_array: Optional[np.ndarray] = None, + window_size: int = 0, + index=None, + offset=None, + **kwargs, + ): + super().__init__(index_array, window_size, **kwargs) + self.index = index + self.offset = offset + + @Appender(get_window_bounds_doc) + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + + # if windows is variable, default is 'right', otherwise default is 'both' + if closed is None: + closed = "right" if self.index is not None else "both" + + right_closed = closed in ["right", "both"] + left_closed = closed in ["left", "both"] + + if self.index[num_values - 1] < self.index[0]: + index_growth_sign = -1 + else: + index_growth_sign = 1 + + start = np.empty(num_values, dtype="int64") + start.fill(-1) + end = np.empty(num_values, dtype="int64") + end.fill(-1) + + start[0] = 0 + + # right endpoint is closed + if right_closed: + end[0] = 1 + # right endpoint is open + else: + end[0] = 0 + + # start is start of slice interval (including) + # end is end of slice interval (not including) + for i in range(1, num_values): + end_bound = self.index[i] + start_bound = self.index[i] - index_growth_sign * self.offset + + # left endpoint is closed + if left_closed: + start_bound -= Nano(1) + + # advance the start bound until we are + # within the constraint + start[i] = i + for j in range(start[i - 1], i): + if (self.index[j] - start_bound) * index_growth_sign > timedelta(0): + start[i] = j + break + + # end bound is previous end + # or current index + if (self.index[end[i - 1]] - end_bound) * index_growth_sign <= timedelta(0): + end[i] = i + 1 + else: + end[i] = end[i - 1] + + # right endpoint is open + if not right_closed: + end[i] -= 1 + + return start, end + + class ExpandingIndexer(BaseIndexer): """Calculate expanding window bounds, mimicking df.expanding()""" diff --git a/pandas/tests/window/test_base_indexer.py b/pandas/tests/window/test_base_indexer.py index df58028dee862..6f64a376b6fad 100644 --- a/pandas/tests/window/test_base_indexer.py +++ b/pandas/tests/window/test_base_indexer.py @@ -1,10 +1,12 @@ import numpy as np import pytest -from pandas import DataFrame, Series +from pandas import DataFrame, Series, date_range import pandas._testing as tm from pandas.api.indexers import BaseIndexer, FixedForwardWindowIndexer -from pandas.core.window.indexers import ExpandingIndexer +from pandas.core.window.indexers import ExpandingIndexer, NonFixedVariableWindowIndexer + +from pandas.tseries.offsets import BusinessDay def test_bad_get_window_bounds_signature(): @@ -234,3 +236,20 @@ def test_rolling_forward_cov_corr(func, expected): expected = Series(expected) expected.name = result.name tm.assert_equal(result, expected) + + +@pytest.mark.parametrize( + "closed,expected_data", + [ + ["right", [0.0, 1.0, 2.0, 3.0, 7.0, 12.0, 6.0, 7.0, 8.0, 9.0]], + ["left", [0.0, 0.0, 1.0, 2.0, 5.0, 9.0, 5.0, 6.0, 7.0, 8.0]], + ], +) +def test_non_fixed_variable_window_indexer(closed, expected_data): + index = date_range("2020", periods=10) + df = DataFrame(range(10), index=index) + offset = BusinessDay(1) + indexer = NonFixedVariableWindowIndexer(index=index, offset=offset) + result = df.rolling(indexer, closed=closed).sum() + expected = DataFrame(expected_data, index=index) + tm.assert_frame_equal(result, expected)