Skip to content

ENH: Rolling window with step size (GH-15354) #45765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pandas/_libs/window/indexers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ def calculate_variable_window_bounds(
min_periods,
center: bool,
closed: str | None,
step: int | None,
index: np.ndarray, # const int64_t[:]
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
12 changes: 9 additions & 3 deletions pandas/_libs/window/indexers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def calculate_variable_window_bounds(
object min_periods, # unused but here to match get_window_bounds signature
bint center,
str closed,
int64_t step,
const int64_t[:] index
):
"""
Expand All @@ -38,17 +39,20 @@ def calculate_variable_window_bounds(
closed : str
string of side of the window that should be closed

step : int64
Spacing between windows

index : ndarray[int64]
time series index to roll over

Returns
-------
(ndarray[int64], ndarray[int64])
(ndarray[int64], ndarray[int64], ndarray[int64])
"""
cdef:
bint left_closed = False
bint right_closed = False
ndarray[int64_t, ndim=1] start, end
ndarray[int64_t, ndim=1] start, end, ref
int64_t start_bound, end_bound, index_growth_sign = 1
Py_ssize_t i, j

Expand Down Expand Up @@ -143,4 +147,6 @@ def calculate_variable_window_bounds(
# right endpoint is open
if not right_closed and not center:
end[i] -= 1
return start, end
ref = (None if step is None or step == 1
else np.arange(0, num_values, step, dtype='int64'))
return start[::step], end[::step], ref
3 changes: 3 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11263,6 +11263,7 @@ def rolling(
on: str | None = None,
axis: Axis = 0,
closed: str | None = None,
step: int | None = None,
method: str = "single",
):
axis = self._get_axis_number(axis)
Expand All @@ -11277,6 +11278,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
method=method,
)

Expand All @@ -11289,6 +11291,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
method=method,
)

Expand Down
146 changes: 114 additions & 32 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
center passed from the top level rolling API
closed : str, default None
closed passed from the top level rolling API
step : int, default None
step passed from the top level rolling API
win_type : str, default None
win_type passed from the top level rolling API

Returns
-------
A tuple of ndarray[int64]s, indicating the boundaries of each
window
A tuple of ndarray[int64]s:
start : array of start boundaries
end : array of end boundaries
ref : array of window reference locations, or None indicating all if step is None or 1
"""


Expand All @@ -55,6 +59,16 @@ def __init__(
for key, value in kwargs.items():
setattr(self, key, value)

def _get_default_ref(self, num_values: int = 0, step: int | None = None):
"""
Returns the default window reference locations.
"""
return (
None
if step is None or step == 1
else np.arange(0, num_values, step, dtype="int64")
)

@Appender(get_window_bounds_doc)
def get_window_bounds(
self,
Expand All @@ -66,9 +80,23 @@ def get_window_bounds(

raise NotImplementedError

@Appender(get_window_bounds_doc)
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

start, end = self.get_window_bounds(num_values, min_periods, center, closed)
ref = self._get_default_ref(num_values, step)
return start[::step], end[::step], ref

class FixedWindowIndexer(BaseIndexer):
"""Creates window boundaries that are of fixed length."""

class BaseIndexer2(BaseIndexer):
"""Base class for window bounds calculations with step optimization."""

@Appender(get_window_bounds_doc)
def get_window_bounds(
Expand All @@ -79,12 +107,43 @@ def get_window_bounds(
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:

start, end, ref = self.get_window_bounds2(
num_values, min_periods, center, closed
)
return start, end

@Appender(get_window_bounds_doc)
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

raise NotImplementedError


class FixedWindowIndexer(BaseIndexer2):
"""Creates window boundaries that are of fixed length."""

@Appender(get_window_bounds_doc)
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

if center:
offset = (self.window_size - 1) // 2
else:
offset = 0

end = np.arange(1 + offset, num_values + 1 + offset, dtype="int64")
end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64")
start = end - self.window_size
if closed in ["left", "both"]:
start -= 1
Expand All @@ -94,20 +153,22 @@ def get_window_bounds(
end = np.clip(end, 0, num_values)
start = np.clip(start, 0, num_values)

return start, end
ref = self._get_default_ref(num_values, step)
return start, end, ref


class VariableWindowIndexer(BaseIndexer):
class VariableWindowIndexer(BaseIndexer2):
"""Creates window boundaries that are of variable length, namely for time series."""

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

# error: Argument 4 to "calculate_variable_window_bounds" has incompatible
# type "Optional[bool]"; expected "bool"
Expand All @@ -119,6 +180,7 @@ def get_window_bounds(
min_periods,
center, # type: ignore[arg-type]
closed,
step if step is not None else 1,
self.index_array, # type: ignore[arg-type]
)

Expand Down Expand Up @@ -205,25 +267,28 @@ def get_window_bounds(
return start, end


class ExpandingIndexer(BaseIndexer):
class ExpandingIndexer(BaseIndexer2):
"""Calculate expanding window bounds, mimicking df.expanding()"""

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

return (
np.zeros(num_values, dtype=np.int64),
np.arange(1, num_values + 1, dtype=np.int64),
)
if step is None:
step = 1
end = np.arange(1, num_values + 1, step, dtype=np.int64)
start = np.zeros(len(end), dtype=np.int64)
ref = self._get_default_ref(num_values, step)
return start, end, ref


class FixedForwardWindowIndexer(BaseIndexer):
class FixedForwardWindowIndexer(BaseIndexer2):
"""
Creates window boundaries for fixed-length windows that include the
current row.
Expand All @@ -250,30 +315,34 @@ class FixedForwardWindowIndexer(BaseIndexer):
"""

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

if center:
raise ValueError("Forward-looking windows can't have center=True")
if closed is not None:
raise ValueError(
"Forward-looking windows don't support setting the closed argument"
)
if step is None:
step = 1

start = np.arange(num_values, dtype="int64")
start = np.arange(0, num_values, step, dtype="int64")
end = start + self.window_size
if self.window_size:
end[-self.window_size :] = num_values
end = np.clip(end, 0, num_values)

return start, end
ref = self._get_default_ref(num_values, step)
return start, end, ref


class GroupbyIndexer(BaseIndexer):
class GroupbyIndexer(BaseIndexer2):
"""Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""

def __init__(
Expand Down Expand Up @@ -313,18 +382,21 @@ def __init__(
)

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:
# 1) For each group, get the indices that belong to the group
# 2) Use the indices to calculate the start & end bounds of the window
# 3) Append the window bounds in group order
start_arrays = []
end_arrays = []
ref_arrays = []
empty = np.array([], dtype=np.int64)
window_indices_start = 0
for key, indices in self.groupby_indices.items():
index_array: np.ndarray | None
Expand All @@ -338,11 +410,12 @@ def get_window_bounds(
window_size=self.window_size,
**self.indexer_kwargs,
)
start, end = indexer.get_window_bounds(
len(indices), min_periods, center, closed
start, end, ref = indexer.get_window_bounds2(
len(indices), min_periods, center, closed, step
)
start = start.astype(np.int64)
end = end.astype(np.int64)
ref = None if ref is None else ref.astype(np.int64)
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"
Expand All @@ -358,21 +431,30 @@ def get_window_bounds(
)
start_arrays.append(window_indices.take(ensure_platform_int(start)))
end_arrays.append(window_indices.take(ensure_platform_int(end)))
ref_arrays.append(
empty if ref is None else window_indices.take(ensure_platform_int(ref))
)
start = np.concatenate(start_arrays)
end = np.concatenate(end_arrays)
return start, end
ref = None if step is None or step == 1 else np.concatenate(ref_arrays)
return start, end, ref


class ExponentialMovingWindowIndexer(BaseIndexer):
class ExponentialMovingWindowIndexer(BaseIndexer2):
"""Calculate ewm window bounds (the entire window)"""

@Appender(get_window_bounds_doc)
def get_window_bounds(
def get_window_bounds2(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)
return (
np.array([0], dtype=np.int64),
np.array([num_values], dtype=np.int64),
None,
)
Loading