Skip to content

PERF: Rolling/Expanding.cov/corr #39591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions asv_bench/benchmarks/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ class Pairwise:

def setup(self, window, method, pairwise):
N = 10 ** 4
n_groups = 20
groups = [i for _ in range(N // n_groups) for i in range(n_groups)]
arr = np.random.random(N)
self.df = pd.DataFrame(arr)
self.df_group = pd.DataFrame({"A": groups, "B": arr}).groupby("A")

def time_pairwise(self, window, method, pairwise):
if window is None:
Expand All @@ -150,6 +153,13 @@ def time_pairwise(self, window, method, pairwise):
r = self.df.rolling(window=window)
getattr(r, method)(self.df, pairwise=pairwise)

def time_groupby(self, window, method, pairwise):
if window is None:
r = self.df_group.expanding()
else:
r = self.df_group.rolling(window=window)
getattr(r, method)(self.df, pairwise=pairwise)


class Quantile:
params = (
Expand Down
5 changes: 4 additions & 1 deletion doc/source/whatsnew/v1.3.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ Performance improvements
- Performance improvement in :meth:`Series.mean` for nullable data types (:issue:`34814`)
- Performance improvement in :meth:`Series.isin` for nullable data types (:issue:`38340`)
- Performance improvement in :meth:`DataFrame.corr` for method=kendall (:issue:`28329`)
- Performance improvement in :meth:`core.window.Rolling.corr` and :meth:`core.window.Rolling.cov` (:issue:`39388`)
- Performance improvement in :meth:`core.window.rolling.Rolling.corr` and :meth:`core.window.rolling.Rolling.cov` (:issue:`39388`)
- Performance improvement in :meth:`core.window.rolling.RollingGroupby.corr`, :meth:`core.window.expanding.ExpandingGroupby.corr`, :meth:`core.window.expanding.ExpandingGroupby.corr` and :meth:`core.window.expanding.ExpandingGroupby.cov` (:issue:`39591`)

.. ---------------------------------------------------------------------------

Expand Down Expand Up @@ -406,6 +407,8 @@ Groupby/resample/rolling
- Bug in :meth:`.Resampler.aggregate` and :meth:`DataFrame.transform` raising ``TypeError`` instead of ``SpecificationError`` when missing keys had mixed dtypes (:issue:`39025`)
- Bug in :meth:`.DataFrameGroupBy.idxmin` and :meth:`.DataFrameGroupBy.idxmax` with ``ExtensionDtype`` columns (:issue:`38733`)
- Bug in :meth:`Series.resample` would raise when the index was a :class:`PeriodIndex` consisting of ``NaT`` (:issue:`39227`)
- Bug in :meth:`core.window.rolling.RollingGroupby.corr` and :meth:`core.window.expanding.ExpandingGroupby.corr` where the groupby column would return 0 instead of ``np.nan`` when providing ``other`` that was longer than each group (:issue:`39591`)
- Bug in :meth:`core.window.expanding.ExpandingGroupby.corr` and :meth:`core.window.expanding.ExpandingGroupby.cov` where 1 would be returned instead of ``np.nan`` when providing ``other`` that was longer than each group (:issue:`39591`)

Reshaping
^^^^^^^^^
Expand Down
104 changes: 86 additions & 18 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@
)
from pandas.core.dtypes.missing import notna

from pandas.core.algorithms import factorize
from pandas.core.apply import ResamplerWindowApply
from pandas.core.base import DataError, SelectionMixin
import pandas.core.common as common
from pandas.core.construction import extract_array
from pandas.core.groupby.base import GotItemMixin, ShallowMixin
from pandas.core.indexes.api import Index, MultiIndex
from pandas.core.reshape.concat import concat
from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba
from pandas.core.window.common import flex_binary_moment, zsqrt
from pandas.core.window.doc import (
Expand Down Expand Up @@ -423,6 +426,14 @@ def _apply_tablewise(
self._insert_on_column(out, obj)
return out

def _apply_pairwise(self, target, other, pairwise, func):
if other is None:
other = target
# only default unset
pairwise = True if pairwise is None else pairwise

return flex_binary_moment(target, other, func, pairwise=bool(pairwise))

def _apply(
self,
func: Callable[..., Any],
Expand Down Expand Up @@ -526,9 +537,6 @@ def __init__(self, obj, *args, **kwargs):
self._groupby.grouper.mutated = True
super().__init__(obj, *args, **kwargs)

corr = dispatch("corr", other=None, pairwise=None)
cov = dispatch("cov", other=None, pairwise=None)

def _apply(
self,
func: Callable[..., Any],
Expand Down Expand Up @@ -591,6 +599,79 @@ def _apply(
result.index = result_index
return result

def _apply_pairwise(self, target, other, pairwise, func):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow. can you type the input / output args and add a doc-string.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (a testament to how complex groupby apply is)

# Manually drop the grouping column first
target = target.drop(columns=self._groupby.grouper.names, errors="ignore")
result = super()._apply_pairwise(target, other, pairwise, func)
# Modify the resulting index to include the groupby level
if other is not None:
# When we have other, we must reindex (expand) the result
# from flex_binary_moment to a "transform"-like result
# per groupby combination
old_result_len = len(result)
result = concat(
[
result.take(gb_indices).reindex(result.index)
for gb_indices in self._groupby.indices.values()
]
)

gb_pairs = (
common.maybe_make_list(pair) for pair in self._groupby.indices.keys()
)
groupby_codes = []
groupby_levels = []
# e.g. [[1, 2], [4, 5]] as [[1, 4], [2, 5]]
for gb_level_pair in map(list, zip(*gb_pairs)):
labels = np.repeat(np.array(gb_level_pair), old_result_len)
codes, levels = factorize(labels)
groupby_codes.append(codes)
groupby_levels.append(levels)

result_codes, result_levels = factorize(result.index)
result_codes = groupby_codes + [result_codes]
result_levels = groupby_levels + [result_levels]
result_names = self._groupby.grouper.names + [result.index.name]

else:
# When we evaluate the pairwise=True result, repeat the groupby
# labels by the number of columns in the original object
groupby_codes = self._groupby.grouper.codes
groupby_levels = self._groupby.grouper.levels

group_indices = self._groupby.grouper.indices.values()
if group_indices:
indexer = np.concatenate(list(group_indices))
else:
indexer = np.array([], dtype=np.intp)

if target.ndim == 1:
repeat_by = 1
else:
repeat_by = len(target.columns)
groupby_codes = [
np.repeat(c.take(indexer), repeat_by) for c in groupby_codes
]
if isinstance(result.index, MultiIndex):
result_codes = list(result.index.codes)
result_levels = list(result.index.levels)
result_names = list(result.index.names)
else:
result_codes, result_levels = factorize(result.index)
result_codes = [result_codes]
result_levels = [result_levels]
result_names = [result.index.name]

result_codes = groupby_codes + result_codes
result_levels = groupby_levels + result_levels
result_names = self._groupby.grouper.names + result_names

result_index = MultiIndex(
result_levels, result_codes, names=result_names, verify_integrity=False
)
result.index = result_index
return result

def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries:
"""
Split data into blocks & return conformed data.
Expand Down Expand Up @@ -1205,11 +1286,6 @@ def quantile(self, quantile: float, interpolation: str = "linear", **kwargs):
return self._apply(window_func, name="quantile", **kwargs)

def cov(self, other=None, pairwise=None, ddof=1, **kwargs):
if other is None:
other = self._selected_obj
# only default unset
pairwise = True if pairwise is None else pairwise

from pandas import Series

def cov_func(x, y):
Expand Down Expand Up @@ -1239,15 +1315,9 @@ def cov_func(x, y):
result = (mean_x_y - mean_x * mean_y) * (count_x_y / (count_x_y - ddof))
return Series(result, index=x.index, name=x.name)

return flex_binary_moment(
self._selected_obj, other, cov_func, pairwise=bool(pairwise)
)
return self._apply_pairwise(self._selected_obj, other, pairwise, cov_func)

def corr(self, other=None, pairwise=None, ddof=1, **kwargs):
if other is None:
other = self._selected_obj
# only default unset
pairwise = True if pairwise is None else pairwise

from pandas import Series

Expand Down Expand Up @@ -1288,9 +1358,7 @@ def corr_func(x, y):
result = numerator / denominator
return Series(result, index=x.index, name=x.name)

return flex_binary_moment(
self._selected_obj, other, corr_func, pairwise=bool(pairwise)
)
return self._apply_pairwise(self._selected_obj, other, pairwise, corr_func)


class Rolling(RollingAndExpandingMixin):
Expand Down
10 changes: 10 additions & 0 deletions pandas/tests/window/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def func(x):
return getattr(x.rolling(4), f)(self.frame)

expected = g.apply(func)
# GH 39591: The grouped column should be all np.nan
# (groupby.apply inserts 0s for cov)
expected["A"] = np.nan
tm.assert_frame_equal(result, expected)

result = getattr(r.B, f)(pairwise=True)
Expand Down Expand Up @@ -688,6 +691,13 @@ def func(x):
return getattr(x.expanding(), f)(self.frame)

expected = g.apply(func)
# GH 39591: groupby.apply returns 1 instead of nan for windows
# with all nan values
null_idx = list(range(20, 61)) + list(range(72, 113))
expected.iloc[null_idx, 1] = np.nan
# GH 39591: The grouped column should be all np.nan
# (groupby.apply inserts 0s for cov)
expected["A"] = np.nan
tm.assert_frame_equal(result, expected)

result = getattr(r.B, f)(pairwise=True)
Expand Down