Skip to content

Commit 256a0c7

Browse files
committed
BUG: Fix window aggregations to skip over unused elements (pandas-devGH-45647)
1 parent 763de5d commit 256a0c7

File tree

3 files changed

+161
-37
lines changed

3 files changed

+161
-37
lines changed

pandas/_libs/window/aggregations.pyx

+55-36
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,9 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start,
122122
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
123123
cdef:
124124
Py_ssize_t i, j
125-
float64_t sum_x = 0, compensation_add = 0, compensation_remove = 0
125+
float64_t sum_x, compensation_add, compensation_remove
126126
int64_t s, e
127-
int64_t nobs = 0, N = len(values)
127+
int64_t nobs = 0, N = len(start)
128128
ndarray[float64_t] output
129129
bint is_monotonic_increasing_bounds
130130

@@ -139,10 +139,12 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start,
139139
s = start[i]
140140
e = end[i]
141141

142-
if i == 0 or not is_monotonic_increasing_bounds:
142+
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
143143

144144
# setup
145145

146+
sum_x = compensation_add = compensation_remove = 0
147+
nobs = 0
146148
for j in range(s, e):
147149
add_sum(values[j], &nobs, &sum_x, &compensation_add)
148150

@@ -226,9 +228,9 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x,
226228
def roll_mean(const float64_t[:] values, ndarray[int64_t] start,
227229
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
228230
cdef:
229-
float64_t val, compensation_add = 0, compensation_remove = 0, sum_x = 0
231+
float64_t val, compensation_add, compensation_remove, sum_x
230232
int64_t s, e
231-
Py_ssize_t nobs = 0, i, j, neg_ct = 0, N = len(values)
233+
Py_ssize_t nobs, i, j, neg_ct, N = len(start)
232234
ndarray[float64_t] output
233235
bint is_monotonic_increasing_bounds
234236

@@ -243,8 +245,10 @@ def roll_mean(const float64_t[:] values, ndarray[int64_t] start,
243245
s = start[i]
244246
e = end[i]
245247

246-
if i == 0 or not is_monotonic_increasing_bounds:
248+
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
247249

250+
compensation_add = compensation_remove = sum_x = 0
251+
nobs = neg_ct = 0
248252
# setup
249253
for j in range(s, e):
250254
val = values[j]
@@ -349,11 +353,11 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start,
349353
Numerically stable implementation using Welford's method.
350354
"""
351355
cdef:
352-
float64_t mean_x = 0, ssqdm_x = 0, nobs = 0, compensation_add = 0,
353-
float64_t compensation_remove = 0,
356+
float64_t mean_x, ssqdm_x, nobs, compensation_add,
357+
float64_t compensation_remove,
354358
float64_t val, prev, delta, mean_x_old
355359
int64_t s, e
356-
Py_ssize_t i, j, N = len(values)
360+
Py_ssize_t i, j, N = len(start)
357361
ndarray[float64_t] output
358362
bint is_monotonic_increasing_bounds
359363

@@ -372,8 +376,9 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start,
372376

373377
# Over the first window, observations can only be added
374378
# never removed
375-
if i == 0 or not is_monotonic_increasing_bounds:
379+
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
376380

381+
mean_x = ssqdm_x = nobs = compensation_add = compensation_remove = 0
377382
for j in range(s, e):
378383
add_var(values[j], &nobs, &mean_x, &ssqdm_x, &compensation_add)
379384

@@ -500,11 +505,11 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
500505
cdef:
501506
Py_ssize_t i, j
502507
float64_t val, prev, min_val, mean_val, sum_val = 0
503-
float64_t compensation_xxx_add = 0, compensation_xxx_remove = 0
504-
float64_t compensation_xx_add = 0, compensation_xx_remove = 0
505-
float64_t compensation_x_add = 0, compensation_x_remove = 0
506-
float64_t x = 0, xx = 0, xxx = 0
507-
int64_t nobs = 0, N = len(values), nobs_mean = 0
508+
float64_t compensation_xxx_add, compensation_xxx_remove
509+
float64_t compensation_xx_add, compensation_xx_remove
510+
float64_t compensation_x_add, compensation_x_remove
511+
float64_t x, xx, xxx
512+
int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0
508513
int64_t s, e
509514
ndarray[float64_t] output, mean_array, values_copy
510515
bint is_monotonic_increasing_bounds
@@ -518,7 +523,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
518523
values_copy = np.copy(values)
519524

520525
with nogil:
521-
for i in range(0, N):
526+
for i in range(0, V):
522527
val = values_copy[i]
523528
if notnan(val):
524529
nobs_mean += 1
@@ -527,7 +532,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
527532
# Other cases would lead to imprecision for smallest values
528533
if min_val - mean_val > -1e5:
529534
mean_val = round(mean_val)
530-
for i in range(0, N):
535+
for i in range(0, V):
531536
values_copy[i] = values_copy[i] - mean_val
532537

533538
for i in range(0, N):
@@ -537,8 +542,13 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
537542

538543
# Over the first window, observations can only be added
539544
# never removed
540-
if i == 0 or not is_monotonic_increasing_bounds:
545+
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
541546

547+
compensation_xxx_add = compensation_xxx_remove = 0
548+
compensation_xx_add = compensation_xx_remove = 0
549+
compensation_x_add = compensation_x_remove = 0
550+
x = xx = xxx = 0
551+
nobs = 0
542552
for j in range(s, e):
543553
val = values_copy[j]
544554
add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
@@ -682,12 +692,12 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
682692
cdef:
683693
Py_ssize_t i, j
684694
float64_t val, prev, mean_val, min_val, sum_val = 0
685-
float64_t compensation_xxxx_add = 0, compensation_xxxx_remove = 0
686-
float64_t compensation_xxx_remove = 0, compensation_xxx_add = 0
687-
float64_t compensation_xx_remove = 0, compensation_xx_add = 0
688-
float64_t compensation_x_remove = 0, compensation_x_add = 0
689-
float64_t x = 0, xx = 0, xxx = 0, xxxx = 0
690-
int64_t nobs = 0, s, e, N = len(values), nobs_mean = 0
695+
float64_t compensation_xxxx_add, compensation_xxxx_remove
696+
float64_t compensation_xxx_remove, compensation_xxx_add
697+
float64_t compensation_xx_remove, compensation_xx_add
698+
float64_t compensation_x_remove, compensation_x_add
699+
float64_t x, xx, xxx, xxxx
700+
int64_t nobs, s, e, N = len(start), V = len(values), nobs_mean = 0
691701
ndarray[float64_t] output, values_copy
692702
bint is_monotonic_increasing_bounds
693703

@@ -700,7 +710,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
700710
min_val = np.nanmin(values)
701711

702712
with nogil:
703-
for i in range(0, N):
713+
for i in range(0, V):
704714
val = values_copy[i]
705715
if notnan(val):
706716
nobs_mean += 1
@@ -709,7 +719,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
709719
# Other cases would lead to imprecision for smallest values
710720
if min_val - mean_val > -1e4:
711721
mean_val = round(mean_val)
712-
for i in range(0, N):
722+
for i in range(0, V):
713723
values_copy[i] = values_copy[i] - mean_val
714724

715725
for i in range(0, N):
@@ -719,8 +729,14 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
719729

720730
# Over the first window, observations can only be added
721731
# never removed
722-
if i == 0 or not is_monotonic_increasing_bounds:
732+
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
723733

734+
compensation_xxxx_add = compensation_xxxx_remove = 0
735+
compensation_xxx_remove = compensation_xxx_add = 0
736+
compensation_xx_remove = compensation_xx_add = 0
737+
compensation_x_remove = compensation_x_add = 0
738+
x = xx = xxx = xxxx = 0
739+
nobs = 0
724740
for j in range(s, e):
725741
add_kurt(values_copy[j], &nobs, &x, &xx, &xxx, &xxxx,
726742
&compensation_x_add, &compensation_xx_add,
@@ -764,7 +780,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
764780
Py_ssize_t i, j
765781
bint err = False, is_monotonic_increasing_bounds
766782
int midpoint, ret = 0
767-
int64_t nobs = 0, N = len(values), s, e, win
783+
int64_t nobs, N = len(start), s, e, win
768784
float64_t val, res, prev
769785
skiplist_t *sl
770786
ndarray[float64_t] output
@@ -791,8 +807,11 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
791807
s = start[i]
792808
e = end[i]
793809

794-
if i == 0 or not is_monotonic_increasing_bounds:
810+
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
795811

812+
skiplist_destroy(sl)
813+
sl = skiplist_init(<int>win)
814+
nobs = 0
796815
# setup
797816
for j in range(s, e):
798817
val = values[j]
@@ -948,7 +967,7 @@ cdef _roll_min_max(ndarray[numeric_t] values,
948967
cdef:
949968
numeric_t ai
950969
int64_t curr_win_size, start
951-
Py_ssize_t i, k, nobs = 0, N = len(values)
970+
Py_ssize_t i, k, nobs = 0, N = len(starti)
952971
deque Q[int64_t] # min/max always the front
953972
deque W[int64_t] # track the whole window for nobs compute
954973
ndarray[float64_t, ndim=1] output
@@ -1031,7 +1050,7 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start,
10311050
O(N log(window)) implementation using skip list
10321051
"""
10331052
cdef:
1034-
Py_ssize_t i, j, s, e, N = len(values), idx
1053+
Py_ssize_t i, j, s, e, N = len(start), idx
10351054
int ret = 0
10361055
int64_t nobs = 0, win
10371056
float64_t val, prev, midpoint, idx_with_fraction
@@ -1068,8 +1087,8 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start,
10681087
s = start[i]
10691088
e = end[i]
10701089

1071-
if i == 0 or not is_monotonic_increasing_bounds:
1072-
if not is_monotonic_increasing_bounds:
1090+
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
1091+
if not is_monotonic_increasing_bounds or s >= end[i - 1]:
10731092
nobs = 0
10741093
skiplist_destroy(skiplist)
10751094
skiplist = skiplist_init(<int>win)
@@ -1160,7 +1179,7 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start,
11601179
derived from roll_quantile
11611180
"""
11621181
cdef:
1163-
Py_ssize_t i, j, s, e, N = len(values), idx
1182+
Py_ssize_t i, j, s, e, N = len(start), idx
11641183
float64_t rank_min = 0, rank = 0
11651184
int64_t nobs = 0, win
11661185
float64_t val
@@ -1193,8 +1212,8 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start,
11931212
s = start[i]
11941213
e = end[i]
11951214

1196-
if i == 0 or not is_monotonic_increasing_bounds:
1197-
if not is_monotonic_increasing_bounds:
1215+
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
1216+
if not is_monotonic_increasing_bounds or s >= end[i - 1]:
11981217
nobs = 0
11991218
skiplist_destroy(skiplist)
12001219
skiplist = skiplist_init(<int>win)

pandas/tests/window/conftest.py

+61
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
datetime,
33
timedelta,
44
)
5+
from functools import partial
56

67
import numpy as np
78
import pytest
89

10+
import pandas._libs.window.aggregations as window_aggregations
911
import pandas.util._test_decorators as td
1012

1113
from pandas import (
@@ -259,3 +261,62 @@ def frame():
259261
index=bdate_range(datetime(2009, 1, 1), periods=100),
260262
columns=np.arange(10),
261263
)
264+
265+
266+
def _named_func(name_and_func):
267+
name, func = name_and_func
268+
if not hasattr(func, "func"):
269+
func = partial(func)
270+
func.__name__ = name
271+
return func
272+
273+
274+
@pytest.fixture(
275+
params=[
276+
_named_func(x)
277+
for x in [
278+
("roll_sum", window_aggregations.roll_sum),
279+
("roll_mean", window_aggregations.roll_mean),
280+
]
281+
+ [
282+
(f"roll_var({ddof})", partial(window_aggregations.roll_var, ddof=ddof))
283+
for ddof in [0, 1]
284+
]
285+
+ [
286+
("roll_skew", window_aggregations.roll_skew),
287+
("roll_kurt", window_aggregations.roll_kurt),
288+
("roll_mediac_c", window_aggregations.roll_median_c),
289+
("roll_max", window_aggregations.roll_max),
290+
("roll_min", window_aggregations.roll_min),
291+
]
292+
+ [
293+
(
294+
f"roll_quntile({quantile},{interpolation})",
295+
partial(
296+
window_aggregations.roll_quantile,
297+
quantile=quantile,
298+
interpolation=interpolation,
299+
),
300+
)
301+
for quantile in [0.25, 0.5, 0.75]
302+
for interpolation in window_aggregations.interpolation_types
303+
]
304+
+ [
305+
(
306+
f"roll_rank({percentile},{method},{ascending})",
307+
partial(
308+
window_aggregations.roll_rank,
309+
percentile=percentile,
310+
method=method,
311+
ascending=ascending,
312+
),
313+
)
314+
for percentile in [True, False]
315+
for method in window_aggregations.rolling_rank_tiebreakers.keys()
316+
for ascending in [True, False]
317+
]
318+
]
319+
)
320+
def rolling_aggregation(request):
321+
"""Make a named rolling aggregation function as fixture."""
322+
return request.param

pandas/tests/window/test_rolling.py

+45-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
datetime,
33
timedelta,
44
)
5+
import time
56

67
import numpy as np
78
import pytest
@@ -1391,7 +1392,7 @@ def test_rolling_corr_timedelta_index(index, window):
13911392
# GH: 31286
13921393
x = Series([1, 2, 3, 4, 5], index=index)
13931394
y = x.copy()
1394-
x[0:2] = 0.0
1395+
x.iloc[0:2] = 0.0
13951396
result = x.rolling(window).corr(y)
13961397
expected = Series([np.nan, np.nan, 1, 1, 1], index=index)
13971398
tm.assert_almost_equal(result, expected)
@@ -1734,3 +1735,46 @@ def test_rolling_std_neg_sqrt():
17341735

17351736
b = a.ewm(span=3).std()
17361737
assert np.isfinite(b[2:]).all()
1738+
1739+
1740+
def test_rolling_aggregation_boundary_consistency(rolling_aggregation):
1741+
minp, step, width, size, selection = 0, 1, 3, 11, [2, 7]
1742+
s = Series(np.arange(1, 1 + size, dtype=np.float64))
1743+
end = np.arange(width, size, step, dtype=np.int64)
1744+
start = end - width
1745+
selarr = np.array(selection, dtype=np.int32)
1746+
result = Series(rolling_aggregation(s.values, start[selarr], end[selarr], minp))
1747+
expected = Series(rolling_aggregation(s.values, start, end, minp)[selarr])
1748+
tm.assert_equal(expected, result)
1749+
1750+
1751+
def test_rolling_aggregation_timing_with_unused_elements(rolling_aggregation):
1752+
minp, width = 0, 5 # width at least 4 for kurt
1753+
1754+
def time_roll_agg(extra_size):
1755+
t0 = time.time()
1756+
size = 2 * width + 2 + extra_size
1757+
s = Series(np.full(size, np.nan, dtype=np.float64))
1758+
start = np.array([1, size - width - 2], dtype=np.int64)
1759+
end = np.array([1 + width, size - 2], dtype=np.int64)
1760+
loc = np.array(
1761+
[j for i in range(len(start)) for j in range(start[i], end[i])],
1762+
dtype=np.int32,
1763+
)
1764+
for j in loc:
1765+
s.iloc[j] = 1 + j
1766+
result = Series(rolling_aggregation(s.values, start, end, minp))
1767+
compact_s = np.array(s.iloc[loc], dtype=np.float64)
1768+
compact_start = np.arange(0, len(start) * width, width, dtype=np.int64)
1769+
compact_end = compact_start + width
1770+
expected = Series(
1771+
rolling_aggregation(compact_s, compact_start, compact_end, minp)
1772+
)
1773+
assert np.isfinite(expected.values).all(), "Not all expected values are finite"
1774+
tm.assert_equal(expected, result)
1775+
t1 = time.time()
1776+
return t1 - t0
1777+
1778+
for i in range(3): # all but last are warmup
1779+
t = [time_roll_agg(extra_size) for extra_size in [2, 2000]]
1780+
assert abs((t[0] - t[1]) / t[0]) < 2, "Unused elements significantly affect timing"

0 commit comments

Comments
 (0)