diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index f1f24ab7a101b..4fbad4df3e78b 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -499,6 +499,7 @@ Groupby/resample/rolling - Bug in :meth:`DataFrameGroupBy.rolling` returned wrong values with timeaware window containing ``NaN``. Raises ``ValueError`` because windows are not monotonic now (:issue:`34617`) - Bug in :meth:`Rolling.__iter__` where a ``ValueError`` was not raised when ``min_periods`` was larger than ``window`` (:issue:`37156`) - Using :meth:`Rolling.var()` instead of :meth:`Rolling.std()` avoids numerical issues for :meth:`Rolling.corr()` when :meth:`Rolling.var()` is still within floating point precision while :meth:`Rolling.std()` is not (:issue:`31286`) +- Bug in :meth:`Rolling.median` and :meth:`Rolling.quantile` returned wrong values for :class:`BaseIndexer` subclasses with non-monotonic starting or ending points for windows (:issue:`37153`) Reshaping ^^^^^^^^^ diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 2c315ca13e563..b2dbf7802e6f0 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -686,6 +686,11 @@ def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start, int64_t nobs = 0, N = len(values), s, e, win int midpoint ndarray[float64_t] output + bint is_monotonic_increasing_bounds + + is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds( + start, end + ) # we use the Fixed/Variable Indexer here as the # actual skiplist ops outweigh any window computation costs @@ -705,7 +710,7 @@ def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0: + if i == 0 or not is_monotonic_increasing_bounds: # setup for j in range(s, e): @@ -733,7 +738,6 @@ def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start, if notnan(val): skiplist_remove(sl, val) nobs -= 1 - if nobs >= minp: midpoint = (nobs / 2) if nobs % 2: @@ -748,6 +752,10 @@ def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start, output[i] = res + if not is_monotonic_increasing_bounds: + nobs = 0 + sl = skiplist_init(win) + skiplist_destroy(sl) if err: raise MemoryError("skiplist_insert failed") @@ -935,7 +943,7 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, cdef: float64_t val, prev, midpoint, idx_with_fraction skiplist_t *skiplist - int64_t nobs = 0, i, j, s, e, N = len(values) + int64_t nobs = 0, i, j, s, e, N = len(values), win Py_ssize_t idx ndarray[float64_t] output float64_t vlow, vhigh @@ -950,6 +958,9 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, except KeyError: raise ValueError(f"Interpolation '{interpolation}' is not supported") + is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds( + start, end + ) # we use the Fixed/Variable Indexer here as the # actual skiplist ops outweigh any window computation costs output = np.empty(N, dtype=float) @@ -967,7 +978,10 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, s = start[i] e = end[i] - if i == 0: + if i == 0 or not is_monotonic_increasing_bounds: + if not is_monotonic_increasing_bounds: + nobs = 0 + skiplist = skiplist_init(win) # setup for j in range(s, e): @@ -977,7 +991,6 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, skiplist_insert(skiplist, val) else: - # calculate adds for j in range(end[i - 1], e): val = values[j] @@ -991,7 +1004,6 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, if notnan(val): skiplist_remove(skiplist, val) nobs -= 1 - if nobs >= minp: if nobs == 1: # Single value in skip list diff --git a/pandas/tests/window/test_base_indexer.py b/pandas/tests/window/test_base_indexer.py index 7f2d58effe1ae..1723330ec40e1 100644 --- a/pandas/tests/window/test_base_indexer.py +++ b/pandas/tests/window/test_base_indexer.py @@ -263,3 +263,31 @@ def test_fixed_forward_indexer_count(): result = df.rolling(window=indexer, min_periods=0).count() expected = DataFrame({"b": [0.0, 0.0, 1.0, 1.0]}) tm.assert_frame_equal(result, expected) + + +@pytest.mark.parametrize( + ("end_value", "values"), [(1, [0.0, 1, 1, 3, 2]), (-1, [0.0, 1, 0, 3, 1])] +) +@pytest.mark.parametrize(("func", "args"), [("median", []), ("quantile", [0.5])]) +def test_indexer_quantile_sum(end_value, values, func, args): + # GH 37153 + class CustomIndexer(BaseIndexer): + def get_window_bounds(self, num_values, min_periods, center, closed): + start = np.empty(num_values, dtype=np.int64) + end = np.empty(num_values, dtype=np.int64) + for i in range(num_values): + if self.use_expanding[i]: + start[i] = 0 + end[i] = max(i + end_value, 1) + else: + start[i] = i + end[i] = i + self.window_size + return start, end + + use_expanding = [True, False, True, False, True] + df = DataFrame({"values": range(5)}) + + indexer = CustomIndexer(window_size=1, use_expanding=use_expanding) + result = getattr(df.rolling(indexer), func)(*args) + expected = DataFrame({"values": values}) + tm.assert_frame_equal(result, expected)