Skip to content

PERF: ExpandingGroupby #37064

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c1d6af1
Add groupby expanding indexer
Sep 27, 2020
37cefba
Impliment apply for ExpandingGroupby
Sep 28, 2020
6918c28
Merge remote-tracking branch 'upstream/master' into feature/groupby_e…
Sep 28, 2020
77deb50
Get most tests to work except numerical precision test
Sep 28, 2020
43d2682
Make signature of GroupbyExpanding indexer match base class
Sep 28, 2020
409bda8
Share groupbyindexer implementation
Sep 28, 2020
1146a2f
Merge remote-tracking branch 'upstream/master' into feature/groupby_e…
Sep 28, 2020
a04702e
Merge remote-tracking branch 'upstream/master' into feature/groupby_e…
Sep 29, 2020
542444d
Merge remote-tracking branch 'upstream/master' into feature/groupby_e…
Oct 9, 2020
0dacaa5
Remove need to select variable algorithm
Oct 9, 2020
7683214
Merge remote-tracking branch 'upstream/master' into feature/groupby_e…
Oct 10, 2020
ed211e7
Create new subclass for shared window groupby algorithms
Oct 11, 2020
6885d14
Merge remote-tracking branch 'upstream/master' into feature/groupby_e…
Oct 11, 2020
57471be
Add expanding groupby benchmark
Oct 11, 2020
bff2316
Merge remote-tracking branch 'upstream/master' into feature/groupby_e…
Oct 11, 2020
6af880e
Merge remote-tracking branch 'upstream/master' into feature/groupby_e…
Oct 11, 2020
608e7d6
Add benchmarks
Oct 11, 2020
859541e
Remove unneeded args passed to _apply
Oct 12, 2020
fc43e23
Remove unnecessary poping of kwargs
Oct 12, 2020
bab0a1d
Add whatsnew
Oct 12, 2020
4babf56
Add whatsnew number
Oct 12, 2020
5788af9
Lint
Oct 12, 2020
a0ab03d
Merge remote-tracking branch 'upstream/master' into feature/groupby_e…
Oct 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions asv_bench/benchmarks/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,21 @@ class ExpandingMethods:

def setup(self, constructor, dtype, method):
N = 10 ** 5
N_groupby = 100
arr = (100 * np.random.random(N)).astype(dtype)
self.expanding = getattr(pd, constructor)(arr).expanding()
self.expanding_groupby = (
pd.DataFrame({"A": arr[:N_groupby], "B": range(N_groupby)})
.groupby("B")
.expanding()
)

def time_expanding(self, constructor, dtype, method):
getattr(self.expanding, method)()

def time_expanding_groupby(self, constructor, dtype, method):
getattr(self.expanding_groupby, method)()


class EWMMethods:

Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ Performance improvements
avoiding creating these again, if created on either. This can speed up operations that depend on creating copies of existing indexes (:issue:`36840`)
- Performance improvement in :meth:`RollingGroupby.count` (:issue:`35625`)
- Small performance decrease to :meth:`Rolling.min` and :meth:`Rolling.max` for fixed windows (:issue:`36567`)
- Performance improvement in :class:`ExpandingGroupby` (:issue:`37064`)

.. ---------------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ cdef inline float64_t calc_var(int64_t minp, int ddof, float64_t nobs,
result = ssqdm_x / (nobs - <float64_t>ddof)
# Fix for numerical imprecision.
# Can be result < 0 once Kahan Summation is implemented
if result < 1e-15:
if result < 1e-14:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xref #37051, needed to change for an expanding groupby test

result = 0
else:
result = NaN
Expand Down
65 changes: 0 additions & 65 deletions pandas/core/window/common.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""Common utility functions for rolling operations"""
from collections import defaultdict
from typing import Callable, Optional
import warnings

import numpy as np

from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries

from pandas.core.groupby.base import GotItemMixin
from pandas.core.indexes.api import MultiIndex
from pandas.core.shared_docs import _shared_docs

Expand All @@ -27,69 +25,6 @@
"""


def _dispatch(name: str, *args, **kwargs):
"""
Dispatch to apply.
"""

def outer(self, *args, **kwargs):
def f(x):
x = self._shallow_copy(x, groupby=self._groupby)
return getattr(x, name)(*args, **kwargs)

return self._groupby.apply(f)

outer.__name__ = name
return outer


class WindowGroupByMixin(GotItemMixin):
"""
Provide the groupby facilities.
"""

def __init__(self, obj, *args, **kwargs):
kwargs.pop("parent", None)
groupby = kwargs.pop("groupby", None)
if groupby is None:
groupby, obj = obj, obj._selected_obj
self._groupby = groupby
self._groupby.mutated = True
self._groupby.grouper.mutated = True
super().__init__(obj, *args, **kwargs)

corr = _dispatch("corr", other=None, pairwise=None)
cov = _dispatch("cov", other=None, pairwise=None)

def _apply(
self,
func: Callable,
require_min_periods: int = 0,
floor: int = 1,
is_weighted: bool = False,
name: Optional[str] = None,
use_numba_cache: bool = False,
**kwargs,
):
"""
Dispatch to apply; we are stripping all of the _apply kwargs and
performing the original function call on the grouped object.
"""
kwargs.pop("floor", None)
kwargs.pop("original_func", None)

# TODO: can we de-duplicate with _dispatch?
def f(x, name=name, *args):
x = self._shallow_copy(x)

if isinstance(name, str):
return getattr(x, name)(*args, **kwargs)

return x.apply(name, *args, **kwargs)

return self._groupby.apply(f)


def flex_binary_moment(arg1, arg2, f, pairwise=False):

if not (
Expand Down
28 changes: 22 additions & 6 deletions pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from pandas.compat.numpy import function as nv
from pandas.util._decorators import Appender, Substitution, doc

from pandas.core.window.common import WindowGroupByMixin, _doc_template, _shared_docs
from pandas.core.window.rolling import RollingAndExpandingMixin
from pandas.core.window.common import _doc_template, _shared_docs
from pandas.core.window.indexers import ExpandingIndexer, GroupbyIndexer
from pandas.core.window.rolling import BaseWindowGroupby, RollingAndExpandingMixin


class Expanding(RollingAndExpandingMixin):
Expand Down Expand Up @@ -248,11 +249,26 @@ def corr(self, other=None, pairwise=None, **kwargs):
return super().corr(other=other, pairwise=pairwise, **kwargs)


class ExpandingGroupby(WindowGroupByMixin, Expanding):
class ExpandingGroupby(BaseWindowGroupby, Expanding):
"""
Provide a expanding groupby implementation.
"""

@property
def _constructor(self):
return Expanding
def _get_window_indexer(self, window: int) -> GroupbyIndexer:
"""
Return an indexer class that will compute the window start and end bounds

Parameters
----------
window : int
window size for FixedWindowIndexer (unused)

Returns
-------
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._groupby.indices,
window_indexer=ExpandingIndexer,
)
return window_indexer
30 changes: 21 additions & 9 deletions pandas/core/window/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,26 +259,38 @@ def get_window_bounds(
return start, end


class GroupbyRollingIndexer(BaseIndexer):
class GroupbyIndexer(BaseIndexer):
"""Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""

def __init__(
self,
index_array: Optional[np.ndarray],
window_size: int,
groupby_indicies: Dict,
rolling_indexer: Type[BaseIndexer],
indexer_kwargs: Optional[Dict],
index_array: Optional[np.ndarray] = None,
window_size: int = 0,
groupby_indicies: Optional[Dict] = None,
window_indexer: Type[BaseIndexer] = BaseIndexer,
indexer_kwargs: Optional[Dict] = None,
**kwargs,
):
"""
Parameters
----------
index_array : np.ndarray or None
np.ndarray of the index of the original object that we are performing
a chained groupby operation over. This index has been pre-sorted relative to
the groups
window_size : int
window size during the windowing operation
groupby_indicies : dict or None
dict of {group label: [positional index of rows belonging to the group]}
window_indexer : BaseIndexer
BaseIndexer class determining the start and end bounds of each group
indexer_kwargs : dict or None
Custom kwargs to be passed to window_indexer
**kwargs :
keyword arguments that will be available when get_window_bounds is called
"""
self.groupby_indicies = groupby_indicies
self.rolling_indexer = rolling_indexer
self.groupby_indicies = groupby_indicies or {}
self.window_indexer = window_indexer
self.indexer_kwargs = indexer_kwargs or {}
super().__init__(
index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs
Expand All @@ -303,7 +315,7 @@ def get_window_bounds(
index_array = self.index_array.take(ensure_platform_int(indices))
else:
index_array = self.index_array
indexer = self.rolling_indexer(
indexer = self.window_indexer(
index_array=index_array,
window_size=self.window_size,
**self.indexer_kwargs,
Expand Down
Loading