diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index 5f8cdb2a0bdac..5738775fe2b27 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -140,8 +140,11 @@ class Pairwise: def setup(self, window, method, pairwise): N = 10 ** 4 + n_groups = 20 + groups = [i for _ in range(N // n_groups) for i in range(n_groups)] arr = np.random.random(N) self.df = pd.DataFrame(arr) + self.df_group = pd.DataFrame({"A": groups, "B": arr}).groupby("A") def time_pairwise(self, window, method, pairwise): if window is None: @@ -150,6 +153,13 @@ def time_pairwise(self, window, method, pairwise): r = self.df.rolling(window=window) getattr(r, method)(self.df, pairwise=pairwise) + def time_groupby(self, window, method, pairwise): + if window is None: + r = self.df_group.expanding() + else: + r = self.df_group.rolling(window=window) + getattr(r, method)(self.df, pairwise=pairwise) + class Quantile: params = ( diff --git a/doc/source/whatsnew/v1.3.0.rst b/doc/source/whatsnew/v1.3.0.rst index 4d0384abbf0c6..251fbf538ca92 100644 --- a/doc/source/whatsnew/v1.3.0.rst +++ b/doc/source/whatsnew/v1.3.0.rst @@ -249,7 +249,8 @@ Performance improvements - Performance improvement in :meth:`Series.mean` for nullable data types (:issue:`34814`) - Performance improvement in :meth:`Series.isin` for nullable data types (:issue:`38340`) - Performance improvement in :meth:`DataFrame.corr` for method=kendall (:issue:`28329`) -- Performance improvement in :meth:`core.window.Rolling.corr` and :meth:`core.window.Rolling.cov` (:issue:`39388`) +- Performance improvement in :meth:`core.window.rolling.Rolling.corr` and :meth:`core.window.rolling.Rolling.cov` (:issue:`39388`) +- Performance improvement in :meth:`core.window.rolling.RollingGroupby.corr`, :meth:`core.window.expanding.ExpandingGroupby.corr`, :meth:`core.window.expanding.ExpandingGroupby.corr` and :meth:`core.window.expanding.ExpandingGroupby.cov` (:issue:`39591`) .. --------------------------------------------------------------------------- @@ -406,6 +407,8 @@ Groupby/resample/rolling - Bug in :meth:`.Resampler.aggregate` and :meth:`DataFrame.transform` raising ``TypeError`` instead of ``SpecificationError`` when missing keys had mixed dtypes (:issue:`39025`) - Bug in :meth:`.DataFrameGroupBy.idxmin` and :meth:`.DataFrameGroupBy.idxmax` with ``ExtensionDtype`` columns (:issue:`38733`) - Bug in :meth:`Series.resample` would raise when the index was a :class:`PeriodIndex` consisting of ``NaT`` (:issue:`39227`) +- Bug in :meth:`core.window.rolling.RollingGroupby.corr` and :meth:`core.window.expanding.ExpandingGroupby.corr` where the groupby column would return 0 instead of ``np.nan`` when providing ``other`` that was longer than each group (:issue:`39591`) +- Bug in :meth:`core.window.expanding.ExpandingGroupby.corr` and :meth:`core.window.expanding.ExpandingGroupby.cov` where 1 would be returned instead of ``np.nan`` when providing ``other`` that was longer than each group (:issue:`39591`) Reshaping ^^^^^^^^^ diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 2948216e200de..e02555fb1e990 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -3,14 +3,14 @@ import datetime from functools import partial from textwrap import dedent -from typing import TYPE_CHECKING, Optional, Union +from typing import Optional, Union import warnings import numpy as np from pandas._libs.tslibs import Timedelta import pandas._libs.window.aggregations as window_aggregations -from pandas._typing import FrameOrSeries, TimedeltaConvertibleTypes +from pandas._typing import FrameOrSeries, FrameOrSeriesUnion, TimedeltaConvertibleTypes from pandas.compat.numpy import function as nv from pandas.util._decorators import doc @@ -19,7 +19,7 @@ import pandas.core.common as common from pandas.core.util.numba_ import maybe_use_numba -from pandas.core.window.common import flex_binary_moment, zsqrt +from pandas.core.window.common import zsqrt from pandas.core.window.doc import ( _shared_docs, args_compat, @@ -35,10 +35,7 @@ GroupbyIndexer, ) from pandas.core.window.numba_ import generate_numba_groupby_ewma_func -from pandas.core.window.rolling import BaseWindow, BaseWindowGroupby, dispatch - -if TYPE_CHECKING: - from pandas import Series +from pandas.core.window.rolling import BaseWindow, BaseWindowGroupby def get_center_of_mass( @@ -74,13 +71,20 @@ def get_center_of_mass( return float(comass) -def wrap_result(obj: Series, result: np.ndarray) -> Series: +def dispatch(name: str, *args, **kwargs): """ - Wrap a single 1D result. + Dispatch to groupby apply. """ - obj = obj._selected_obj - return obj._constructor(result, obj.index, name=obj.name) + def outer(self, *args, **kwargs): + def f(x): + x = self._shallow_copy(x, groupby=self._groupby) + return getattr(x, name)(*args, **kwargs) + + return self._groupby.apply(f) + + outer.__name__ = name + return outer class ExponentialMovingWindow(BaseWindow): @@ -443,36 +447,30 @@ def var_func(values, begin, end, min_periods): ) def cov( self, - other: Optional[Union[np.ndarray, FrameOrSeries]] = None, + other: Optional[FrameOrSeriesUnion] = None, pairwise: Optional[bool] = None, bias: bool = False, **kwargs, ): - if other is None: - other = self._selected_obj - # only default unset - pairwise = True if pairwise is None else pairwise - other = self._shallow_copy(other) - - def _get_cov(X, Y): - X = self._shallow_copy(X) - Y = self._shallow_copy(Y) - cov = window_aggregations.ewmcov( - X._prep_values(), + from pandas import Series + + def cov_func(x, y): + x_array = self._prep_values(x) + y_array = self._prep_values(y) + result = window_aggregations.ewmcov( + x_array, np.array([0], dtype=np.int64), np.array([0], dtype=np.int64), self.min_periods, - Y._prep_values(), + y_array, self.com, self.adjust, self.ignore_na, bias, ) - return wrap_result(X, cov) + return Series(result, index=x.index, name=x.name) - return flex_binary_moment( - self._selected_obj, other._selected_obj, _get_cov, pairwise=bool(pairwise) - ) + return self._apply_pairwise(self._selected_obj, other, pairwise, cov_func) @doc( template_header, @@ -502,45 +500,37 @@ def _get_cov(X, Y): ) def corr( self, - other: Optional[Union[np.ndarray, FrameOrSeries]] = None, + other: Optional[FrameOrSeriesUnion] = None, pairwise: Optional[bool] = None, **kwargs, ): - if other is None: - other = self._selected_obj - # only default unset - pairwise = True if pairwise is None else pairwise - other = self._shallow_copy(other) + from pandas import Series - def _get_corr(X, Y): - X = self._shallow_copy(X) - Y = self._shallow_copy(Y) + def cov_func(x, y): + x_array = self._prep_values(x) + y_array = self._prep_values(y) - def _cov(x, y): + def _cov(X, Y): return window_aggregations.ewmcov( - x, + X, np.array([0], dtype=np.int64), np.array([0], dtype=np.int64), self.min_periods, - y, + Y, self.com, self.adjust, self.ignore_na, 1, ) - x_values = X._prep_values() - y_values = Y._prep_values() with np.errstate(all="ignore"): - cov = _cov(x_values, y_values) - x_var = _cov(x_values, x_values) - y_var = _cov(y_values, y_values) - corr = cov / zsqrt(x_var * y_var) - return wrap_result(X, corr) - - return flex_binary_moment( - self._selected_obj, other._selected_obj, _get_corr, pairwise=bool(pairwise) - ) + cov = _cov(x_array, y_array) + x_var = _cov(x_array, x_array) + y_var = _cov(y_array, y_array) + result = cov / zsqrt(x_var * y_var) + return Series(result, index=x.index, name=x.name) + + return self._apply_pairwise(self._selected_obj, other, pairwise, cov_func) class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow): diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index f3f697bf7309d..b35d62e484280 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -48,11 +48,14 @@ ) from pandas.core.dtypes.missing import notna +from pandas.core.algorithms import factorize from pandas.core.apply import ResamplerWindowApply from pandas.core.base import DataError, SelectionMixin +import pandas.core.common as common from pandas.core.construction import extract_array from pandas.core.groupby.base import GotItemMixin, ShallowMixin from pandas.core.indexes.api import Index, MultiIndex +from pandas.core.reshape.concat import concat from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba from pandas.core.window.common import flex_binary_moment, zsqrt from pandas.core.window.doc import ( @@ -406,6 +409,9 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike: def _apply_tablewise( self, homogeneous_func: Callable[..., ArrayLike], name: Optional[str] = None ) -> FrameOrSeriesUnion: + """ + Apply the given function to the DataFrame across the entire object + """ if self._selected_obj.ndim == 1: raise ValueError("method='table' not applicable for Series objects.") obj = self._create_data(self._selected_obj) @@ -423,6 +429,23 @@ def _apply_tablewise( self._insert_on_column(out, obj) return out + def _apply_pairwise( + self, + target: FrameOrSeriesUnion, + other: Optional[FrameOrSeriesUnion], + pairwise: Optional[bool], + func: Callable[[FrameOrSeriesUnion, FrameOrSeriesUnion], FrameOrSeriesUnion], + ) -> FrameOrSeriesUnion: + """ + Apply the given pairwise function given 2 pandas objects (DataFrame/Series) + """ + if other is None: + other = target + # only default unset + pairwise = True if pairwise is None else pairwise + + return flex_binary_moment(target, other, func, pairwise=bool(pairwise)) + def _apply( self, func: Callable[..., Any], @@ -495,22 +518,6 @@ def aggregate(self, func, *args, **kwargs): agg = aggregate -def dispatch(name: str, *args, **kwargs): - """ - Dispatch to groupby apply. - """ - - def outer(self, *args, **kwargs): - def f(x): - x = self._shallow_copy(x, groupby=self._groupby) - return getattr(x, name)(*args, **kwargs) - - return self._groupby.apply(f) - - outer.__name__ = name - return outer - - class BaseWindowGroupby(GotItemMixin, BaseWindow): """ Provide the groupby windowing facilities. @@ -526,9 +533,6 @@ def __init__(self, obj, *args, **kwargs): self._groupby.grouper.mutated = True super().__init__(obj, *args, **kwargs) - corr = dispatch("corr", other=None, pairwise=None) - cov = dispatch("cov", other=None, pairwise=None) - def _apply( self, func: Callable[..., Any], @@ -591,6 +595,85 @@ def _apply( result.index = result_index return result + def _apply_pairwise( + self, + target: FrameOrSeriesUnion, + other: Optional[FrameOrSeriesUnion], + pairwise: Optional[bool], + func: Callable[[FrameOrSeriesUnion, FrameOrSeriesUnion], FrameOrSeriesUnion], + ) -> FrameOrSeriesUnion: + """ + Apply the given pairwise function given 2 pandas objects (DataFrame/Series) + """ + # Manually drop the grouping column first + target = target.drop(columns=self._groupby.grouper.names, errors="ignore") + result = super()._apply_pairwise(target, other, pairwise, func) + # 1) Determine the levels + codes of the groupby levels + if other is not None: + # When we have other, we must reindex (expand) the result + # from flex_binary_moment to a "transform"-like result + # per groupby combination + old_result_len = len(result) + result = concat( + [ + result.take(gb_indices).reindex(result.index) + for gb_indices in self._groupby.indices.values() + ] + ) + + gb_pairs = ( + common.maybe_make_list(pair) for pair in self._groupby.indices.keys() + ) + groupby_codes = [] + groupby_levels = [] + # e.g. [[1, 2], [4, 5]] as [[1, 4], [2, 5]] + for gb_level_pair in map(list, zip(*gb_pairs)): + labels = np.repeat(np.array(gb_level_pair), old_result_len) + codes, levels = factorize(labels) + groupby_codes.append(codes) + groupby_levels.append(levels) + + else: + # When we evaluate the pairwise=True result, repeat the groupby + # labels by the number of columns in the original object + groupby_codes = self._groupby.grouper.codes + groupby_levels = self._groupby.grouper.levels + + group_indices = self._groupby.grouper.indices.values() + if group_indices: + indexer = np.concatenate(list(group_indices)) + else: + indexer = np.array([], dtype=np.intp) + + if target.ndim == 1: + repeat_by = 1 + else: + repeat_by = len(target.columns) + groupby_codes = [ + np.repeat(c.take(indexer), repeat_by) for c in groupby_codes + ] + # 2) Determine the levels + codes of the result from super()._apply_pairwise + if isinstance(result.index, MultiIndex): + result_codes = list(result.index.codes) + result_levels = list(result.index.levels) + result_names = list(result.index.names) + else: + idx_codes, idx_levels = factorize(result.index) + result_codes = [idx_codes] + result_levels = [idx_levels] + result_names = [result.index.name] + + # 3) Create the resulting index by combining 1) + 2) + result_codes = groupby_codes + result_codes + result_levels = groupby_levels + result_levels + result_names = self._groupby.grouper.names + result_names + + result_index = MultiIndex( + result_levels, result_codes, names=result_names, verify_integrity=False + ) + result.index = result_index + return result + def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: """ Split data into blocks & return conformed data. @@ -1205,11 +1288,6 @@ def quantile(self, quantile: float, interpolation: str = "linear", **kwargs): return self._apply(window_func, name="quantile", **kwargs) def cov(self, other=None, pairwise=None, ddof=1, **kwargs): - if other is None: - other = self._selected_obj - # only default unset - pairwise = True if pairwise is None else pairwise - from pandas import Series def cov_func(x, y): @@ -1239,15 +1317,9 @@ def cov_func(x, y): result = (mean_x_y - mean_x * mean_y) * (count_x_y / (count_x_y - ddof)) return Series(result, index=x.index, name=x.name) - return flex_binary_moment( - self._selected_obj, other, cov_func, pairwise=bool(pairwise) - ) + return self._apply_pairwise(self._selected_obj, other, pairwise, cov_func) def corr(self, other=None, pairwise=None, ddof=1, **kwargs): - if other is None: - other = self._selected_obj - # only default unset - pairwise = True if pairwise is None else pairwise from pandas import Series @@ -1288,9 +1360,7 @@ def corr_func(x, y): result = numerator / denominator return Series(result, index=x.index, name=x.name) - return flex_binary_moment( - self._selected_obj, other, corr_func, pairwise=bool(pairwise) - ) + return self._apply_pairwise(self._selected_obj, other, pairwise, corr_func) class Rolling(RollingAndExpandingMixin): diff --git a/pandas/tests/window/test_groupby.py b/pandas/tests/window/test_groupby.py index b89fb35ac3a70..d3c2b5467e5bb 100644 --- a/pandas/tests/window/test_groupby.py +++ b/pandas/tests/window/test_groupby.py @@ -117,6 +117,9 @@ def func(x): return getattr(x.rolling(4), f)(self.frame) expected = g.apply(func) + # GH 39591: The grouped column should be all np.nan + # (groupby.apply inserts 0s for cov) + expected["A"] = np.nan tm.assert_frame_equal(result, expected) result = getattr(r.B, f)(pairwise=True) @@ -688,6 +691,13 @@ def func(x): return getattr(x.expanding(), f)(self.frame) expected = g.apply(func) + # GH 39591: groupby.apply returns 1 instead of nan for windows + # with all nan values + null_idx = list(range(20, 61)) + list(range(72, 113)) + expected.iloc[null_idx, 1] = np.nan + # GH 39591: The grouped column should be all np.nan + # (groupby.apply inserts 0s for cov) + expected["A"] = np.nan tm.assert_frame_equal(result, expected) result = getattr(r.B, f)(pairwise=True)