From 256a0c71246e3385f5efed95879aabf42d28447a Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 27 Jan 2022 03:45:10 -0500 Subject: [PATCH 1/5] BUG: Fix window aggregations to skip over unused elements (GH-45647) --- pandas/_libs/window/aggregations.pyx | 91 +++++++++++++++++----------- pandas/tests/window/conftest.py | 61 +++++++++++++++++++ pandas/tests/window/test_rolling.py | 46 +++++++++++++- 3 files changed, 161 insertions(+), 37 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 5ebb60dc7e41b..2dad06cb409ea 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -122,9 +122,9 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp) -> np.ndarray: cdef: Py_ssize_t i, j - float64_t sum_x = 0, compensation_add = 0, compensation_remove = 0 + float64_t sum_x, compensation_add, compensation_remove int64_t s, e - int64_t nobs = 0, N = len(values) + int64_t nobs = 0, N = len(start) ndarray[float64_t] output bint is_monotonic_increasing_bounds @@ -139,10 +139,12 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: # setup + sum_x = compensation_add = compensation_remove = 0 + nobs = 0 for j in range(s, e): add_sum(values[j], &nobs, &sum_x, &compensation_add) @@ -226,9 +228,9 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x, def roll_mean(const float64_t[:] values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp) -> np.ndarray: cdef: - float64_t val, compensation_add = 0, compensation_remove = 0, sum_x = 0 + float64_t val, compensation_add, compensation_remove, sum_x int64_t s, e - Py_ssize_t nobs = 0, i, j, neg_ct = 0, N = len(values) + Py_ssize_t nobs, i, j, neg_ct, N = len(start) ndarray[float64_t] output bint is_monotonic_increasing_bounds @@ -243,8 +245,10 @@ def roll_mean(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + compensation_add = compensation_remove = sum_x = 0 + nobs = neg_ct = 0 # setup for j in range(s, e): val = values[j] @@ -349,11 +353,11 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start, Numerically stable implementation using Welford's method. """ cdef: - float64_t mean_x = 0, ssqdm_x = 0, nobs = 0, compensation_add = 0, - float64_t compensation_remove = 0, + float64_t mean_x, ssqdm_x, nobs, compensation_add, + float64_t compensation_remove, float64_t val, prev, delta, mean_x_old int64_t s, e - Py_ssize_t i, j, N = len(values) + Py_ssize_t i, j, N = len(start) ndarray[float64_t] output bint is_monotonic_increasing_bounds @@ -372,8 +376,9 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start, # Over the first window, observations can only be added # never removed - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + mean_x = ssqdm_x = nobs = compensation_add = compensation_remove = 0 for j in range(s, e): add_var(values[j], &nobs, &mean_x, &ssqdm_x, &compensation_add) @@ -500,11 +505,11 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, cdef: Py_ssize_t i, j float64_t val, prev, min_val, mean_val, sum_val = 0 - float64_t compensation_xxx_add = 0, compensation_xxx_remove = 0 - float64_t compensation_xx_add = 0, compensation_xx_remove = 0 - float64_t compensation_x_add = 0, compensation_x_remove = 0 - float64_t x = 0, xx = 0, xxx = 0 - int64_t nobs = 0, N = len(values), nobs_mean = 0 + float64_t compensation_xxx_add, compensation_xxx_remove + float64_t compensation_xx_add, compensation_xx_remove + float64_t compensation_x_add, compensation_x_remove + float64_t x, xx, xxx + int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0 int64_t s, e ndarray[float64_t] output, mean_array, values_copy bint is_monotonic_increasing_bounds @@ -518,7 +523,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, values_copy = np.copy(values) with nogil: - for i in range(0, N): + for i in range(0, V): val = values_copy[i] if notnan(val): nobs_mean += 1 @@ -527,7 +532,7 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, # Other cases would lead to imprecision for smallest values if min_val - mean_val > -1e5: mean_val = round(mean_val) - for i in range(0, N): + for i in range(0, V): values_copy[i] = values_copy[i] - mean_val for i in range(0, N): @@ -537,8 +542,13 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, # Over the first window, observations can only be added # never removed - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + compensation_xxx_add = compensation_xxx_remove = 0 + compensation_xx_add = compensation_xx_remove = 0 + compensation_x_add = compensation_x_remove = 0 + x = xx = xxx = 0 + nobs = 0 for j in range(s, e): val = values_copy[j] add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add, @@ -682,12 +692,12 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, cdef: Py_ssize_t i, j float64_t val, prev, mean_val, min_val, sum_val = 0 - float64_t compensation_xxxx_add = 0, compensation_xxxx_remove = 0 - float64_t compensation_xxx_remove = 0, compensation_xxx_add = 0 - float64_t compensation_xx_remove = 0, compensation_xx_add = 0 - float64_t compensation_x_remove = 0, compensation_x_add = 0 - float64_t x = 0, xx = 0, xxx = 0, xxxx = 0 - int64_t nobs = 0, s, e, N = len(values), nobs_mean = 0 + float64_t compensation_xxxx_add, compensation_xxxx_remove + float64_t compensation_xxx_remove, compensation_xxx_add + float64_t compensation_xx_remove, compensation_xx_add + float64_t compensation_x_remove, compensation_x_add + float64_t x, xx, xxx, xxxx + int64_t nobs, s, e, N = len(start), V = len(values), nobs_mean = 0 ndarray[float64_t] output, values_copy bint is_monotonic_increasing_bounds @@ -700,7 +710,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, min_val = np.nanmin(values) with nogil: - for i in range(0, N): + for i in range(0, V): val = values_copy[i] if notnan(val): nobs_mean += 1 @@ -709,7 +719,7 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, # Other cases would lead to imprecision for smallest values if min_val - mean_val > -1e4: mean_val = round(mean_val) - for i in range(0, N): + for i in range(0, V): values_copy[i] = values_copy[i] - mean_val for i in range(0, N): @@ -719,8 +729,14 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, # Over the first window, observations can only be added # never removed - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + compensation_xxxx_add = compensation_xxxx_remove = 0 + compensation_xxx_remove = compensation_xxx_add = 0 + compensation_xx_remove = compensation_xx_add = 0 + compensation_x_remove = compensation_x_add = 0 + x = xx = xxx = xxxx = 0 + nobs = 0 for j in range(s, e): add_kurt(values_copy[j], &nobs, &x, &xx, &xxx, &xxxx, &compensation_x_add, &compensation_xx_add, @@ -764,7 +780,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, Py_ssize_t i, j bint err = False, is_monotonic_increasing_bounds int midpoint, ret = 0 - int64_t nobs = 0, N = len(values), s, e, win + int64_t nobs, N = len(start), s, e, win float64_t val, res, prev skiplist_t *sl ndarray[float64_t] output @@ -791,8 +807,11 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + skiplist_destroy(sl) + sl = skiplist_init(win) + nobs = 0 # setup for j in range(s, e): val = values[j] @@ -948,7 +967,7 @@ cdef _roll_min_max(ndarray[numeric_t] values, cdef: numeric_t ai int64_t curr_win_size, start - Py_ssize_t i, k, nobs = 0, N = len(values) + Py_ssize_t i, k, nobs = 0, N = len(starti) deque Q[int64_t] # min/max always the front deque W[int64_t] # track the whole window for nobs compute ndarray[float64_t, ndim=1] output @@ -1031,7 +1050,7 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start, O(N log(window)) implementation using skip list """ cdef: - Py_ssize_t i, j, s, e, N = len(values), idx + Py_ssize_t i, j, s, e, N = len(start), idx int ret = 0 int64_t nobs = 0, win float64_t val, prev, midpoint, idx_with_fraction @@ -1068,8 +1087,8 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: - if not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + if not is_monotonic_increasing_bounds or s >= end[i - 1]: nobs = 0 skiplist_destroy(skiplist) skiplist = skiplist_init(win) @@ -1160,7 +1179,7 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start, derived from roll_quantile """ cdef: - Py_ssize_t i, j, s, e, N = len(values), idx + Py_ssize_t i, j, s, e, N = len(start), idx float64_t rank_min = 0, rank = 0 int64_t nobs = 0, win float64_t val @@ -1193,8 +1212,8 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0 or not is_monotonic_increasing_bounds: - if not is_monotonic_increasing_bounds: + if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: + if not is_monotonic_increasing_bounds or s >= end[i - 1]: nobs = 0 skiplist_destroy(skiplist) skiplist = skiplist_init(win) diff --git a/pandas/tests/window/conftest.py b/pandas/tests/window/conftest.py index 09233e3f31c89..436fb96bfe813 100644 --- a/pandas/tests/window/conftest.py +++ b/pandas/tests/window/conftest.py @@ -2,10 +2,12 @@ datetime, timedelta, ) +from functools import partial import numpy as np import pytest +import pandas._libs.window.aggregations as window_aggregations import pandas.util._test_decorators as td from pandas import ( @@ -259,3 +261,62 @@ def frame(): index=bdate_range(datetime(2009, 1, 1), periods=100), columns=np.arange(10), ) + + +def _named_func(name_and_func): + name, func = name_and_func + if not hasattr(func, "func"): + func = partial(func) + func.__name__ = name + return func + + +@pytest.fixture( + params=[ + _named_func(x) + for x in [ + ("roll_sum", window_aggregations.roll_sum), + ("roll_mean", window_aggregations.roll_mean), + ] + + [ + (f"roll_var({ddof})", partial(window_aggregations.roll_var, ddof=ddof)) + for ddof in [0, 1] + ] + + [ + ("roll_skew", window_aggregations.roll_skew), + ("roll_kurt", window_aggregations.roll_kurt), + ("roll_mediac_c", window_aggregations.roll_median_c), + ("roll_max", window_aggregations.roll_max), + ("roll_min", window_aggregations.roll_min), + ] + + [ + ( + f"roll_quntile({quantile},{interpolation})", + partial( + window_aggregations.roll_quantile, + quantile=quantile, + interpolation=interpolation, + ), + ) + for quantile in [0.25, 0.5, 0.75] + for interpolation in window_aggregations.interpolation_types + ] + + [ + ( + f"roll_rank({percentile},{method},{ascending})", + partial( + window_aggregations.roll_rank, + percentile=percentile, + method=method, + ascending=ascending, + ), + ) + for percentile in [True, False] + for method in window_aggregations.rolling_rank_tiebreakers.keys() + for ascending in [True, False] + ] + ] +) +def rolling_aggregation(request): + """Make a named rolling aggregation function as fixture.""" + return request.param diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 814bd6b998182..9b4ca6475b970 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -2,6 +2,7 @@ datetime, timedelta, ) +import time import numpy as np import pytest @@ -1391,7 +1392,7 @@ def test_rolling_corr_timedelta_index(index, window): # GH: 31286 x = Series([1, 2, 3, 4, 5], index=index) y = x.copy() - x[0:2] = 0.0 + x.iloc[0:2] = 0.0 result = x.rolling(window).corr(y) expected = Series([np.nan, np.nan, 1, 1, 1], index=index) tm.assert_almost_equal(result, expected) @@ -1734,3 +1735,46 @@ def test_rolling_std_neg_sqrt(): b = a.ewm(span=3).std() assert np.isfinite(b[2:]).all() + + +def test_rolling_aggregation_boundary_consistency(rolling_aggregation): + minp, step, width, size, selection = 0, 1, 3, 11, [2, 7] + s = Series(np.arange(1, 1 + size, dtype=np.float64)) + end = np.arange(width, size, step, dtype=np.int64) + start = end - width + selarr = np.array(selection, dtype=np.int32) + result = Series(rolling_aggregation(s.values, start[selarr], end[selarr], minp)) + expected = Series(rolling_aggregation(s.values, start, end, minp)[selarr]) + tm.assert_equal(expected, result) + + +def test_rolling_aggregation_timing_with_unused_elements(rolling_aggregation): + minp, width = 0, 5 # width at least 4 for kurt + + def time_roll_agg(extra_size): + t0 = time.time() + size = 2 * width + 2 + extra_size + s = Series(np.full(size, np.nan, dtype=np.float64)) + start = np.array([1, size - width - 2], dtype=np.int64) + end = np.array([1 + width, size - 2], dtype=np.int64) + loc = np.array( + [j for i in range(len(start)) for j in range(start[i], end[i])], + dtype=np.int32, + ) + for j in loc: + s.iloc[j] = 1 + j + result = Series(rolling_aggregation(s.values, start, end, minp)) + compact_s = np.array(s.iloc[loc], dtype=np.float64) + compact_start = np.arange(0, len(start) * width, width, dtype=np.int64) + compact_end = compact_start + width + expected = Series( + rolling_aggregation(compact_s, compact_start, compact_end, minp) + ) + assert np.isfinite(expected.values).all(), "Not all expected values are finite" + tm.assert_equal(expected, result) + t1 = time.time() + return t1 - t0 + + for i in range(3): # all but last are warmup + t = [time_roll_agg(extra_size) for extra_size in [2, 2000]] + assert abs((t[0] - t[1]) / t[0]) < 2, "Unused elements significantly affect timing" From cafe909b87ea42e8c891f5545c66005e078281d4 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 27 Jan 2022 08:06:39 -0500 Subject: [PATCH 2/5] BUG: Fix testing of window aggregations with unused elements (GH-45647) --- pandas/tests/window/conftest.py | 6 ++-- pandas/tests/window/test_rolling.py | 51 ++++++++++++----------------- 2 files changed, 24 insertions(+), 33 deletions(-) diff --git a/pandas/tests/window/conftest.py b/pandas/tests/window/conftest.py index 436fb96bfe813..76170bec47bff 100644 --- a/pandas/tests/window/conftest.py +++ b/pandas/tests/window/conftest.py @@ -285,20 +285,20 @@ def _named_func(name_and_func): + [ ("roll_skew", window_aggregations.roll_skew), ("roll_kurt", window_aggregations.roll_kurt), - ("roll_mediac_c", window_aggregations.roll_median_c), + ("roll_median_c", window_aggregations.roll_median_c), ("roll_max", window_aggregations.roll_max), ("roll_min", window_aggregations.roll_min), ] + [ ( - f"roll_quntile({quantile},{interpolation})", + f"roll_quantile({quantile},{interpolation})", partial( window_aggregations.roll_quantile, quantile=quantile, interpolation=interpolation, ), ) - for quantile in [0.25, 0.5, 0.75] + for quantile in [0.0001, 0.5, 0.9999] for interpolation in window_aggregations.interpolation_types ] + [ diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 9b4ca6475b970..6d63915f1c3e6 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -2,7 +2,6 @@ datetime, timedelta, ) -import time import numpy as np import pytest @@ -1748,33 +1747,25 @@ def test_rolling_aggregation_boundary_consistency(rolling_aggregation): tm.assert_equal(expected, result) -def test_rolling_aggregation_timing_with_unused_elements(rolling_aggregation): - minp, width = 0, 5 # width at least 4 for kurt +def test_rolling_aggregation_with_unused_elements(rolling_aggregation): + import sys - def time_roll_agg(extra_size): - t0 = time.time() - size = 2 * width + 2 + extra_size - s = Series(np.full(size, np.nan, dtype=np.float64)) - start = np.array([1, size - width - 2], dtype=np.int64) - end = np.array([1 + width, size - 2], dtype=np.int64) - loc = np.array( - [j for i in range(len(start)) for j in range(start[i], end[i])], - dtype=np.int32, - ) - for j in loc: - s.iloc[j] = 1 + j - result = Series(rolling_aggregation(s.values, start, end, minp)) - compact_s = np.array(s.iloc[loc], dtype=np.float64) - compact_start = np.arange(0, len(start) * width, width, dtype=np.int64) - compact_end = compact_start + width - expected = Series( - rolling_aggregation(compact_s, compact_start, compact_end, minp) - ) - assert np.isfinite(expected.values).all(), "Not all expected values are finite" - tm.assert_equal(expected, result) - t1 = time.time() - return t1 - t0 - - for i in range(3): # all but last are warmup - t = [time_roll_agg(extra_size) for extra_size in [2, 2000]] - assert abs((t[0] - t[1]) / t[0]) < 2, "Unused elements significantly affect timing" + minp, width = 0, 5 # width at least 4 for kurt + size = 2 * width + 5 + s = Series(np.arange(1, size + 1, dtype=np.float64)) + s[width : width + 2] = sys.float_info.min + s[width + 2] = np.nan + s[width + 3 : width + 5] = sys.float_info.max + start = np.array([0, size - width], dtype=np.int64) + end = np.array([width, size], dtype=np.int64) + loc = np.array( + [j for i in range(len(start)) for j in range(start[i], end[i])], + dtype=np.int32, + ) + result = Series(rolling_aggregation(s.values, start, end, minp)) + compact_s = np.array(s.iloc[loc], dtype=np.float64) + compact_start = np.arange(0, len(start) * width, width, dtype=np.int64) + compact_end = compact_start + width + expected = Series(rolling_aggregation(compact_s, compact_start, compact_end, minp)) + assert np.isfinite(expected.values).all(), "Not all expected values are finite" + tm.assert_equal(expected, result) From 64e959f0a13a74aa462da0902ab50efe0adc8c3b Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 27 Jan 2022 09:54:55 -0500 Subject: [PATCH 3/5] Minor style fixes (GH-45647) --- pandas/tests/window/test_rolling.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 6d63915f1c3e6..8c6da79659889 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -2,6 +2,7 @@ datetime, timedelta, ) +import sys import numpy as np import pytest @@ -1737,6 +1738,7 @@ def test_rolling_std_neg_sqrt(): def test_rolling_aggregation_boundary_consistency(rolling_aggregation): + # GH-45647 minp, step, width, size, selection = 0, 1, 3, 11, [2, 7] s = Series(np.arange(1, 1 + size, dtype=np.float64)) end = np.arange(width, size, step, dtype=np.int64) @@ -1748,8 +1750,7 @@ def test_rolling_aggregation_boundary_consistency(rolling_aggregation): def test_rolling_aggregation_with_unused_elements(rolling_aggregation): - import sys - + # GH-45647 minp, width = 0, 5 # width at least 4 for kurt size = 2 * width + 5 s = Series(np.arange(1, size + 1, dtype=np.float64)) From 221ca7b4e048baf205b2def3db67fc219371c3d2 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 27 Jan 2022 09:55:38 -0500 Subject: [PATCH 4/5] Added release note (GH-45647) --- doc/source/whatsnew/v1.4.1.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/whatsnew/v1.4.1.rst b/doc/source/whatsnew/v1.4.1.rst index 1bd8da2d2b03c..ef5e7ecf44ba3 100644 --- a/doc/source/whatsnew/v1.4.1.rst +++ b/doc/source/whatsnew/v1.4.1.rst @@ -25,6 +25,7 @@ Fixed regressions Bug fixes ~~~~~~~~~ - Fixed segfault in :meth:``DataFrame.to_json`` when dumping tz-aware datetimes in Python 3.10 (:issue:`42130`) +- Fixed window aggregations to skip over unused elements (:issue:`45647`) - .. --------------------------------------------------------------------------- From a7d68789543625709d49d2c4e41c8c3da008e479 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Fri, 28 Jan 2022 04:53:09 -0500 Subject: [PATCH 5/5] Refactored and cleaned up (GH-45647) --- doc/source/whatsnew/v1.4.1.rst | 2 +- pandas/_libs/window/aggregations.pyx | 13 +- pandas/tests/window/conftest.py | 61 ---------- .../tests/window/test_cython_aggregations.py | 111 ++++++++++++++++++ pandas/tests/window/test_rolling.py | 36 ------ 5 files changed, 119 insertions(+), 104 deletions(-) create mode 100644 pandas/tests/window/test_cython_aggregations.py diff --git a/doc/source/whatsnew/v1.4.1.rst b/doc/source/whatsnew/v1.4.1.rst index ef5e7ecf44ba3..c950a71222c6c 100644 --- a/doc/source/whatsnew/v1.4.1.rst +++ b/doc/source/whatsnew/v1.4.1.rst @@ -25,7 +25,7 @@ Fixed regressions Bug fixes ~~~~~~~~~ - Fixed segfault in :meth:``DataFrame.to_json`` when dumping tz-aware datetimes in Python 3.10 (:issue:`42130`) -- Fixed window aggregations to skip over unused elements (:issue:`45647`) +- Fixed window aggregations in :meth:`DataFrame.rolling` and :meth:`Series.rolling` to skip over unused elements (:issue:`45647`) - .. --------------------------------------------------------------------------- diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 2dad06cb409ea..ff53a577af33f 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -780,7 +780,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, Py_ssize_t i, j bint err = False, is_monotonic_increasing_bounds int midpoint, ret = 0 - int64_t nobs, N = len(start), s, e, win + int64_t nobs = 0, N = len(start), s, e, win float64_t val, res, prev skiplist_t *sl ndarray[float64_t] output @@ -809,9 +809,10 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: - skiplist_destroy(sl) - sl = skiplist_init(win) - nobs = 0 + if i != 0: + skiplist_destroy(sl) + sl = skiplist_init(win) + nobs = 0 # setup for j in range(s, e): val = values[j] @@ -1088,7 +1089,7 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start, e = end[i] if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: - if not is_monotonic_increasing_bounds or s >= end[i - 1]: + if i != 0: nobs = 0 skiplist_destroy(skiplist) skiplist = skiplist_init(win) @@ -1213,7 +1214,7 @@ def roll_rank(const float64_t[:] values, ndarray[int64_t] start, e = end[i] if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: - if not is_monotonic_increasing_bounds or s >= end[i - 1]: + if i != 0: nobs = 0 skiplist_destroy(skiplist) skiplist = skiplist_init(win) diff --git a/pandas/tests/window/conftest.py b/pandas/tests/window/conftest.py index 76170bec47bff..09233e3f31c89 100644 --- a/pandas/tests/window/conftest.py +++ b/pandas/tests/window/conftest.py @@ -2,12 +2,10 @@ datetime, timedelta, ) -from functools import partial import numpy as np import pytest -import pandas._libs.window.aggregations as window_aggregations import pandas.util._test_decorators as td from pandas import ( @@ -261,62 +259,3 @@ def frame(): index=bdate_range(datetime(2009, 1, 1), periods=100), columns=np.arange(10), ) - - -def _named_func(name_and_func): - name, func = name_and_func - if not hasattr(func, "func"): - func = partial(func) - func.__name__ = name - return func - - -@pytest.fixture( - params=[ - _named_func(x) - for x in [ - ("roll_sum", window_aggregations.roll_sum), - ("roll_mean", window_aggregations.roll_mean), - ] - + [ - (f"roll_var({ddof})", partial(window_aggregations.roll_var, ddof=ddof)) - for ddof in [0, 1] - ] - + [ - ("roll_skew", window_aggregations.roll_skew), - ("roll_kurt", window_aggregations.roll_kurt), - ("roll_median_c", window_aggregations.roll_median_c), - ("roll_max", window_aggregations.roll_max), - ("roll_min", window_aggregations.roll_min), - ] - + [ - ( - f"roll_quantile({quantile},{interpolation})", - partial( - window_aggregations.roll_quantile, - quantile=quantile, - interpolation=interpolation, - ), - ) - for quantile in [0.0001, 0.5, 0.9999] - for interpolation in window_aggregations.interpolation_types - ] - + [ - ( - f"roll_rank({percentile},{method},{ascending})", - partial( - window_aggregations.roll_rank, - percentile=percentile, - method=method, - ascending=ascending, - ), - ) - for percentile in [True, False] - for method in window_aggregations.rolling_rank_tiebreakers.keys() - for ascending in [True, False] - ] - ] -) -def rolling_aggregation(request): - """Make a named rolling aggregation function as fixture.""" - return request.param diff --git a/pandas/tests/window/test_cython_aggregations.py b/pandas/tests/window/test_cython_aggregations.py new file mode 100644 index 0000000000000..c60cb6ea74ec0 --- /dev/null +++ b/pandas/tests/window/test_cython_aggregations.py @@ -0,0 +1,111 @@ +from functools import partial +import sys + +import numpy as np +import pytest + +import pandas._libs.window.aggregations as window_aggregations + +from pandas import Series +import pandas._testing as tm + + +def _get_rolling_aggregations(): + # list pairs of name and function + # each function has this signature: + # (const float64_t[:] values, ndarray[int64_t] start, + # ndarray[int64_t] end, int64_t minp) -> np.ndarray + named_roll_aggs = ( + [ + ("roll_sum", window_aggregations.roll_sum), + ("roll_mean", window_aggregations.roll_mean), + ] + + [ + (f"roll_var({ddof})", partial(window_aggregations.roll_var, ddof=ddof)) + for ddof in [0, 1] + ] + + [ + ("roll_skew", window_aggregations.roll_skew), + ("roll_kurt", window_aggregations.roll_kurt), + ("roll_median_c", window_aggregations.roll_median_c), + ("roll_max", window_aggregations.roll_max), + ("roll_min", window_aggregations.roll_min), + ] + + [ + ( + f"roll_quantile({quantile},{interpolation})", + partial( + window_aggregations.roll_quantile, + quantile=quantile, + interpolation=interpolation, + ), + ) + for quantile in [0.0001, 0.5, 0.9999] + for interpolation in window_aggregations.interpolation_types + ] + + [ + ( + f"roll_rank({percentile},{method},{ascending})", + partial( + window_aggregations.roll_rank, + percentile=percentile, + method=method, + ascending=ascending, + ), + ) + for percentile in [True, False] + for method in window_aggregations.rolling_rank_tiebreakers.keys() + for ascending in [True, False] + ] + ) + # unzip to a list of 2 tuples, names and functions + unzipped = list(zip(*named_roll_aggs)) + return {"ids": unzipped[0], "params": unzipped[1]} + + +_rolling_aggregations = _get_rolling_aggregations() + + +@pytest.fixture( + params=_rolling_aggregations["params"], ids=_rolling_aggregations["ids"] +) +def rolling_aggregation(request): + """Make a rolling aggregation function as fixture.""" + return request.param + + +def test_rolling_aggregation_boundary_consistency(rolling_aggregation): + # GH-45647 + minp, step, width, size, selection = 0, 1, 3, 11, [2, 7] + values = np.arange(1, 1 + size, dtype=np.float64) + end = np.arange(width, size, step, dtype=np.int64) + start = end - width + selarr = np.array(selection, dtype=np.int32) + result = Series(rolling_aggregation(values, start[selarr], end[selarr], minp)) + expected = Series(rolling_aggregation(values, start, end, minp)[selarr]) + tm.assert_equal(expected, result) + + +def test_rolling_aggregation_with_unused_elements(rolling_aggregation): + # GH-45647 + minp, width = 0, 5 # width at least 4 for kurt + size = 2 * width + 5 + values = np.arange(1, size + 1, dtype=np.float64) + values[width : width + 2] = sys.float_info.min + values[width + 2] = np.nan + values[width + 3 : width + 5] = sys.float_info.max + start = np.array([0, size - width], dtype=np.int64) + end = np.array([width, size], dtype=np.int64) + loc = np.array( + [j for i in range(len(start)) for j in range(start[i], end[i])], + dtype=np.int32, + ) + result = Series(rolling_aggregation(values, start, end, minp)) + compact_values = np.array(values[loc], dtype=np.float64) + compact_start = np.arange(0, len(start) * width, width, dtype=np.int64) + compact_end = compact_start + width + expected = Series( + rolling_aggregation(compact_values, compact_start, compact_end, minp) + ) + assert np.isfinite(expected.values).all(), "Not all expected values are finite" + tm.assert_equal(expected, result) diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 8c6da79659889..31dbcdfce44e7 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -2,7 +2,6 @@ datetime, timedelta, ) -import sys import numpy as np import pytest @@ -1735,38 +1734,3 @@ def test_rolling_std_neg_sqrt(): b = a.ewm(span=3).std() assert np.isfinite(b[2:]).all() - - -def test_rolling_aggregation_boundary_consistency(rolling_aggregation): - # GH-45647 - minp, step, width, size, selection = 0, 1, 3, 11, [2, 7] - s = Series(np.arange(1, 1 + size, dtype=np.float64)) - end = np.arange(width, size, step, dtype=np.int64) - start = end - width - selarr = np.array(selection, dtype=np.int32) - result = Series(rolling_aggregation(s.values, start[selarr], end[selarr], minp)) - expected = Series(rolling_aggregation(s.values, start, end, minp)[selarr]) - tm.assert_equal(expected, result) - - -def test_rolling_aggregation_with_unused_elements(rolling_aggregation): - # GH-45647 - minp, width = 0, 5 # width at least 4 for kurt - size = 2 * width + 5 - s = Series(np.arange(1, size + 1, dtype=np.float64)) - s[width : width + 2] = sys.float_info.min - s[width + 2] = np.nan - s[width + 3 : width + 5] = sys.float_info.max - start = np.array([0, size - width], dtype=np.int64) - end = np.array([width, size], dtype=np.int64) - loc = np.array( - [j for i in range(len(start)) for j in range(start[i], end[i])], - dtype=np.int32, - ) - result = Series(rolling_aggregation(s.values, start, end, minp)) - compact_s = np.array(s.iloc[loc], dtype=np.float64) - compact_start = np.arange(0, len(start) * width, width, dtype=np.int64) - compact_end = compact_start + width - expected = Series(rolling_aggregation(compact_s, compact_start, compact_end, minp)) - assert np.isfinite(expected.values).all(), "Not all expected values are finite" - tm.assert_equal(expected, result)