Skip to content

CLN: rolling step followups #46191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pandas/_libs/window/indexers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ def calculate_variable_window_bounds(
min_periods,
center: bool,
closed: str | None,
step: int | None,
index: np.ndarray, # const int64_t[:]
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
6 changes: 1 addition & 5 deletions pandas/_libs/window/indexers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def calculate_variable_window_bounds(
object min_periods, # unused but here to match get_window_bounds signature
bint center,
str closed,
int64_t step,
const int64_t[:] index
):
"""
Expand All @@ -39,9 +38,6 @@ def calculate_variable_window_bounds(
closed : str
string of side of the window that should be closed

step : int64
Spacing between windows

index : ndarray[int64]
time series index to roll over

Expand Down Expand Up @@ -150,4 +146,4 @@ def calculate_variable_window_bounds(
# right endpoint is open
if not right_closed and not center:
end[i] -= 1
return start[::step], end[::step]
return start, end
25 changes: 5 additions & 20 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ def get_window_bounds(
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:

if step is not None:
raise NotImplementedError("step not implemented for variable window")

# error: Argument 4 to "calculate_variable_window_bounds" has incompatible
# type "Optional[bool]"; expected "bool"
# error: Argument 6 to "calculate_variable_window_bounds" has incompatible
Expand All @@ -128,7 +125,6 @@ def get_window_bounds(
min_periods,
center, # type: ignore[arg-type]
closed,
1,
self.index_array, # type: ignore[arg-type]
)

Expand Down Expand Up @@ -234,12 +230,10 @@ def get_window_bounds(
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:

if step is not None:
raise NotImplementedError("step not implemented for expanding window")

end = np.arange(1, num_values + 1, dtype=np.int64)
start = np.zeros(len(end), dtype=np.int64)
return start, end
return (
np.zeros(num_values, dtype=np.int64),
np.arange(1, num_values + 1, dtype=np.int64),
)


class FixedForwardWindowIndexer(BaseIndexer):
Expand Down Expand Up @@ -343,8 +337,6 @@ def get_window_bounds(
closed: str | None = None,
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:
if step is not None:
raise NotImplementedError("step not implemented for groupby window")

# 1) For each group, get the indices that belong to the group
# 2) Use the indices to calculate the start & end bounds of the window
Expand Down Expand Up @@ -404,11 +396,4 @@ def get_window_bounds(
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:

if step is not None:
raise NotImplementedError(
"step not implemented for exponentail moving window"
)
return (
np.array([0], dtype=np.int64),
np.array([num_values], dtype=np.int64),
)
return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)
59 changes: 23 additions & 36 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ def _validate(self) -> None:
)
if self.method not in ["table", "single"]:
raise ValueError("method must be 'table' or 'single")
if self.step is not None:
if not is_integer(self.step):
raise ValueError("step must be an integer")
elif self.step < 0:
raise ValueError("step must be >= 0")

def _check_window_bounds(
self, start: np.ndarray, end: np.ndarray, num_vals: int
Expand All @@ -238,16 +243,14 @@ def _check_window_bounds(
f"start ({len(start)}) and end ({len(end)}) bounds must be the "
f"same length"
)
elif not isinstance(self._get_window_indexer(), GroupbyIndexer) and len(
start
) != (num_vals + (self.step or 1) - 1) // (self.step or 1):
elif len(start) != (num_vals + (self.step or 1) - 1) // (self.step or 1):
raise ValueError(
f"start and end bounds ({len(start)}) must be the same length "
f"as the object ({num_vals}) divided by the step ({self.step}) "
f"if given and rounded up"
)

def _slice_index(self, index: Index, result: Sized | None = None) -> Index:
def _slice_axis_for_step(self, index: Index, result: Sized | None = None) -> Index:
"""
Slices the index for a given result and the preset step.
"""
Expand Down Expand Up @@ -446,7 +449,7 @@ def _apply_series(
raise DataError("No numeric types to aggregate") from err

result = homogeneous_func(values)
index = self._slice_index(obj.index, result)
index = self._slice_axis_for_step(obj.index, result)
return obj._constructor(result, index=index, name=obj.name)

def _apply_blockwise(
Expand Down Expand Up @@ -484,7 +487,7 @@ def hfunc(values: ArrayLike) -> ArrayLike:
res_values.append(res)
taker.append(i)

index = self._slice_index(
index = self._slice_axis_for_step(
obj.index, res_values[0] if len(res_values) > 0 else None
)
df = type(obj)._from_arrays(
Expand Down Expand Up @@ -524,7 +527,7 @@ def _apply_tablewise(
values = values.T if self.axis == 1 else values
result = homogeneous_func(values)
result = result.T if self.axis == 1 else result
index = self._slice_index(obj.index, result)
index = self._slice_axis_for_step(obj.index, result)
columns = (
obj.columns
if result.shape[1] == len(obj.columns)
Expand Down Expand Up @@ -644,13 +647,13 @@ def _numba_apply(
)
result = aggregator(values, start, end, min_periods, *func_args)
result = result.T if self.axis == 1 else result
index = self._slice_index(obj.index, result)
index = self._slice_axis_for_step(obj.index, result)
if obj.ndim == 1:
result = result.squeeze()
out = obj._constructor(result, index=index, name=obj.name)
return out
else:
columns = self._slice_index(obj.columns, result.T)
columns = self._slice_axis_for_step(obj.columns, result.T)
out = obj._constructor(result, index=index, columns=columns)
return self._resolve_output(out, obj)

Expand Down Expand Up @@ -692,7 +695,7 @@ def __init__(
obj = obj.drop(columns=self._grouper.names, errors="ignore")
# GH 15354
if kwargs.get("step") is not None:
raise NotImplementedError("step not implemented for rolling groupby")
raise NotImplementedError("step not implemented for groupby")
super().__init__(obj, *args, **kwargs)

def _apply(
Expand Down Expand Up @@ -938,15 +941,12 @@ class Window(BaseWindow):
The closed parameter with fixed windows is now supported.

step : int, default None
When supported, applies ``[::step]`` to the resulting sequence of windows, in a
computationally efficient manner. Currently supported only with fixed-length
window indexers. Note that using a step argument other than None or 1 will
produce a result with a different shape than the input.
Evaluate the window at every ``step`` result, equivalent to slicing as
``[::step]``. ``window`` must be an integer. Using a step argument other
than None or 1 will produce a result with a different shape than the input.

..versionadded:: 1.5

The step parameter is only supported with fixed windows.

method : str {'single', 'table'}, default 'single'

.. versionadded:: 1.3.0
Expand Down Expand Up @@ -1605,9 +1605,7 @@ def cov(
**kwargs,
):
if self.step is not None:
raise NotImplementedError(
"step not implemented for rolling and expanding cov"
)
raise NotImplementedError("step not implemented for cov")

from pandas import Series

Expand Down Expand Up @@ -1650,11 +1648,8 @@ def corr(
ddof: int = 1,
**kwargs,
):

if self.step is not None:
raise NotImplementedError(
"step not implemented for rolling and expanding corr"
)
raise NotImplementedError("step not implemented for corr")

from pandas import Series

Expand Down Expand Up @@ -1749,24 +1744,16 @@ def _validate(self):
if self.min_periods is None:
self.min_periods = 1

if self.step is not None:
raise NotImplementedError(
"step is not supported with frequency windows"
)

elif isinstance(self.window, BaseIndexer):
# Passed BaseIndexer subclass should handle all other rolling kwargs
pass
elif not is_integer(self.window) or self.window < 0:
raise ValueError("window must be an integer 0 or greater")
# GH 15354:
# validate window indexer parameters do not raise in get_window_bounds
# this cannot be done in BaseWindow._validate because there _get_window_indexer
# would erroneously create a fixed window given a window argument like "1s" due
# to _win_freq_i8 not being set
indexer = self._get_window_indexer()
indexer.get_window_bounds(
num_values=0,
min_periods=self.min_periods,
center=self.center,
closed=self.closed,
step=self.step,
)

def _validate_datetimelike_monotonic(self):
"""
Expand Down
35 changes: 13 additions & 22 deletions pandas/tests/window/test_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def test_rolling_apply_out_of_bounds(engine_and_raw):


@pytest.mark.parametrize("window", [2, "2s"])
@pytest.mark.parametrize("step", [None])
def test_rolling_apply_with_pandas_objects(window, step):
def test_rolling_apply_with_pandas_objects(window):
# 5071
df = DataFrame(
{"A": np.random.randn(5), "B": np.random.randint(0, 10, size=5)},
Expand All @@ -67,8 +66,8 @@ def f(x):
return np.nan
return x.iloc[-1]

result = df.rolling(window, step=step).apply(f, raw=False)
expected = df.iloc[2:].reindex_like(df)[::step]
result = df.rolling(window).apply(f, raw=False)
expected = df.iloc[2:].reindex_like(df)
tm.assert_frame_equal(result, expected)

with tm.external_error_raised(AttributeError):
Expand Down Expand Up @@ -96,8 +95,7 @@ def test_rolling_apply(engine_and_raw, step):
tm.assert_series_equal(result, expected)


@pytest.mark.parametrize("step", [None])
def test_all_apply(engine_and_raw, step):
def test_all_apply(engine_and_raw):
engine, raw = engine_and_raw

df = (
Expand All @@ -106,16 +104,15 @@ def test_all_apply(engine_and_raw, step):
).set_index("A")
* 2
)
er = df.rolling(window=1, step=step)
r = df.rolling(window="1s", step=step)
er = df.rolling(window=1)
r = df.rolling(window="1s")

result = r.apply(lambda x: 1, engine=engine, raw=raw)
expected = er.apply(lambda x: 1, engine=engine, raw=raw)
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize("step", [None])
def test_ragged_apply(engine_and_raw, step):
def test_ragged_apply(engine_and_raw):
engine, raw = engine_and_raw

df = DataFrame({"B": range(5)})
Expand All @@ -128,24 +125,18 @@ def test_ragged_apply(engine_and_raw, step):
]

f = lambda x: 1
result = df.rolling(window="1s", min_periods=1, step=step).apply(
f, engine=engine, raw=raw
)
expected = df.copy()[::step]
result = df.rolling(window="1s", min_periods=1).apply(f, engine=engine, raw=raw)
expected = df.copy()
expected["B"] = 1.0
tm.assert_frame_equal(result, expected)

result = df.rolling(window="2s", min_periods=1, step=step).apply(
f, engine=engine, raw=raw
)
expected = df.copy()[::step]
result = df.rolling(window="2s", min_periods=1).apply(f, engine=engine, raw=raw)
expected = df.copy()
expected["B"] = 1.0
tm.assert_frame_equal(result, expected)

result = df.rolling(window="5s", min_periods=1, step=step).apply(
f, engine=engine, raw=raw
)
expected = df.copy()[::step]
result = df.rolling(window="5s", min_periods=1).apply(f, engine=engine, raw=raw)
expected = df.copy()
expected["B"] = 1.0
tm.assert_frame_equal(result, expected)

Expand Down
7 changes: 3 additions & 4 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,13 @@ def test_rolling_forward_cov_corr(func, expected):
["left", [0.0, 0.0, 1.0, 2.0, 5.0, 9.0, 5.0, 6.0, 7.0, 8.0]],
],
)
@pytest.mark.parametrize("step", [None])
def test_non_fixed_variable_window_indexer(closed, expected_data, step):
def test_non_fixed_variable_window_indexer(closed, expected_data):
index = date_range("2020", periods=10)
df = DataFrame(range(10), index=index)
offset = BusinessDay(1)
indexer = VariableOffsetWindowIndexer(index=index, offset=offset)
result = df.rolling(indexer, closed=closed, step=step).sum()
expected = DataFrame(expected_data, index=index)[::step]
result = df.rolling(indexer, closed=closed).sum()
expected = DataFrame(expected_data, index=index)
tm.assert_frame_equal(result, expected)


Expand Down
Loading