Skip to content

BUG: Fixes to FixedForwardWindowIndexer and GroupbyIndexer (#43267) #43291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.3.4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Fixed regressions

Bug fixes
~~~~~~~~~
- Fixed bug in :meth:`pandas.DataFrame.groupby.rolling` and :class:`pandas.api.indexers.FixedForwardWindowIndexer` leading to segfaults and window endpoints being mixed across groups (:issue:`43267`)
- Fixed bug in :meth:`.GroupBy.mean` with datetimelike values including ``NaT`` values returning incorrect results (:issue:`43132`)
- Fixed bug in :meth:`Series.aggregate` not passing the first ``args`` to the user supplied ``func`` in certain cases (:issue:`43357`)
- Fixed memory leaks in :meth:`Series.rolling.quantile` and :meth:`Series.rolling.median` (:issue:`43339`)
Expand Down
47 changes: 26 additions & 21 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ def get_window_bounds(
)

start = np.arange(num_values, dtype="int64")
end_s = start[: -self.window_size] + self.window_size
end_e = np.full(self.window_size, num_values, dtype="int64")
end = np.concatenate([end_s, end_e])
end = start + self.window_size
if self.window_size:
end[-self.window_size :] = num_values

return start, end

Expand All @@ -279,8 +279,8 @@ class GroupbyIndexer(BaseIndexer):
def __init__(
self,
index_array: np.ndarray | None = None,
window_size: int = 0,
groupby_indicies: dict | None = None,
window_size: int | BaseIndexer = 0,
groupby_indices: dict | None = None,
window_indexer: type[BaseIndexer] = BaseIndexer,
indexer_kwargs: dict | None = None,
**kwargs,
Expand All @@ -292,9 +292,9 @@ def __init__(
np.ndarray of the index of the original object that we are performing
a chained groupby operation over. This index has been pre-sorted relative to
the groups
window_size : int
window_size : int or BaseIndexer
window size during the windowing operation
groupby_indicies : dict or None
groupby_indices : dict or None
dict of {group label: [positional index of rows belonging to the group]}
window_indexer : BaseIndexer
BaseIndexer class determining the start and end bounds of each group
Expand All @@ -303,11 +303,13 @@ def __init__(
**kwargs :
keyword arguments that will be available when get_window_bounds is called
"""
self.groupby_indicies = groupby_indicies or {}
self.groupby_indices = groupby_indices or {}
self.window_indexer = window_indexer
self.indexer_kwargs = indexer_kwargs or {}
self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
super().__init__(
index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs
index_array=index_array,
window_size=self.indexer_kwargs.pop("window_size", window_size),
**kwargs,
)

@Appender(get_window_bounds_doc)
Expand All @@ -323,8 +325,8 @@ def get_window_bounds(
# 3) Append the window bounds in group order
start_arrays = []
end_arrays = []
window_indicies_start = 0
for key, indices in self.groupby_indicies.items():
window_indices_start = 0
for key, indices in self.groupby_indices.items():
index_array: np.ndarray | None

if self.index_array is not None:
Expand All @@ -341,18 +343,21 @@ def get_window_bounds(
)
start = start.astype(np.int64)
end = end.astype(np.int64)
# Cannot use groupby_indicies as they might not be monotonic with the object
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"
# Cannot use groupby_indices as they might not be monotonic with the object
# we're rolling over
window_indicies = np.arange(
window_indicies_start, window_indicies_start + len(indices)
window_indices = np.arange(
window_indices_start, window_indices_start + len(indices)
)
window_indicies_start += len(indices)
window_indices_start += len(indices)
# Extend as we'll be slicing window like [start, end)
window_indicies = np.append(
window_indicies, [window_indicies[-1] + 1]
).astype(np.int64)
start_arrays.append(window_indicies.take(ensure_platform_int(start)))
end_arrays.append(window_indicies.take(ensure_platform_int(end)))
window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype(
np.int64, copy=False
)
start_arrays.append(window_indices.take(ensure_platform_int(start)))
end_arrays.append(window_indices.take(ensure_platform_int(end)))
start = np.concatenate(start_arrays)
end = np.concatenate(end_arrays)
return start, end
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExponentialMovingWindowIndexer,
)
return window_indexer
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExpandingIndexer,
)
return window_indexer
26 changes: 21 additions & 5 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,10 @@ def __iter__(self):
center=self.center,
closed=self.closed,
)
# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)

assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

for s, e in zip(start, end):
result = obj.iloc[slice(s, e)]
Expand Down Expand Up @@ -563,6 +565,10 @@ def calc(x):
center=self.center,
closed=self.closed,
)
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

return func(x, start, end, min_periods, *numba_args)

with np.errstate(all="ignore"):
Expand Down Expand Up @@ -1501,6 +1507,11 @@ def cov_func(x, y):
center=self.center,
closed=self.closed,
)

assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -1540,6 +1551,11 @@ def corr_func(x, y):
center=self.center,
closed=self.closed,
)

assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -2490,11 +2506,11 @@ def _get_window_indexer(self) -> GroupbyIndexer:
index_array = self._index_array
if isinstance(self.window, BaseIndexer):
rolling_indexer = type(self.window)
indexer_kwargs = self.window.__dict__
indexer_kwargs = self.window.__dict__.copy()
assert isinstance(indexer_kwargs, dict) # for mypy
# We'll be using the index of each group later
indexer_kwargs.pop("index_array", None)
window = 0
window = self.window
elif self._win_freq_i8 is not None:
rolling_indexer = VariableWindowIndexer
window = self._win_freq_i8
Expand All @@ -2504,7 +2520,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
window_indexer = GroupbyIndexer(
index_array=index_array,
window_size=window,
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=rolling_indexer,
indexer_kwargs=indexer_kwargs,
)
Expand Down
2 changes: 1 addition & 1 deletion pandas/tests/groupby/test_missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def test_min_count(func, min_count, value):
tm.assert_frame_equal(result, expected)


def test_indicies_with_missing():
def test_indices_with_missing():
# GH 9304
df = DataFrame({"a": [1, 1, np.nan], "b": [2, 3, 4], "c": [5, 6, 7]})
g = df.groupby(["a", "b"])
Expand Down
159 changes: 159 additions & 0 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from pandas import (
DataFrame,
MultiIndex,
Series,
concat,
date_range,
)
import pandas._testing as tm
Expand All @@ -13,6 +15,7 @@
)
from pandas.core.indexers.objects import (
ExpandingIndexer,
FixedWindowIndexer,
VariableOffsetWindowIndexer,
)

Expand Down Expand Up @@ -293,3 +296,159 @@ def get_window_bounds(self, num_values, min_periods, center, closed):
result = getattr(df.rolling(indexer), func)(*args)
expected = DataFrame({"values": values})
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize(
"indexer_class", [FixedWindowIndexer, FixedForwardWindowIndexer, ExpandingIndexer]
)
@pytest.mark.parametrize("window_size", [1, 2, 12])
@pytest.mark.parametrize(
"df_data",
[
{"a": [1, 1], "b": [0, 1]},
{"a": [1, 2], "b": [0, 1]},
{"a": [1] * 16, "b": [np.nan, 1, 2, np.nan] + list(range(4, 16))},
],
)
def test_indexers_are_reusable_after_groupby_rolling(
indexer_class, window_size, df_data
):
# GH 43267
df = DataFrame(df_data)
num_trials = 3
indexer = indexer_class(window_size=window_size)
original_window_size = indexer.window_size
for i in range(num_trials):
df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean()
assert indexer.window_size == original_window_size


@pytest.mark.parametrize(
"window_size, num_values, expected_start, expected_end",
[
(1, 1, [0], [1]),
(1, 2, [0, 1], [1, 2]),
(2, 1, [0], [1]),
(2, 2, [0, 1], [2, 2]),
(5, 12, range(12), list(range(5, 12)) + [12] * 5),
(12, 5, range(5), [5] * 5),
(0, 0, np.array([]), np.array([])),
(1, 0, np.array([]), np.array([])),
(0, 1, [0], [0]),
],
)
def test_fixed_forward_indexer_bounds(
window_size, num_values, expected_start, expected_end
):
# GH 43267
indexer = FixedForwardWindowIndexer(window_size=window_size)
start, end = indexer.get_window_bounds(num_values=num_values)

tm.assert_numpy_array_equal(start, np.array(expected_start), check_dtype=False)
tm.assert_numpy_array_equal(end, np.array(expected_end), check_dtype=False)
assert len(start) == len(end)


@pytest.mark.parametrize(
"df, window_size, expected",
[
(
DataFrame({"b": [0, 1, 2], "a": [1, 2, 2]}),
2,
Series(
[0, 1.5, 2.0],
index=MultiIndex.from_arrays([[1, 2, 2], range(3)], names=["a", None]),
name="b",
dtype=np.float64,
),
),
(
DataFrame(
{
"b": [np.nan, 1, 2, np.nan] + list(range(4, 18)),
"a": [1] * 7 + [2] * 11,
"c": range(18),
}
),
12,
Series(
[
3.6,
3.6,
4.25,
5.0,
5.0,
5.5,
6.0,
12.0,
12.5,
13.0,
13.5,
14.0,
14.5,
15.0,
15.5,
16.0,
16.5,
17.0,
],
index=MultiIndex.from_arrays(
[[1] * 7 + [2] * 11, range(18)], names=["a", None]
),
name="b",
dtype=np.float64,
),
),
],
)
def test_rolling_groupby_with_fixed_forward_specific(df, window_size, expected):
# GH 43267
indexer = FixedForwardWindowIndexer(window_size=window_size)
result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean()
tm.assert_series_equal(result, expected)


@pytest.mark.parametrize(
"group_keys",
[
(1,),
(1, 2),
(2, 1),
(1, 1, 2),
(1, 2, 1),
(1, 1, 2, 2),
(1, 2, 3, 2, 3),
(1, 1, 2) * 4,
(1, 2, 3) * 5,
],
)
@pytest.mark.parametrize("window_size", [1, 2, 3, 4, 5, 8, 20])
def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size):
# GH 43267
df = DataFrame(
{
"a": np.array(list(group_keys)),
"b": np.arange(len(group_keys), dtype=np.float64) + 17,
"c": np.arange(len(group_keys), dtype=np.int64),
}
)

indexer = FixedForwardWindowIndexer(window_size=window_size)
result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).sum()
result.index.names = ["a", "c"]

groups = df.groupby("a")[["a", "b"]]
manual = concat(
[
g.assign(
b=[
g["b"].iloc[i : i + window_size].sum(min_count=1)
for i in range(len(g))
]
)
for _, g in groups
]
)
manual = manual.set_index(["a", "c"])["b"]

tm.assert_series_equal(result, manual)