Skip to content

BUG: Fix #46726; wrong result with varying window size min/max rolling calc. #61288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
198 changes: 115 additions & 83 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from libc.math cimport (
sqrt,
)
from libcpp.deque cimport deque
from libcpp.stack cimport stack
from libcpp.unordered_map cimport unordered_map

from pandas._libs.algos cimport TiebreakEnumType
Expand Down Expand Up @@ -988,39 +989,29 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,

# ----------------------------------------------------------------------

# Moving maximum / minimum code taken from Bottleneck
# Licence at LICENSES/BOTTLENECK_LICENCE


cdef float64_t init_mm(float64_t ai, Py_ssize_t *nobs, bint is_max) noexcept nogil:

if ai == ai:
nobs[0] = nobs[0] + 1
elif is_max:
ai = MINfloat64
else:
ai = MAXfloat64

return ai


cdef void remove_mm(float64_t aold, Py_ssize_t *nobs) noexcept nogil:
""" remove a value from the mm calc """
if aold == aold:
nobs[0] = nobs[0] - 1


cdef float64_t calc_mm(int64_t minp, Py_ssize_t nobs,
float64_t value) noexcept nogil:
cdef:
float64_t result
cdef int64_t bisect_left(
deque[int64_t]& a,
int64_t x,
int64_t lo=0,
int64_t hi=-1
) nogil:
"""Same as https://docs.python.org/3/library/bisect.html."""

cdef int64_t mid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, can you add a similar docstring.

if hi == -1:
hi = a.size()
while lo < hi:
mid = (lo + hi) // 2
if a.at(mid) < x:
lo = mid + 1
else:
hi = mid
return lo

if nobs >= minp:
result = value
else:
result = NaN
from libc.math cimport isnan

return result
# Prior version of moving maximum / minimum code taken from Bottleneck
# Licence at LICENSES/BOTTLENECK_LICENCE


def roll_max(ndarray[float64_t] values, ndarray[int64_t] start,
Expand Down Expand Up @@ -1068,69 +1059,110 @@ def roll_min(ndarray[float64_t] values, ndarray[int64_t] start,
return _roll_min_max(values, start, end, minp, is_max=0)


cdef _roll_min_max(ndarray[float64_t] values,
ndarray[int64_t] starti,
ndarray[int64_t] endi,
int64_t minp,
bint is_max):
def _roll_min_max(
ndarray[float64_t] values,
ndarray[int64_t] start,
ndarray[int64_t] end,
int64_t minp,
bint is_max
):
cdef:
float64_t ai
int64_t curr_win_size, start
Py_ssize_t i, k, nobs = 0, N = len(starti)
deque Q[int64_t] # min/max always the front
deque W[int64_t] # track the whole window for nobs compute
Py_ssize_t i, i_next, k, valid_start, last_end, last_start, N = len(start)
# Indices of bounded extrema in `values`. `candidates[i]` is always increasing.
# `values[candidates[i]]` is decreasing for max and increasing for min.
deque candidates[int64_t]
# Indices of largest windows that "cover" preceding windows.
stack dominators[int64_t]
ndarray[float64_t, ndim=1] output

Py_ssize_t this_start, this_end, stash_start
int64_t q_idx

output = np.empty(N, dtype=np.float64)
Q = deque[int64_t]()
W = deque[int64_t]()
candidates = deque[int64_t]()
dominators = stack[int64_t]()

# This function was "ported" / translated from sliding_min_max()
# in /pandas/core/_numba/kernels/min_max_.py.
# (See there for credits and some comments.)
# Code translation assumptions/rules:
# - min_periods --> minp
# - deque[0] --> front()
# - deque[-1] --> back()
# - stack[-1] --> top()
# - bool(stack/deque) --> !empty()
# - deque.append() --> push_back()
# - stack.append() --> push()
# - deque.popleft --> pop_front()
# - deque.pop() --> pop_back()

with nogil:
if minp < 1:
minp = 1

if N>2:
i_next = N - 1
for i in range(N - 2, -1, -1):
if start[i_next] < start[i] \
and (
dominators.empty()
or start[dominators.top()] > start[i_next]
):
dominators.push(i_next)
i_next = i

# NaN tracking to guarantee minp
valid_start = -minp

last_end = 0
last_start = -1

# This is using a modified version of the C++ code in this
# SO post: https://stackoverflow.com/a/12239580
# The original impl didn't deal with variable window sizes
# So the code was optimized for that

# first window's size
curr_win_size = endi[0] - starti[0]
# GH 32865
# Anchor output index to values index to provide custom
# BaseIndexer support
for i in range(N):
this_start = start[i]
this_end = end[i]

curr_win_size = endi[i] - starti[i]
if i == 0:
start = starti[i]
else:
start = endi[i - 1]

for k in range(start, endi[i]):
ai = init_mm(values[k], &nobs, is_max)
# Discard previous entries if we find new min or max
if is_max:
while not Q.empty() and ((ai >= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
else:
while not Q.empty() and ((ai <= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
Q.push_back(k)
W.push_back(k)

# Discard entries outside and left of current window
while not Q.empty() and Q.front() <= starti[i] - 1:
Q.pop_front()
while not W.empty() and W.front() <= starti[i] - 1:
remove_mm(values[W.front()], &nobs)
W.pop_front()

# Save output based on index in input value array
if not Q.empty() and curr_win_size > 0:
output[i] = calc_mm(minp, nobs, values[Q.front()])
if (not dominators.empty() and dominators.top() == i):
dominators.pop()

if not (this_end > last_end
or (this_end == last_end and this_start >= last_start)):
raise ValueError(
"Start/End ordering requirement is violated at index {}".format(i))

if dominators.empty():
stash_start = this_start
else:
stash_start = min(this_start, start[dominators.top()])

while not candidates.empty() and candidates.front() < stash_start:
candidates.pop_front()

for k in range(last_end, this_end):
if not isnan(values[k]):
valid_start += 1
while valid_start >= 0 and isnan(values[valid_start]):
valid_start += 1

if is_max:
while (not candidates.empty()
and values[k] >= values[candidates.back()]):
candidates.pop_back()
else:
while (not candidates.empty()
and values[k] <= values[candidates.back()]):
candidates.pop_back()
candidates.push_back(k)

if candidates.empty() or this_start > valid_start:
output[i] = NaN
elif candidates.front() >= this_start:
# ^^ This is here to avoid costly bisection for fixed window sizes.
output[i] = values[candidates.front()]
else:
q_idx = bisect_left(candidates, this_start, lo=1)
output[i] = values[candidates[q_idx]]
last_end = this_end
last_start = this_start

return output

Expand Down
135 changes: 92 additions & 43 deletions pandas/core/_numba/kernels/min_max_.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import (
TYPE_CHECKING,
Any,
)

import numba
import numpy as np
Expand All @@ -18,6 +21,20 @@
from pandas._typing import npt


@numba.njit(nogil=True, parallel=False)
def bisect_left(a: list[Any], x: Any, lo: int = 0, hi: int = -1) -> int:
"""Same as https://docs.python.org/3/library/bisect.html; not in numba yet!"""
if hi == -1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring here; I think just a single line is fine. Something like

"""Get index of smallest value greater than or equal to x between lo and hi."""

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docstrings and comments for all requests. Made a couple of more one-liner code comments in strategic spots. If you must, feel free to remove them.
Puzzled at the failed freethreading unit test (with a seemingly unrelated error message). No real changes here. Something (else) must be going on?

hi = len(a)
while lo < hi:
mid = (lo + hi) // 2
if a[mid] < x:
lo = mid + 1
else:
hi = mid
return lo


@numba.jit(nopython=True, nogil=True, parallel=False)
def sliding_min_max(
values: np.ndarray,
Expand All @@ -27,55 +44,87 @@ def sliding_min_max(
min_periods: int,
is_max: bool,
) -> tuple[np.ndarray, list[int]]:
# Basic idea of the algorithm: https://stackoverflow.com/a/12239580
# It was generalized to work with an arbitrary list of any window size and position
# by adding the Dominators stack.

N = len(start)
nobs = 0
output = np.empty(N, dtype=result_dtype)
na_pos = []
# Use deque once numba supports it
# https://github.com/numba/numba/issues/7417
Q: list = []
W: list = []
for i in range(N):
curr_win_size = end[i] - start[i]
if i == 0:
st = start[i]
else:
st = end[i - 1]

for k in range(st, end[i]):
ai = values[k]
if not np.isnan(ai):
nobs += 1
elif is_max:
ai = -np.inf
else:
ai = np.inf
# Discard previous entries if we find new min or max
if is_max:
while Q and ((ai >= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]):
Q.pop()
else:
while Q and ((ai <= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]):
Q.pop()
Q.append(k)
W.append(k)

# Discard entries outside and left of current window
while Q and Q[0] <= start[i] - 1:
Q.pop(0)
while W and W[0] <= start[i] - 1:
if not np.isnan(values[W[0]]):
nobs -= 1
W.pop(0)

# Save output based on index in input value array
if Q and curr_win_size > 0 and nobs >= min_periods:
output[i] = values[Q[0]]
output = np.empty(N, dtype=result_dtype)

def cmp(a: Any, b: Any, is_max: bool) -> bool:
if is_max:
return a >= b
else:
return a <= b

# Indices of bounded extrema in `values`. `candidates[i]` is always increasing.
# `values[candidates[i]]` is decreasing for max and increasing for min.
candidates: list[int] = [] # this is a queue
# Indices of largest windows that "cover" preceding windows.
dominators: list[int] = [] # this is a stack

if min_periods < 1:
min_periods = 1

if N > 2:
i_next = N - 1 # equivalent to i_next = i+1 inside the loop
for i in range(N - 2, -1, -1):
next_dominates = start[i_next] < start[i]
if next_dominates and (
not dominators or start[dominators[-1]] > start[i_next]
):
dominators.append(i_next)
i_next = i

# NaN tracking to guarantee min_periods
valid_start = -min_periods

last_end = 0
last_start = -1

for i in range(N):
this_start = start[i].item()
this_end = end[i].item()

if dominators and dominators[-1] == i:
dominators.pop()

if not (
this_end > last_end or (this_end == last_end and this_start >= last_start)
):
raise ValueError(
"Start/End ordering requirement is violated at index " + str(i)
)

stash_start = (
this_start if not dominators else min(this_start, start[dominators[-1]])
)
while candidates and candidates[0] < stash_start:
candidates.pop(0)

for k in range(last_end, this_end):
if not np.isnan(values[k]):
valid_start += 1
while valid_start >= 0 and np.isnan(values[valid_start]):
valid_start += 1
while candidates and cmp(values[k], values[candidates[-1]], is_max):
candidates.pop() # Q.pop_back()
candidates.append(k) # Q.push_back(k)

if not candidates or (this_start > valid_start):
if values.dtype.kind != "i":
output[i] = np.nan
else:
na_pos.append(i)
elif candidates[0] >= this_start:
# ^^ This is here to avoid costly bisection for fixed window sizes.
output[i] = values[candidates[0]]
else:
q_idx = bisect_left(candidates, this_start, lo=1)
output[i] = values[candidates[q_idx]]
last_end = this_end
last_start = this_start

return output, na_pos

Expand Down
Loading
Loading