diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 2a8b6fe3ade6a..1f7cb2d64bcdb 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -347,6 +347,7 @@ Groupby/resample/rolling - Bug when subsetting columns on a :class:`~pandas.core.groupby.DataFrameGroupBy` (e.g. ``df.groupby('a')[['b']])``) would reset the attributes ``axis``, ``dropna``, ``group_keys``, ``level``, ``mutated``, ``sort``, and ``squeeze`` to their default values. (:issue:`9959`) - Bug in :meth:`DataFrameGroupby.tshift` failing to raise ``ValueError`` when a frequency cannot be inferred for the index of a group (:issue:`35937`) - Bug in :meth:`DataFrame.groupby` does not always maintain column index name for ``any``, ``all``, ``bfill``, ``ffill``, ``shift`` (:issue:`29764`) +- Bug in :meth:`DataFrame.groupby.rolling` output incorrect when using a partial window (:issue:`36040`) - Bug in :meth:`DataFrameGroupBy.apply` raising error with ``np.nan`` group(s) when ``dropna=False`` (:issue:`35889`) - Bug in :meth:`Rolling.sum()` returned wrong values when dtypes where mixed between float and integer and axis was equal to one (:issue:`20649`, :issue:`35596`) diff --git a/pandas/core/window/common.py b/pandas/core/window/common.py index 6452eb8c6b3a9..3f7323abfb552 100644 --- a/pandas/core/window/common.py +++ b/pandas/core/window/common.py @@ -71,6 +71,7 @@ def _apply( is_weighted: bool = False, name: Optional[str] = None, use_numba_cache: bool = False, + skip_offset: bool = False, **kwargs, ): """ diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index a21521f4ce8bb..d8aeed0578836 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -78,17 +78,17 @@ def get_window_bounds( closed: Optional[str] = None, ) -> Tuple[np.ndarray, np.ndarray]: - start_s = np.zeros(self.window_size, dtype="int64") - start_e = ( - np.arange(self.window_size, num_values, dtype="int64") - - self.window_size - + 1 - ) - start = np.concatenate([start_s, start_e])[:num_values] + if center: + offset = (self.window_size - 1) // 2 + else: + offset = 0 + + end = np.arange(1 + offset, num_values + 1 + offset).astype("int64") + start = end - self.window_size + + end = np.clip(end, 0, num_values) + start = np.clip(start, 0, num_values) - end_s = np.arange(self.window_size, dtype="int64") + 1 - end_e = start_e + self.window_size - end = np.concatenate([end_s, end_e])[:num_values] return start, end diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 335fc3db5cd86..94de560dbdbcb 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -72,24 +72,6 @@ from pandas.core.internals import Block # noqa:F401 -def calculate_center_offset(window) -> int: - """ - Calculate an offset necessary to have the window label to be centered. - - Parameters - ---------- - window: ndarray or int - window weights or window - - Returns - ------- - int - """ - if not is_integer(window): - window = len(window) - return int((window - 1) / 2.0) - - def calculate_min_periods( window: int, min_periods: Optional[int], @@ -417,18 +399,44 @@ def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"): # insert at the end result[name] = extra_col - def _center_window(self, result: np.ndarray, window) -> np.ndarray: + def calculate_center_offset(self, window, center: bool) -> int: + """ + Calculate an offset necessary to have the window label to be centered. + + Parameters + ---------- + window : ndarray or int + window weights or window + center : bool + Set the labels at the center of the window. + + Returns + ------- + int + """ + if not center: + return 0 + + if self.is_freq_type or isinstance(self.window, BaseIndexer): + return 0 + + if not is_integer(window): + window = len(window) + return int((window - 1) / 2.0) + + def _center_window(self, result: np.ndarray, window, center) -> np.ndarray: """ Center the result in the window. """ if self.axis > result.ndim - 1: raise ValueError("Requested axis is larger then no. of argument dimensions") - offset = calculate_center_offset(window) + offset = self.calculate_center_offset(window, center) if offset > 0: lead_indexer = [slice(None)] * result.ndim lead_indexer[self.axis] = slice(offset, None) result = np.copy(result[tuple(lead_indexer)]) + return result def _get_roll_func(self, func_name: str) -> Callable: @@ -524,6 +532,7 @@ def _apply( is_weighted: bool = False, name: Optional[str] = None, use_numba_cache: bool = False, + skip_offset: bool = False, **kwargs, ): """ @@ -543,6 +552,8 @@ def _apply( use_numba_cache : bool whether to cache a numba compiled function. Only available for numba enabled methods (so far only apply) + skip_offset : bool + whether to skip offsetting x **kwargs additional arguments for rolling function and window function @@ -560,7 +571,11 @@ def homogeneous_func(values: np.ndarray): if values.size == 0: return values.copy() - offset = calculate_center_offset(window) if center else 0 + if skip_offset: + offset = 0 + else: + offset = self.calculate_center_offset(window, center) + additional_nans = np.array([np.nan] * offset) if not is_weighted: @@ -603,8 +618,8 @@ def calc(x): if use_numba_cache: NUMBA_FUNC_CACHE[(kwargs["original_func"], "rolling_apply")] = func - if center: - result = self._center_window(result, window) + if not skip_offset: + result = self._center_window(result, window, center) return result @@ -1189,7 +1204,7 @@ def sum(self, *args, **kwargs): window_func = self._get_roll_func("roll_weighted_sum") window_func = get_weighted_roll_func(window_func) return self._apply( - window_func, center=self.center, is_weighted=True, name="sum", **kwargs + window_func, center=self.center, is_weighted=True, name="sum", **kwargs, ) @Substitution(name="window") @@ -1210,7 +1225,7 @@ def var(self, ddof=1, *args, **kwargs): window_func = get_weighted_roll_func(window_func) kwargs.pop("name", None) return self._apply( - window_func, center=self.center, is_weighted=True, name="var", **kwargs + window_func, center=self.center, is_weighted=True, name="var", **kwargs, ) @Substitution(name="window", versionadded="\n.. versionadded:: 1.0.0\n") @@ -1388,7 +1403,8 @@ def apply( # Cython apply functions handle center, so don't need to use # _apply's center handling window = self._get_window() - offset = calculate_center_offset(window) if self.center else 0 + + offset = self.calculate_center_offset(window, self.center) apply_func = self._generate_cython_apply_func( args, kwargs, raw, offset, func ) @@ -1406,19 +1422,17 @@ def apply( raw=raw, original_func=func, args=args, + skip_offset=True, kwargs=kwargs, ) def _generate_cython_apply_func(self, args, kwargs, raw, offset, func): from pandas import Series + cython_func = self._get_cython_func_type("roll_generic") + window_func = partial( - self._get_cython_func_type("roll_generic"), - args=args, - kwargs=kwargs, - raw=raw, - offset=offset, - func=func, + cython_func, args=args, kwargs=kwargs, raw=raw, offset=offset, func=func, ) def apply_func(values, begin, end, min_periods, raw=raw): @@ -1433,7 +1447,7 @@ def sum(self, *args, **kwargs): window_func = self._get_cython_func_type("roll_sum") kwargs.pop("floor", None) return self._apply( - window_func, center=self.center, floor=0, name="sum", **kwargs + window_func, center=self.center, floor=0, name="sum", **kwargs, ) _shared_docs["max"] = dedent( @@ -1540,7 +1554,9 @@ def median(self, **kwargs): window_func = self._get_roll_func("roll_median_c") # GH 32865. Move max window size calculation to # the median function implementation - return self._apply(window_func, center=self.center, name="median", **kwargs) + return self._apply( + window_func, center=self.center, name="median", skip_offset=True, **kwargs + ) def std(self, ddof=1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) @@ -1563,7 +1579,8 @@ def zsqrt_func(values, begin, end, min_periods): def var(self, ddof=1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) kwargs.pop("require_min_periods", None) - window_func = partial(self._get_cython_func_type("roll_var"), ddof=ddof) + cython_func = self._get_cython_func_type("roll_var") + window_func = partial(cython_func, ddof=ddof) # ddof passed again for compat with groupby.rolling return self._apply( window_func, @@ -1696,8 +1713,10 @@ def kurt(self, **kwargs): def quantile(self, quantile, interpolation="linear", **kwargs): if quantile == 1.0: window_func = self._get_cython_func_type("roll_max") + skip_offset = False elif quantile == 0.0: window_func = self._get_cython_func_type("roll_min") + skip_offset = False else: window_func = partial( self._get_roll_func("roll_quantile"), @@ -1705,11 +1724,18 @@ def quantile(self, quantile, interpolation="linear", **kwargs): quantile=quantile, interpolation=interpolation, ) + skip_offset = True # Pass through for groupby.rolling kwargs["quantile"] = quantile kwargs["interpolation"] = interpolation - return self._apply(window_func, center=self.center, name="quantile", **kwargs) + return self._apply( + window_func, + center=self.center, + name="quantile", + skip_offset=skip_offset, + **kwargs, + ) _shared_docs[ "cov" @@ -2189,6 +2215,7 @@ def _apply( is_weighted: bool = False, name: Optional[str] = None, use_numba_cache: bool = False, + skip_offset: bool = True, **kwargs, ): result = Rolling._apply( @@ -2200,6 +2227,7 @@ def _apply( is_weighted, name, use_numba_cache, + skip_offset, **kwargs, ) # Cannot use _wrap_outputs because we calculate the result all at once @@ -2243,6 +2271,31 @@ def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: obj = obj.take(groupby_order) return super()._create_data(obj) + def calculate_center_offset(self, window, center: bool) -> int: + """ + Calculate an offset necessary to have the window label to be centered. + + Parameters + ---------- + window : ndarray or int + window weights or window + center : bool + Set the labels at the center of the window. + + Returns + ------- + int + """ + if not center or not self.win_type: + return 0 + + if self.is_freq_type or isinstance(self.window, BaseIndexer): + return 0 + + if not is_integer(window): + window = len(window) + return int((window - 1) / 2.0) + def _get_cython_func_type(self, func: str) -> Callable: """ Return the cython function type. diff --git a/pandas/tests/window/moments/test_moments_consistency_rolling.py b/pandas/tests/window/moments/test_moments_consistency_rolling.py index dfcbdde466d44..42c3d02089c6f 100644 --- a/pandas/tests/window/moments/test_moments_consistency_rolling.py +++ b/pandas/tests/window/moments/test_moments_consistency_rolling.py @@ -136,6 +136,53 @@ def test_rolling_apply_consistency( tm.assert_equal(rolling_f_result, rolling_apply_f_result) +@pytest.mark.parametrize( + "window,min_periods,center", list(_rolling_consistency_cases()) +) +def test_rolling_groupby(base_functions, window, min_periods, center): + base_df = DataFrame({"group": "A", "data": randn(20)}) + + b_df = base_df.copy() + b_df["group"] = "B" + + grp_df = pd.concat([base_df, b_df]).groupby("group") + + for (f, require_min_periods, name) in base_functions: + if ( + require_min_periods + and (min_periods is not None) + and (min_periods < require_min_periods) + ): + continue + + base_rolling_f = getattr( + base_df[["data"]].rolling( + window=window, center=center, min_periods=min_periods + ), + name, + ) + + grp_rolling_f = getattr( + grp_df[["data"]].rolling( + window=window, center=center, min_periods=min_periods + ), + name, + ) + + base_result = base_rolling_f().reset_index(drop=True) + grp_result = grp_rolling_f().reset_index() + + a_result = grp_result[grp_result["group"] == "A"][["data"]].reset_index( + drop=True + ) + b_result = grp_result[grp_result["group"] == "B"][["data"]].reset_index( + drop=True + ) + + tm.assert_frame_equal(base_result, a_result) + tm.assert_frame_equal(base_result, b_result) + + @pytest.mark.parametrize("window", range(7)) def test_rolling_corr_with_zero_variance(window): # GH 18430 diff --git a/pandas/tests/window/test_grouper.py b/pandas/tests/window/test_grouper.py index 786cf68d28871..20797095a5692 100644 --- a/pandas/tests/window/test_grouper.py +++ b/pandas/tests/window/test_grouper.py @@ -274,6 +274,32 @@ def test_groupby_rolling_center_center(self): ) tm.assert_frame_equal(result, expected) + @pytest.mark.parametrize("min_periods", [5, 4, 3]) + def test_groupby_rolling_center_min_periods(self, min_periods): + df = pd.DataFrame({"group": ["A"] * 10 + ["B"] * 10, "data": range(20)}) + + window_size = 5 + result = ( + df.groupby("group") + .rolling(window_size, center=True, min_periods=min_periods) + .mean() + ) + result = result.reset_index()[["group", "data"]] + + grp_A_mean = [1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 7.5, 8.0] + grp_B_mean = [x + 10.0 for x in grp_A_mean] + + num_nans = max(0, min_periods - 3) # For window_size of 5 + nans = [np.nan] * num_nans + grp_A_expected = nans + grp_A_mean[num_nans : 10 - num_nans] + nans + grp_B_expected = nans + grp_B_mean[num_nans : 10 - num_nans] + nans + + expected = pd.DataFrame( + {"group": ["A"] * 10 + ["B"] * 10, "data": grp_A_expected + grp_B_expected} + ) + + tm.assert_frame_equal(result, expected) + def test_groupby_subselect_rolling(self): # GH 35486 df = DataFrame(