Skip to content

BUG: Fix window aggregations to skip over unused elements (GH-45647) #45655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.4.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Fixed regressions
Bug fixes
~~~~~~~~~
- Fixed segfault in :meth:``DataFrame.to_json`` when dumping tz-aware datetimes in Python 3.10 (:issue:`42130`)
- Fixed window aggregations in :meth:`DataFrame.rolling` and :meth:`Series.rolling` to skip over unused elements (:issue:`45647`)
-

.. ---------------------------------------------------------------------------
Expand Down
92 changes: 56 additions & 36 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
cdef:
Py_ssize_t i, j
float64_t sum_x = 0, compensation_add = 0, compensation_remove = 0
float64_t sum_x, compensation_add, compensation_remove
int64_t s, e
int64_t nobs = 0, N = len(values)
int64_t nobs = 0, N = len(start)
ndarray[float64_t] output
bint is_monotonic_increasing_bounds

Expand All @@ -139,10 +139,12 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

# setup

sum_x = compensation_add = compensation_remove = 0
nobs = 0
for j in range(s, e):
add_sum(values[j], &nobs, &sum_x, &compensation_add)

Expand Down Expand Up @@ -226,9 +228,9 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x,
def roll_mean(const float64_t[:] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
cdef:
float64_t val, compensation_add = 0, compensation_remove = 0, sum_x = 0
float64_t val, compensation_add, compensation_remove, sum_x
int64_t s, e
Py_ssize_t nobs = 0, i, j, neg_ct = 0, N = len(values)
Py_ssize_t nobs, i, j, neg_ct, N = len(start)
ndarray[float64_t] output
bint is_monotonic_increasing_bounds

Expand All @@ -243,8 +245,10 @@ def roll_mean(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

compensation_add = compensation_remove = sum_x = 0
nobs = neg_ct = 0
# setup
for j in range(s, e):
val = values[j]
Expand Down Expand Up @@ -349,11 +353,11 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start,
Numerically stable implementation using Welford's method.
"""
cdef:
float64_t mean_x = 0, ssqdm_x = 0, nobs = 0, compensation_add = 0,
float64_t compensation_remove = 0,
float64_t mean_x, ssqdm_x, nobs, compensation_add,
float64_t compensation_remove,
float64_t val, prev, delta, mean_x_old
int64_t s, e
Py_ssize_t i, j, N = len(values)
Py_ssize_t i, j, N = len(start)
ndarray[float64_t] output
bint is_monotonic_increasing_bounds

Expand All @@ -372,8 +376,9 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start,

# Over the first window, observations can only be added
# never removed
if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

mean_x = ssqdm_x = nobs = compensation_add = compensation_remove = 0
for j in range(s, e):
add_var(values[j], &nobs, &mean_x, &ssqdm_x, &compensation_add)

Expand Down Expand Up @@ -500,11 +505,11 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
cdef:
Py_ssize_t i, j
float64_t val, prev, min_val, mean_val, sum_val = 0
float64_t compensation_xxx_add = 0, compensation_xxx_remove = 0
float64_t compensation_xx_add = 0, compensation_xx_remove = 0
float64_t compensation_x_add = 0, compensation_x_remove = 0
float64_t x = 0, xx = 0, xxx = 0
int64_t nobs = 0, N = len(values), nobs_mean = 0
float64_t compensation_xxx_add, compensation_xxx_remove
float64_t compensation_xx_add, compensation_xx_remove
float64_t compensation_x_add, compensation_x_remove
float64_t x, xx, xxx
int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0
int64_t s, e
ndarray[float64_t] output, mean_array, values_copy
bint is_monotonic_increasing_bounds
Expand All @@ -518,7 +523,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
values_copy = np.copy(values)

with nogil:
for i in range(0, N):
for i in range(0, V):
val = values_copy[i]
if notnan(val):
nobs_mean += 1
Expand All @@ -527,7 +532,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
# Other cases would lead to imprecision for smallest values
if min_val - mean_val > -1e5:
mean_val = round(mean_val)
for i in range(0, N):
for i in range(0, V):
values_copy[i] = values_copy[i] - mean_val

for i in range(0, N):
Expand All @@ -537,8 +542,13 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,

# Over the first window, observations can only be added
# never removed
if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

compensation_xxx_add = compensation_xxx_remove = 0
compensation_xx_add = compensation_xx_remove = 0
compensation_x_add = compensation_x_remove = 0
x = xx = xxx = 0
nobs = 0
for j in range(s, e):
val = values_copy[j]
add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
Expand Down Expand Up @@ -682,12 +692,12 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
cdef:
Py_ssize_t i, j
float64_t val, prev, mean_val, min_val, sum_val = 0
float64_t compensation_xxxx_add = 0, compensation_xxxx_remove = 0
float64_t compensation_xxx_remove = 0, compensation_xxx_add = 0
float64_t compensation_xx_remove = 0, compensation_xx_add = 0
float64_t compensation_x_remove = 0, compensation_x_add = 0
float64_t x = 0, xx = 0, xxx = 0, xxxx = 0
int64_t nobs = 0, s, e, N = len(values), nobs_mean = 0
float64_t compensation_xxxx_add, compensation_xxxx_remove
float64_t compensation_xxx_remove, compensation_xxx_add
float64_t compensation_xx_remove, compensation_xx_add
float64_t compensation_x_remove, compensation_x_add
float64_t x, xx, xxx, xxxx
int64_t nobs, s, e, N = len(start), V = len(values), nobs_mean = 0
ndarray[float64_t] output, values_copy
bint is_monotonic_increasing_bounds

Expand All @@ -700,7 +710,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
min_val = np.nanmin(values)

with nogil:
for i in range(0, N):
for i in range(0, V):
val = values_copy[i]
if notnan(val):
nobs_mean += 1
Expand All @@ -709,7 +719,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,
# Other cases would lead to imprecision for smallest values
if min_val - mean_val > -1e4:
mean_val = round(mean_val)
for i in range(0, N):
for i in range(0, V):
values_copy[i] = values_copy[i] - mean_val

for i in range(0, N):
Expand All @@ -719,8 +729,14 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start,

# Over the first window, observations can only be added
# never removed
if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

compensation_xxxx_add = compensation_xxxx_remove = 0
compensation_xxx_remove = compensation_xxx_add = 0
compensation_xx_remove = compensation_xx_add = 0
compensation_x_remove = compensation_x_add = 0
x = xx = xxx = xxxx = 0
nobs = 0
for j in range(s, e):
add_kurt(values_copy[j], &nobs, &x, &xx, &xxx, &xxxx,
&compensation_x_add, &compensation_xx_add,
Expand Down Expand Up @@ -764,7 +780,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
Py_ssize_t i, j
bint err = False, is_monotonic_increasing_bounds
int midpoint, ret = 0
int64_t nobs = 0, N = len(values), s, e, win
int64_t nobs = 0, N = len(start), s, e, win
float64_t val, res, prev
skiplist_t *sl
ndarray[float64_t] output
Expand All @@ -791,8 +807,12 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:

if i != 0:
skiplist_destroy(sl)
sl = skiplist_init(<int>win)
nobs = 0
# setup
for j in range(s, e):
val = values[j]
Expand Down Expand Up @@ -948,7 +968,7 @@ cdef _roll_min_max(ndarray[numeric_t] values,
cdef:
numeric_t ai
int64_t curr_win_size, start
Py_ssize_t i, k, nobs = 0, N = len(values)
Py_ssize_t i, k, nobs = 0, N = len(starti)
deque Q[int64_t] # min/max always the front
deque W[int64_t] # track the whole window for nobs compute
ndarray[float64_t, ndim=1] output
Expand Down Expand Up @@ -1031,7 +1051,7 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start,
O(N log(window)) implementation using skip list
"""
cdef:
Py_ssize_t i, j, s, e, N = len(values), idx
Py_ssize_t i, j, s, e, N = len(start), idx
int ret = 0
int64_t nobs = 0, win
float64_t val, prev, midpoint, idx_with_fraction
Expand Down Expand Up @@ -1068,8 +1088,8 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
if i != 0:
nobs = 0
skiplist_destroy(skiplist)
skiplist = skiplist_init(<int>win)
Expand Down Expand Up @@ -1160,7 +1180,7 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start,
derived from roll_quantile
"""
cdef:
Py_ssize_t i, j, s, e, N = len(values), idx
Py_ssize_t i, j, s, e, N = len(start), idx
float64_t rank_min = 0, rank = 0
int64_t nobs = 0, win
float64_t val
Expand Down Expand Up @@ -1193,8 +1213,8 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0 or not is_monotonic_increasing_bounds:
if not is_monotonic_increasing_bounds:
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
if i != 0:
nobs = 0
skiplist_destroy(skiplist)
skiplist = skiplist_init(<int>win)
Expand Down
111 changes: 111 additions & 0 deletions pandas/tests/window/test_cython_aggregations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from functools import partial
import sys

import numpy as np
import pytest

import pandas._libs.window.aggregations as window_aggregations

from pandas import Series
import pandas._testing as tm


def _get_rolling_aggregations():
# list pairs of name and function
# each function has this signature:
# (const float64_t[:] values, ndarray[int64_t] start,
# ndarray[int64_t] end, int64_t minp) -> np.ndarray
named_roll_aggs = (
[
("roll_sum", window_aggregations.roll_sum),
("roll_mean", window_aggregations.roll_mean),
]
+ [
(f"roll_var({ddof})", partial(window_aggregations.roll_var, ddof=ddof))
for ddof in [0, 1]
]
+ [
("roll_skew", window_aggregations.roll_skew),
("roll_kurt", window_aggregations.roll_kurt),
("roll_median_c", window_aggregations.roll_median_c),
("roll_max", window_aggregations.roll_max),
("roll_min", window_aggregations.roll_min),
]
+ [
(
f"roll_quantile({quantile},{interpolation})",
partial(
window_aggregations.roll_quantile,
quantile=quantile,
interpolation=interpolation,
),
)
for quantile in [0.0001, 0.5, 0.9999]
for interpolation in window_aggregations.interpolation_types
]
+ [
(
f"roll_rank({percentile},{method},{ascending})",
partial(
window_aggregations.roll_rank,
percentile=percentile,
method=method,
ascending=ascending,
),
)
for percentile in [True, False]
for method in window_aggregations.rolling_rank_tiebreakers.keys()
for ascending in [True, False]
]
)
# unzip to a list of 2 tuples, names and functions
unzipped = list(zip(*named_roll_aggs))
return {"ids": unzipped[0], "params": unzipped[1]}


_rolling_aggregations = _get_rolling_aggregations()


@pytest.fixture(
params=_rolling_aggregations["params"], ids=_rolling_aggregations["ids"]
)
def rolling_aggregation(request):
"""Make a rolling aggregation function as fixture."""
return request.param


def test_rolling_aggregation_boundary_consistency(rolling_aggregation):
# GH-45647
minp, step, width, size, selection = 0, 1, 3, 11, [2, 7]
values = np.arange(1, 1 + size, dtype=np.float64)
end = np.arange(width, size, step, dtype=np.int64)
start = end - width
selarr = np.array(selection, dtype=np.int32)
result = Series(rolling_aggregation(values, start[selarr], end[selarr], minp))
expected = Series(rolling_aggregation(values, start, end, minp)[selarr])
tm.assert_equal(expected, result)


def test_rolling_aggregation_with_unused_elements(rolling_aggregation):
# GH-45647
minp, width = 0, 5 # width at least 4 for kurt
size = 2 * width + 5
values = np.arange(1, size + 1, dtype=np.float64)
values[width : width + 2] = sys.float_info.min
values[width + 2] = np.nan
values[width + 3 : width + 5] = sys.float_info.max
start = np.array([0, size - width], dtype=np.int64)
end = np.array([width, size], dtype=np.int64)
loc = np.array(
[j for i in range(len(start)) for j in range(start[i], end[i])],
dtype=np.int32,
)
result = Series(rolling_aggregation(values, start, end, minp))
compact_values = np.array(values[loc], dtype=np.float64)
compact_start = np.arange(0, len(start) * width, width, dtype=np.int64)
compact_end = compact_start + width
expected = Series(
rolling_aggregation(compact_values, compact_start, compact_end, minp)
)
assert np.isfinite(expected.values).all(), "Not all expected values are finite"
tm.assert_equal(expected, result)