From 338279763e599628093da162addc7d2a00e6d4b7 Mon Sep 17 00:00:00 2001 From: viable-alternative Date: Thu, 17 Apr 2025 14:00:01 -0400 Subject: [PATCH 1/7] BUG: Fix #46726; wrong result with varying window size min/max rolling calc. --- pandas/_libs/window/aggregations.pyx | 198 ++++++++++++++--------- pandas/core/_numba/kernels/min_max_.py | 207 ++++++++++++++++++++----- pandas/tests/window/test_numba.py | 57 +++++++ pandas/tests/window/test_rolling.py | 170 ++++++++++++++++++++ 4 files changed, 515 insertions(+), 117 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 2baed13cbd7be..d8c006be34350 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -6,6 +6,7 @@ from libc.math cimport ( sqrt, ) from libcpp.deque cimport deque +from libcpp.stack cimport stack from libcpp.unordered_map cimport unordered_map from pandas._libs.algos cimport TiebreakEnumType @@ -991,36 +992,24 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, # Moving maximum / minimum code taken from Bottleneck # Licence at LICENSES/BOTTLENECK_LICENCE +cdef int64_t bisect_left( + deque[int64_t]& a, + int64_t x, + int64_t lo=0, + int64_t hi=-1 +) nogil: + cdef int64_t mid + if hi == -1: + hi = a.size() + while lo < hi: + mid = (lo + hi) // 2 + if a.at(mid) < x: + lo = mid + 1 + else: + hi = mid + return lo -cdef float64_t init_mm(float64_t ai, Py_ssize_t *nobs, bint is_max) noexcept nogil: - - if ai == ai: - nobs[0] = nobs[0] + 1 - elif is_max: - ai = MINfloat64 - else: - ai = MAXfloat64 - - return ai - - -cdef void remove_mm(float64_t aold, Py_ssize_t *nobs) noexcept nogil: - """ remove a value from the mm calc """ - if aold == aold: - nobs[0] = nobs[0] - 1 - - -cdef float64_t calc_mm(int64_t minp, Py_ssize_t nobs, - float64_t value) noexcept nogil: - cdef: - float64_t result - - if nobs >= minp: - result = value - else: - result = NaN - - return result +from libc.math cimport isnan def roll_max(ndarray[float64_t] values, ndarray[int64_t] start, @@ -1068,69 +1057,124 @@ def roll_min(ndarray[float64_t] values, ndarray[int64_t] start, return _roll_min_max(values, start, end, minp, is_max=0) -cdef _roll_min_max(ndarray[float64_t] values, - ndarray[int64_t] starti, - ndarray[int64_t] endi, - int64_t minp, - bint is_max): +def _roll_min_max( + ndarray[float64_t] values, + ndarray[int64_t] start, + ndarray[int64_t] end, + int64_t minp, + bint is_max +): cdef: - float64_t ai - int64_t curr_win_size, start - Py_ssize_t i, k, nobs = 0, N = len(starti) - deque Q[int64_t] # min/max always the front - deque W[int64_t] # track the whole window for nobs compute + Py_ssize_t i, i_next, k, valid_start, last_end, last_start, N = len(start) + deque Q[int64_t] + stack Dominators[int64_t] ndarray[float64_t, ndim=1] output + # ideally want these in the i-loop scope + Py_ssize_t this_start, this_end, stash_start + int64_t q_idx + output = np.empty(N, dtype=np.float64) Q = deque[int64_t]() - W = deque[int64_t]() + Dominators = stack[int64_t]() + + # This function was "ported" / translated from sliding_min_max() + # in /pandas/core/_numba/kernels/min_max_.py. (See there for detailed + # comments and credits.) + # Code translation assumptions/rules: + # - min_periods --> minp + # - deque[0] --> front() + # - deque[-1] --> back() + # - stack[-1] --> top() + # - bool(stack/deque) --> !empty() + # - deque.append() --> push_back() + # - stack.append() --> push() + # - deque.popleft --> pop_front() + # - deque.pop() --> pop_back() with nogil: + if minp < 1: + minp = 1 + + if N>2: + i_next = N - 1 + for i in range(N - 2, -1, -1): + if start[i_next] < start[i] \ + and ( + Dominators.empty() + or start[Dominators.top()] > start[i_next] + ): + Dominators.push(i_next) + i_next = i - # This is using a modified version of the C++ code in this - # SO post: https://stackoverflow.com/a/12239580 - # The original impl didn't deal with variable window sizes - # So the code was optimized for that + valid_start = -minp + + last_end =0 + last_start=-1 - # first window's size - curr_win_size = endi[0] - starti[0] - # GH 32865 - # Anchor output index to values index to provide custom - # BaseIndexer support for i in range(N): + this_start = start[i] + this_end = end[i] + + if (not Dominators.empty() and Dominators.top() == i): + Dominators.pop() - curr_win_size = endi[i] - starti[i] - if i == 0: - start = starti[i] + if not (this_end > last_end + or (this_end == last_end and this_start >= last_start)): + raise ValueError( + "Start/End ordering requirement is violated at index {}".format(i)) + + if Dominators.empty(): + stash_start = this_start else: - start = endi[i - 1] - - for k in range(start, endi[i]): - ai = init_mm(values[k], &nobs, is_max) - # Discard previous entries if we find new min or max - if is_max: - while not Q.empty() and ((ai >= values[Q.back()]) or - values[Q.back()] != values[Q.back()]): - Q.pop_back() - else: - while not Q.empty() and ((ai <= values[Q.back()]) or - values[Q.back()] != values[Q.back()]): - Q.pop_back() - Q.push_back(k) - W.push_back(k) - - # Discard entries outside and left of current window - while not Q.empty() and Q.front() <= starti[i] - 1: + stash_start = min(this_start, start[Dominators.top()]) + + while not Q.empty() and Q.front() < stash_start: Q.pop_front() - while not W.empty() and W.front() <= starti[i] - 1: - remove_mm(values[W.front()], &nobs) - W.pop_front() - # Save output based on index in input value array - if not Q.empty() and curr_win_size > 0: - output[i] = calc_mm(minp, nobs, values[Q.front()]) - else: + for k in range(last_end, this_end): + if not isnan(values[k]): + valid_start += 1 + while valid_start>=0 and isnan(values[valid_start]): + valid_start += 1 + + # Sadly, this runs more than 15% faster than trying to use + # generic comparison functions. + # That is, I tried: + # + # | cdef inline bint le(float64_t a, float64_t b) nogil: + # | return a <= b + # | cdef inline bint ge(float64_t a, float64_t b) nogil: + # | return a >= b + # | ctypedef bint (*cmp_func_t) (float64_t a, float64_t b) nogil + # | ... + # | cmp_func_t cmp + # | + # | if is_max: + # | cmp = ge + # | else: + # | cmp = le + # and, finally + # | while not Q.empty() and cmp(values[k], values[Q.back()]): + # | Q.pop_back() + + if is_max: + while not Q.empty() and values[k] >= values[Q.back()]: + Q.pop_back() + else: + while not Q.empty() and values[k] <= values[Q.back()]: + Q.pop_back() + Q.push_back(k) + + if Q.empty() or this_start > valid_start: output[i] = NaN + elif Q.front() >= this_start: + output[i] = values[Q.front()] + else: + q_idx = bisect_left(Q, this_start, lo=1) + output[i] = values[Q[q_idx]] + last_end = this_end + last_start = this_start return output diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py index 68aa1446bbe3c..3104d306be481 100644 --- a/pandas/core/_numba/kernels/min_max_.py +++ b/pandas/core/_numba/kernels/min_max_.py @@ -9,7 +9,10 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import ( + TYPE_CHECKING, + Any, +) import numba import numpy as np @@ -18,6 +21,45 @@ from pandas._typing import npt +@numba.njit(nogil=True, parallel=False) +def bisect_left(a: list[Any], x: Any, lo: int = 0, hi: int = -1) -> int: + if hi == -1: + hi = len(a) + while lo < hi: + mid = (lo + hi) // 2 + if a[mid] < x: + lo = mid + 1 + else: + hi = mid + return lo + + +# *** Notations *** +# N: size of the values[] array. +# NN: last accessed element (1-based) in the values[] array, that is max_(end[]). +# In most cases NN==N (unless you are using a custom window indexer). +# M: number of min/max "jobs", that is, size of start[] and end[] arrays. +# In pandas' common usage, M==N==NN, but it does not have to! +# k: maximum window size, that is max(end[] - start[]) +# +# *** Complexity *** +# - O(max(NN,M)) for constant window sizes. +# - O(max(NN,M)*log(k)) for arbitrary window sizes. +# +# *** Assumptions *** +# The min/max "jobs" have to be ordered in the lexiographical (end[i], start[i]) order. +# In the regular pandas' case with constant window size these array ARE properly +# sorted by construction. +# In other words, for each i = 0..N-2, this condition must hold: +# - (end[i+1] > end[i]) OR +# - (end[i+1] == end[i] AND start[i+1] >= start[i]) +# +# To debug this with your favorite Python debugger: +# - Comment out the "numba.jit" line right below this comment above the function def. +# - Find and comment out a similar line in column_looper() defined in +# generate_apply_looper() in executor.py. +# - Place a breakpoint in this function. Your Python debugger will stop there! +# - (Debugging was tested with VSCode on WSL.) @numba.jit(nopython=True, nogil=True, parallel=False) def sliding_min_max( values: np.ndarray, @@ -27,55 +69,140 @@ def sliding_min_max( min_periods: int, is_max: bool, ) -> tuple[np.ndarray, list[int]]: + # numba-only init part N = len(start) - nobs = 0 - output = np.empty(N, dtype=result_dtype) na_pos = [] - # Use deque once numba supports it - # https://github.com/numba/numba/issues/7417 - Q: list = [] - W: list = [] - for i in range(N): - curr_win_size = end[i] - start[i] - if i == 0: - st = start[i] + output = np.empty(N, dtype=result_dtype) + + def cmp(a: Any, b: Any, is_max: bool) -> bool: + if is_max: + return a >= b else: - st = end[i - 1] - - for k in range(st, end[i]): - ai = values[k] - if not np.isnan(ai): - nobs += 1 - elif is_max: - ai = -np.inf - else: - ai = np.inf - # Discard previous entries if we find new min or max - if is_max: - while Q and ((ai >= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]): - Q.pop() - else: - while Q and ((ai <= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]): - Q.pop() - Q.append(k) - W.append(k) + return a <= b + + # All comments below are for the case of a maximum, that is, is_max = True. + # I will call this Q a "stash": preliminary calculations minimally necessary to + # finish the job. Q will always be in ascending order regardless of min/max: + # these are indices into the "values" array. The values[Q[i]], however, will + # be in non-descending order for max, and non-ascending for min. + # Think of this deque as indices of maximums found so far for varying window + # positions. That is, there is only one maximum in the source array, but it may + # not fit each window. So there are many "secondary maximums", and each of them + # may potentially fit multiple windows (unless, of course you get a very special + # case of an arary of strictly descending values and constant rolling window size). + # In that case Q will be the longest, so here is an idea for the worst case + # scenario testing. + + # We have to pretend, given that Numba has neither queue nor stack. + Q: list = [] # this is a queue + Dominators: list = [] # this is a stack + # END-OF numba-only init part + + # Basic idea of the algorithm: https://stackoverflow.com/a/12239580 + # It was generalized to work with an arbitrary list of any window size and position - # Discard entries outside and left of current window - while Q and Q[0] <= start[i] - 1: + # Zero is apparently passed here as a default. + # It is important to have this value precise for calculations. + if min_periods < 1: + min_periods = 1 + + # We will say that node j "dominates" node i if j comes after i, yet requires a + # deeper deque Q at the time node i is processed in order to be able to finish + # the job for node j. This precisely means the following two conditions: + # - j > i + # - start[j] < start[i]. + # We keep track of such nodes in the Dominators queue. + # In addition, if it so happens that two nodes j1 and j2 dominate another node, + # and j2 > j1, yet start[j2] <= start[j1], then we only need to keep track of j2. + # (To understand why this works, note that the algorithm requires that + # the "end" array is sorted in non-descending order, among other things.) + if N > 2: + i_next = N - 1 # equivalent to i_next = i+1 inside the loop + for i in range(N - 2, -1, -1): + next_dominates = start[i_next] < start[i] + if next_dominates and ( + not Dominators or start[Dominators[-1]] > start[i_next] + ): + # Both ">" and ">=" would have been (logically) equivalent, but we are + # shooting for the shortest size of the Dominators list, hence the + # usage of ">" + Dominators.append(i_next) + i_next = i + + valid_start = -min_periods + + # Having these relieves us from having "if i>0" on each iteration for special + # handling + last_end = 0 + last_start = -1 + + for i in range(N): + this_start = start[i].item() + this_end = end[i].item() + + if Dominators and Dominators[-1] == i: + Dominators.pop() + + # TODO: Arguably there are benefits to having this consistency check before + # this function is even called (e.g. in rolling.py). + # Given the current implementation, it will be rather tricky at the moment + # to have this check in rolling.py. Additionally, this is only required for + # min/max, and may only ever be violated if user-defined window indexer is + # used. Thus this is the best spot for it, given the circumstances. + if not ( + this_end > last_end or (this_end == last_end and this_start >= last_start) + ): + raise ValueError( + "Start/End ordering requirement is violated at index " + str(i) + ) + + # This is the least restrictive starting index that will take care of current + # item (i) and all remaining items + stash_start = ( + this_start if not Dominators else min(this_start, start[Dominators[-1]]) + ) + # Discard entries outside of the "needed" window. Do it first as to keep the + # stash small. + while Q and Q[0] < stash_start: Q.pop(0) - while W and W[0] <= start[i] - 1: - if not np.isnan(values[W[0]]): - nobs -= 1 - W.pop(0) - # Save output based on index in input value array - if Q and curr_win_size > 0 and nobs >= min_periods: - output[i] = values[Q[0]] - else: + for k in range(last_end, this_end): + if not np.isnan(values[k]): + valid_start += 1 + while valid_start >= 0 and np.isnan(values[valid_start]): + valid_start += 1 + while Q and cmp(values[k], values[Q[-1]], is_max): + Q.pop() # Q.pop_back() + Q.append(k) # Q.push_back(k) + + if not Q or (this_start > valid_start): + # The "not Q" condition means we have not seen anything but NaNs yet in + # values[:this_end-1]. The "this_start > valid_start" condition means we + # have not accumulated enough (min_periods or more) non-NaN values. if values.dtype.kind != "i": output[i] = np.nan else: na_pos.append(i) + elif Q[0] >= this_start: + # This is the only read-from-the-stash scenario that ever happens when + # window size is constant across the set. This is also likely 99+% of + # all use cases, thus we want to make sure we do not go into bisection + # as to incur neither the *log(k) penalty nor the function call penalty + # for this very common case. If we are here, then our stash is as "deep" + # as what the current node ("job") requires. Thus take the front item. + output[i] = values[Q[0]] + else: + # In this case our stash is bigger than what is necessary to compute this + # node's output due to a wider search window at (one of) the nodes that + # follow. We have to locate our value in the middle of the stash. + # Since our stash is sorted, we can use binary search: + # here we need to output the item closest to the front (idx=0) of the + # stash that fits our window bounds. Item 0 has been looked at (and + # discarded) by now, so lo=1 + q_idx = bisect_left(Q, this_start, lo=1) + output[i] = values[Q[q_idx]] + last_end = this_end + last_start = this_start return output, na_pos diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 887aeca6590dc..78fffa7666f74 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -581,3 +581,60 @@ def test_npfunc_no_warnings(): df = DataFrame({"col1": [1, 2, 3, 4, 5]}) with tm.assert_produces_warning(False): df.col1.rolling(2).apply(np.prod, raw=True, engine="numba") + + +from .test_rolling import ( + ArbitraryWindowIndexer, + CustomLengthWindowIndexer, +) + + +@td.skip_if_no("numba") +class TestMinMax: + @pytest.mark.parametrize("is_max", [True, False]) + @pytest.mark.parametrize( + "seed, n, win_len, min_obs, frac_nan, indexer_t", + [ + (42, 1000, 80, 15, 0.3, CustomLengthWindowIndexer), + (52, 1000, 80, 15, 0.3, ArbitraryWindowIndexer), + (1984, 1000, 40, 25, 0.3, None), + ], + ) + def test_minmax(self, is_max, seed, n, win_len, min_obs, frac_nan, indexer_t): + if seed is not None and isinstance(seed, np.random._generator.Generator): + rng = np.random.default_rng(seed) + rng.bit_generator.state = seed.bit_generator.state + else: + rng = np.random.default_rng(seed) + + vals = DataFrame({"Data": rng.random(n)}) + if frac_nan > 0: + is_nan = rng.random(len(vals)) < frac_nan + vals.Data = np.where(is_nan, np.nan, vals.Data) + + ind_param = indexer_t(rng, len(vals), win_len) if indexer_t else win_len + + r = vals.rolling(ind_param, min_periods=min_obs) + f = r.max if is_max else r.min + test_cython = f(engine="cython") + test_numba = f(engine="numba") + tm.assert_series_equal(test_numba.Data, test_cython.Data) + + @pytest.mark.parametrize( + "seed, n, win_len, indexer_t", + [ + (42, 15, 7, ArbitraryWindowIndexer), + ], + ) + def test_wrong_order(self, seed, n, win_len, indexer_t): + rng = np.random.default_rng(seed) + vals = DataFrame({"Data": rng.random(n)}) + + ind_obj = indexer_t(rng, len(vals), win_len) + ind_obj._end[[14, 7]] = ind_obj._end[[7, 14]] + + f = vals.rolling(ind_obj).max + with pytest.raises( + ValueError, match="Start/End ordering requirement is violated at index 8" + ): + f(engine="numba") diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 8c57781c1447c..fc547b9d66bb8 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -2,6 +2,7 @@ datetime, timedelta, ) +from typing import Any import numpy as np import pytest @@ -1946,3 +1947,172 @@ def test_rolling_timedelta_window_non_nanoseconds(unit, tz): df.index = df.index.as_unit("ns") tm.assert_frame_equal(ref_df, df) + + +class StandardWindowIndexer(BaseIndexer): + def __init__(self, n, win_len): + self.n = n + self.win_len = win_len + super().__init__() + + def get_window_bounds( + self, num_values=None, min_periods=None, center=None, closed=None, step=None + ): + if num_values is None: + num_values = self.n + end = np.arange(num_values, dtype="int64") + 1 + start = np.clip(end - self.win_len, 0, num_values) + return start, end + + +class CustomLengthWindowIndexer(BaseIndexer): + def __init__(self, rnd: np.random.Generator, n, win_len): + self.window = rnd.integers(win_len, size=n) + super().__init__() + + def get_window_bounds( + self, num_values=None, min_periods=None, center=None, closed=None, step=None + ): + if num_values is None: + num_values = len(self.window) + end = np.arange(num_values, dtype="int64") + 1 + start = np.clip(end - self.window, 0, num_values) + return start, end + + +class ArbitraryWindowIndexer(BaseIndexer): + def __init__(self, rnd: np.random.Generator, n, win_len): + start = rnd.integers(n, size=n) + win_len = rnd.integers(win_len, size=n) + end = np.where(start - win_len >= 0, start - win_len, start + win_len) + + (start, end) = ( + np.where(end >= start, start, end), + np.where(end >= start, end, start), + ) + + # It is extremely unlikely that a random array would come sorted, + # so we proceed with sort without checking if it is sorted. + prm = sorted(range(len(start)), key=lambda i: (end[i], start[i])) + + self._start = np.array(start)[prm] + self._end = np.array(end)[prm] + super().__init__() + + def get_window_bounds( + self, num_values=None, min_periods=None, center=None, closed=None, step=None + ): + if num_values is None: + num_values = len(self._start) + start = np.clip(self._start, 0, num_values) + end = np.clip(self._end, 0, num_values) + return start, end + + +class TestMinMax: + # Pytest cache will not be a good choice here, because it appears + # pytest persists data on disk, and we are not really interested + # in flooding your hard drive with random numbers. + # Thus we just cache control data in memory to avoid repetititve calculations. + class Cache: + def __init__(self) -> None: + self.ctrl: dict[Any, Any] = {} + + @pytest.fixture(scope="class") + def cache(self) -> Cache: + return self.Cache() + + @pytest.mark.parametrize("is_max", [True, False]) + # @pytest.mark.parametrize("engine", ["python", "cython", "numba"]) + @pytest.mark.parametrize("engine", ["cython"]) + @pytest.mark.parametrize( + "seed, n, win_len, min_obs, frac_nan, indexer_t", + [ + (42, 1000, 80, 15, 0.3, CustomLengthWindowIndexer), + (52, 1000, 80, 15, 0.3, ArbitraryWindowIndexer), + (1984, 1000, 40, 25, 0.3, None), + ], + ) + def test_minmax( + self, is_max, engine, seed, n, win_len, min_obs, frac_nan, indexer_t, cache + ): + if seed is not None and isinstance(seed, np.random._generator.Generator): + rng = np.random.default_rng(seed) + rng.bit_generator.state = seed.bit_generator.state + else: + rng = np.random.default_rng(seed) + + if seed is None or isinstance(seed, np.random._generator.Generator): + rng_state_for_key = ( + rng.bit_generator.state["bit_generator"], + rng.bit_generator.state["state"]["state"], + rng.bit_generator.state["state"]["inc"], + rng.bit_generator.state["has_uint32"], + rng.bit_generator.state["uinteger"], + ) + else: + rng_state_for_key = seed + self.last_rng_state = rng.bit_generator.state + vals = DataFrame({"Data": rng.random(n)}) + if frac_nan > 0: + is_nan = rng.random(len(vals)) < frac_nan + vals.Data = np.where(is_nan, np.nan, vals.Data) + + ind_obj = ( + indexer_t(rng, len(vals), win_len) + if indexer_t + else StandardWindowIndexer(len(vals), win_len) + ) + ind_param = ind_obj if indexer_t else win_len + + (start, end) = ind_obj.get_window_bounds() + ctrl_key = (is_max, rng_state_for_key, n, win_len, min_obs, frac_nan, indexer_t) + if ctrl_key in cache.ctrl: + ctrl = cache.ctrl[ctrl_key] + else: + # This is brute force calculation, and may get expensive when n is + # large, so we cache it. + ctrl = calc_minmax_control(vals.Data, start, end, min_obs, is_max) + cache.ctrl[ctrl_key] = ctrl + + r = vals.rolling(ind_param, min_periods=min_obs) + f = r.max if is_max else r.min + test = f(engine=engine) + tm.assert_series_equal(test.Data, ctrl.Data) + + # @pytest.mark.parametrize("engine", ["python", "cython", "numba"]) + @pytest.mark.parametrize("engine", ["cython"]) + @pytest.mark.parametrize( + "seed, n, win_len, indexer_t", + [ + (42, 15, 7, ArbitraryWindowIndexer), + ], + ) + def test_wrong_order(self, engine, seed, n, win_len, indexer_t): + rng = np.random.default_rng(seed) + vals = DataFrame({"Data": rng.random(n)}) + + ind_obj = indexer_t(rng, len(vals), win_len) + ind_obj._end[[14, 7]] = ind_obj._end[[7, 14]] + + f = vals.rolling(ind_obj).max + with pytest.raises( + ValueError, match="Start/End ordering requirement is violated at index 8" + ): + f(engine=engine) + + +def calc_minmax_control(vals, start, end, min_periods, ismax): + func = np.nanmax if ismax else np.nanmin + outp = np.full(vals.shape, np.nan) + for i in range(len(start)): + if start[i] >= end[i]: + outp[i] = np.nan + else: + rng = vals[start[i] : end[i]] + non_nan_cnt = np.count_nonzero(~np.isnan(rng)) + if non_nan_cnt >= min_periods: + outp[i] = func(rng) + else: + outp[i] = np.nan + return DataFrame({"Data": outp}) From 7c41898549c1c89c0926475e4b22b3f8a10bd8ef Mon Sep 17 00:00:00 2001 From: viable-alternative Date: Fri, 18 Apr 2025 12:33:37 -0400 Subject: [PATCH 2/7] Whitespace recommendations from code review. Co-authored-by: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> --- pandas/_libs/window/aggregations.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index d8c006be34350..9a4c29d48004e 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1109,8 +1109,8 @@ def _roll_min_max( valid_start = -minp - last_end =0 - last_start=-1 + last_end = 0 + last_start = -1 for i in range(N): this_start = start[i] From ba94a736ddae91fed46cbe3dfd6f157e9fd66142 Mon Sep 17 00:00:00 2001 From: viable-alternative Date: Fri, 18 Apr 2025 12:34:08 -0400 Subject: [PATCH 3/7] Whitespace recommendations from code review. Co-authored-by: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> --- pandas/_libs/window/aggregations.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 9a4c29d48004e..bb393a84ab78a 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1135,7 +1135,7 @@ def _roll_min_max( for k in range(last_end, this_end): if not isnan(values[k]): valid_start += 1 - while valid_start>=0 and isnan(values[valid_start]): + while valid_start >= 0 and isnan(values[valid_start]): valid_start += 1 # Sadly, this runs more than 15% faster than trying to use From a1c68ee208e637eac385f74fd27ee9d295ee2611 Mon Sep 17 00:00:00 2001 From: viable-alternative Date: Fri, 18 Apr 2025 16:12:14 -0400 Subject: [PATCH 4/7] removed comments in code Co-authored-by: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> --- pandas/core/_numba/kernels/min_max_.py | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py index 3104d306be481..f6ad977e41c18 100644 --- a/pandas/core/_numba/kernels/min_max_.py +++ b/pandas/core/_numba/kernels/min_max_.py @@ -34,32 +34,6 @@ def bisect_left(a: list[Any], x: Any, lo: int = 0, hi: int = -1) -> int: return lo -# *** Notations *** -# N: size of the values[] array. -# NN: last accessed element (1-based) in the values[] array, that is max_(end[]). -# In most cases NN==N (unless you are using a custom window indexer). -# M: number of min/max "jobs", that is, size of start[] and end[] arrays. -# In pandas' common usage, M==N==NN, but it does not have to! -# k: maximum window size, that is max(end[] - start[]) -# -# *** Complexity *** -# - O(max(NN,M)) for constant window sizes. -# - O(max(NN,M)*log(k)) for arbitrary window sizes. -# -# *** Assumptions *** -# The min/max "jobs" have to be ordered in the lexiographical (end[i], start[i]) order. -# In the regular pandas' case with constant window size these array ARE properly -# sorted by construction. -# In other words, for each i = 0..N-2, this condition must hold: -# - (end[i+1] > end[i]) OR -# - (end[i+1] == end[i] AND start[i+1] >= start[i]) -# -# To debug this with your favorite Python debugger: -# - Comment out the "numba.jit" line right below this comment above the function def. -# - Find and comment out a similar line in column_looper() defined in -# generate_apply_looper() in executor.py. -# - Place a breakpoint in this function. Your Python debugger will stop there! -# - (Debugging was tested with VSCode on WSL.) @numba.jit(nopython=True, nogil=True, parallel=False) def sliding_min_max( values: np.ndarray, From 4ee752a7d61b71b138e7372597e56cf0103c154e Mon Sep 17 00:00:00 2001 From: viable-alternative Date: Fri, 18 Apr 2025 16:29:15 -0400 Subject: [PATCH 5/7] In-code comments replaced with minimal comment at the top of the fuinction, as requested by @mroeschke --- pandas/_libs/window/aggregations.pyx | 25 +--------- pandas/core/_numba/kernels/min_max_.py | 66 ++------------------------ 2 files changed, 6 insertions(+), 85 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index bb393a84ab78a..6b15a3f98aa74 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1070,7 +1070,6 @@ def _roll_min_max( stack Dominators[int64_t] ndarray[float64_t, ndim=1] output - # ideally want these in the i-loop scope Py_ssize_t this_start, this_end, stash_start int64_t q_idx @@ -1079,8 +1078,8 @@ def _roll_min_max( Dominators = stack[int64_t]() # This function was "ported" / translated from sliding_min_max() - # in /pandas/core/_numba/kernels/min_max_.py. (See there for detailed - # comments and credits.) + # in /pandas/core/_numba/kernels/min_max_.py. + # (See there for credits and some comments.) # Code translation assumptions/rules: # - min_periods --> minp # - deque[0] --> front() @@ -1138,26 +1137,6 @@ def _roll_min_max( while valid_start >= 0 and isnan(values[valid_start]): valid_start += 1 - # Sadly, this runs more than 15% faster than trying to use - # generic comparison functions. - # That is, I tried: - # - # | cdef inline bint le(float64_t a, float64_t b) nogil: - # | return a <= b - # | cdef inline bint ge(float64_t a, float64_t b) nogil: - # | return a >= b - # | ctypedef bint (*cmp_func_t) (float64_t a, float64_t b) nogil - # | ... - # | cmp_func_t cmp - # | - # | if is_max: - # | cmp = ge - # | else: - # | cmp = le - # and, finally - # | while not Q.empty() and cmp(values[k], values[Q.back()]): - # | Q.pop_back() - if is_max: while not Q.empty() and values[k] >= values[Q.back()]: Q.pop_back() diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py index f6ad977e41c18..8e377e88bcc9e 100644 --- a/pandas/core/_numba/kernels/min_max_.py +++ b/pandas/core/_numba/kernels/min_max_.py @@ -43,7 +43,10 @@ def sliding_min_max( min_periods: int, is_max: bool, ) -> tuple[np.ndarray, list[int]]: - # numba-only init part + # Basic idea of the algorithm: https://stackoverflow.com/a/12239580 + # It was generalized to work with an arbitrary list of any window size and position + # by adding the Dominators stack. + N = len(start) na_pos = [] output = np.empty(N, dtype=result_dtype) @@ -54,42 +57,12 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool: else: return a <= b - # All comments below are for the case of a maximum, that is, is_max = True. - # I will call this Q a "stash": preliminary calculations minimally necessary to - # finish the job. Q will always be in ascending order regardless of min/max: - # these are indices into the "values" array. The values[Q[i]], however, will - # be in non-descending order for max, and non-ascending for min. - # Think of this deque as indices of maximums found so far for varying window - # positions. That is, there is only one maximum in the source array, but it may - # not fit each window. So there are many "secondary maximums", and each of them - # may potentially fit multiple windows (unless, of course you get a very special - # case of an arary of strictly descending values and constant rolling window size). - # In that case Q will be the longest, so here is an idea for the worst case - # scenario testing. - - # We have to pretend, given that Numba has neither queue nor stack. Q: list = [] # this is a queue Dominators: list = [] # this is a stack - # END-OF numba-only init part - - # Basic idea of the algorithm: https://stackoverflow.com/a/12239580 - # It was generalized to work with an arbitrary list of any window size and position - # Zero is apparently passed here as a default. - # It is important to have this value precise for calculations. if min_periods < 1: min_periods = 1 - # We will say that node j "dominates" node i if j comes after i, yet requires a - # deeper deque Q at the time node i is processed in order to be able to finish - # the job for node j. This precisely means the following two conditions: - # - j > i - # - start[j] < start[i]. - # We keep track of such nodes in the Dominators queue. - # In addition, if it so happens that two nodes j1 and j2 dominate another node, - # and j2 > j1, yet start[j2] <= start[j1], then we only need to keep track of j2. - # (To understand why this works, note that the algorithm requires that - # the "end" array is sorted in non-descending order, among other things.) if N > 2: i_next = N - 1 # equivalent to i_next = i+1 inside the loop for i in range(N - 2, -1, -1): @@ -97,16 +70,11 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool: if next_dominates and ( not Dominators or start[Dominators[-1]] > start[i_next] ): - # Both ">" and ">=" would have been (logically) equivalent, but we are - # shooting for the shortest size of the Dominators list, hence the - # usage of ">" Dominators.append(i_next) i_next = i valid_start = -min_periods - # Having these relieves us from having "if i>0" on each iteration for special - # handling last_end = 0 last_start = -1 @@ -117,12 +85,6 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool: if Dominators and Dominators[-1] == i: Dominators.pop() - # TODO: Arguably there are benefits to having this consistency check before - # this function is even called (e.g. in rolling.py). - # Given the current implementation, it will be rather tricky at the moment - # to have this check in rolling.py. Additionally, this is only required for - # min/max, and may only ever be violated if user-defined window indexer is - # used. Thus this is the best spot for it, given the circumstances. if not ( this_end > last_end or (this_end == last_end and this_start >= last_start) ): @@ -130,13 +92,9 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool: "Start/End ordering requirement is violated at index " + str(i) ) - # This is the least restrictive starting index that will take care of current - # item (i) and all remaining items stash_start = ( this_start if not Dominators else min(this_start, start[Dominators[-1]]) ) - # Discard entries outside of the "needed" window. Do it first as to keep the - # stash small. while Q and Q[0] < stash_start: Q.pop(0) @@ -150,29 +108,13 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool: Q.append(k) # Q.push_back(k) if not Q or (this_start > valid_start): - # The "not Q" condition means we have not seen anything but NaNs yet in - # values[:this_end-1]. The "this_start > valid_start" condition means we - # have not accumulated enough (min_periods or more) non-NaN values. if values.dtype.kind != "i": output[i] = np.nan else: na_pos.append(i) elif Q[0] >= this_start: - # This is the only read-from-the-stash scenario that ever happens when - # window size is constant across the set. This is also likely 99+% of - # all use cases, thus we want to make sure we do not go into bisection - # as to incur neither the *log(k) penalty nor the function call penalty - # for this very common case. If we are here, then our stash is as "deep" - # as what the current node ("job") requires. Thus take the front item. output[i] = values[Q[0]] else: - # In this case our stash is bigger than what is necessary to compute this - # node's output due to a wider search window at (one of) the nodes that - # follow. We have to locate our value in the middle of the stash. - # Since our stash is sorted, we can use binary search: - # here we need to output the item closest to the front (idx=0) of the - # stash that fits our window bounds. Item 0 has been looked at (and - # discarded) by now, so lo=1 q_idx = bisect_left(Q, this_start, lo=1) output[i] = values[Q[q_idx]] last_end = this_end From c8761eca1a084553766aae732509194d74cc8e3a Mon Sep 17 00:00:00 2001 From: viable-alternative Date: Sat, 19 Apr 2025 13:42:17 -0400 Subject: [PATCH 6/7] Randomized deterministic unit tests replaced with 10-row prebuilt tests as requested by @mroeschke --- pandas/tests/window/test_numba.py | 58 ++------- pandas/tests/window/test_rolling.py | 190 ++++++---------------------- 2 files changed, 49 insertions(+), 199 deletions(-) diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 78fffa7666f74..f9b71bf4835fe 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -583,58 +583,16 @@ def test_npfunc_no_warnings(): df.col1.rolling(2).apply(np.prod, raw=True, engine="numba") -from .test_rolling import ( - ArbitraryWindowIndexer, - CustomLengthWindowIndexer, -) +from .test_rolling import TestMinMax @td.skip_if_no("numba") -class TestMinMax: - @pytest.mark.parametrize("is_max", [True, False]) - @pytest.mark.parametrize( - "seed, n, win_len, min_obs, frac_nan, indexer_t", - [ - (42, 1000, 80, 15, 0.3, CustomLengthWindowIndexer), - (52, 1000, 80, 15, 0.3, ArbitraryWindowIndexer), - (1984, 1000, 40, 25, 0.3, None), - ], - ) - def test_minmax(self, is_max, seed, n, win_len, min_obs, frac_nan, indexer_t): - if seed is not None and isinstance(seed, np.random._generator.Generator): - rng = np.random.default_rng(seed) - rng.bit_generator.state = seed.bit_generator.state - else: - rng = np.random.default_rng(seed) - - vals = DataFrame({"Data": rng.random(n)}) - if frac_nan > 0: - is_nan = rng.random(len(vals)) < frac_nan - vals.Data = np.where(is_nan, np.nan, vals.Data) - - ind_param = indexer_t(rng, len(vals), win_len) if indexer_t else win_len +class TestMinMaxNumba: + parent = TestMinMax() - r = vals.rolling(ind_param, min_periods=min_obs) - f = r.max if is_max else r.min - test_cython = f(engine="cython") - test_numba = f(engine="numba") - tm.assert_series_equal(test_numba.Data, test_cython.Data) + @pytest.mark.parametrize("is_max, has_nan, exp_list", TestMinMax.TestData) + def test_minmax(self, is_max, has_nan, exp_list): + TestMinMaxNumba.parent.test_minmax(is_max, has_nan, exp_list, "numba") - @pytest.mark.parametrize( - "seed, n, win_len, indexer_t", - [ - (42, 15, 7, ArbitraryWindowIndexer), - ], - ) - def test_wrong_order(self, seed, n, win_len, indexer_t): - rng = np.random.default_rng(seed) - vals = DataFrame({"Data": rng.random(n)}) - - ind_obj = indexer_t(rng, len(vals), win_len) - ind_obj._end[[14, 7]] = ind_obj._end[[7, 14]] - - f = vals.rolling(ind_obj).max - with pytest.raises( - ValueError, match="Start/End ordering requirement is violated at index 8" - ): - f(engine="numba") + def test_wrong_order(self): + TestMinMaxNumba.parent.test_wrong_order("numba") diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index fc547b9d66bb8..7388b8b3cc4bd 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -2,7 +2,6 @@ datetime, timedelta, ) -from typing import Any import numpy as np import pytest @@ -1949,54 +1948,10 @@ def test_rolling_timedelta_window_non_nanoseconds(unit, tz): tm.assert_frame_equal(ref_df, df) -class StandardWindowIndexer(BaseIndexer): - def __init__(self, n, win_len): - self.n = n - self.win_len = win_len - super().__init__() - - def get_window_bounds( - self, num_values=None, min_periods=None, center=None, closed=None, step=None - ): - if num_values is None: - num_values = self.n - end = np.arange(num_values, dtype="int64") + 1 - start = np.clip(end - self.win_len, 0, num_values) - return start, end - - -class CustomLengthWindowIndexer(BaseIndexer): - def __init__(self, rnd: np.random.Generator, n, win_len): - self.window = rnd.integers(win_len, size=n) - super().__init__() - - def get_window_bounds( - self, num_values=None, min_periods=None, center=None, closed=None, step=None - ): - if num_values is None: - num_values = len(self.window) - end = np.arange(num_values, dtype="int64") + 1 - start = np.clip(end - self.window, 0, num_values) - return start, end - - -class ArbitraryWindowIndexer(BaseIndexer): - def __init__(self, rnd: np.random.Generator, n, win_len): - start = rnd.integers(n, size=n) - win_len = rnd.integers(win_len, size=n) - end = np.where(start - win_len >= 0, start - win_len, start + win_len) - - (start, end) = ( - np.where(end >= start, start, end), - np.where(end >= start, end, start), - ) - - # It is extremely unlikely that a random array would come sorted, - # so we proceed with sort without checking if it is sorted. - prm = sorted(range(len(start)), key=lambda i: (end[i], start[i])) - - self._start = np.array(start)[prm] - self._end = np.array(end)[prm] +class PrescribedWindowIndexer(BaseIndexer): + def __init__(self, start, end): + self._start = start + self._end = end super().__init__() def get_window_bounds( @@ -2010,109 +1965,46 @@ def get_window_bounds( class TestMinMax: - # Pytest cache will not be a good choice here, because it appears - # pytest persists data on disk, and we are not really interested - # in flooding your hard drive with random numbers. - # Thus we just cache control data in memory to avoid repetititve calculations. - class Cache: - def __init__(self) -> None: - self.ctrl: dict[Any, Any] = {} - - @pytest.fixture(scope="class") - def cache(self) -> Cache: - return self.Cache() - - @pytest.mark.parametrize("is_max", [True, False]) - # @pytest.mark.parametrize("engine", ["python", "cython", "numba"]) - @pytest.mark.parametrize("engine", ["cython"]) - @pytest.mark.parametrize( - "seed, n, win_len, min_obs, frac_nan, indexer_t", - [ - (42, 1000, 80, 15, 0.3, CustomLengthWindowIndexer), - (52, 1000, 80, 15, 0.3, ArbitraryWindowIndexer), - (1984, 1000, 40, 25, 0.3, None), - ], - ) - def test_minmax( - self, is_max, engine, seed, n, win_len, min_obs, frac_nan, indexer_t, cache - ): - if seed is not None and isinstance(seed, np.random._generator.Generator): - rng = np.random.default_rng(seed) - rng.bit_generator.state = seed.bit_generator.state - else: - rng = np.random.default_rng(seed) - - if seed is None or isinstance(seed, np.random._generator.Generator): - rng_state_for_key = ( - rng.bit_generator.state["bit_generator"], - rng.bit_generator.state["state"]["state"], - rng.bit_generator.state["state"]["inc"], - rng.bit_generator.state["has_uint32"], - rng.bit_generator.state["uinteger"], - ) - else: - rng_state_for_key = seed - self.last_rng_state = rng.bit_generator.state - vals = DataFrame({"Data": rng.random(n)}) - if frac_nan > 0: - is_nan = rng.random(len(vals)) < frac_nan - vals.Data = np.where(is_nan, np.nan, vals.Data) - - ind_obj = ( - indexer_t(rng, len(vals), win_len) - if indexer_t - else StandardWindowIndexer(len(vals), win_len) - ) - ind_param = ind_obj if indexer_t else win_len + TestData = [ + (True, False, [3.0, 5.0, 2.0, 5.0, 1.0, 5.0, 6.0, 7.0, 8.0, 9.0]), + (True, True, [3.0, 4.0, 2.0, 4.0, 1.0, 4.0, 6.0, 7.0, 7.0, 9.0]), + (False, False, [3.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0]), + (False, True, [3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 6.0, 6.0, 7.0, 1.0]), + ] - (start, end) = ind_obj.get_window_bounds() - ctrl_key = (is_max, rng_state_for_key, n, win_len, min_obs, frac_nan, indexer_t) - if ctrl_key in cache.ctrl: - ctrl = cache.ctrl[ctrl_key] + @pytest.mark.parametrize("is_max, has_nan, exp_list", TestData) + def test_minmax(self, is_max, has_nan, exp_list, engine=None): + nan_idx = [0, 5, 8] + df = DataFrame( + { + "data": [5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 6.0, 7.0, 8.0, 9.0], + "start": [2, 0, 3, 0, 4, 0, 5, 5, 7, 3], + "end": [3, 4, 4, 5, 5, 6, 7, 8, 9, 10], + } + ) + if has_nan: + df.loc[nan_idx, "data"] = np.nan + expected = Series(exp_list, name="data") + r = df.data.rolling( + PrescribedWindowIndexer(df.start.to_numpy(), df.end.to_numpy()) + ) + if is_max: + result = r.max(engine=engine) else: - # This is brute force calculation, and may get expensive when n is - # large, so we cache it. - ctrl = calc_minmax_control(vals.Data, start, end, min_obs, is_max) - cache.ctrl[ctrl_key] = ctrl - - r = vals.rolling(ind_param, min_periods=min_obs) - f = r.max if is_max else r.min - test = f(engine=engine) - tm.assert_series_equal(test.Data, ctrl.Data) - - # @pytest.mark.parametrize("engine", ["python", "cython", "numba"]) - @pytest.mark.parametrize("engine", ["cython"]) - @pytest.mark.parametrize( - "seed, n, win_len, indexer_t", - [ - (42, 15, 7, ArbitraryWindowIndexer), - ], - ) - def test_wrong_order(self, engine, seed, n, win_len, indexer_t): - rng = np.random.default_rng(seed) - vals = DataFrame({"Data": rng.random(n)}) + result = r.min(engine=engine) + + tm.assert_series_equal(result, expected) + + def test_wrong_order(self, engine=None): + start = np.array(range(5), dtype=np.int64) + end = start + 1 + end[3] = end[2] + start[3] = start[2] - 1 - ind_obj = indexer_t(rng, len(vals), win_len) - ind_obj._end[[14, 7]] = ind_obj._end[[7, 14]] + df = DataFrame({"data": start * 1.0, "start": start, "end": end}) - f = vals.rolling(ind_obj).max + r = df.data.rolling(PrescribedWindowIndexer(start, end)) with pytest.raises( - ValueError, match="Start/End ordering requirement is violated at index 8" + ValueError, match="Start/End ordering requirement is violated at index 3" ): - f(engine=engine) - - -def calc_minmax_control(vals, start, end, min_periods, ismax): - func = np.nanmax if ismax else np.nanmin - outp = np.full(vals.shape, np.nan) - for i in range(len(start)): - if start[i] >= end[i]: - outp[i] = np.nan - else: - rng = vals[start[i] : end[i]] - non_nan_cnt = np.count_nonzero(~np.isnan(rng)) - if non_nan_cnt >= min_periods: - outp[i] = func(rng) - else: - outp[i] = np.nan - return DataFrame({"Data": outp}) + r.max(engine=engine) From ee74058603b3ba21718d42179ac625262cf1fab4 Mon Sep 17 00:00:00 2001 From: viable-alternative Date: Mon, 21 Apr 2025 20:36:21 -0400 Subject: [PATCH 7/7] Rolling min/max: added brief comments; renamed a couple of local variables; de-coupled numba and cython unit tests. --- pandas/_libs/window/aggregations.pyx | 61 ++++++++++++++----------- pandas/core/_numba/kernels/min_max_.py | 40 ++++++++++------- pandas/tests/window/test_numba.py | 62 +++++++++++++++++++++++--- pandas/tests/window/test_rolling.py | 27 +++++------ 4 files changed, 128 insertions(+), 62 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 6b15a3f98aa74..04b3f8ab461fa 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -989,15 +989,14 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, # ---------------------------------------------------------------------- -# Moving maximum / minimum code taken from Bottleneck -# Licence at LICENSES/BOTTLENECK_LICENCE - cdef int64_t bisect_left( deque[int64_t]& a, int64_t x, int64_t lo=0, int64_t hi=-1 ) nogil: + """Same as https://docs.python.org/3/library/bisect.html.""" + cdef int64_t mid if hi == -1: hi = a.size() @@ -1011,6 +1010,9 @@ cdef int64_t bisect_left( from libc.math cimport isnan +# Prior version of moving maximum / minimum code taken from Bottleneck +# Licence at LICENSES/BOTTLENECK_LICENCE + def roll_max(ndarray[float64_t] values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp) -> np.ndarray: @@ -1066,16 +1068,19 @@ def _roll_min_max( ): cdef: Py_ssize_t i, i_next, k, valid_start, last_end, last_start, N = len(start) - deque Q[int64_t] - stack Dominators[int64_t] + # Indices of bounded extrema in `values`. `candidates[i]` is always increasing. + # `values[candidates[i]]` is decreasing for max and increasing for min. + deque candidates[int64_t] + # Indices of largest windows that "cover" preceding windows. + stack dominators[int64_t] ndarray[float64_t, ndim=1] output Py_ssize_t this_start, this_end, stash_start int64_t q_idx output = np.empty(N, dtype=np.float64) - Q = deque[int64_t]() - Dominators = stack[int64_t]() + candidates = deque[int64_t]() + dominators = stack[int64_t]() # This function was "ported" / translated from sliding_min_max() # in /pandas/core/_numba/kernels/min_max_.py. @@ -1100,12 +1105,13 @@ def _roll_min_max( for i in range(N - 2, -1, -1): if start[i_next] < start[i] \ and ( - Dominators.empty() - or start[Dominators.top()] > start[i_next] + dominators.empty() + or start[dominators.top()] > start[i_next] ): - Dominators.push(i_next) + dominators.push(i_next) i_next = i + # NaN tracking to guarantee minp valid_start = -minp last_end = 0 @@ -1115,21 +1121,21 @@ def _roll_min_max( this_start = start[i] this_end = end[i] - if (not Dominators.empty() and Dominators.top() == i): - Dominators.pop() + if (not dominators.empty() and dominators.top() == i): + dominators.pop() if not (this_end > last_end or (this_end == last_end and this_start >= last_start)): raise ValueError( "Start/End ordering requirement is violated at index {}".format(i)) - if Dominators.empty(): + if dominators.empty(): stash_start = this_start else: - stash_start = min(this_start, start[Dominators.top()]) + stash_start = min(this_start, start[dominators.top()]) - while not Q.empty() and Q.front() < stash_start: - Q.pop_front() + while not candidates.empty() and candidates.front() < stash_start: + candidates.pop_front() for k in range(last_end, this_end): if not isnan(values[k]): @@ -1138,20 +1144,23 @@ def _roll_min_max( valid_start += 1 if is_max: - while not Q.empty() and values[k] >= values[Q.back()]: - Q.pop_back() + while (not candidates.empty() + and values[k] >= values[candidates.back()]): + candidates.pop_back() else: - while not Q.empty() and values[k] <= values[Q.back()]: - Q.pop_back() - Q.push_back(k) + while (not candidates.empty() + and values[k] <= values[candidates.back()]): + candidates.pop_back() + candidates.push_back(k) - if Q.empty() or this_start > valid_start: + if candidates.empty() or this_start > valid_start: output[i] = NaN - elif Q.front() >= this_start: - output[i] = values[Q.front()] + elif candidates.front() >= this_start: + # ^^ This is here to avoid costly bisection for fixed window sizes. + output[i] = values[candidates.front()] else: - q_idx = bisect_left(Q, this_start, lo=1) - output[i] = values[Q[q_idx]] + q_idx = bisect_left(candidates, this_start, lo=1) + output[i] = values[candidates[q_idx]] last_end = this_end last_start = this_start diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py index 8e377e88bcc9e..c03f20c871012 100644 --- a/pandas/core/_numba/kernels/min_max_.py +++ b/pandas/core/_numba/kernels/min_max_.py @@ -23,6 +23,7 @@ @numba.njit(nogil=True, parallel=False) def bisect_left(a: list[Any], x: Any, lo: int = 0, hi: int = -1) -> int: + """Same as https://docs.python.org/3/library/bisect.html; not in numba yet!""" if hi == -1: hi = len(a) while lo < hi: @@ -57,8 +58,11 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool: else: return a <= b - Q: list = [] # this is a queue - Dominators: list = [] # this is a stack + # Indices of bounded extrema in `values`. `candidates[i]` is always increasing. + # `values[candidates[i]]` is decreasing for max and increasing for min. + candidates: list[int] = [] # this is a queue + # Indices of largest windows that "cover" preceding windows. + dominators: list[int] = [] # this is a stack if min_periods < 1: min_periods = 1 @@ -68,11 +72,12 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool: for i in range(N - 2, -1, -1): next_dominates = start[i_next] < start[i] if next_dominates and ( - not Dominators or start[Dominators[-1]] > start[i_next] + not dominators or start[dominators[-1]] > start[i_next] ): - Dominators.append(i_next) + dominators.append(i_next) i_next = i + # NaN tracking to guarantee min_periods valid_start = -min_periods last_end = 0 @@ -82,8 +87,8 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool: this_start = start[i].item() this_end = end[i].item() - if Dominators and Dominators[-1] == i: - Dominators.pop() + if dominators and dominators[-1] == i: + dominators.pop() if not ( this_end > last_end or (this_end == last_end and this_start >= last_start) @@ -93,30 +98,31 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool: ) stash_start = ( - this_start if not Dominators else min(this_start, start[Dominators[-1]]) + this_start if not dominators else min(this_start, start[dominators[-1]]) ) - while Q and Q[0] < stash_start: - Q.pop(0) + while candidates and candidates[0] < stash_start: + candidates.pop(0) for k in range(last_end, this_end): if not np.isnan(values[k]): valid_start += 1 while valid_start >= 0 and np.isnan(values[valid_start]): valid_start += 1 - while Q and cmp(values[k], values[Q[-1]], is_max): - Q.pop() # Q.pop_back() - Q.append(k) # Q.push_back(k) + while candidates and cmp(values[k], values[candidates[-1]], is_max): + candidates.pop() # Q.pop_back() + candidates.append(k) # Q.push_back(k) - if not Q or (this_start > valid_start): + if not candidates or (this_start > valid_start): if values.dtype.kind != "i": output[i] = np.nan else: na_pos.append(i) - elif Q[0] >= this_start: - output[i] = values[Q[0]] + elif candidates[0] >= this_start: + # ^^ This is here to avoid costly bisection for fixed window sizes. + output[i] = values[candidates[0]] else: - q_idx = bisect_left(Q, this_start, lo=1) - output[i] = values[Q[q_idx]] + q_idx = bisect_left(candidates, this_start, lo=1) + output[i] = values[candidates[q_idx]] last_end = this_end last_start = this_start diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index f9b71bf4835fe..ff6a616bc5264 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -12,6 +12,7 @@ to_datetime, ) import pandas._testing as tm +from pandas.api.indexers import BaseIndexer from pandas.util.version import Version pytestmark = [pytest.mark.single_cpu] @@ -583,16 +584,65 @@ def test_npfunc_no_warnings(): df.col1.rolling(2).apply(np.prod, raw=True, engine="numba") -from .test_rolling import TestMinMax +class PrescribedWindowIndexer(BaseIndexer): + def __init__(self, start, end): + self._start = start + self._end = end + super().__init__() + + def get_window_bounds( + self, num_values=None, min_periods=None, center=None, closed=None, step=None + ): + if num_values is None: + num_values = len(self._start) + start = np.clip(self._start, 0, num_values) + end = np.clip(self._end, 0, num_values) + return start, end @td.skip_if_no("numba") class TestMinMaxNumba: - parent = TestMinMax() - - @pytest.mark.parametrize("is_max, has_nan, exp_list", TestMinMax.TestData) + @pytest.mark.parametrize( + "is_max, has_nan, exp_list", + [ + (True, False, [3.0, 5.0, 2.0, 5.0, 1.0, 5.0, 6.0, 7.0, 8.0, 9.0]), + (True, True, [3.0, 4.0, 2.0, 4.0, 1.0, 4.0, 6.0, 7.0, 7.0, 9.0]), + (False, False, [3.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0]), + (False, True, [3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 6.0, 6.0, 7.0, 1.0]), + ], + ) def test_minmax(self, is_max, has_nan, exp_list): - TestMinMaxNumba.parent.test_minmax(is_max, has_nan, exp_list, "numba") + nan_idx = [0, 5, 8] + df = DataFrame( + { + "data": [5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 6.0, 7.0, 8.0, 9.0], + "start": [2, 0, 3, 0, 4, 0, 5, 5, 7, 3], + "end": [3, 4, 4, 5, 5, 6, 7, 8, 9, 10], + } + ) + if has_nan: + df.loc[nan_idx, "data"] = np.nan + expected = Series(exp_list, name="data") + r = df.data.rolling( + PrescribedWindowIndexer(df.start.to_numpy(), df.end.to_numpy()) + ) + if is_max: + result = r.max(engine="numba") + else: + result = r.min(engine="numba") + + tm.assert_series_equal(result, expected) def test_wrong_order(self): - TestMinMaxNumba.parent.test_wrong_order("numba") + start = np.array(range(5), dtype=np.int64) + end = start + 1 + end[3] = end[2] + start[3] = start[2] - 1 + + df = DataFrame({"data": start * 1.0, "start": start, "end": end}) + + r = df.data.rolling(PrescribedWindowIndexer(start, end)) + with pytest.raises( + ValueError, match="Start/End ordering requirement is violated at index 3" + ): + r.max(engine="numba") diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 7388b8b3cc4bd..6e8f075d35490 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -1965,15 +1965,16 @@ def get_window_bounds( class TestMinMax: - TestData = [ - (True, False, [3.0, 5.0, 2.0, 5.0, 1.0, 5.0, 6.0, 7.0, 8.0, 9.0]), - (True, True, [3.0, 4.0, 2.0, 4.0, 1.0, 4.0, 6.0, 7.0, 7.0, 9.0]), - (False, False, [3.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0]), - (False, True, [3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 6.0, 6.0, 7.0, 1.0]), - ] - - @pytest.mark.parametrize("is_max, has_nan, exp_list", TestData) - def test_minmax(self, is_max, has_nan, exp_list, engine=None): + @pytest.mark.parametrize( + "is_max, has_nan, exp_list", + [ + (True, False, [3.0, 5.0, 2.0, 5.0, 1.0, 5.0, 6.0, 7.0, 8.0, 9.0]), + (True, True, [3.0, 4.0, 2.0, 4.0, 1.0, 4.0, 6.0, 7.0, 7.0, 9.0]), + (False, False, [3.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0]), + (False, True, [3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 6.0, 6.0, 7.0, 1.0]), + ], + ) + def test_minmax(self, is_max, has_nan, exp_list): nan_idx = [0, 5, 8] df = DataFrame( { @@ -1989,13 +1990,13 @@ def test_minmax(self, is_max, has_nan, exp_list, engine=None): PrescribedWindowIndexer(df.start.to_numpy(), df.end.to_numpy()) ) if is_max: - result = r.max(engine=engine) + result = r.max() else: - result = r.min(engine=engine) + result = r.min() tm.assert_series_equal(result, expected) - def test_wrong_order(self, engine=None): + def test_wrong_order(self): start = np.array(range(5), dtype=np.int64) end = start + 1 end[3] = end[2] @@ -2007,4 +2008,4 @@ def test_wrong_order(self, engine=None): with pytest.raises( ValueError, match="Start/End ordering requirement is violated at index 3" ): - r.max(engine=engine) + r.max()