Skip to content

BUG: Fixes to FixedForwardWindowIndexer and GroupbyIndexer (#43267) #43291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.3.4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Fixed regressions

Bug fixes
~~~~~~~~~
- Bug in :meth:`pandas.DataFrame.groupby.rolling` and :class:`pandas.api.indexers.FixedForwardWindowIndexer` leading to segfaults and window endpoints being mixed across groups (:issue:`43267`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. for consistency

Suggested change
- Bug in :meth:`pandas.DataFrame.groupby.rolling` and :class:`pandas.api.indexers.FixedForwardWindowIndexer` leading to segfaults and window endpoints being mixed across groups (:issue:`43267`)
- Fixed bug in :meth:`pandas.DataFrame.groupby.rolling` and :class:`pandas.api.indexers.FixedForwardWindowIndexer` leading to segfaults and window endpoints being mixed across groups (:issue:`43267`)

-
-

Expand Down
42 changes: 23 additions & 19 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,8 @@ def get_window_bounds(
)

start = np.arange(num_values, dtype="int64")
end_s = start[: -self.window_size] + self.window_size
end_e = np.full(self.window_size, num_values, dtype="int64")
end = np.concatenate([end_s, end_e])
end = start + self.window_size
end[end > num_values] = num_values

return start, end

Expand All @@ -280,7 +279,7 @@ def __init__(
self,
index_array: np.ndarray | None = None,
window_size: int = 0,
groupby_indicies: dict | None = None,
groupby_indices: dict | None = None,
window_indexer: type[BaseIndexer] = BaseIndexer,
indexer_kwargs: dict | None = None,
**kwargs,
Expand All @@ -294,7 +293,7 @@ def __init__(
the groups
window_size : int
window size during the windowing operation
groupby_indicies : dict or None
groupby_indices : dict or None
dict of {group label: [positional index of rows belonging to the group]}
window_indexer : BaseIndexer
BaseIndexer class determining the start and end bounds of each group
Expand All @@ -303,11 +302,13 @@ def __init__(
**kwargs :
keyword arguments that will be available when get_window_bounds is called
"""
self.groupby_indicies = groupby_indicies or {}
self.groupby_indices = groupby_indices or {}
self.window_indexer = window_indexer
self.indexer_kwargs = indexer_kwargs or {}
self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
super().__init__(
index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs
index_array=index_array,
window_size=self.indexer_kwargs.pop("window_size", window_size),
**kwargs,
)

@Appender(get_window_bounds_doc)
Expand All @@ -323,8 +324,8 @@ def get_window_bounds(
# 3) Append the window bounds in group order
start_arrays = []
end_arrays = []
window_indicies_start = 0
for key, indices in self.groupby_indicies.items():
window_indices_start = 0
for key, indices in self.groupby_indices.items():
index_array: np.ndarray | None

if self.index_array is not None:
Expand All @@ -341,18 +342,21 @@ def get_window_bounds(
)
start = start.astype(np.int64)
end = end.astype(np.int64)
# Cannot use groupby_indicies as they might not be monotonic with the object
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"
# Cannot use groupby_indices as they might not be monotonic with the object
# we're rolling over
window_indicies = np.arange(
window_indicies_start, window_indicies_start + len(indices)
window_indices = np.arange(
window_indices_start, window_indices_start + len(indices)
)
window_indicies_start += len(indices)
window_indices_start += len(indices)
# Extend as we'll be slicing window like [start, end)
window_indicies = np.append(
window_indicies, [window_indicies[-1] + 1]
).astype(np.int64)
start_arrays.append(window_indicies.take(ensure_platform_int(start)))
end_arrays.append(window_indicies.take(ensure_platform_int(end)))
window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype(
np.int64, copy=False
)
start_arrays.append(window_indices.take(ensure_platform_int(start)))
end_arrays.append(window_indices.take(ensure_platform_int(end)))
start = np.concatenate(start_arrays)
end = np.concatenate(end_arrays)
return start, end
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExponentialMovingWindowIndexer,
)
return window_indexer
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExpandingIndexer,
)
return window_indexer
31 changes: 26 additions & 5 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,10 @@ def __iter__(self):
center=self.center,
closed=self.closed,
)
# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)

assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

for s, e in zip(start, end):
result = obj.iloc[slice(s, e)]
Expand Down Expand Up @@ -533,6 +535,7 @@ def _apply(
y : type of input
"""
window_indexer = self._get_window_indexer()

min_periods = (
self.min_periods
if self.min_periods is not None
Expand All @@ -546,12 +549,17 @@ def homogeneous_func(values: np.ndarray):
return values.copy()

def calc(x):

start, end = window_indexer.get_window_bounds(
num_values=len(x),
min_periods=min_periods,
center=self.center,
closed=self.closed,
)
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

return func(x, start, end, min_periods, *numba_args)

with np.errstate(all="ignore"):
Expand Down Expand Up @@ -1434,6 +1442,11 @@ def cov_func(x, y):
center=self.center,
closed=self.closed,
)

assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -1473,6 +1486,11 @@ def corr_func(x, y):
center=self.center,
closed=self.closed,
)

assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -2346,26 +2364,29 @@ def _get_window_indexer(self) -> GroupbyIndexer:
rolling_indexer: type[BaseIndexer]
indexer_kwargs: dict[str, Any] | None = None
index_array = self._index_array

if isinstance(self.window, BaseIndexer):
rolling_indexer = type(self.window)
indexer_kwargs = self.window.__dict__
indexer_kwargs = self.window.__dict__.copy()
assert isinstance(indexer_kwargs, dict) # for mypy
# We'll be using the index of each group later
indexer_kwargs.pop("index_array", None)
window = 0
window = self.window
elif self._win_freq_i8 is not None:
rolling_indexer = VariableWindowIndexer
window = self._win_freq_i8
else:
rolling_indexer = FixedWindowIndexer
window = self.window

Copy link
Member

@mroeschke mroeschke Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you undo these whitespace changes? (Seems to be in other places as well)

window_indexer = GroupbyIndexer(
index_array=index_array,
window_size=window,
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=rolling_indexer,
indexer_kwargs=indexer_kwargs,
)

return window_indexer

def _validate_monotonic(self):
Expand Down
2 changes: 1 addition & 1 deletion pandas/tests/groupby/test_missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def test_min_count(func, min_count, value):
tm.assert_frame_equal(result, expected)


def test_indicies_with_missing():
def test_indices_with_missing():
# GH 9304
df = DataFrame({"a": [1, 1, np.nan], "b": [2, 3, 4], "c": [5, 6, 7]})
g = df.groupby(["a", "b"])
Expand Down
159 changes: 159 additions & 0 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from pandas import (
DataFrame,
MultiIndex,
Series,
concat,
date_range,
)
import pandas._testing as tm
Expand All @@ -13,6 +15,7 @@
)
from pandas.core.indexers.objects import (
ExpandingIndexer,
FixedWindowIndexer,
VariableOffsetWindowIndexer,
)

Expand Down Expand Up @@ -293,3 +296,159 @@ def get_window_bounds(self, num_values, min_periods, center, closed):
result = getattr(df.rolling(indexer), func)(*args)
expected = DataFrame({"values": values})
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize(
"indexer_class", [FixedWindowIndexer, FixedForwardWindowIndexer, ExpandingIndexer]
)
@pytest.mark.parametrize("window_size", [1, 2, 12])
@pytest.mark.parametrize(
"df_data",
[
{"a": [1, 1], "b": [0, 1]},
{"a": [1, 2], "b": [0, 1]},
{"a": [1] * 16, "b": [np.nan, 1, 2, np.nan] + list(range(4, 16))},
],
)
def test_indexers_are_reusable_after_groupby_rolling(
indexer_class, window_size, df_data
):
# GH 43267
df = DataFrame(df_data)
num_trials = 3
indexer = indexer_class(window_size=window_size)
original_window_size = indexer.window_size
for i in range(num_trials):
df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean()
assert indexer.window_size == original_window_size


@pytest.mark.parametrize(
"window_size, num_values, expected_start, expected_end",
[
(1, 1, [0], [1]),
(1, 2, [0, 1], [1, 2]),
(2, 1, [0], [1]),
(2, 2, [0, 1], [2, 2]),
(5, 12, range(12), list(range(5, 12)) + [12] * 5),
(12, 5, range(5), [5] * 5),
(0, 0, np.array([]), np.array([])),
(1, 0, np.array([]), np.array([])),
(0, 1, [0], [0]),
],
)
def test_fixed_forward_indexer_bounds(
window_size, num_values, expected_start, expected_end
):
# GH 43267
indexer = FixedForwardWindowIndexer(window_size=window_size)
start, end = indexer.get_window_bounds(num_values=num_values)

tm.assert_numpy_array_equal(start, np.array(expected_start), check_dtype=False)
tm.assert_numpy_array_equal(end, np.array(expected_end), check_dtype=False)
assert len(start) == len(end)


@pytest.mark.parametrize(
"df, window_size, expected",
[
(
DataFrame({"b": [0, 1, 2], "a": [1, 2, 2]}),
2,
Series(
[0, 1.5, 2.0],
index=MultiIndex.from_arrays([[1, 2, 2], range(3)], names=["a", None]),
name="b",
dtype=np.float64,
),
),
(
DataFrame(
{
"b": [np.nan, 1, 2, np.nan] + list(range(4, 18)),
"a": [1] * 7 + [2] * 11,
"c": range(18),
}
),
12,
Series(
[
3.6,
3.6,
4.25,
5.0,
5.0,
5.5,
6.0,
12.0,
12.5,
13.0,
13.5,
14.0,
14.5,
15.0,
15.5,
16.0,
16.5,
17.0,
],
index=MultiIndex.from_arrays(
[[1] * 7 + [2] * 11, range(18)], names=["a", None]
),
name="b",
dtype=np.float64,
),
),
],
)
def test_rolling_groupby_with_fixed_forward_specific(df, window_size, expected):
# GH 43267
indexer = FixedForwardWindowIndexer(window_size=window_size)
result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean()
tm.assert_series_equal(result, expected)


@pytest.mark.parametrize(
"group_keys",
[
(1,),
(1, 2),
(2, 1),
(1, 1, 2),
(1, 2, 1),
(1, 1, 2, 2),
(1, 2, 3, 2, 3),
(1, 1, 2) * 4,
(1, 2, 3) * 5,
],
)
@pytest.mark.parametrize("window_size", [1, 2, 3, 4, 5, 8, 20])
def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size):
# GH 43267
df = DataFrame(
{
"a": np.array(list(group_keys)),
"b": np.arange(len(group_keys), dtype=np.float64) + 17,
"c": np.arange(len(group_keys), dtype=np.int64),
}
)

indexer = FixedForwardWindowIndexer(window_size=window_size)
result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).sum()
result.index.names = ["a", "c"]

groups = df.groupby("a")[["a", "b"]]
manual = concat(
[
g.assign(
b=[
g["b"].iloc[i : i + window_size].sum(min_count=1)
for i in range(len(g))
]
)
for _, g in groups
]
)
manual = manual.set_index(["a", "c"])["b"]

tm.assert_series_equal(result, manual)