Skip to content

Commit bad52a9

Browse files
authored
PERF: Use Indexers to implement groupby rolling (#34052)
1 parent a248f72 commit bad52a9

File tree

5 files changed

+188
-11
lines changed

5 files changed

+188
-11
lines changed

asv_bench/benchmarks/rolling.py

+23
Original file line numberDiff line numberDiff line change
@@ -186,4 +186,27 @@ def peakmem_rolling(self, constructor, window_size, dtype, method):
186186
getattr(self.roll, method)()
187187

188188

189+
class Groupby:
190+
191+
params = ["sum", "median", "mean", "max", "min", "kurt", "sum"]
192+
193+
def setup(self, method):
194+
N = 1000
195+
df = pd.DataFrame(
196+
{
197+
"A": [str(i) for i in range(N)] * 10,
198+
"B": list(range(N)) * 10,
199+
"C": pd.date_range(start="1900-01-01", freq="1min", periods=N * 10),
200+
}
201+
)
202+
self.groupby_roll_int = df.groupby("A").rolling(window=2)
203+
self.groupby_roll_offset = df.groupby("A").rolling(window="30s", on="C")
204+
205+
def time_rolling_int(self, method):
206+
getattr(self.groupby_roll_int, method)()
207+
208+
def time_rolling_offset(self, method):
209+
getattr(self.groupby_roll_offset, method)()
210+
211+
189212
from .pandas_vb_common import setup # noqa: F401 isort:skip

doc/source/whatsnew/v1.1.0.rst

+1
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ Performance improvements
619619
- Performance improvement in :func:`factorize` for nullable (integer and boolean) dtypes (:issue:`33064`).
620620
- Performance improvement in reductions (sum, prod, min, max) for nullable (integer and boolean) dtypes (:issue:`30982`, :issue:`33261`, :issue:`33442`).
621621
- Performance improvement in arithmetic operations between two :class:`DataFrame` objects (:issue:`32779`)
622+
- Performance improvement in :class:`pandas.core.groupby.RollingGroupby` (:issue:`34052`)
622623

623624
.. ---------------------------------------------------------------------------
624625

pandas/core/window/ewm.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def _apply(self, func, **kwargs):
232232
-------
233233
y : same type as input argument
234234
"""
235-
blocks, obj = self._create_blocks()
235+
blocks, obj = self._create_blocks(self._selected_obj)
236236
block_list = list(blocks)
237237

238238
results = []

pandas/core/window/indexers.py

+64-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""Indexer objects for computing start/end window bounds for rolling operations"""
2-
from typing import Optional, Tuple
2+
from typing import Dict, Optional, Tuple, Type, Union
33

44
import numpy as np
55

@@ -170,3 +170,66 @@ def get_window_bounds(
170170
end = np.concatenate([end_s, end_e])
171171

172172
return start, end
173+
174+
175+
class GroupbyRollingIndexer(BaseIndexer):
176+
"""Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""
177+
178+
def __init__(
179+
self,
180+
index_array: Optional[np.ndarray],
181+
window_size: int,
182+
groupby_indicies: Dict,
183+
rolling_indexer: Union[Type[FixedWindowIndexer], Type[VariableWindowIndexer]],
184+
**kwargs,
185+
):
186+
"""
187+
Parameters
188+
----------
189+
**kwargs :
190+
keyword arguments that will be available when get_window_bounds is called
191+
"""
192+
self.groupby_indicies = groupby_indicies
193+
self.rolling_indexer = rolling_indexer
194+
super().__init__(index_array, window_size, **kwargs)
195+
196+
@Appender(get_window_bounds_doc)
197+
def get_window_bounds(
198+
self,
199+
num_values: int = 0,
200+
min_periods: Optional[int] = None,
201+
center: Optional[bool] = None,
202+
closed: Optional[str] = None,
203+
) -> Tuple[np.ndarray, np.ndarray]:
204+
# 1) For each group, get the indices that belong to the group
205+
# 2) Use the indices to calculate the start & end bounds of the window
206+
# 3) Append the window bounds in group order
207+
start_arrays = []
208+
end_arrays = []
209+
window_indicies_start = 0
210+
for key, indicies in self.groupby_indicies.items():
211+
if self.index_array is not None:
212+
index_array = self.index_array.take(indicies)
213+
else:
214+
index_array = self.index_array
215+
indexer = self.rolling_indexer(
216+
index_array=index_array, window_size=self.window_size,
217+
)
218+
start, end = indexer.get_window_bounds(
219+
len(indicies), min_periods, center, closed
220+
)
221+
# Cannot use groupby_indicies as they might not be monotonic with the object
222+
# we're rolling over
223+
window_indicies = np.arange(
224+
window_indicies_start,
225+
window_indicies_start + len(indicies),
226+
dtype=np.int64,
227+
)
228+
window_indicies_start += len(indicies)
229+
# Extend as we'll be slicing window like [start, end)
230+
window_indicies = np.append(window_indicies, [window_indicies[-1] + 1])
231+
start_arrays.append(window_indicies.take(start))
232+
end_arrays.append(window_indicies.take(end))
233+
start = np.concatenate(start_arrays)
234+
end = np.concatenate(end_arrays)
235+
return start, end

pandas/core/window/rolling.py

+99-9
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from functools import partial
77
import inspect
88
from textwrap import dedent
9-
from typing import Callable, Dict, List, Optional, Set, Tuple, Union
9+
from typing import Callable, Dict, List, Optional, Set, Tuple, Type, Union
1010

1111
import numpy as np
1212

@@ -37,7 +37,7 @@
3737
from pandas.core.base import DataError, PandasObject, SelectionMixin, ShallowMixin
3838
import pandas.core.common as com
3939
from pandas.core.construction import extract_array
40-
from pandas.core.indexes.api import Index, ensure_index
40+
from pandas.core.indexes.api import Index, MultiIndex, ensure_index
4141
from pandas.core.util.numba_ import NUMBA_FUNC_CACHE
4242
from pandas.core.window.common import (
4343
WindowGroupByMixin,
@@ -49,6 +49,7 @@
4949
from pandas.core.window.indexers import (
5050
BaseIndexer,
5151
FixedWindowIndexer,
52+
GroupbyRollingIndexer,
5253
VariableWindowIndexer,
5354
)
5455
from pandas.core.window.numba_ import generate_numba_apply_func
@@ -219,12 +220,10 @@ def _validate_get_window_bounds_signature(window: BaseIndexer) -> None:
219220
f"get_window_bounds"
220221
)
221222

222-
def _create_blocks(self):
223+
def _create_blocks(self, obj: FrameOrSeries):
223224
"""
224225
Split data into blocks & return conformed data.
225226
"""
226-
obj = self._selected_obj
227-
228227
# filter out the on from the object
229228
if self.on is not None and not isinstance(self.on, Index):
230229
if obj.ndim == 2:
@@ -320,7 +319,7 @@ def __repr__(self) -> str:
320319

321320
def __iter__(self):
322321
window = self._get_window(win_type=None)
323-
blocks, obj = self._create_blocks()
322+
blocks, obj = self._create_blocks(self._selected_obj)
324323
index = self._get_window_indexer(window=window)
325324

326325
start, end = index.get_window_bounds(
@@ -527,7 +526,7 @@ def _apply(
527526
win_type = self._get_win_type(kwargs)
528527
window = self._get_window(win_type=win_type)
529528

530-
blocks, obj = self._create_blocks()
529+
blocks, obj = self._create_blocks(self._selected_obj)
531530
block_list = list(blocks)
532531
window_indexer = self._get_window_indexer(window)
533532

@@ -1261,7 +1260,7 @@ def count(self):
12611260
# implementations shouldn't end up here
12621261
assert not isinstance(self.window, BaseIndexer)
12631262

1264-
blocks, obj = self._create_blocks()
1263+
blocks, obj = self._create_blocks(self._selected_obj)
12651264
results = []
12661265
for b in blocks:
12671266
result = b.notna().astype(int)
@@ -2174,12 +2173,103 @@ class RollingGroupby(WindowGroupByMixin, Rolling):
21742173
Provide a rolling groupby implementation.
21752174
"""
21762175

2176+
def _apply(
2177+
self,
2178+
func: Callable,
2179+
center: bool,
2180+
require_min_periods: int = 0,
2181+
floor: int = 1,
2182+
is_weighted: bool = False,
2183+
name: Optional[str] = None,
2184+
use_numba_cache: bool = False,
2185+
**kwargs,
2186+
):
2187+
result = Rolling._apply(
2188+
self,
2189+
func,
2190+
center,
2191+
require_min_periods,
2192+
floor,
2193+
is_weighted,
2194+
name,
2195+
use_numba_cache,
2196+
**kwargs,
2197+
)
2198+
# Cannot use _wrap_outputs because we calculate the result all at once
2199+
# Compose MultiIndex result from grouping levels then rolling level
2200+
# Aggregate the MultiIndex data as tuples then the level names
2201+
grouped_object_index = self._groupby._selected_obj.index
2202+
grouped_index_name = [grouped_object_index.name]
2203+
groupby_keys = [grouping.name for grouping in self._groupby.grouper._groupings]
2204+
result_index_names = groupby_keys + grouped_index_name
2205+
2206+
result_index_data = []
2207+
for key, values in self._groupby.grouper.indices.items():
2208+
for value in values:
2209+
if not is_list_like(key):
2210+
data = [key, grouped_object_index[value]]
2211+
else:
2212+
data = [*key, grouped_object_index[value]]
2213+
result_index_data.append(tuple(data))
2214+
2215+
result_index = MultiIndex.from_tuples(
2216+
result_index_data, names=result_index_names
2217+
)
2218+
result.index = result_index
2219+
return result
2220+
21772221
@property
21782222
def _constructor(self):
21792223
return Rolling
21802224

2181-
def _gotitem(self, key, ndim, subset=None):
2225+
def _create_blocks(self, obj: FrameOrSeries):
2226+
"""
2227+
Split data into blocks & return conformed data.
2228+
"""
2229+
# Ensure the object we're rolling over is monotonically sorted relative
2230+
# to the groups
2231+
obj = obj.take(np.concatenate(list(self._groupby.grouper.indices.values())))
2232+
return super()._create_blocks(obj)
2233+
2234+
def _get_cython_func_type(self, func: str) -> Callable:
2235+
"""
2236+
Return the cython function type.
2237+
2238+
RollingGroupby needs to always use "variable" algorithms since processing
2239+
the data in group order may not be monotonic with the data which
2240+
"fixed" algorithms assume
2241+
"""
2242+
return self._get_roll_func(f"{func}_variable")
2243+
2244+
def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer:
2245+
"""
2246+
Return an indexer class that will compute the window start and end bounds
2247+
2248+
Parameters
2249+
----------
2250+
window : int
2251+
window size for FixedWindowIndexer
21822252
2253+
Returns
2254+
-------
2255+
GroupbyRollingIndexer
2256+
"""
2257+
rolling_indexer: Union[Type[FixedWindowIndexer], Type[VariableWindowIndexer]]
2258+
if self.is_freq_type:
2259+
rolling_indexer = VariableWindowIndexer
2260+
index_array = self._groupby._selected_obj.index.asi8
2261+
else:
2262+
rolling_indexer = FixedWindowIndexer
2263+
index_array = None
2264+
window_indexer = GroupbyRollingIndexer(
2265+
index_array=index_array,
2266+
window_size=window,
2267+
groupby_indicies=self._groupby.indices,
2268+
rolling_indexer=rolling_indexer,
2269+
)
2270+
return window_indexer
2271+
2272+
def _gotitem(self, key, ndim, subset=None):
21832273
# we are setting the index on the actual object
21842274
# here so our index is carried thru to the selected obj
21852275
# when we do the splitting for the groupby

0 commit comments

Comments
 (0)