diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 2baed13cbd7be..04b3f8ab461fa 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -6,6 +6,7 @@ from libc.math cimport ( sqrt, ) from libcpp.deque cimport deque +from libcpp.stack cimport stack from libcpp.unordered_map cimport unordered_map from pandas._libs.algos cimport TiebreakEnumType @@ -988,39 +989,29 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, # ---------------------------------------------------------------------- -# Moving maximum / minimum code taken from Bottleneck -# Licence at LICENSES/BOTTLENECK_LICENCE - - -cdef float64_t init_mm(float64_t ai, Py_ssize_t *nobs, bint is_max) noexcept nogil: - - if ai == ai: - nobs[0] = nobs[0] + 1 - elif is_max: - ai = MINfloat64 - else: - ai = MAXfloat64 - - return ai - - -cdef void remove_mm(float64_t aold, Py_ssize_t *nobs) noexcept nogil: - """ remove a value from the mm calc """ - if aold == aold: - nobs[0] = nobs[0] - 1 - - -cdef float64_t calc_mm(int64_t minp, Py_ssize_t nobs, - float64_t value) noexcept nogil: - cdef: - float64_t result +cdef int64_t bisect_left( + deque[int64_t]& a, + int64_t x, + int64_t lo=0, + int64_t hi=-1 +) nogil: + """Same as https://docs.python.org/3/library/bisect.html.""" + + cdef int64_t mid + if hi == -1: + hi = a.size() + while lo < hi: + mid = (lo + hi) // 2 + if a.at(mid) < x: + lo = mid + 1 + else: + hi = mid + return lo - if nobs >= minp: - result = value - else: - result = NaN +from libc.math cimport isnan - return result +# Prior version of moving maximum / minimum code taken from Bottleneck +# Licence at LICENSES/BOTTLENECK_LICENCE def roll_max(ndarray[float64_t] values, ndarray[int64_t] start, @@ -1068,69 +1059,110 @@ def roll_min(ndarray[float64_t] values, ndarray[int64_t] start, return _roll_min_max(values, start, end, minp, is_max=0) -cdef _roll_min_max(ndarray[float64_t] values, - ndarray[int64_t] starti, - ndarray[int64_t] endi, - int64_t minp, - bint is_max): +def _roll_min_max( + ndarray[float64_t] values, + ndarray[int64_t] start, + ndarray[int64_t] end, + int64_t minp, + bint is_max +): cdef: - float64_t ai - int64_t curr_win_size, start - Py_ssize_t i, k, nobs = 0, N = len(starti) - deque Q[int64_t] # min/max always the front - deque W[int64_t] # track the whole window for nobs compute + Py_ssize_t i, i_next, k, valid_start, last_end, last_start, N = len(start) + # Indices of bounded extrema in `values`. `candidates[i]` is always increasing. + # `values[candidates[i]]` is decreasing for max and increasing for min. + deque candidates[int64_t] + # Indices of largest windows that "cover" preceding windows. + stack dominators[int64_t] ndarray[float64_t, ndim=1] output + Py_ssize_t this_start, this_end, stash_start + int64_t q_idx + output = np.empty(N, dtype=np.float64) - Q = deque[int64_t]() - W = deque[int64_t]() + candidates = deque[int64_t]() + dominators = stack[int64_t]() + + # This function was "ported" / translated from sliding_min_max() + # in /pandas/core/_numba/kernels/min_max_.py. + # (See there for credits and some comments.) + # Code translation assumptions/rules: + # - min_periods --> minp + # - deque[0] --> front() + # - deque[-1] --> back() + # - stack[-1] --> top() + # - bool(stack/deque) --> !empty() + # - deque.append() --> push_back() + # - stack.append() --> push() + # - deque.popleft --> pop_front() + # - deque.pop() --> pop_back() with nogil: + if minp < 1: + minp = 1 + + if N>2: + i_next = N - 1 + for i in range(N - 2, -1, -1): + if start[i_next] < start[i] \ + and ( + dominators.empty() + or start[dominators.top()] > start[i_next] + ): + dominators.push(i_next) + i_next = i + + # NaN tracking to guarantee minp + valid_start = -minp + + last_end = 0 + last_start = -1 - # This is using a modified version of the C++ code in this - # SO post: https://stackoverflow.com/a/12239580 - # The original impl didn't deal with variable window sizes - # So the code was optimized for that - - # first window's size - curr_win_size = endi[0] - starti[0] - # GH 32865 - # Anchor output index to values index to provide custom - # BaseIndexer support for i in range(N): + this_start = start[i] + this_end = end[i] - curr_win_size = endi[i] - starti[i] - if i == 0: - start = starti[i] - else: - start = endi[i - 1] - - for k in range(start, endi[i]): - ai = init_mm(values[k], &nobs, is_max) - # Discard previous entries if we find new min or max - if is_max: - while not Q.empty() and ((ai >= values[Q.back()]) or - values[Q.back()] != values[Q.back()]): - Q.pop_back() - else: - while not Q.empty() and ((ai <= values[Q.back()]) or - values[Q.back()] != values[Q.back()]): - Q.pop_back() - Q.push_back(k) - W.push_back(k) - - # Discard entries outside and left of current window - while not Q.empty() and Q.front() <= starti[i] - 1: - Q.pop_front() - while not W.empty() and W.front() <= starti[i] - 1: - remove_mm(values[W.front()], &nobs) - W.pop_front() - - # Save output based on index in input value array - if not Q.empty() and curr_win_size > 0: - output[i] = calc_mm(minp, nobs, values[Q.front()]) + if (not dominators.empty() and dominators.top() == i): + dominators.pop() + + if not (this_end > last_end + or (this_end == last_end and this_start >= last_start)): + raise ValueError( + "Start/End ordering requirement is violated at index {}".format(i)) + + if dominators.empty(): + stash_start = this_start else: + stash_start = min(this_start, start[dominators.top()]) + + while not candidates.empty() and candidates.front() < stash_start: + candidates.pop_front() + + for k in range(last_end, this_end): + if not isnan(values[k]): + valid_start += 1 + while valid_start >= 0 and isnan(values[valid_start]): + valid_start += 1 + + if is_max: + while (not candidates.empty() + and values[k] >= values[candidates.back()]): + candidates.pop_back() + else: + while (not candidates.empty() + and values[k] <= values[candidates.back()]): + candidates.pop_back() + candidates.push_back(k) + + if candidates.empty() or this_start > valid_start: output[i] = NaN + elif candidates.front() >= this_start: + # ^^ This is here to avoid costly bisection for fixed window sizes. + output[i] = values[candidates.front()] + else: + q_idx = bisect_left(candidates, this_start, lo=1) + output[i] = values[candidates[q_idx]] + last_end = this_end + last_start = this_start return output diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py index 68aa1446bbe3c..c03f20c871012 100644 --- a/pandas/core/_numba/kernels/min_max_.py +++ b/pandas/core/_numba/kernels/min_max_.py @@ -9,7 +9,10 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import ( + TYPE_CHECKING, + Any, +) import numba import numpy as np @@ -18,6 +21,20 @@ from pandas._typing import npt +@numba.njit(nogil=True, parallel=False) +def bisect_left(a: list[Any], x: Any, lo: int = 0, hi: int = -1) -> int: + """Same as https://docs.python.org/3/library/bisect.html; not in numba yet!""" + if hi == -1: + hi = len(a) + while lo < hi: + mid = (lo + hi) // 2 + if a[mid] < x: + lo = mid + 1 + else: + hi = mid + return lo + + @numba.jit(nopython=True, nogil=True, parallel=False) def sliding_min_max( values: np.ndarray, @@ -27,55 +44,87 @@ def sliding_min_max( min_periods: int, is_max: bool, ) -> tuple[np.ndarray, list[int]]: + # Basic idea of the algorithm: https://stackoverflow.com/a/12239580 + # It was generalized to work with an arbitrary list of any window size and position + # by adding the Dominators stack. + N = len(start) - nobs = 0 - output = np.empty(N, dtype=result_dtype) na_pos = [] - # Use deque once numba supports it - # https://github.com/numba/numba/issues/7417 - Q: list = [] - W: list = [] - for i in range(N): - curr_win_size = end[i] - start[i] - if i == 0: - st = start[i] - else: - st = end[i - 1] - - for k in range(st, end[i]): - ai = values[k] - if not np.isnan(ai): - nobs += 1 - elif is_max: - ai = -np.inf - else: - ai = np.inf - # Discard previous entries if we find new min or max - if is_max: - while Q and ((ai >= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]): - Q.pop() - else: - while Q and ((ai <= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]): - Q.pop() - Q.append(k) - W.append(k) - - # Discard entries outside and left of current window - while Q and Q[0] <= start[i] - 1: - Q.pop(0) - while W and W[0] <= start[i] - 1: - if not np.isnan(values[W[0]]): - nobs -= 1 - W.pop(0) - - # Save output based on index in input value array - if Q and curr_win_size > 0 and nobs >= min_periods: - output[i] = values[Q[0]] + output = np.empty(N, dtype=result_dtype) + + def cmp(a: Any, b: Any, is_max: bool) -> bool: + if is_max: + return a >= b else: + return a <= b + + # Indices of bounded extrema in `values`. `candidates[i]` is always increasing. + # `values[candidates[i]]` is decreasing for max and increasing for min. + candidates: list[int] = [] # this is a queue + # Indices of largest windows that "cover" preceding windows. + dominators: list[int] = [] # this is a stack + + if min_periods < 1: + min_periods = 1 + + if N > 2: + i_next = N - 1 # equivalent to i_next = i+1 inside the loop + for i in range(N - 2, -1, -1): + next_dominates = start[i_next] < start[i] + if next_dominates and ( + not dominators or start[dominators[-1]] > start[i_next] + ): + dominators.append(i_next) + i_next = i + + # NaN tracking to guarantee min_periods + valid_start = -min_periods + + last_end = 0 + last_start = -1 + + for i in range(N): + this_start = start[i].item() + this_end = end[i].item() + + if dominators and dominators[-1] == i: + dominators.pop() + + if not ( + this_end > last_end or (this_end == last_end and this_start >= last_start) + ): + raise ValueError( + "Start/End ordering requirement is violated at index " + str(i) + ) + + stash_start = ( + this_start if not dominators else min(this_start, start[dominators[-1]]) + ) + while candidates and candidates[0] < stash_start: + candidates.pop(0) + + for k in range(last_end, this_end): + if not np.isnan(values[k]): + valid_start += 1 + while valid_start >= 0 and np.isnan(values[valid_start]): + valid_start += 1 + while candidates and cmp(values[k], values[candidates[-1]], is_max): + candidates.pop() # Q.pop_back() + candidates.append(k) # Q.push_back(k) + + if not candidates or (this_start > valid_start): if values.dtype.kind != "i": output[i] = np.nan else: na_pos.append(i) + elif candidates[0] >= this_start: + # ^^ This is here to avoid costly bisection for fixed window sizes. + output[i] = values[candidates[0]] + else: + q_idx = bisect_left(candidates, this_start, lo=1) + output[i] = values[candidates[q_idx]] + last_end = this_end + last_start = this_start return output, na_pos diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 887aeca6590dc..ff6a616bc5264 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -12,6 +12,7 @@ to_datetime, ) import pandas._testing as tm +from pandas.api.indexers import BaseIndexer from pandas.util.version import Version pytestmark = [pytest.mark.single_cpu] @@ -581,3 +582,67 @@ def test_npfunc_no_warnings(): df = DataFrame({"col1": [1, 2, 3, 4, 5]}) with tm.assert_produces_warning(False): df.col1.rolling(2).apply(np.prod, raw=True, engine="numba") + + +class PrescribedWindowIndexer(BaseIndexer): + def __init__(self, start, end): + self._start = start + self._end = end + super().__init__() + + def get_window_bounds( + self, num_values=None, min_periods=None, center=None, closed=None, step=None + ): + if num_values is None: + num_values = len(self._start) + start = np.clip(self._start, 0, num_values) + end = np.clip(self._end, 0, num_values) + return start, end + + +@td.skip_if_no("numba") +class TestMinMaxNumba: + @pytest.mark.parametrize( + "is_max, has_nan, exp_list", + [ + (True, False, [3.0, 5.0, 2.0, 5.0, 1.0, 5.0, 6.0, 7.0, 8.0, 9.0]), + (True, True, [3.0, 4.0, 2.0, 4.0, 1.0, 4.0, 6.0, 7.0, 7.0, 9.0]), + (False, False, [3.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0]), + (False, True, [3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 6.0, 6.0, 7.0, 1.0]), + ], + ) + def test_minmax(self, is_max, has_nan, exp_list): + nan_idx = [0, 5, 8] + df = DataFrame( + { + "data": [5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 6.0, 7.0, 8.0, 9.0], + "start": [2, 0, 3, 0, 4, 0, 5, 5, 7, 3], + "end": [3, 4, 4, 5, 5, 6, 7, 8, 9, 10], + } + ) + if has_nan: + df.loc[nan_idx, "data"] = np.nan + expected = Series(exp_list, name="data") + r = df.data.rolling( + PrescribedWindowIndexer(df.start.to_numpy(), df.end.to_numpy()) + ) + if is_max: + result = r.max(engine="numba") + else: + result = r.min(engine="numba") + + tm.assert_series_equal(result, expected) + + def test_wrong_order(self): + start = np.array(range(5), dtype=np.int64) + end = start + 1 + end[3] = end[2] + start[3] = start[2] - 1 + + df = DataFrame({"data": start * 1.0, "start": start, "end": end}) + + r = df.data.rolling(PrescribedWindowIndexer(start, end)) + with pytest.raises( + ValueError, match="Start/End ordering requirement is violated at index 3" + ): + r.max(engine="numba") diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 8c57781c1447c..6e8f075d35490 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -1946,3 +1946,66 @@ def test_rolling_timedelta_window_non_nanoseconds(unit, tz): df.index = df.index.as_unit("ns") tm.assert_frame_equal(ref_df, df) + + +class PrescribedWindowIndexer(BaseIndexer): + def __init__(self, start, end): + self._start = start + self._end = end + super().__init__() + + def get_window_bounds( + self, num_values=None, min_periods=None, center=None, closed=None, step=None + ): + if num_values is None: + num_values = len(self._start) + start = np.clip(self._start, 0, num_values) + end = np.clip(self._end, 0, num_values) + return start, end + + +class TestMinMax: + @pytest.mark.parametrize( + "is_max, has_nan, exp_list", + [ + (True, False, [3.0, 5.0, 2.0, 5.0, 1.0, 5.0, 6.0, 7.0, 8.0, 9.0]), + (True, True, [3.0, 4.0, 2.0, 4.0, 1.0, 4.0, 6.0, 7.0, 7.0, 9.0]), + (False, False, [3.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0]), + (False, True, [3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 6.0, 6.0, 7.0, 1.0]), + ], + ) + def test_minmax(self, is_max, has_nan, exp_list): + nan_idx = [0, 5, 8] + df = DataFrame( + { + "data": [5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 6.0, 7.0, 8.0, 9.0], + "start": [2, 0, 3, 0, 4, 0, 5, 5, 7, 3], + "end": [3, 4, 4, 5, 5, 6, 7, 8, 9, 10], + } + ) + if has_nan: + df.loc[nan_idx, "data"] = np.nan + expected = Series(exp_list, name="data") + r = df.data.rolling( + PrescribedWindowIndexer(df.start.to_numpy(), df.end.to_numpy()) + ) + if is_max: + result = r.max() + else: + result = r.min() + + tm.assert_series_equal(result, expected) + + def test_wrong_order(self): + start = np.array(range(5), dtype=np.int64) + end = start + 1 + end[3] = end[2] + start[3] = start[2] - 1 + + df = DataFrame({"data": start * 1.0, "start": start, "end": end}) + + r = df.data.rolling(PrescribedWindowIndexer(start, end)) + with pytest.raises( + ValueError, match="Start/End ordering requirement is violated at index 3" + ): + r.max()