Skip to content

Commit fe47c74

Browse files
Rolling min/max: added brief comments; renamed a couple of local variables; de-coupled numba and cython unit tests.
1 parent 93cf87f commit fe47c74

File tree

4 files changed

+128
-62
lines changed

4 files changed

+128
-62
lines changed

pandas/_libs/window/aggregations.pyx

+35-26
Original file line numberDiff line numberDiff line change
@@ -989,15 +989,14 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
989989

990990
# ----------------------------------------------------------------------
991991

992-
# Moving maximum / minimum code taken from Bottleneck
993-
# Licence at LICENSES/BOTTLENECK_LICENCE
994-
995992
cdef int64_t bisect_left(
996993
deque[int64_t]& a,
997994
int64_t x,
998995
int64_t lo=0,
999996
int64_t hi=-1
1000997
) nogil:
998+
"""Same as https://docs.python.org/3/library/bisect.html."""
999+
10011000
cdef int64_t mid
10021001
if hi == -1:
10031002
hi = a.size()
@@ -1011,6 +1010,9 @@ cdef int64_t bisect_left(
10111010

10121011
from libc.math cimport isnan
10131012

1013+
# Prior version of moving maximum / minimum code taken from Bottleneck
1014+
# Licence at LICENSES/BOTTLENECK_LICENCE
1015+
10141016

10151017
def roll_max(ndarray[float64_t] values, ndarray[int64_t] start,
10161018
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
@@ -1066,16 +1068,19 @@ def _roll_min_max(
10661068
):
10671069
cdef:
10681070
Py_ssize_t i, i_next, k, valid_start, last_end, last_start, N = len(start)
1069-
deque Q[int64_t]
1070-
stack Dominators[int64_t]
1071+
# Indices of bounded extrema in `values`. `candidates[i]` is always increasing.
1072+
# `values[candidates[i]]` is decreasing for max and increasing for min.
1073+
deque candidates[int64_t]
1074+
# Indices of largest windows that "cover" preceding windows.
1075+
stack dominators[int64_t]
10711076
ndarray[float64_t, ndim=1] output
10721077

10731078
Py_ssize_t this_start, this_end, stash_start
10741079
int64_t q_idx
10751080

10761081
output = np.empty(N, dtype=np.float64)
1077-
Q = deque[int64_t]()
1078-
Dominators = stack[int64_t]()
1082+
candidates = deque[int64_t]()
1083+
dominators = stack[int64_t]()
10791084

10801085
# This function was "ported" / translated from sliding_min_max()
10811086
# in /pandas/core/_numba/kernels/min_max_.py.
@@ -1100,12 +1105,13 @@ def _roll_min_max(
11001105
for i in range(N - 2, -1, -1):
11011106
if start[i_next] < start[i] \
11021107
and (
1103-
Dominators.empty()
1104-
or start[Dominators.top()] > start[i_next]
1108+
dominators.empty()
1109+
or start[dominators.top()] > start[i_next]
11051110
):
1106-
Dominators.push(i_next)
1111+
dominators.push(i_next)
11071112
i_next = i
11081113

1114+
# NaN tracking to guarantee minp
11091115
valid_start = -minp
11101116

11111117
last_end = 0
@@ -1115,21 +1121,21 @@ def _roll_min_max(
11151121
this_start = start[i]
11161122
this_end = end[i]
11171123

1118-
if (not Dominators.empty() and Dominators.top() == i):
1119-
Dominators.pop()
1124+
if (not dominators.empty() and dominators.top() == i):
1125+
dominators.pop()
11201126

11211127
if not (this_end > last_end
11221128
or (this_end == last_end and this_start >= last_start)):
11231129
raise ValueError(
11241130
"Start/End ordering requirement is violated at index {}".format(i))
11251131

1126-
if Dominators.empty():
1132+
if dominators.empty():
11271133
stash_start = this_start
11281134
else:
1129-
stash_start = min(this_start, start[Dominators.top()])
1135+
stash_start = min(this_start, start[dominators.top()])
11301136

1131-
while not Q.empty() and Q.front() < stash_start:
1132-
Q.pop_front()
1137+
while not candidates.empty() and candidates.front() < stash_start:
1138+
candidates.pop_front()
11331139

11341140
for k in range(last_end, this_end):
11351141
if not isnan(values[k]):
@@ -1138,20 +1144,23 @@ def _roll_min_max(
11381144
valid_start += 1
11391145

11401146
if is_max:
1141-
while not Q.empty() and values[k] >= values[Q.back()]:
1142-
Q.pop_back()
1147+
while (not candidates.empty()
1148+
and values[k] >= values[candidates.back()]):
1149+
candidates.pop_back()
11431150
else:
1144-
while not Q.empty() and values[k] <= values[Q.back()]:
1145-
Q.pop_back()
1146-
Q.push_back(k)
1151+
while (not candidates.empty()
1152+
and values[k] <= values[candidates.back()]):
1153+
candidates.pop_back()
1154+
candidates.push_back(k)
11471155

1148-
if Q.empty() or this_start > valid_start:
1156+
if candidates.empty() or this_start > valid_start:
11491157
output[i] = NaN
1150-
elif Q.front() >= this_start:
1151-
output[i] = values[Q.front()]
1158+
elif candidates.front() >= this_start:
1159+
# ^^ This is here to avoid costly bisection for fixed window sizes.
1160+
output[i] = values[candidates.front()]
11521161
else:
1153-
q_idx = bisect_left(Q, this_start, lo=1)
1154-
output[i] = values[Q[q_idx]]
1162+
q_idx = bisect_left(candidates, this_start, lo=1)
1163+
output[i] = values[candidates[q_idx]]
11551164
last_end = this_end
11561165
last_start = this_start
11571166

pandas/core/_numba/kernels/min_max_.py

+23-17
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
@numba.njit(nogil=True, parallel=False)
2525
def bisect_left(a: list[Any], x: Any, lo: int = 0, hi: int = -1) -> int:
26+
"""Same as https://docs.python.org/3/library/bisect.html; not in numba yet!"""
2627
if hi == -1:
2728
hi = len(a)
2829
while lo < hi:
@@ -57,8 +58,11 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool:
5758
else:
5859
return a <= b
5960

60-
Q: list = [] # this is a queue
61-
Dominators: list = [] # this is a stack
61+
# Indices of bounded extrema in `values`. `candidates[i]` is always increasing.
62+
# `values[candidates[i]]` is decreasing for max and increasing for min.
63+
candidates: list[int] = [] # this is a queue
64+
# Indices of largest windows that "cover" preceding windows.
65+
dominators: list[int] = [] # this is a stack
6266

6367
if min_periods < 1:
6468
min_periods = 1
@@ -68,11 +72,12 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool:
6872
for i in range(N - 2, -1, -1):
6973
next_dominates = start[i_next] < start[i]
7074
if next_dominates and (
71-
not Dominators or start[Dominators[-1]] > start[i_next]
75+
not dominators or start[dominators[-1]] > start[i_next]
7276
):
73-
Dominators.append(i_next)
77+
dominators.append(i_next)
7478
i_next = i
7579

80+
# NaN tracking to guarantee min_periods
7681
valid_start = -min_periods
7782

7883
last_end = 0
@@ -82,8 +87,8 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool:
8287
this_start = start[i].item()
8388
this_end = end[i].item()
8489

85-
if Dominators and Dominators[-1] == i:
86-
Dominators.pop()
90+
if dominators and dominators[-1] == i:
91+
dominators.pop()
8792

8893
if not (
8994
this_end > last_end or (this_end == last_end and this_start >= last_start)
@@ -93,30 +98,31 @@ def cmp(a: Any, b: Any, is_max: bool) -> bool:
9398
)
9499

95100
stash_start = (
96-
this_start if not Dominators else min(this_start, start[Dominators[-1]])
101+
this_start if not dominators else min(this_start, start[dominators[-1]])
97102
)
98-
while Q and Q[0] < stash_start:
99-
Q.pop(0)
103+
while candidates and candidates[0] < stash_start:
104+
candidates.pop(0)
100105

101106
for k in range(last_end, this_end):
102107
if not np.isnan(values[k]):
103108
valid_start += 1
104109
while valid_start >= 0 and np.isnan(values[valid_start]):
105110
valid_start += 1
106-
while Q and cmp(values[k], values[Q[-1]], is_max):
107-
Q.pop() # Q.pop_back()
108-
Q.append(k) # Q.push_back(k)
111+
while candidates and cmp(values[k], values[candidates[-1]], is_max):
112+
candidates.pop() # Q.pop_back()
113+
candidates.append(k) # Q.push_back(k)
109114

110-
if not Q or (this_start > valid_start):
115+
if not candidates or (this_start > valid_start):
111116
if values.dtype.kind != "i":
112117
output[i] = np.nan
113118
else:
114119
na_pos.append(i)
115-
elif Q[0] >= this_start:
116-
output[i] = values[Q[0]]
120+
elif candidates[0] >= this_start:
121+
# ^^ This is here to avoid costly bisection for fixed window sizes.
122+
output[i] = values[candidates[0]]
117123
else:
118-
q_idx = bisect_left(Q, this_start, lo=1)
119-
output[i] = values[Q[q_idx]]
124+
q_idx = bisect_left(candidates, this_start, lo=1)
125+
output[i] = values[candidates[q_idx]]
120126
last_end = this_end
121127
last_start = this_start
122128

pandas/tests/window/test_numba.py

+56-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
to_datetime,
1313
)
1414
import pandas._testing as tm
15+
from pandas.api.indexers import BaseIndexer
1516
from pandas.util.version import Version
1617

1718
pytestmark = [pytest.mark.single_cpu]
@@ -583,16 +584,65 @@ def test_npfunc_no_warnings():
583584
df.col1.rolling(2).apply(np.prod, raw=True, engine="numba")
584585

585586

586-
from .test_rolling import TestMinMax
587+
class PrescribedWindowIndexer(BaseIndexer):
588+
def __init__(self, start, end):
589+
self._start = start
590+
self._end = end
591+
super().__init__()
592+
593+
def get_window_bounds(
594+
self, num_values=None, min_periods=None, center=None, closed=None, step=None
595+
):
596+
if num_values is None:
597+
num_values = len(self._start)
598+
start = np.clip(self._start, 0, num_values)
599+
end = np.clip(self._end, 0, num_values)
600+
return start, end
587601

588602

589603
@td.skip_if_no("numba")
590604
class TestMinMaxNumba:
591-
parent = TestMinMax()
592-
593-
@pytest.mark.parametrize("is_max, has_nan, exp_list", TestMinMax.TestData)
605+
@pytest.mark.parametrize(
606+
"is_max, has_nan, exp_list",
607+
[
608+
(True, False, [3.0, 5.0, 2.0, 5.0, 1.0, 5.0, 6.0, 7.0, 8.0, 9.0]),
609+
(True, True, [3.0, 4.0, 2.0, 4.0, 1.0, 4.0, 6.0, 7.0, 7.0, 9.0]),
610+
(False, False, [3.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0]),
611+
(False, True, [3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 6.0, 6.0, 7.0, 1.0]),
612+
],
613+
)
594614
def test_minmax(self, is_max, has_nan, exp_list):
595-
TestMinMaxNumba.parent.test_minmax(is_max, has_nan, exp_list, "numba")
615+
nan_idx = [0, 5, 8]
616+
df = DataFrame(
617+
{
618+
"data": [5.0, 4.0, 3.0, 2.0, 1.0, 0.0, 6.0, 7.0, 8.0, 9.0],
619+
"start": [2, 0, 3, 0, 4, 0, 5, 5, 7, 3],
620+
"end": [3, 4, 4, 5, 5, 6, 7, 8, 9, 10],
621+
}
622+
)
623+
if has_nan:
624+
df.loc[nan_idx, "data"] = np.nan
625+
expected = Series(exp_list, name="data")
626+
r = df.data.rolling(
627+
PrescribedWindowIndexer(df.start.to_numpy(), df.end.to_numpy())
628+
)
629+
if is_max:
630+
result = r.max(engine="numba")
631+
else:
632+
result = r.min(engine="numba")
633+
634+
tm.assert_series_equal(result, expected)
596635

597636
def test_wrong_order(self):
598-
TestMinMaxNumba.parent.test_wrong_order("numba")
637+
start = np.array(range(5), dtype=np.int64)
638+
end = start + 1
639+
end[3] = end[2]
640+
start[3] = start[2] - 1
641+
642+
df = DataFrame({"data": start * 1.0, "start": start, "end": end})
643+
644+
r = df.data.rolling(PrescribedWindowIndexer(start, end))
645+
with pytest.raises(
646+
ValueError, match="Start/End ordering requirement is violated at index 3"
647+
):
648+
r.max(engine="numba")

pandas/tests/window/test_rolling.py

+14-13
Original file line numberDiff line numberDiff line change
@@ -1965,15 +1965,16 @@ def get_window_bounds(
19651965

19661966

19671967
class TestMinMax:
1968-
TestData = [
1969-
(True, False, [3.0, 5.0, 2.0, 5.0, 1.0, 5.0, 6.0, 7.0, 8.0, 9.0]),
1970-
(True, True, [3.0, 4.0, 2.0, 4.0, 1.0, 4.0, 6.0, 7.0, 7.0, 9.0]),
1971-
(False, False, [3.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0]),
1972-
(False, True, [3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 6.0, 6.0, 7.0, 1.0]),
1973-
]
1974-
1975-
@pytest.mark.parametrize("is_max, has_nan, exp_list", TestData)
1976-
def test_minmax(self, is_max, has_nan, exp_list, engine=None):
1968+
@pytest.mark.parametrize(
1969+
"is_max, has_nan, exp_list",
1970+
[
1971+
(True, False, [3.0, 5.0, 2.0, 5.0, 1.0, 5.0, 6.0, 7.0, 8.0, 9.0]),
1972+
(True, True, [3.0, 4.0, 2.0, 4.0, 1.0, 4.0, 6.0, 7.0, 7.0, 9.0]),
1973+
(False, False, [3.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 7.0, 0.0]),
1974+
(False, True, [3.0, 2.0, 2.0, 1.0, 1.0, 1.0, 6.0, 6.0, 7.0, 1.0]),
1975+
],
1976+
)
1977+
def test_minmax(self, is_max, has_nan, exp_list):
19771978
nan_idx = [0, 5, 8]
19781979
df = DataFrame(
19791980
{
@@ -1989,13 +1990,13 @@ def test_minmax(self, is_max, has_nan, exp_list, engine=None):
19891990
PrescribedWindowIndexer(df.start.to_numpy(), df.end.to_numpy())
19901991
)
19911992
if is_max:
1992-
result = r.max(engine=engine)
1993+
result = r.max()
19931994
else:
1994-
result = r.min(engine=engine)
1995+
result = r.min()
19951996

19961997
tm.assert_series_equal(result, expected)
19971998

1998-
def test_wrong_order(self, engine=None):
1999+
def test_wrong_order(self):
19992000
start = np.array(range(5), dtype=np.int64)
20002001
end = start + 1
20012002
end[3] = end[2]
@@ -2007,4 +2008,4 @@ def test_wrong_order(self, engine=None):
20072008
with pytest.raises(
20082009
ValueError, match="Start/End ordering requirement is violated at index 3"
20092010
):
2010-
r.max(engine=engine)
2011+
r.max()

0 commit comments

Comments
 (0)