Skip to content

ENH: Rolling window with step size (GH-15354) #45735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pandas/_libs/window/indexers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ def calculate_variable_window_bounds(
min_periods,
center: bool,
closed: str | None,
step: int | None,
index: np.ndarray, # const int64_t[:]
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64], npt.NDArray[np.int64]]: ...
12 changes: 9 additions & 3 deletions pandas/_libs/window/indexers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def calculate_variable_window_bounds(
object min_periods, # unused but here to match get_window_bounds signature
bint center,
str closed,
int64_t step,
const int64_t[:] index
):
"""
Expand All @@ -38,17 +39,20 @@ def calculate_variable_window_bounds(
closed : str
string of side of the window that should be closed

step : int64
Spacing between windows

index : ndarray[int64]
time series index to roll over

Returns
-------
(ndarray[int64], ndarray[int64])
(ndarray[int64], ndarray[int64], ndarray[int64])
"""
cdef:
bint left_closed = False
bint right_closed = False
ndarray[int64_t, ndim=1] start, end
ndarray[int64_t, ndim=1] start, end, ref
int64_t start_bound, end_bound, index_growth_sign = 1
Py_ssize_t i, j

Expand Down Expand Up @@ -143,4 +147,6 @@ def calculate_variable_window_bounds(
# right endpoint is open
if not right_closed and not center:
end[i] -= 1
return start, end
ref = (None if step is None or step == 1
else np.arange(0, num_values, step, dtype='int64'))
return start[::step], end[::step], ref
3 changes: 3 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11263,6 +11263,7 @@ def rolling(
on: str | None = None,
axis: Axis = 0,
closed: str | None = None,
step: int | None = None,
method: str = "single",
):
axis = self._get_axis_number(axis)
Expand All @@ -11277,6 +11278,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
method=method,
)

Expand All @@ -11289,6 +11291,7 @@ def rolling(
on=on,
axis=axis,
closed=closed,
step=step,
method=method,
)

Expand Down
91 changes: 67 additions & 24 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
center passed from the top level rolling API
closed : str, default None
closed passed from the top level rolling API
step : int, default None
step passed from the top level rolling API
win_type : str, default None
win_type passed from the top level rolling API

Returns
-------
A tuple of ndarray[int64]s, indicating the boundaries of each
window
A tuple of ndarray[int64]s:
start : array of start boundaries
end : array of end boundaries
ref : array of window reference locations, or None indicating all if step is None or 1
"""


Expand All @@ -55,14 +59,25 @@ def __init__(
for key, value in kwargs.items():
setattr(self, key, value)

def _get_default_ref(self, num_values: int = 0, step: int | None = None):
"""
Returns the default window reference locations.
"""
return (
None
if step is None or step == 1
else np.arange(0, num_values, step, dtype="int64")
)

@Appender(get_window_bounds_doc)
def get_window_bounds(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

raise NotImplementedError

Expand All @@ -77,14 +92,15 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

if center:
offset = (self.window_size - 1) // 2
else:
offset = 0

end = np.arange(1 + offset, num_values + 1 + offset, dtype="int64")
end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64")
start = end - self.window_size
if closed in ["left", "both"]:
start -= 1
Expand All @@ -94,7 +110,8 @@ def get_window_bounds(
end = np.clip(end, 0, num_values)
start = np.clip(start, 0, num_values)

return start, end
ref = self._get_default_ref(num_values, step)
return start, end, ref


class VariableWindowIndexer(BaseIndexer):
Expand All @@ -107,7 +124,8 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

# error: Argument 4 to "calculate_variable_window_bounds" has incompatible
# type "Optional[bool]"; expected "bool"
Expand All @@ -119,6 +137,7 @@ def get_window_bounds(
min_periods,
center, # type: ignore[arg-type]
closed,
step if step is not None else 1,
self.index_array, # type: ignore[arg-type]
)

Expand All @@ -145,11 +164,14 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

# if windows is variable, default is 'right', otherwise default is 'both'
if closed is None:
closed = "right" if self.index is not None else "both"
if step is None:
step = 1

right_closed = closed in ["right", "both"]
left_closed = closed in ["left", "both"]
Expand Down Expand Up @@ -202,7 +224,8 @@ def get_window_bounds(
if not right_closed:
end[i] -= 1

return start, end
ref = self._get_default_ref(num_values, step)
return start[::step], end[::step], ref


class ExpandingIndexer(BaseIndexer):
Expand All @@ -215,12 +238,15 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

return (
np.zeros(num_values, dtype=np.int64),
np.arange(1, num_values + 1, dtype=np.int64),
)
if step is None:
step = 1
end = np.arange(1, num_values + 1, step, dtype=np.int64)
start = np.zeros(len(end), dtype=np.int64)
ref = self._get_default_ref(num_values, step)
return start, end, ref


class FixedForwardWindowIndexer(BaseIndexer):
Expand Down Expand Up @@ -256,21 +282,25 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

if center:
raise ValueError("Forward-looking windows can't have center=True")
if closed is not None:
raise ValueError(
"Forward-looking windows don't support setting the closed argument"
)
if step is None:
step = 1

start = np.arange(num_values, dtype="int64")
start = np.arange(0, num_values, step, dtype="int64")
end = start + self.window_size
if self.window_size:
end[-self.window_size :] = num_values
end = np.clip(end, 0, num_values)

return start, end
ref = self._get_default_ref(num_values, step)
return start, end, ref


class GroupbyIndexer(BaseIndexer):
Expand Down Expand Up @@ -319,12 +349,15 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:
# 1) For each group, get the indices that belong to the group
# 2) Use the indices to calculate the start & end bounds of the window
# 3) Append the window bounds in group order
start_arrays = []
end_arrays = []
ref_arrays = []
empty = np.array([], dtype=np.int64)
window_indices_start = 0
for key, indices in self.groupby_indices.items():
index_array: np.ndarray | None
Expand All @@ -338,11 +371,12 @@ def get_window_bounds(
window_size=self.window_size,
**self.indexer_kwargs,
)
start, end = indexer.get_window_bounds(
len(indices), min_periods, center, closed
start, end, ref = indexer.get_window_bounds(
len(indices), min_periods, center, closed, step
)
start = start.astype(np.int64)
end = end.astype(np.int64)
ref = None if ref is None else ref.astype(np.int64)
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"
Expand All @@ -358,9 +392,13 @@ def get_window_bounds(
)
start_arrays.append(window_indices.take(ensure_platform_int(start)))
end_arrays.append(window_indices.take(ensure_platform_int(end)))
ref_arrays.append(
empty if ref is None else window_indices.take(ensure_platform_int(ref))
)
start = np.concatenate(start_arrays)
end = np.concatenate(end_arrays)
return start, end
ref = None if step is None or step == 1 else np.concatenate(ref_arrays)
return start, end, ref


class ExponentialMovingWindowIndexer(BaseIndexer):
Expand All @@ -373,6 +411,11 @@ def get_window_bounds(
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
step: int | None = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]:

return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)
return (
np.array([0], dtype=np.int64),
np.array([num_values], dtype=np.int64),
None,
)
11 changes: 8 additions & 3 deletions pandas/core/window/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ def flex_binary_moment(arg1, arg2, f, pairwise=False):
from pandas import DataFrame

def dataframe_from_int_dict(data, frame_template):
result = DataFrame(data, index=frame_template.index)
result = DataFrame(
data, index=None if len(data) > 0 else frame_template.index
)
if len(result.columns) > 0:
result.columns = frame_template.columns[result.columns]
return result
Expand All @@ -42,13 +44,16 @@ def dataframe_from_int_dict(data, frame_template):
raise ValueError("'arg2' columns are not unique")
X, Y = arg1.align(arg2, join="outer")
X, Y = prep_binary(X, Y)
result_index = X.index
res_columns = arg1.columns.union(arg2.columns)
for col in res_columns:
if col in X and col in Y:
results[col] = f(X[col], Y[col])
return DataFrame(results, index=X.index, columns=res_columns)
result_index = results[col].index
return DataFrame(results, index=result_index, columns=res_columns)
elif pairwise is True:
results = defaultdict(dict)
result_index = arg1.index.union(arg2.index)
for i in range(len(arg1.columns)):
for j in range(len(arg2.columns)):
if j < i and arg2 is arg1:
Expand All @@ -58,10 +63,10 @@ def dataframe_from_int_dict(data, frame_template):
results[i][j] = f(
*prep_binary(arg1.iloc[:, i], arg2.iloc[:, j])
)
result_index = results[i][j].index

from pandas import concat

result_index = arg1.index.union(arg2.index)
if len(result_index):

# construct result frame
Expand Down
8 changes: 5 additions & 3 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def __init__(
)

def _check_window_bounds(
self, start: np.ndarray, end: np.ndarray, num_vals: int
self, start: np.ndarray, end: np.ndarray, ref: np.ndarray | None, num_vals: int
) -> None:
# emw algorithms are iterative with each point
# ExponentialMovingWindowIndexer "bounds" are the entire window
Expand Down Expand Up @@ -732,11 +732,12 @@ def cov_func(x, y):
if self.min_periods is not None
else window_indexer.window_size
)
start, end = window_indexer.get_window_bounds(
start, end, ref = window_indexer.get_window_bounds(
num_values=len(x_array),
min_periods=min_periods,
center=self.center,
closed=self.closed,
step=self.step,
)
result = window_aggregations.ewmcov(
x_array,
Expand Down Expand Up @@ -798,11 +799,12 @@ def cov_func(x, y):
if self.min_periods is not None
else window_indexer.window_size
)
start, end = window_indexer.get_window_bounds(
start, end, ref = window_indexer.get_window_bounds(
num_values=len(x_array),
min_periods=min_periods,
center=self.center,
closed=self.closed,
step=self.step,
)

def _cov(X, Y):
Expand Down
4 changes: 2 additions & 2 deletions pandas/core/window/numba_.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ def roll_table(
minimum_periods: int,
*args: Any,
):
result = np.empty(values.shape)
min_periods_mask = np.empty(values.shape)
result = np.empty((len(begin), values.shape[1]))
min_periods_mask = np.empty(result.shape)
for i in numba.prange(len(result)):
start = begin[i]
stop = end[i]
Expand Down
Loading