From ff6a89885dcc92c948c27bb03d71ab8a8789d85e Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Mon, 31 Jan 2022 06:05:56 -0500 Subject: [PATCH 1/3] ENH: Rolling window with step size (GH-15354) --- pandas/_libs/window/indexers.pyi | 3 +- pandas/_libs/window/indexers.pyx | 12 +- pandas/core/generic.py | 3 + pandas/core/indexers/objects.py | 91 +++-- pandas/core/window/common.py | 11 +- pandas/core/window/ewm.py | 8 +- pandas/core/window/numba_.py | 4 +- pandas/core/window/rolling.py | 156 ++++++-- pandas/tests/window/conftest.py | 25 ++ .../test_moments_consistency_rolling.py | 96 ++--- pandas/tests/window/test_api.py | 89 ++--- pandas/tests/window/test_apply.py | 68 ++-- pandas/tests/window/test_base_indexer.py | 104 ++++-- pandas/tests/window/test_dtypes.py | 20 +- pandas/tests/window/test_groupby.py | 337 ++++++++++++------ pandas/tests/window/test_numba.py | 54 +-- pandas/tests/window/test_pairwise.py | 48 ++- pandas/tests/window/test_rolling.py | 81 +++-- pandas/tests/window/test_rolling_functions.py | 60 ++-- pandas/tests/window/test_rolling_quantile.py | 20 +- pandas/tests/window/test_rolling_skew_kurt.py | 42 ++- pandas/tests/window/test_win_type.py | 74 ++-- 22 files changed, 913 insertions(+), 493 deletions(-) diff --git a/pandas/_libs/window/indexers.pyi b/pandas/_libs/window/indexers.pyi index c9bc64be34ac9..6d4f85f350c77 100644 --- a/pandas/_libs/window/indexers.pyi +++ b/pandas/_libs/window/indexers.pyi @@ -8,5 +8,6 @@ def calculate_variable_window_bounds( min_periods, center: bool, closed: str | None, + step: int | None, index: np.ndarray, # const int64_t[:] -) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64]]: ... +) -> tuple[npt.NDArray[np.int64], npt.NDArray[np.int64], npt.NDArray[np.int64]]: ... diff --git a/pandas/_libs/window/indexers.pyx b/pandas/_libs/window/indexers.pyx index 4b3a858ade773..64b9cfc6a6979 100644 --- a/pandas/_libs/window/indexers.pyx +++ b/pandas/_libs/window/indexers.pyx @@ -16,6 +16,7 @@ def calculate_variable_window_bounds( object min_periods, # unused but here to match get_window_bounds signature bint center, str closed, + int64_t step, const int64_t[:] index ): """ @@ -38,17 +39,20 @@ def calculate_variable_window_bounds( closed : str string of side of the window that should be closed + step : int64 + Spacing between windows + index : ndarray[int64] time series index to roll over Returns ------- - (ndarray[int64], ndarray[int64]) + (ndarray[int64], ndarray[int64], ndarray[int64]) """ cdef: bint left_closed = False bint right_closed = False - ndarray[int64_t, ndim=1] start, end + ndarray[int64_t, ndim=1] start, end, ref int64_t start_bound, end_bound, index_growth_sign = 1 Py_ssize_t i, j @@ -143,4 +147,6 @@ def calculate_variable_window_bounds( # right endpoint is open if not right_closed and not center: end[i] -= 1 - return start, end + ref = (None if step is None or step == 1 + else np.arange(0, num_values, step, dtype='int64')) + return start[::step], end[::step], ref diff --git a/pandas/core/generic.py b/pandas/core/generic.py index a497475ebd182..b481ad1a90fb7 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -11263,6 +11263,7 @@ def rolling( on: str | None = None, axis: Axis = 0, closed: str | None = None, + step: int | None = None, method: str = "single", ): axis = self._get_axis_number(axis) @@ -11277,6 +11278,7 @@ def rolling( on=on, axis=axis, closed=closed, + step=step, method=method, ) @@ -11289,6 +11291,7 @@ def rolling( on=on, axis=axis, closed=closed, + step=step, method=method, ) diff --git a/pandas/core/indexers/objects.py b/pandas/core/indexers/objects.py index 4d5e4bbe6bd36..8a73646138abf 100644 --- a/pandas/core/indexers/objects.py +++ b/pandas/core/indexers/objects.py @@ -27,13 +27,18 @@ center passed from the top level rolling API closed : str, default None closed passed from the top level rolling API +step : int, default None + step passed from the top level rolling API win_type : str, default None win_type passed from the top level rolling API Returns ------- -A tuple of ndarray[int64]s, indicating the boundaries of each -window +A tuple of ndarray[int64]s: + start : array of start boundaries + end : array of end boundaries + ref : array of window reference locations, or None indicating all + must be None if step is None or 1 """ @@ -55,6 +60,16 @@ def __init__( for key, value in kwargs.items(): setattr(self, key, value) + def _get_default_ref(self, num_values: int = 0, step: int | None = None): + """ + Returns the default window reference locations. + """ + return ( + None + if step is None or step == 1 + else np.arange(0, num_values, step, dtype="int64") + ) + @Appender(get_window_bounds_doc) def get_window_bounds( self, @@ -62,7 +77,8 @@ def get_window_bounds( min_periods: int | None = None, center: bool | None = None, closed: str | None = None, - ) -> tuple[np.ndarray, np.ndarray]: + step: int | None = None, + ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]: raise NotImplementedError @@ -77,14 +93,15 @@ def get_window_bounds( min_periods: int | None = None, center: bool | None = None, closed: str | None = None, - ) -> tuple[np.ndarray, np.ndarray]: + step: int | None = None, + ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]: if center: offset = (self.window_size - 1) // 2 else: offset = 0 - end = np.arange(1 + offset, num_values + 1 + offset, dtype="int64") + end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64") start = end - self.window_size if closed in ["left", "both"]: start -= 1 @@ -94,7 +111,8 @@ def get_window_bounds( end = np.clip(end, 0, num_values) start = np.clip(start, 0, num_values) - return start, end + ref = self._get_default_ref(num_values, step) + return start, end, ref class VariableWindowIndexer(BaseIndexer): @@ -107,7 +125,8 @@ def get_window_bounds( min_periods: int | None = None, center: bool | None = None, closed: str | None = None, - ) -> tuple[np.ndarray, np.ndarray]: + step: int | None = None, + ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]: # error: Argument 4 to "calculate_variable_window_bounds" has incompatible # type "Optional[bool]"; expected "bool" @@ -119,6 +138,7 @@ def get_window_bounds( min_periods, center, # type: ignore[arg-type] closed, + step if step is not None else 1, self.index_array, # type: ignore[arg-type] ) @@ -145,11 +165,14 @@ def get_window_bounds( min_periods: int | None = None, center: bool | None = None, closed: str | None = None, - ) -> tuple[np.ndarray, np.ndarray]: + step: int | None = None, + ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]: # if windows is variable, default is 'right', otherwise default is 'both' if closed is None: closed = "right" if self.index is not None else "both" + if step is None: + step = 1 right_closed = closed in ["right", "both"] left_closed = closed in ["left", "both"] @@ -202,7 +225,8 @@ def get_window_bounds( if not right_closed: end[i] -= 1 - return start, end + ref = self._get_default_ref(num_values, step) + return start[::step], end[::step], ref class ExpandingIndexer(BaseIndexer): @@ -215,12 +239,15 @@ def get_window_bounds( min_periods: int | None = None, center: bool | None = None, closed: str | None = None, - ) -> tuple[np.ndarray, np.ndarray]: + step: int | None = None, + ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]: - return ( - np.zeros(num_values, dtype=np.int64), - np.arange(1, num_values + 1, dtype=np.int64), - ) + if step is None: + step = 1 + end = np.arange(1, num_values + 1, step, dtype=np.int64) + start = np.zeros(len(end), dtype=np.int64) + ref = self._get_default_ref(num_values, step) + return start, end, ref class FixedForwardWindowIndexer(BaseIndexer): @@ -256,7 +283,8 @@ def get_window_bounds( min_periods: int | None = None, center: bool | None = None, closed: str | None = None, - ) -> tuple[np.ndarray, np.ndarray]: + step: int | None = None, + ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]: if center: raise ValueError("Forward-looking windows can't have center=True") @@ -264,13 +292,16 @@ def get_window_bounds( raise ValueError( "Forward-looking windows don't support setting the closed argument" ) + if step is None: + step = 1 - start = np.arange(num_values, dtype="int64") + start = np.arange(num_values, step=step, dtype="int64") end = start + self.window_size if self.window_size: - end[-self.window_size :] = num_values + end = np.clip(end, 0, num_values) - return start, end + ref = self._get_default_ref(num_values, step) + return start, end, ref class GroupbyIndexer(BaseIndexer): @@ -319,12 +350,14 @@ def get_window_bounds( min_periods: int | None = None, center: bool | None = None, closed: str | None = None, - ) -> tuple[np.ndarray, np.ndarray]: + step: int | None = None, + ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]: # 1) For each group, get the indices that belong to the group # 2) Use the indices to calculate the start & end bounds of the window # 3) Append the window bounds in group order start_arrays = [] end_arrays = [] + ref_arrays = [] window_indices_start = 0 for key, indices in self.groupby_indices.items(): index_array: np.ndarray | None @@ -338,11 +371,12 @@ def get_window_bounds( window_size=self.window_size, **self.indexer_kwargs, ) - start, end = indexer.get_window_bounds( - len(indices), min_periods, center, closed + start, end, ref = indexer.get_window_bounds( + len(indices), min_periods, center, closed, step ) start = start.astype(np.int64) end = end.astype(np.int64) + ref = None if ref is None else ref.astype(np.int64) assert len(start) == len( end ), "these should be equal in length from get_window_bounds" @@ -358,9 +392,13 @@ def get_window_bounds( ) start_arrays.append(window_indices.take(ensure_platform_int(start))) end_arrays.append(window_indices.take(ensure_platform_int(end))) + ref_arrays.append( + None if ref is None else window_indices.take(ensure_platform_int(ref)) + ) start = np.concatenate(start_arrays) end = np.concatenate(end_arrays) - return start, end + ref = None if step is None or step == 1 else np.concatenate(ref_arrays) + return start, end, ref class ExponentialMovingWindowIndexer(BaseIndexer): @@ -373,6 +411,11 @@ def get_window_bounds( min_periods: int | None = None, center: bool | None = None, closed: str | None = None, - ) -> tuple[np.ndarray, np.ndarray]: + step: int | None = None, + ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]: - return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64) + return ( + np.array([0], dtype=np.int64), + np.array([num_values], dtype=np.int64), + None, + ) diff --git a/pandas/core/window/common.py b/pandas/core/window/common.py index 15144116fa924..7035a67acaea2 100644 --- a/pandas/core/window/common.py +++ b/pandas/core/window/common.py @@ -22,7 +22,9 @@ def flex_binary_moment(arg1, arg2, f, pairwise=False): from pandas import DataFrame def dataframe_from_int_dict(data, frame_template): - result = DataFrame(data, index=frame_template.index) + result = DataFrame( + data, index=None if len(data) > 0 else frame_template.index + ) if len(result.columns) > 0: result.columns = frame_template.columns[result.columns] return result @@ -42,13 +44,16 @@ def dataframe_from_int_dict(data, frame_template): raise ValueError("'arg2' columns are not unique") X, Y = arg1.align(arg2, join="outer") X, Y = prep_binary(X, Y) + result_index = X.index res_columns = arg1.columns.union(arg2.columns) for col in res_columns: if col in X and col in Y: results[col] = f(X[col], Y[col]) - return DataFrame(results, index=X.index, columns=res_columns) + result_index = results[col].index + return DataFrame(results, index=result_index, columns=res_columns) elif pairwise is True: results = defaultdict(dict) + result_index = arg1.index.union(arg2.index) for i in range(len(arg1.columns)): for j in range(len(arg2.columns)): if j < i and arg2 is arg1: @@ -58,10 +63,10 @@ def dataframe_from_int_dict(data, frame_template): results[i][j] = f( *prep_binary(arg1.iloc[:, i], arg2.iloc[:, j]) ) + result_index = results[i][j].index from pandas import concat - result_index = arg1.index.union(arg2.index) if len(result_index): # construct result frame diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 4bebc56273805..1b24d214d2356 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -418,7 +418,7 @@ def __init__( ) def _check_window_bounds( - self, start: np.ndarray, end: np.ndarray, num_vals: int + self, start: np.ndarray, end: np.ndarray, ref: np.ndarray, num_vals: int ) -> None: # emw algorithms are iterative with each point # ExponentialMovingWindowIndexer "bounds" are the entire window @@ -732,11 +732,12 @@ def cov_func(x, y): if self.min_periods is not None else window_indexer.window_size ) - start, end = window_indexer.get_window_bounds( + start, end, ref = window_indexer.get_window_bounds( num_values=len(x_array), min_periods=min_periods, center=self.center, closed=self.closed, + step=self.step, ) result = window_aggregations.ewmcov( x_array, @@ -798,11 +799,12 @@ def cov_func(x, y): if self.min_periods is not None else window_indexer.window_size ) - start, end = window_indexer.get_window_bounds( + start, end, ref = window_indexer.get_window_bounds( num_values=len(x_array), min_periods=min_periods, center=self.center, closed=self.closed, + step=self.step, ) def _cov(X, Y): diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index 0e8eea3ec671e..4a0f0b33fb063 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -234,8 +234,8 @@ def roll_table( minimum_periods: int, *args: Any, ): - result = np.empty(values.shape) - min_periods_mask = np.empty(values.shape) + result = np.empty((len(begin), values.shape[1])) + min_periods_mask = np.empty(result.shape) for i in numba.prange(len(result)): start = begin[i] stop = end[i] diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index bbd0181e47401..1d13475d6db95 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -126,6 +126,7 @@ def __init__( axis: Axis = 0, on: str | Index | None = None, closed: str | None = None, + step: int | None = None, method: str = "single", *, selection=None, @@ -133,6 +134,7 @@ def __init__( self.obj = obj self.on = on self.closed = closed + self.step = step self.window = window self.min_periods = min_periods self.center = center @@ -228,19 +230,38 @@ def _validate(self) -> None: raise ValueError("method must be 'table' or 'single") def _check_window_bounds( - self, start: np.ndarray, end: np.ndarray, num_vals: int + self, start: np.ndarray, end: np.ndarray, ref: np.ndarray, num_vals: int ) -> None: if len(start) != len(end): raise ValueError( f"start ({len(start)}) and end ({len(end)}) bounds must be the " f"same length" ) - elif len(start) != num_vals: + if ref is not None and len(start) != len(ref): + raise ValueError( + f"start ({len(start)}) and ref ({len(ref)}) arrays must be the " + f"same length" + ) + elif not isinstance(self._get_window_indexer(), GroupbyIndexer) and len( + start + ) != (num_vals + (self.step or 1) - 1) // (self.step or 1): raise ValueError( f"start and end bounds ({len(start)}) must be the same length " - f"as the object ({num_vals})" + f"as the object ({num_vals}) divided by the step ({self.step}) " + f"if given and rounded up unless groupby was used" ) + def _slice_index(self, index: Index, at: np.ndarray) -> Index: + """ + Slices the index of the object. + """ + if at is None: + return index + result = index[at] + if isinstance(index, DatetimeIndex): + result.freq = None if index.freq is None else index.freq * (self.step or 1) + return result + def _create_data(self, obj: NDFrameT) -> NDFrameT: """ Split data into blocks & return conformed data. @@ -319,13 +340,14 @@ def __iter__(self): obj = self._create_data(obj) indexer = self._get_window_indexer() - start, end = indexer.get_window_bounds( + start, end, ref = indexer.get_window_bounds( num_values=len(obj), min_periods=self.min_periods, center=self.center, closed=self.closed, + step=self.step, ) - self._check_window_bounds(start, end, len(obj)) + self._check_window_bounds(start, end, ref, len(obj)) for s, e in zip(start, end): result = obj.iloc[slice(s, e)] @@ -428,8 +450,11 @@ def _apply_series( except (TypeError, NotImplementedError) as err: raise DataError("No numeric types to aggregate") from err - result = homogeneous_func(values) - return obj._constructor(result, index=obj.index, name=obj.name) + result, ref = homogeneous_func(values), None + if type(result) is tuple: + result, ref = result + index = self._slice_index(obj.index, ref) + return obj._constructor(result, index=index, name=obj.name) def _apply_blockwise( self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None @@ -455,20 +480,30 @@ def hfunc(values: ArrayLike) -> ArrayLike: obj = obj.T taker = [] - res_values = [] + res_values: list[Any] = [] + ref_value = None for i, arr in enumerate(obj._iter_column_arrays()): # GH#42736 operate column-wise instead of block-wise try: - res = hfunc(arr) + res, ref = hfunc(arr), None except (TypeError, NotImplementedError): pass else: + if type(res) is tuple: + res, ref = res + if len(res_values) == 0: + ref_value = ref + elif not ( + (ref_value is None and ref is None) + or np.array_equal(ref_value, ref) + ): + raise ValueError("hfunc returned inconsistent ref value") res_values.append(res) taker.append(i) df = type(obj)._from_arrays( res_values, - index=obj.index, + index=self._slice_index(obj.index, ref_value), columns=obj.columns.take(taker), verify_integrity=False, ) @@ -501,9 +536,13 @@ def _apply_tablewise( obj = self._create_data(self._selected_obj) values = self._prep_values(obj.to_numpy()) values = values.T if self.axis == 1 else values - result = homogeneous_func(values) + result, ref = homogeneous_func(values), None + if type(result) is tuple: + result, ref = result result = result.T if self.axis == 1 else result - out = obj._constructor(result, index=obj.index, columns=obj.columns) + index = obj.index if self.axis == 1 else self._slice_index(obj.index, ref) + columns = obj.columns if self.axis != 1 else self._slice_index(obj.columns, ref) + out = obj._constructor(result, index=index, columns=columns) return self._resolve_output(out, obj) @@ -565,18 +604,19 @@ def homogeneous_func(values: np.ndarray): # calculation function if values.size == 0: - return values.copy() + return values.copy(), np.array([], dtype=np.int64) def calc(x): - start, end = window_indexer.get_window_bounds( + start, end, ref = window_indexer.get_window_bounds( num_values=len(x), min_periods=min_periods, center=self.center, closed=self.closed, + step=self.step, ) - self._check_window_bounds(start, end, len(x)) + self._check_window_bounds(start, end, ref, len(x)) - return func(x, start, end, min_periods, *numba_args) + return func(x, start, end, min_periods, *numba_args), ref with np.errstate(all="ignore"): result = calc(values) @@ -610,25 +650,30 @@ def _numba_apply( values = self._prep_values(obj.to_numpy()) if values.ndim == 1: values = values.reshape(-1, 1) - start, end = window_indexer.get_window_bounds( + start, end, ref = window_indexer.get_window_bounds( num_values=len(values), min_periods=min_periods, center=self.center, closed=self.closed, + step=self.step, ) - self._check_window_bounds(start, end, len(values)) + self._check_window_bounds(start, end, ref, len(values)) aggregator = executor.generate_shared_aggregator( func, engine_kwargs, numba_cache_key_str ) result = aggregator(values, start, end, min_periods, *func_args) NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator result = result.T if self.axis == 1 else result + index = obj.index if self.axis == 1 else self._slice_index(obj.index, ref) if obj.ndim == 1: result = result.squeeze() - out = obj._constructor(result, index=obj.index, name=obj.name) + out = obj._constructor(result, index=index, name=obj.name) return out else: - out = obj._constructor(result, index=obj.index, columns=obj.columns) + columns = ( + obj.columns if self.axis != 1 else self._slice_index(obj.columns, ref) + ) + out = obj._constructor(result, index=index, columns=columns) return self._resolve_output(out, obj) def aggregate(self, func, *args, **kwargs): @@ -707,7 +752,7 @@ def _apply( group_indices = self._grouper.indices.values() if group_indices: - indexer = np.concatenate(list(group_indices)) + indexer = np.concatenate([ind[:: self.step] for ind in group_indices]) else: indexer = np.array([], dtype=np.intp) codes = [c.take(indexer) for c in codes] @@ -730,6 +775,14 @@ def _apply( result = result.reset_index(level=list(range(len(groupby_keys)))) return result + def _adjust_pairwise_result(self, result): + return concat( + [ + result.take(gb_indices[:: self.step]).reindex(result.index) + for gb_indices in self._grouper.indices.values() + ] + ) + def _apply_pairwise( self, target: DataFrame | Series, @@ -753,12 +806,7 @@ def _apply_pairwise( # from flex_binary_moment to a "transform"-like result # per groupby combination old_result_len = len(result) - result = concat( - [ - result.take(gb_indices).reindex(result.index) - for gb_indices in self._grouper.indices.values() - ] - ) + result = self._adjust_pairwise_result(result) gb_pairs = ( com.maybe_make_list(pair) for pair in self._grouper.indices.keys() @@ -781,7 +829,7 @@ def _apply_pairwise( group_indices = self._grouper.indices.values() if group_indices: - indexer = np.concatenate(list(group_indices)) + indexer = np.concatenate([ind[:: self.step] for ind in group_indices]) else: indexer = np.array([], dtype=np.intp) @@ -864,8 +912,8 @@ class Window(BaseWindow): If a BaseIndexer subclass, the window boundaries based on the defined ``get_window_bounds`` method. Additional rolling - keyword arguments, namely ``min_periods``, ``center``, and - ``closed`` will be passed to ``get_window_bounds``. + keyword arguments, namely ``min_periods``, ``center``, ``closed`` and + ``step`` will be passed to ``get_window_bounds``. min_periods : int, default None Minimum number of observations in window required to have a value; @@ -1059,6 +1107,7 @@ class Window(BaseWindow): "axis", "on", "closed", + "step", "method", ] @@ -1151,7 +1200,7 @@ def calc(x): return result - return self._apply_blockwise(homogeneous_func, name) + return self._apply_blockwise(homogeneous_func, name)[:: self.step] @doc( _shared_docs["aggregate"], @@ -1590,13 +1639,14 @@ def cov_func(x, y): if self.min_periods is not None else window_indexer.window_size ) - start, end = window_indexer.get_window_bounds( + start, end, ref = window_indexer.get_window_bounds( num_values=len(x_array), min_periods=min_periods, center=self.center, closed=self.closed, + step=self.step, ) - self._check_window_bounds(start, end, len(x_array)) + self._check_window_bounds(start, end, ref, len(x_array)) with np.errstate(all="ignore"): mean_x_y = window_aggregations.roll_mean( @@ -1608,7 +1658,7 @@ def cov_func(x, y): notna(x_array + y_array).astype(np.float64), start, end, 0 ) result = (mean_x_y - mean_x * mean_y) * (count_x_y / (count_x_y - ddof)) - return Series(result, index=x.index, name=x.name) + return Series(result, index=self._slice_index(x.index, ref), name=x.name) return self._apply_pairwise(self._selected_obj, other, pairwise, cov_func) @@ -1631,13 +1681,14 @@ def corr_func(x, y): if self.min_periods is not None else window_indexer.window_size ) - start, end = window_indexer.get_window_bounds( + start, end, ref = window_indexer.get_window_bounds( num_values=len(x_array), min_periods=min_periods, center=self.center, closed=self.closed, + step=self.step, ) - self._check_window_bounds(start, end, len(x_array)) + self._check_window_bounds(start, end, ref, len(x_array)) with np.errstate(all="ignore"): mean_x_y = window_aggregations.roll_mean( @@ -1659,7 +1710,7 @@ def corr_func(x, y): ) denominator = (x_var * y_var) ** 0.5 result = numerator / denominator - return Series(result, index=x.index, name=x.name) + return Series(result, index=self._slice_index(x.index, ref), name=x.name) return self._apply_pairwise(self._selected_obj, other, pairwise, corr_func) @@ -1674,6 +1725,7 @@ class Rolling(RollingAndExpandingMixin): "axis", "on", "closed", + "step", "method", ] @@ -2597,6 +2649,17 @@ class RollingGroupby(BaseWindowGroupby, Rolling): _attributes = Rolling._attributes + BaseWindowGroupby._attributes + def _slice_index(self, index: Index, at: np.ndarray) -> Index: + """ + Slices the index of the object. + """ + if at is None: + return index + result = index[at] + if isinstance(index, DatetimeIndex): + result.freq = None + return result + def _get_window_indexer(self) -> GroupbyIndexer: """ Return an indexer class that will compute the window start and end bounds @@ -2639,3 +2702,22 @@ def _validate_monotonic(self): or self._on.hasnans ): self._raise_monotonic_error() + + def _adjust_pairwise_result(self, result): + gb_lens = np.array( + [ + len(gb_indices[:: self.step]) + for gb_indices in self._grouper.indices.values() + ], + dtype=np.int64, + ) + gb_ends = np.cumsum(gb_lens) + gb_starts = np.hstack((0, gb_ends[:-1])) if len(gb_ends) > 0 else gb_ends + return concat( + [ + result.take( + np.arange(gb_starts[i], gb_ends[i], dtype=np.int64) + ).reindex(result.index) + for i in range(len(gb_ends)) + ] + ) diff --git a/pandas/tests/window/conftest.py b/pandas/tests/window/conftest.py index f2832652ed58f..715e4bcf6feaf 100644 --- a/pandas/tests/window/conftest.py +++ b/pandas/tests/window/conftest.py @@ -126,3 +126,28 @@ def frame(): index=bdate_range(datetime(2009, 1, 1), periods=100), columns=np.arange(10), ) + + +@pytest.fixture(params=[None, 1, 2, 5, 10]) +def step(request): + """step keyword argument for rolling window operations.""" + return request.param + + +@pytest.fixture +def step_methods(): + """Make a step-argument helper as fixture.""" + + class StepMethods: + @staticmethod + def get_selected_indices(step, group_keys): + """Return step-selected indices within groups.""" + step = step or 1 + group_ind = {} + for i, key in enumerate(group_keys): + group_ind.setdefault(key, []).append(i) + return sorted( + ind[j] for ind in group_ind.values() for j in range(0, len(ind), step) + ) + + return StepMethods diff --git a/pandas/tests/window/moments/test_moments_consistency_rolling.py b/pandas/tests/window/moments/test_moments_consistency_rolling.py index 62bfc66b124f3..66d9737fc54a1 100644 --- a/pandas/tests/window/moments/test_moments_consistency_rolling.py +++ b/pandas/tests/window/moments/test_moments_consistency_rolling.py @@ -42,22 +42,24 @@ def test_rolling_apply_consistency_sum( @pytest.mark.parametrize("ddof", [0, 1]) -def test_moments_consistency_var(all_data, rolling_consistency_cases, center, ddof): +def test_moments_consistency_var( + all_data, rolling_consistency_cases, center, ddof, step +): window, min_periods = rolling_consistency_cases - var_x = all_data.rolling(window=window, min_periods=min_periods, center=center).var( - ddof=ddof - ) + var_x = all_data.rolling( + window=window, min_periods=min_periods, center=center, step=step + ).var(ddof=ddof) assert not (var_x < 0).any().any() if ddof == 0: # check that biased var(x) == mean(x^2) - mean(x)^2 mean_x = all_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).mean() mean_x2 = ( (all_data * all_data) - .rolling(window=window, min_periods=min_periods, center=center) + .rolling(window=window, min_periods=min_periods, center=center, step=step) .mean() ) tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x)) @@ -65,20 +67,20 @@ def test_moments_consistency_var(all_data, rolling_consistency_cases, center, dd @pytest.mark.parametrize("ddof", [0, 1]) def test_moments_consistency_var_constant( - consistent_data, rolling_consistency_cases, center, ddof + consistent_data, rolling_consistency_cases, center, ddof, step ): window, min_periods = rolling_consistency_cases count_x = consistent_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).count() var_x = consistent_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).var(ddof=ddof) # check that variance of constant series is identically 0 assert not (var_x > 0).any().any() - expected = consistent_data * np.nan + expected = consistent_data[::step] * np.nan expected[count_x >= max(min_periods, 1)] = 0.0 if ddof == 1: expected[count_x < 2] = np.nan @@ -87,51 +89,53 @@ def test_moments_consistency_var_constant( @pytest.mark.parametrize("ddof", [0, 1]) def test_rolling_consistency_var_std_cov( - all_data, rolling_consistency_cases, center, ddof + all_data, rolling_consistency_cases, center, ddof, step ): window, min_periods = rolling_consistency_cases - var_x = all_data.rolling(window=window, min_periods=min_periods, center=center).var( - ddof=ddof - ) + var_x = all_data.rolling( + window=window, min_periods=min_periods, center=center, step=step + ).var(ddof=ddof) assert not (var_x < 0).any().any() - std_x = all_data.rolling(window=window, min_periods=min_periods, center=center).std( - ddof=ddof - ) + std_x = all_data.rolling( + window=window, min_periods=min_periods, center=center, step=step + ).std(ddof=ddof) assert not (std_x < 0).any().any() # check that var(x) == std(x)^2 tm.assert_equal(var_x, std_x * std_x) cov_x_x = all_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).cov(all_data, ddof=ddof) assert not (cov_x_x < 0).any().any() # check that var(x) == cov(x, x) + if step not in [None, 1]: + var_x = var_x.reindex(index=cov_x_x.index) tm.assert_equal(var_x, cov_x_x) @pytest.mark.parametrize("ddof", [0, 1]) def test_rolling_consistency_series_cov_corr( - series_data, rolling_consistency_cases, center, ddof + series_data, rolling_consistency_cases, center, ddof, step ): window, min_periods = rolling_consistency_cases var_x_plus_y = ( (series_data + series_data) - .rolling(window=window, min_periods=min_periods, center=center) + .rolling(window=window, min_periods=min_periods, center=center, step=step) .var(ddof=ddof) ) var_x = series_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).var(ddof=ddof) var_y = series_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).var(ddof=ddof) cov_x_y = series_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).cov(series_data, ddof=ddof) # check that cov(x, y) == (var(x+y) - var(x) - # var(y)) / 2 @@ -140,13 +144,13 @@ def test_rolling_consistency_series_cov_corr( # check that corr(x, y) == cov(x, y) / (std(x) * # std(y)) corr_x_y = series_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).corr(series_data) std_x = series_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).std(ddof=ddof) std_y = series_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).std(ddof=ddof) tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y)) @@ -154,31 +158,33 @@ def test_rolling_consistency_series_cov_corr( # check that biased cov(x, y) == mean(x*y) - # mean(x)*mean(y) mean_x = series_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).mean() mean_y = series_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).mean() mean_x_times_y = ( (series_data * series_data) - .rolling(window=window, min_periods=min_periods, center=center) + .rolling(window=window, min_periods=min_periods, center=center, step=step) .mean() ) tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y)) -def test_rolling_consistency_mean(all_data, rolling_consistency_cases, center): +def test_rolling_consistency_mean(all_data, rolling_consistency_cases, center, step): window, min_periods = rolling_consistency_cases result = all_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).mean() expected = ( - all_data.rolling(window=window, min_periods=min_periods, center=center) + all_data.rolling( + window=window, min_periods=min_periods, center=center, step=step + ) .sum() .divide( all_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).count() ) ) @@ -186,19 +192,19 @@ def test_rolling_consistency_mean(all_data, rolling_consistency_cases, center): def test_rolling_consistency_constant( - consistent_data, rolling_consistency_cases, center + consistent_data, rolling_consistency_cases, center, step ): window, min_periods = rolling_consistency_cases count_x = consistent_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).count() mean_x = consistent_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).mean() # check that correlation of a series with itself is either 1 or NaN corr_x_x = consistent_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).corr(consistent_data) exp = ( @@ -208,34 +214,38 @@ def test_rolling_consistency_constant( ) # check mean of constant series - expected = consistent_data * np.nan + expected = consistent_data[::step] * np.nan expected[count_x >= max(min_periods, 1)] = exp tm.assert_equal(mean_x, expected) # check correlation of constant series with itself is NaN + if step not in [None, 1]: + expected = expected.reindex(index=corr_x_x.index) expected[:] = np.nan tm.assert_equal(corr_x_x, expected) def test_rolling_consistency_var_debiasing_factors( - all_data, rolling_consistency_cases, center + all_data, rolling_consistency_cases, center, step ): window, min_periods = rolling_consistency_cases # check variance debiasing factors var_unbiased_x = all_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).var() var_biased_x = all_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).var(ddof=0) var_debiasing_factors_x = ( - all_data.rolling(window=window, min_periods=min_periods, center=center) + all_data.rolling( + window=window, min_periods=min_periods, center=center, step=step + ) .count() .divide( ( all_data.rolling( - window=window, min_periods=min_periods, center=center + window=window, min_periods=min_periods, center=center, step=step ).count() - 1.0 ).replace(0.0, np.nan) diff --git a/pandas/tests/window/test_api.py b/pandas/tests/window/test_api.py index f84a579247630..6dbcc8dfd00c0 100644 --- a/pandas/tests/window/test_api.py +++ b/pandas/tests/window/test_api.py @@ -16,20 +16,20 @@ from pandas.core.base import SpecificationError -def test_getitem(): +def test_getitem(step): frame = DataFrame(np.random.randn(5, 5)) - r = frame.rolling(window=5) - tm.assert_index_equal(r._selected_obj.columns, frame.columns) + r = frame.rolling(window=5, step=step) + tm.assert_index_equal(r._selected_obj.columns, frame[::step].columns) - r = frame.rolling(window=5)[1] - assert r._selected_obj.name == frame.columns[1] + r = frame.rolling(window=5, step=step)[1] + assert r._selected_obj.name == frame[::step].columns[1] # technically this is allowed - r = frame.rolling(window=5)[1, 3] - tm.assert_index_equal(r._selected_obj.columns, frame.columns[[1, 3]]) + r = frame.rolling(window=5, step=step)[1, 3] + tm.assert_index_equal(r._selected_obj.columns, frame[::step].columns[[1, 3]]) - r = frame.rolling(window=5)[[1, 3]] - tm.assert_index_equal(r._selected_obj.columns, frame.columns[[1, 3]]) + r = frame.rolling(window=5, step=step)[[1, 3]] + tm.assert_index_equal(r._selected_obj.columns, frame[::step].columns[[1, 3]]) def test_select_bad_cols(): @@ -53,21 +53,21 @@ def test_attribute_access(): r.F -def tests_skip_nuisance(): +def tests_skip_nuisance(step): df = DataFrame({"A": range(5), "B": range(5, 10), "C": "foo"}) - r = df.rolling(window=3) + r = df.rolling(window=3, step=step) result = r[["A", "B"]].sum() expected = DataFrame( {"A": [np.nan, np.nan, 3, 6, 9], "B": [np.nan, np.nan, 18, 21, 24]}, columns=list("AB"), - ) + )[::step] tm.assert_frame_equal(result, expected) -def test_skip_sum_object_raises(): +def test_skip_sum_object_raises(step): df = DataFrame({"A": range(5), "B": range(5, 10), "C": "foo"}) - r = df.rolling(window=3) + r = df.rolling(window=3, step=step) msg = r"nuisance columns.*Dropped columns were Index\(\['C'\], dtype='object'\)" with tm.assert_produces_warning(FutureWarning, match=msg): # GH#42738 @@ -75,14 +75,14 @@ def test_skip_sum_object_raises(): expected = DataFrame( {"A": [np.nan, np.nan, 3, 6, 9], "B": [np.nan, np.nan, 18, 21, 24]}, columns=list("AB"), - ) + )[::step] tm.assert_frame_equal(result, expected) -def test_agg(): +def test_agg(step): df = DataFrame({"A": range(5), "B": range(0, 10, 2)}) - r = df.rolling(window=3) + r = df.rolling(window=3, step=step) a_mean = r["A"].mean() a_std = r["A"].std() a_sum = r["A"].sum() @@ -141,10 +141,10 @@ def test_agg_apply(raw): tm.assert_frame_equal(result, expected, check_like=True) -def test_agg_consistency(): +def test_agg_consistency(step): df = DataFrame({"A": range(5), "B": range(0, 10, 2)}) - r = df.rolling(window=3) + r = df.rolling(window=3, step=step) result = r.agg([np.sum, np.mean]).columns expected = MultiIndex.from_product([list("AB"), ["sum", "mean"]]) @@ -182,7 +182,7 @@ def test_agg_nested_dicts(): r.agg({"A": {"ra": ["mean", "std"]}, "B": {"rb": ["mean", "std"]}}) -def test_count_nonnumeric_types(): +def test_count_nonnumeric_types(step): # GH12541 cols = [ "int", @@ -239,13 +239,13 @@ def test_count_nonnumeric_types(): "periods_nat": [1.0, 2.0, 1.0], }, columns=cols, - ) + )[::step] - result = df.rolling(window=2, min_periods=0).count() + result = df.rolling(window=2, min_periods=0, step=step).count() tm.assert_frame_equal(result, expected) - result = df.rolling(1, min_periods=0).count() - expected = df.notna().astype(float) + result = df.rolling(1, min_periods=0, step=step).count() + expected = df.notna().astype(float)[::step] tm.assert_frame_equal(result, expected) @@ -339,11 +339,11 @@ def test_validate_deprecated(): @pytest.mark.filterwarnings("ignore:min_periods:FutureWarning") def test_dont_modify_attributes_after_methods( - arithmetic_win_operators, closed, center, min_periods + arithmetic_win_operators, closed, center, min_periods, step ): # GH 39554 roll_obj = Series(range(1)).rolling( - 1, center=center, closed=closed, min_periods=min_periods + 1, center=center, closed=closed, min_periods=min_periods, step=step ) expected = {attr: getattr(roll_obj, attr) for attr in roll_obj._attributes} getattr(roll_obj, arithmetic_win_operators)() @@ -351,40 +351,49 @@ def test_dont_modify_attributes_after_methods( assert result == expected -def test_centered_axis_validation(): +def test_centered_axis_validation(step): # ok - Series(np.ones(10)).rolling(window=3, center=True, axis=0).mean() + Series(np.ones(10)).rolling(window=3, center=True, axis=0, step=step).mean() # bad axis msg = "No axis named 1 for object type Series" with pytest.raises(ValueError, match=msg): - Series(np.ones(10)).rolling(window=3, center=True, axis=1).mean() + Series(np.ones(10)).rolling(window=3, center=True, axis=1, step=step).mean() # ok ok - DataFrame(np.ones((10, 10))).rolling(window=3, center=True, axis=0).mean() - DataFrame(np.ones((10, 10))).rolling(window=3, center=True, axis=1).mean() + DataFrame(np.ones((10, 10))).rolling( + window=3, center=True, axis=0, step=step + ).mean() + DataFrame(np.ones((10, 10))).rolling( + window=3, center=True, axis=1, step=step + ).mean() # bad axis msg = "No axis named 2 for object type DataFrame" with pytest.raises(ValueError, match=msg): - (DataFrame(np.ones((10, 10))).rolling(window=3, center=True, axis=2).mean()) + ( + DataFrame(np.ones((10, 10))) + .rolling(window=3, center=True, axis=2, step=step) + .mean() + ) -def test_rolling_min_min_periods(): +def test_rolling_min_min_periods(step): a = Series([1, 2, 3, 4, 5]) - result = a.rolling(window=100, min_periods=1).min() - expected = Series(np.ones(len(a))) + result = a.rolling(window=100, min_periods=1, step=step).min() + expected = Series(np.ones(len(a)))[::step] tm.assert_series_equal(result, expected) msg = "min_periods 5 must be <= window 3" with pytest.raises(ValueError, match=msg): - Series([1, 2, 3]).rolling(window=3, min_periods=5).min() + Series([1, 2, 3]).rolling(window=3, min_periods=5, step=step).min() -def test_rolling_max_min_periods(): +def test_rolling_max_min_periods(step): a = Series([1, 2, 3, 4, 5], dtype=np.float64) - b = a.rolling(window=100, min_periods=1).max() - tm.assert_almost_equal(a, b) + result = a.rolling(window=100, min_periods=1, step=step).max() + expected = a[::step] + tm.assert_almost_equal(result, expected) msg = "min_periods 5 must be <= window 3" with pytest.raises(ValueError, match=msg): - Series([1, 2, 3]).rolling(window=3, min_periods=5).max() + Series([1, 2, 3]).rolling(window=3, min_periods=5, step=step).max() diff --git a/pandas/tests/window/test_apply.py b/pandas/tests/window/test_apply.py index 8e690a677aa98..20c1c55410603 100644 --- a/pandas/tests/window/test_apply.py +++ b/pandas/tests/window/test_apply.py @@ -52,7 +52,7 @@ def test_rolling_apply_out_of_bounds(engine_and_raw): @pytest.mark.parametrize("window", [2, "2s"]) -def test_rolling_apply_with_pandas_objects(window): +def test_rolling_apply_with_pandas_objects(window, step): # 5071 df = DataFrame( {"A": np.random.randn(5), "B": np.random.randint(0, 10, size=5)}, @@ -66,32 +66,36 @@ def f(x): return np.nan return x.iloc[-1] - result = df.rolling(window).apply(f, raw=False) - expected = df.iloc[2:].reindex_like(df) + result = df.rolling(window, step=step).apply(f, raw=False) + expected = df.iloc[2:].reindex_like(df)[::step] tm.assert_frame_equal(result, expected) with tm.external_error_raised(AttributeError): df.rolling(window).apply(f, raw=True) -def test_rolling_apply(engine_and_raw): +def test_rolling_apply(engine_and_raw, step): engine, raw = engine_and_raw expected = Series([], dtype="float64") - result = expected.rolling(10).apply(lambda x: x.mean(), engine=engine, raw=raw) + result = expected.rolling(10, step=step).apply( + lambda x: x.mean(), engine=engine, raw=raw + ) tm.assert_series_equal(result, expected) # gh-8080 s = Series([None, None, None]) - result = s.rolling(2, min_periods=0).apply(lambda x: len(x), engine=engine, raw=raw) - expected = Series([1.0, 2.0, 2.0]) + result = s.rolling(2, min_periods=0, step=step).apply( + lambda x: len(x), engine=engine, raw=raw + ) + expected = Series([1.0, 2.0, 2.0])[::step] tm.assert_series_equal(result, expected) - result = s.rolling(2, min_periods=0).apply(len, engine=engine, raw=raw) + result = s.rolling(2, min_periods=0, step=step).apply(len, engine=engine, raw=raw) tm.assert_series_equal(result, expected) -def test_all_apply(engine_and_raw): +def test_all_apply(engine_and_raw, step): engine, raw = engine_and_raw df = ( @@ -100,15 +104,15 @@ def test_all_apply(engine_and_raw): ).set_index("A") * 2 ) - er = df.rolling(window=1) - r = df.rolling(window="1s") + er = df.rolling(window=1, step=step) + r = df.rolling(window="1s", step=step) result = r.apply(lambda x: 1, engine=engine, raw=raw) expected = er.apply(lambda x: 1, engine=engine, raw=raw) tm.assert_frame_equal(result, expected) -def test_ragged_apply(engine_and_raw): +def test_ragged_apply(engine_and_raw, step): engine, raw = engine_and_raw df = DataFrame({"B": range(5)}) @@ -121,18 +125,24 @@ def test_ragged_apply(engine_and_raw): ] f = lambda x: 1 - result = df.rolling(window="1s", min_periods=1).apply(f, engine=engine, raw=raw) - expected = df.copy() + result = df.rolling(window="1s", min_periods=1, step=step).apply( + f, engine=engine, raw=raw + ) + expected = df.copy()[::step] expected["B"] = 1.0 tm.assert_frame_equal(result, expected) - result = df.rolling(window="2s", min_periods=1).apply(f, engine=engine, raw=raw) - expected = df.copy() + result = df.rolling(window="2s", min_periods=1, step=step).apply( + f, engine=engine, raw=raw + ) + expected = df.copy()[::step] expected["B"] = 1.0 tm.assert_frame_equal(result, expected) - result = df.rolling(window="5s", min_periods=1).apply(f, engine=engine, raw=raw) - expected = df.copy() + result = df.rolling(window="5s", min_periods=1, step=step).apply( + f, engine=engine, raw=raw + ) + expected = df.copy()[::step] expected["B"] = 1.0 tm.assert_frame_equal(result, expected) @@ -157,7 +167,7 @@ def test_invalid_raw_numba(): @pytest.mark.parametrize("args_kwargs", [[None, {"par": 10}], [(10,), None]]) -def test_rolling_apply_args_kwargs(args_kwargs): +def test_rolling_apply_args_kwargs(args_kwargs, step): # GH 33433 def foo(x, par): return np.sum(x + par) @@ -165,15 +175,17 @@ def foo(x, par): df = DataFrame({"gr": [1, 1], "a": [1, 2]}) idx = Index(["gr", "a"]) - expected = DataFrame([[11.0, 11.0], [11.0, 12.0]], columns=idx) + expected = DataFrame([[11.0, 11.0], [11.0, 12.0]], columns=idx)[::step] - result = df.rolling(1).apply(foo, args=args_kwargs[0], kwargs=args_kwargs[1]) + result = df.rolling(1, step=step).apply( + foo, args=args_kwargs[0], kwargs=args_kwargs[1] + ) tm.assert_frame_equal(result, expected) midx = MultiIndex.from_tuples([(1, 0), (1, 1)], names=["gr", None]) - expected = Series([11.0, 12.0], index=midx, name="a") + expected = Series([11.0, 12.0], index=midx, name="a")[::step] - gb_rolling = df.groupby("gr")["a"].rolling(1) + gb_rolling = df.groupby("gr")["a"].rolling(1, step=step) result = gb_rolling.apply(foo, args=args_kwargs[0], kwargs=args_kwargs[1]) tm.assert_series_equal(result, expected) @@ -266,9 +278,13 @@ def test_time_rule_frame(raw, frame): @pytest.mark.parametrize("minp", [0, 99, 100]) -def test_min_periods(raw, series, minp): - result = series.rolling(len(series) + 1, min_periods=minp).apply(f, raw=raw) - expected = series.rolling(len(series), min_periods=minp).apply(f, raw=raw) +def test_min_periods(raw, series, minp, step): + result = series.rolling(len(series) + 1, min_periods=minp, step=step).apply( + f, raw=raw + ) + expected = series.rolling(len(series), min_periods=minp, step=step).apply( + f, raw=raw + ) nan_mask = isna(result) tm.assert_series_equal(nan_mask, isna(expected)) diff --git a/pandas/tests/window/test_base_indexer.py b/pandas/tests/window/test_base_indexer.py index 5593aa8351c69..1a29427c1d5f6 100644 --- a/pandas/tests/window/test_base_indexer.py +++ b/pandas/tests/window/test_base_indexer.py @@ -46,7 +46,7 @@ def test_indexer_constructor_arg(): df = DataFrame({"values": range(5)}) class CustomIndexer(BaseIndexer): - def get_window_bounds(self, num_values, min_periods, center, closed): + def get_window_bounds(self, num_values, min_periods, center, closed, step): start = np.empty(num_values, dtype=np.int64) end = np.empty(num_values, dtype=np.int64) for i in range(num_values): @@ -56,7 +56,8 @@ def get_window_bounds(self, num_values, min_periods, center, closed): else: start[i] = i end[i] = i + self.window_size - return start, end + ref = np.arange(0, num_values, dtype=np.int64) + return start, end, ref indexer = CustomIndexer(window_size=1, use_expanding=use_expanding) result = df.rolling(indexer).sum() @@ -68,20 +69,29 @@ def test_indexer_accepts_rolling_args(): df = DataFrame({"values": range(5)}) class CustomIndexer(BaseIndexer): - def get_window_bounds(self, num_values, min_periods, center, closed): + def get_window_bounds(self, num_values, min_periods, center, closed, step): start = np.empty(num_values, dtype=np.int64) end = np.empty(num_values, dtype=np.int64) for i in range(num_values): - if center and min_periods == 1 and closed == "both" and i == 2: + if ( + center + and min_periods == 1 + and closed == "both" + and step == 1 + and i == 2 + ): start[i] = 0 end[i] = num_values else: start[i] = i end[i] = i + self.window_size - return start, end + ref = np.arange(0, num_values, dtype=np.int64) + return start, end, ref indexer = CustomIndexer(window_size=1) - result = df.rolling(indexer, center=True, min_periods=1, closed="both").sum() + result = df.rolling( + indexer, center=True, min_periods=1, closed="both", step=1 + ).sum() expected = DataFrame({"values": [0.0, 1.0, 10.0, 3.0, 4.0]}) tm.assert_frame_equal(result, expected) @@ -141,7 +151,7 @@ def get_window_bounds(self, num_values, min_periods, center, closed): ], ) @pytest.mark.filterwarnings("ignore:min_periods:FutureWarning") -def test_rolling_forward_window(constructor, func, np_func, expected, np_kwargs): +def test_rolling_forward_window(constructor, func, np_func, expected, np_kwargs, step): # GH 32865 values = np.arange(10.0) values[5] = 100.0 @@ -158,11 +168,11 @@ def test_rolling_forward_window(constructor, func, np_func, expected, np_kwargs) rolling = constructor(values).rolling(window=indexer, closed="right") getattr(rolling, func)() - rolling = constructor(values).rolling(window=indexer, min_periods=2) + rolling = constructor(values).rolling(window=indexer, min_periods=2, step=step) result = getattr(rolling, func)() # Check that the function output matches the explicitly provided array - expected = constructor(expected) + expected = constructor(expected)[::step] tm.assert_equal(result, expected) # Check that the rolling function output matches applying an alternative @@ -182,12 +192,12 @@ def test_rolling_forward_window(constructor, func, np_func, expected, np_kwargs) @pytest.mark.parametrize("constructor", [Series, DataFrame]) -def test_rolling_forward_skewness(constructor): +def test_rolling_forward_skewness(constructor, step): values = np.arange(10.0) values[5] = 100.0 indexer = FixedForwardWindowIndexer(window_size=5) - rolling = constructor(values).rolling(window=indexer, min_periods=3) + rolling = constructor(values).rolling(window=indexer, min_periods=3, step=step) result = rolling.skew() expected = constructor( @@ -203,7 +213,7 @@ def test_rolling_forward_skewness(constructor): np.nan, np.nan, ] - ) + )[::step] tm.assert_equal(result, expected) @@ -228,18 +238,18 @@ def test_rolling_forward_skewness(constructor): ), ], ) -def test_rolling_forward_cov_corr(func, expected): +def test_rolling_forward_cov_corr(func, expected, step): values1 = np.arange(10).reshape(-1, 1) values2 = values1 * 2 values1[5, 0] = 100 values = np.concatenate([values1, values2], axis=1) indexer = FixedForwardWindowIndexer(window_size=3) - rolling = DataFrame(values).rolling(window=indexer, min_periods=3) + rolling = DataFrame(values).rolling(window=indexer, min_periods=3, step=step) # We are interested in checking only pairwise covariance / correlation result = getattr(rolling, func)().loc[(slice(None), 1), 0] result = result.reset_index(drop=True) - expected = Series(expected) + expected = Series(expected)[::step].reset_index(drop=True) expected.name = result.name tm.assert_equal(result, expected) @@ -251,22 +261,22 @@ def test_rolling_forward_cov_corr(func, expected): ["left", [0.0, 0.0, 1.0, 2.0, 5.0, 9.0, 5.0, 6.0, 7.0, 8.0]], ], ) -def test_non_fixed_variable_window_indexer(closed, expected_data): +def test_non_fixed_variable_window_indexer(closed, expected_data, step): index = date_range("2020", periods=10) df = DataFrame(range(10), index=index) offset = BusinessDay(1) indexer = VariableOffsetWindowIndexer(index=index, offset=offset) - result = df.rolling(indexer, closed=closed).sum() - expected = DataFrame(expected_data, index=index) + result = df.rolling(indexer, closed=closed, step=step).sum() + expected = DataFrame(expected_data, index=index)[::step] tm.assert_frame_equal(result, expected) -def test_fixed_forward_indexer_count(): +def test_fixed_forward_indexer_count(step): # GH: 35579 df = DataFrame({"b": [None, None, None, 7]}) indexer = FixedForwardWindowIndexer(window_size=2) - result = df.rolling(window=indexer, min_periods=0).count() - expected = DataFrame({"b": [0.0, 0.0, 1.0, 1.0]}) + result = df.rolling(window=indexer, min_periods=0, step=step).count() + expected = DataFrame({"b": [0.0, 0.0, 1.0, 1.0]})[::step] tm.assert_frame_equal(result, expected) @@ -277,7 +287,7 @@ def test_fixed_forward_indexer_count(): def test_indexer_quantile_sum(end_value, values, func, args): # GH 37153 class CustomIndexer(BaseIndexer): - def get_window_bounds(self, num_values, min_periods, center, closed): + def get_window_bounds(self, num_values, min_periods, center, closed, step): start = np.empty(num_values, dtype=np.int64) end = np.empty(num_values, dtype=np.int64) for i in range(num_values): @@ -287,7 +297,8 @@ def get_window_bounds(self, num_values, min_periods, center, closed): else: start[i] = i end[i] = i + self.window_size - return start, end + ref = np.arange(0, num_values, dtype=np.int64) + return start, end, ref use_expanding = [True, False, True, False, True] df = DataFrame({"values": range(5)}) @@ -311,7 +322,7 @@ def get_window_bounds(self, num_values, min_periods, center, closed): ], ) def test_indexers_are_reusable_after_groupby_rolling( - indexer_class, window_size, df_data + indexer_class, window_size, df_data, step ): # GH 43267 df = DataFrame(df_data) @@ -319,7 +330,7 @@ def test_indexers_are_reusable_after_groupby_rolling( indexer = indexer_class(window_size=window_size) original_window_size = indexer.window_size for i in range(num_trials): - df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean() + df.groupby("a")["b"].rolling(window=indexer, min_periods=1, step=step).mean() assert indexer.window_size == original_window_size @@ -338,15 +349,22 @@ def test_indexers_are_reusable_after_groupby_rolling( ], ) def test_fixed_forward_indexer_bounds( - window_size, num_values, expected_start, expected_end + window_size, num_values, expected_start, expected_end, step ): # GH 43267 indexer = FixedForwardWindowIndexer(window_size=window_size) - start, end = indexer.get_window_bounds(num_values=num_values) + start, end, ref = indexer.get_window_bounds(num_values=num_values, step=step) - tm.assert_numpy_array_equal(start, np.array(expected_start), check_dtype=False) - tm.assert_numpy_array_equal(end, np.array(expected_end), check_dtype=False) + expected_ref = ( + None if step in [None, 1] else np.arange(0, num_values, step, dtype=np.int64) + ) + tm.assert_numpy_array_equal( + start, np.array(expected_start[::step]), check_dtype=False + ) + tm.assert_numpy_array_equal(end, np.array(expected_end[::step]), check_dtype=False) assert len(start) == len(end) + if not (expected_ref is None and ref is None): + tm.assert_numpy_array_equal(expected_ref, ref) @pytest.mark.parametrize( @@ -401,10 +419,16 @@ def test_fixed_forward_indexer_bounds( ), ], ) -def test_rolling_groupby_with_fixed_forward_specific(df, window_size, expected): +def test_rolling_groupby_with_fixed_forward_specific( + df, window_size, expected, step, step_methods +): # GH 43267 indexer = FixedForwardWindowIndexer(window_size=window_size) - result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean() + result = ( + df.groupby("a")["b"].rolling(window=indexer, min_periods=1, step=step).mean() + ) + selected = step_methods.get_selected_indices(step, df["a"]) + expected = expected.iloc[selected] tm.assert_series_equal(result, expected) @@ -423,7 +447,9 @@ def test_rolling_groupby_with_fixed_forward_specific(df, window_size, expected): ], ) @pytest.mark.parametrize("window_size", [1, 2, 3, 4, 5, 8, 20]) -def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size): +def test_rolling_groupby_with_fixed_forward_many( + group_keys, window_size, step, step_methods +): # GH 43267 df = DataFrame( { @@ -434,7 +460,9 @@ def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size): ) indexer = FixedForwardWindowIndexer(window_size=window_size) - result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).sum() + result = ( + df.groupby("a")["b"].rolling(window=indexer, min_periods=1, step=step).sum() + ) result.index.names = ["a", "c"] groups = df.groupby("a")[["a", "b", "c"]] @@ -449,6 +477,8 @@ def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size): for _, g in groups ] ) + selected = step_methods.get_selected_indices(step, manual["a"]) + manual = manual.iloc[selected] manual = manual.set_index(["a", "c"])["b"] tm.assert_series_equal(result, manual) @@ -456,8 +486,8 @@ def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size): def test_unequal_start_end_bounds(): class CustomIndexer(BaseIndexer): - def get_window_bounds(self, num_values, min_periods, center, closed): - return np.array([1]), np.array([1, 2]) + def get_window_bounds(self, num_values, min_periods, center, closed, step): + return np.array([1]), np.array([1, 2]), np.array([0]) indexer = CustomIndexer() roll = Series(1).rolling(indexer) @@ -478,8 +508,8 @@ def get_window_bounds(self, num_values, min_periods, center, closed): def test_unequal_bounds_to_object(): # GH 44470 class CustomIndexer(BaseIndexer): - def get_window_bounds(self, num_values, min_periods, center, closed): - return np.array([1]), np.array([2]) + def get_window_bounds(self, num_values, min_periods, center, closed, step): + return np.array([1]), np.array([2]), np.array([0]) indexer = CustomIndexer() roll = Series([1, 1]).rolling(indexer) diff --git a/pandas/tests/window/test_dtypes.py b/pandas/tests/window/test_dtypes.py index 80a96c3a8cee9..161976a6112a5 100644 --- a/pandas/tests/window/test_dtypes.py +++ b/pandas/tests/window/test_dtypes.py @@ -90,9 +90,11 @@ def dtypes(request): ), ], ) -def test_series_dtypes(method, data, expected_data, coerce_int, dtypes, min_periods): +def test_series_dtypes( + method, data, expected_data, coerce_int, dtypes, min_periods, step +): ser = Series(data, dtype=get_dtype(dtypes, coerce_int=coerce_int)) - rolled = ser.rolling(2, min_periods=min_periods) + rolled = ser.rolling(2, min_periods=min_periods, step=step) if dtypes in ("m8[ns]", "M8[ns]", "datetime64[ns, UTC]") and method != "count": msg = "No numeric types to aggregate" @@ -100,15 +102,15 @@ def test_series_dtypes(method, data, expected_data, coerce_int, dtypes, min_peri getattr(rolled, method)() else: result = getattr(rolled, method)() - expected = Series(expected_data, dtype="float64") + expected = Series(expected_data, dtype="float64")[::step] tm.assert_almost_equal(result, expected) -def test_series_nullable_int(any_signed_int_ea_dtype): +def test_series_nullable_int(any_signed_int_ea_dtype, step): # GH 43016 ser = Series([0, 1, NA], dtype=any_signed_int_ea_dtype) - result = ser.rolling(2).mean() - expected = Series([np.nan, 0.5, np.nan]) + result = ser.rolling(2, step=step).mean() + expected = Series([np.nan, 0.5, np.nan])[::step] tm.assert_series_equal(result, expected) @@ -156,10 +158,10 @@ def test_series_nullable_int(any_signed_int_ea_dtype): ), ], ) -def test_dataframe_dtypes(method, expected_data, dtypes, min_periods): +def test_dataframe_dtypes(method, expected_data, dtypes, min_periods, step): df = DataFrame(np.arange(10).reshape((5, 2)), dtype=get_dtype(dtypes)) - rolled = df.rolling(2, min_periods=min_periods) + rolled = df.rolling(2, min_periods=min_periods, step=step) if dtypes in ("m8[ns]", "M8[ns]", "datetime64[ns, UTC]") and method != "count": msg = "No numeric types to aggregate" @@ -167,5 +169,5 @@ def test_dataframe_dtypes(method, expected_data, dtypes, min_periods): getattr(rolled, method)() else: result = getattr(rolled, method)() - expected = DataFrame(expected_data, dtype="float64") + expected = DataFrame(expected_data, dtype="float64")[::step] tm.assert_frame_equal(result, expected) diff --git a/pandas/tests/window/test_groupby.py b/pandas/tests/window/test_groupby.py index 5125587df9ea2..fa0bbbfc15801 100644 --- a/pandas/tests/window/test_groupby.py +++ b/pandas/tests/window/test_groupby.py @@ -102,53 +102,58 @@ def test_getitem_multiple(self): "skew", ], ) - def test_rolling(self, f): + def test_rolling(self, f, step, step_methods): g = self.frame.groupby("A") - r = g.rolling(window=4) + r = g.rolling(window=4, step=step) result = getattr(r, f)() - expected = g.apply(lambda x: getattr(x.rolling(4), f)()) + expected = g.apply(lambda x: getattr(x.rolling(4, step=step), f)()) # groupby.apply doesn't drop the grouped-by column expected = expected.drop("A", axis=1) # GH 39732 - expected_index = MultiIndex.from_arrays([self.frame["A"], range(40)]) + selected = step_methods.get_selected_indices(step, self.frame["A"]) + expected_index = MultiIndex.from_arrays([self.frame["A"][selected], selected]) expected.index = expected_index tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("f", ["std", "var"]) - def test_rolling_ddof(self, f): + def test_rolling_ddof(self, f, step, step_methods): g = self.frame.groupby("A") - r = g.rolling(window=4) + r = g.rolling(window=4, step=step) result = getattr(r, f)(ddof=1) - expected = g.apply(lambda x: getattr(x.rolling(4), f)(ddof=1)) + expected = g.apply(lambda x: getattr(x.rolling(4, step=step), f)(ddof=1)) # groupby.apply doesn't drop the grouped-by column expected = expected.drop("A", axis=1) # GH 39732 - expected_index = MultiIndex.from_arrays([self.frame["A"], range(40)]) + selected = step_methods.get_selected_indices(step, self.frame["A"]) + expected_index = MultiIndex.from_arrays([self.frame["A"][selected], selected]) expected.index = expected_index tm.assert_frame_equal(result, expected) @pytest.mark.parametrize( "interpolation", ["linear", "lower", "higher", "midpoint", "nearest"] ) - def test_rolling_quantile(self, interpolation): + def test_rolling_quantile(self, interpolation, step, step_methods): g = self.frame.groupby("A") - r = g.rolling(window=4) + r = g.rolling(window=4, step=step) result = r.quantile(0.4, interpolation=interpolation) expected = g.apply( - lambda x: x.rolling(4).quantile(0.4, interpolation=interpolation) + lambda x: x.rolling(4, step=step).quantile(0.4, interpolation=interpolation) ) # groupby.apply doesn't drop the grouped-by column expected = expected.drop("A", axis=1) # GH 39732 - expected_index = MultiIndex.from_arrays([self.frame["A"], range(40)]) + selected = step_methods.get_selected_indices(step, self.frame["A"]) + expected_index = MultiIndex.from_arrays([self.frame["A"][selected], selected]) expected.index = expected_index tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("f, expected_val", [["corr", 1], ["cov", 0.5]]) - def test_rolling_corr_cov_other_same_size_as_groups(self, f, expected_val): + def test_rolling_corr_cov_other_same_size_as_groups( + self, f, expected_val, step, step_methods + ): # GH 42915 df = DataFrame( {"value": range(10), "idx1": [1] * 5 + [2] * 5, "idx2": [1, 2, 3, 4, 5] * 2} @@ -156,7 +161,8 @@ def test_rolling_corr_cov_other_same_size_as_groups(self, f, expected_val): other = DataFrame({"value": range(5), "idx2": [1, 2, 3, 4, 5]}).set_index( "idx2" ) - result = getattr(df.groupby(level=0).rolling(2), f)(other) + result = getattr(df.groupby(level=0).rolling(2, step=step), f)(other) + selected = step_methods.get_selected_indices(step, df.index.get_level_values(0)) expected_data = ([np.nan] + [expected_val] * 4) * 2 expected = DataFrame( expected_data, @@ -169,18 +175,19 @@ def test_rolling_corr_cov_other_same_size_as_groups(self, f, expected_val): ], names=["idx1", "idx1", "idx2"], ), - ) + ).iloc[selected] tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("f", ["corr", "cov"]) - def test_rolling_corr_cov_other_diff_size_as_groups(self, f): + def test_rolling_corr_cov_other_diff_size_as_groups(self, f, step): g = self.frame.groupby("A") - r = g.rolling(window=4) + r = g.rolling(window=4, step=step) result = getattr(r, f)(self.frame) def func(x): - return getattr(x.rolling(4), f)(self.frame) + locs = np.hstack([x1.index[::step] for k1, x1 in g]) + return getattr(x.rolling(4), f)(self.frame).loc[locs] expected = g.apply(func) # GH 39591: The grouped column should be all np.nan @@ -189,14 +196,14 @@ def func(x): tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("f", ["corr", "cov"]) - def test_rolling_corr_cov_pairwise(self, f): + def test_rolling_corr_cov_pairwise(self, f, step): g = self.frame.groupby("A") - r = g.rolling(window=4) + r = g.rolling(window=4, step=step) result = getattr(r.B, f)(pairwise=True) def func(x): - return getattr(x.B.rolling(4), f)(pairwise=True) + return getattr(x.B.rolling(4, step=step), f)(pairwise=True) expected = g.apply(func) tm.assert_series_equal(result, expected) @@ -205,7 +212,9 @@ def func(x): "func, expected_values", [("cov", [[1.0, 1.0], [1.0, 4.0]]), ("corr", [[1.0, 0.5], [0.5, 1.0]])], ) - def test_rolling_corr_cov_unordered(self, func, expected_values): + def test_rolling_corr_cov_unordered( + self, func, expected_values, step, step_methods + ): # GH 43386 df = DataFrame( { @@ -214,7 +223,7 @@ def test_rolling_corr_cov_unordered(self, func, expected_values): "c": [2, 0, 6, 4], } ) - rol = df.groupby("a").rolling(3) + rol = df.groupby("a").rolling(3, step=step) result = getattr(rol, func)() expected = DataFrame( { @@ -235,23 +244,32 @@ def test_rolling_corr_cov_unordered(self, func, expected_values): names=["a", None, None], ), ) + selected = [ + 2 * x + i + for x in step_methods.get_selected_indices(step, [1, 1, 1, 2]) + for i in range(2) + ] + expected = expected.iloc[selected] tm.assert_frame_equal(result, expected) - def test_rolling_apply(self, raw): + def test_rolling_apply(self, raw, step, step_methods): g = self.frame.groupby("A") - r = g.rolling(window=4) + r = g.rolling(window=4, step=step) # reduction result = r.apply(lambda x: x.sum(), raw=raw) - expected = g.apply(lambda x: x.rolling(4).apply(lambda y: y.sum(), raw=raw)) + expected = g.apply( + lambda x: x.rolling(4, step=step).apply(lambda y: y.sum(), raw=raw) + ) # groupby.apply doesn't drop the grouped-by column expected = expected.drop("A", axis=1) # GH 39732 - expected_index = MultiIndex.from_arrays([self.frame["A"], range(40)]) + selected = step_methods.get_selected_indices(step, self.frame["A"]) + expected_index = MultiIndex.from_arrays([self.frame["A"][selected], selected]) expected.index = expected_index tm.assert_frame_equal(result, expected) - def test_rolling_apply_mutability(self): + def test_rolling_apply_mutability(self, step, step_methods): # GH 14013 df = DataFrame({"A": ["foo"] * 3 + ["bar"] * 3, "B": [1] * 6}) g = df.groupby("A") @@ -262,38 +280,41 @@ def test_rolling_apply_mutability(self): mi.names = ["A", None] # Grouped column should not be a part of the output - expected = DataFrame([np.nan, 2.0, 2.0] * 2, columns=["B"], index=mi) + selected = step_methods.get_selected_indices(step, df["A"]) + expected = DataFrame([np.nan, 2.0, 2.0] * 2, columns=["B"], index=mi).iloc[ + selected + ] - result = g.rolling(window=2).sum() + result = g.rolling(window=2, step=step).sum() tm.assert_frame_equal(result, expected) # Call an arbitrary function on the groupby g.sum() # Make sure nothing has been mutated - result = g.rolling(window=2).sum() + result = g.rolling(window=2, step=step).sum() tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("expected_value,raw_value", [[1.0, True], [0.0, False]]) - def test_groupby_rolling(self, expected_value, raw_value): + def test_groupby_rolling(self, expected_value, raw_value, step): # GH 31754 def foo(x): return int(isinstance(x, np.ndarray)) df = DataFrame({"id": [1, 1, 1], "value": [1, 2, 3]}) - result = df.groupby("id").value.rolling(1).apply(foo, raw=raw_value) + result = df.groupby("id").value.rolling(1, step=step).apply(foo, raw=raw_value) expected = Series( [expected_value] * 3, index=MultiIndex.from_tuples(((1, 0), (1, 1), (1, 2)), names=["id", None]), name="value", - ) + ).iloc[::step] tm.assert_series_equal(result, expected) - def test_groupby_rolling_center_center(self): + def test_groupby_rolling_center_center(self, step, step_methods): # GH 35552 series = Series(range(1, 6)) - result = series.groupby(series).rolling(center=True, window=3).mean() + result = series.groupby(series).rolling(center=True, window=3, step=step).mean() expected = Series( [np.nan] * 5, index=MultiIndex.from_tuples(((1, 0), (2, 1), (3, 2), (4, 3), (5, 4))), @@ -301,7 +322,7 @@ def test_groupby_rolling_center_center(self): tm.assert_series_equal(result, expected) series = Series(range(1, 5)) - result = series.groupby(series).rolling(center=True, window=3).mean() + result = series.groupby(series).rolling(center=True, window=3, step=step).mean() expected = Series( [np.nan] * 4, index=MultiIndex.from_tuples(((1, 0), (2, 1), (3, 2), (4, 3))), @@ -309,7 +330,8 @@ def test_groupby_rolling_center_center(self): tm.assert_series_equal(result, expected) df = DataFrame({"a": ["a"] * 5 + ["b"] * 6, "b": range(11)}) - result = df.groupby("a").rolling(center=True, window=3).mean() + result = df.groupby("a").rolling(center=True, window=3, step=step).mean() + selected = step_methods.get_selected_indices(step, df["a"]) expected = DataFrame( [np.nan, 1, 2, 3, np.nan, np.nan, 6, 7, 8, 9, np.nan], index=MultiIndex.from_tuples( @@ -329,11 +351,12 @@ def test_groupby_rolling_center_center(self): names=["a", None], ), columns=["b"], - ) + ).iloc[selected] tm.assert_frame_equal(result, expected) df = DataFrame({"a": ["a"] * 5 + ["b"] * 5, "b": range(10)}) - result = df.groupby("a").rolling(center=True, window=3).mean() + result = df.groupby("a").rolling(center=True, window=3, step=step).mean() + selected = step_methods.get_selected_indices(step, df["a"]) expected = DataFrame( [np.nan, 1, 2, 3, np.nan, np.nan, 6, 7, 8, np.nan], index=MultiIndex.from_tuples( @@ -352,10 +375,10 @@ def test_groupby_rolling_center_center(self): names=["a", None], ), columns=["b"], - ) + ).iloc[selected] tm.assert_frame_equal(result, expected) - def test_groupby_rolling_center_on(self): + def test_groupby_rolling_center_on(self, step, step_methods): # GH 37141 df = DataFrame( data={ @@ -366,9 +389,10 @@ def test_groupby_rolling_center_on(self): ) result = ( df.groupby("gb") - .rolling(6, on="Date", center=True, min_periods=1) + .rolling(6, on="Date", center=True, min_periods=1, step=step) .value.mean() ) + selected = step_methods.get_selected_indices(step, df["gb"]) expected = Series( [1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 7.0, 7.5, 7.5, 7.5], name="value", @@ -387,18 +411,18 @@ def test_groupby_rolling_center_on(self): ), names=["gb", "Date"], ), - ) + ).iloc[selected] tm.assert_series_equal(result, expected) @pytest.mark.parametrize("min_periods", [5, 4, 3]) - def test_groupby_rolling_center_min_periods(self, min_periods): + def test_groupby_rolling_center_min_periods(self, min_periods, step, step_methods): # GH 36040 df = DataFrame({"group": ["A"] * 10 + ["B"] * 10, "data": range(20)}) window_size = 5 result = ( df.groupby("group") - .rolling(window_size, center=True, min_periods=min_periods) + .rolling(window_size, center=True, min_periods=min_periods, step=step) .mean() ) result = result.reset_index()[["group", "data"]] @@ -411,18 +435,26 @@ def test_groupby_rolling_center_min_periods(self, min_periods): grp_A_expected = nans + grp_A_mean[num_nans : 10 - num_nans] + nans grp_B_expected = nans + grp_B_mean[num_nans : 10 - num_nans] + nans - expected = DataFrame( - {"group": ["A"] * 10 + ["B"] * 10, "data": grp_A_expected + grp_B_expected} + selected = step_methods.get_selected_indices(step, df["group"]) + expected = ( + DataFrame( + { + "group": ["A"] * 10 + ["B"] * 10, + "data": grp_A_expected + grp_B_expected, + } + ) + .iloc[selected] + .reset_index(drop=True) ) tm.assert_frame_equal(result, expected) - def test_groupby_subselect_rolling(self): + def test_groupby_subselect_rolling(self, step, step_methods): # GH 35486 df = DataFrame( {"a": [1, 2, 3, 2], "b": [4.0, 2.0, 3.0, 1.0], "c": [10, 20, 30, 20]} ) - result = df.groupby("a")[["b"]].rolling(2).max() + result = df.groupby("a")[["b"]].rolling(2, step=step).max() expected = DataFrame( [np.nan, np.nan, 2.0, np.nan], columns=["b"], @@ -430,9 +462,13 @@ def test_groupby_subselect_rolling(self): ((1, 0), (2, 1), (2, 3), (3, 2)), names=["a", None] ), ) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_frame_equal(result, expected) - result = df.groupby("a")["b"].rolling(2).max() + result = df.groupby("a")["b"].rolling(2, step=step).max() expected = Series( [np.nan, np.nan, 2.0, np.nan], index=MultiIndex.from_tuples( @@ -440,32 +476,44 @@ def test_groupby_subselect_rolling(self): ), name="b", ) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_series_equal(result, expected) - def test_groupby_rolling_custom_indexer(self): + def test_groupby_rolling_custom_indexer(self, step): # GH 35557 class SimpleIndexer(BaseIndexer): def get_window_bounds( - self, num_values=0, min_periods=None, center=None, closed=None + self, + num_values=0, + min_periods=None, + center=None, + closed=None, + step=None, ): min_periods = self.window_size if min_periods is None else 0 - end = np.arange(num_values, dtype=np.int64) + 1 + end = np.arange(num_values, dtype=np.int64)[::step] + 1 start = end.copy() - self.window_size start[start < 0] = min_periods - return start, end + ref = np.arange(0, num_values, step, dtype=np.int64) + return start, end, ref df = DataFrame( {"a": [1.0, 2.0, 3.0, 4.0, 5.0] * 3}, index=[0] * 5 + [1] * 5 + [2] * 5 ) result = ( df.groupby(df.index) - .rolling(SimpleIndexer(window_size=3), min_periods=1) + .rolling(SimpleIndexer(window_size=3), min_periods=1, step=step) .sum() ) - expected = df.groupby(df.index).rolling(window=3, min_periods=1).sum() + expected = ( + df.groupby(df.index).rolling(window=3, min_periods=1, step=step).sum() + ) tm.assert_frame_equal(result, expected) - def test_groupby_rolling_subset_with_closed(self): + def test_groupby_rolling_subset_with_closed(self, step, step_methods): # GH 35549 df = DataFrame( { @@ -476,7 +524,9 @@ def test_groupby_rolling_subset_with_closed(self): } ) result = ( - df.groupby("group").rolling("1D", on="date", closed="left")["column1"].sum() + df.groupby("group") + .rolling("1D", on="date", closed="left", step=step)["column1"] + .sum() ) expected = Series( [np.nan, 0.0, 2.0, np.nan, 1.0, 4.0], @@ -487,9 +537,13 @@ def test_groupby_rolling_subset_with_closed(self): ), name="column1", ) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_series_equal(result, expected) - def test_groupby_subset_rolling_subset_with_closed(self): + def test_groupby_subset_rolling_subset_with_closed(self, step, step_methods): # GH 35549 df = DataFrame( { @@ -502,7 +556,7 @@ def test_groupby_subset_rolling_subset_with_closed(self): result = ( df.groupby("group")[["column1", "date"]] - .rolling("1D", on="date", closed="left")["column1"] + .rolling("1D", on="date", closed="left", step=step)["column1"] .sum() ) expected = Series( @@ -514,10 +568,14 @@ def test_groupby_subset_rolling_subset_with_closed(self): ), name="column1", ) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_series_equal(result, expected) @pytest.mark.parametrize("func", ["max", "min"]) - def test_groupby_rolling_index_changed(self, func): + def test_groupby_rolling_index_changed(self, func, step, step_methods): # GH: #36018 nlevels of MultiIndex changed ds = Series( [1, 2, 2], @@ -527,20 +585,21 @@ def test_groupby_rolling_index_changed(self, func): name="a", ) - result = getattr(ds.groupby(ds).rolling(2), func)() + result = getattr(ds.groupby(ds).rolling(2, step=step), func)() + selected = step_methods.get_selected_indices(step, ds) expected = Series( [np.nan, np.nan, 2.0], index=MultiIndex.from_tuples( [(1, "a", "x"), (2, "a", "y"), (2, "c", "z")], names=["a", "1", "2"] ), name="a", - ) + ).iloc[selected] tm.assert_series_equal(result, expected) - def test_groupby_rolling_empty_frame(self): + def test_groupby_rolling_empty_frame(self, step): # GH 36197 expected = DataFrame({"s1": []}) - result = expected.groupby("s1").rolling(window=1).sum() + result = expected.groupby("s1").rolling(window=1, step=step).sum() # GH 32262 expected = expected.drop(columns="s1") # GH-38057 from_tuples gives empty object dtype, we now get float/int levels @@ -551,7 +610,7 @@ def test_groupby_rolling_empty_frame(self): tm.assert_frame_equal(result, expected) expected = DataFrame({"s1": [], "s2": []}) - result = expected.groupby(["s1", "s2"]).rolling(window=1).sum() + result = expected.groupby(["s1", "s2"]).rolling(window=1, step=step).sum() # GH 32262 expected = expected.drop(columns=["s1", "s2"]) expected.index = MultiIndex.from_product( @@ -564,7 +623,7 @@ def test_groupby_rolling_empty_frame(self): ) tm.assert_frame_equal(result, expected) - def test_groupby_rolling_string_index(self): + def test_groupby_rolling_string_index(self, step, step_methods): # GH: 36727 df = DataFrame( [ @@ -579,7 +638,7 @@ def test_groupby_rolling_string_index(self): groups = df.groupby("group") df["count_to_date"] = groups.cumcount() - rolling_groups = groups.rolling("10d", on="eventTime") + rolling_groups = groups.rolling("10d", on="eventTime", step=step) result = rolling_groups.apply(lambda df: df.shape[0]) expected = DataFrame( [ @@ -591,14 +650,18 @@ def test_groupby_rolling_string_index(self): ], columns=["index", "group", "eventTime", "count_to_date"], ).set_index(["group", "index"]) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_frame_equal(result, expected) - def test_groupby_rolling_no_sort(self): + def test_groupby_rolling_no_sort(self, step, step_methods): # GH 36889 result = ( DataFrame({"foo": [2, 1], "bar": [2, 1]}) .groupby("foo", sort=False) - .rolling(1) + .rolling(1, step=step) .min() ) expected = DataFrame( @@ -608,9 +671,13 @@ def test_groupby_rolling_no_sort(self): ) # GH 32262 expected = expected.drop(columns="foo") + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_frame_equal(result, expected) - def test_groupby_rolling_count_closed_on(self): + def test_groupby_rolling_count_closed_on(self, step, step_methods): # GH 35869 df = DataFrame( { @@ -622,7 +689,7 @@ def test_groupby_rolling_count_closed_on(self): ) result = ( df.groupby("group") - .rolling("3d", on="date", closed="left")["column1"] + .rolling("3d", on="date", closed="left", step=step)["column1"] .count() ) expected = Series( @@ -640,13 +707,21 @@ def test_groupby_rolling_count_closed_on(self): names=["group", "date"], ), ) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_series_equal(result, expected) @pytest.mark.parametrize( ("func", "kwargs"), - [("rolling", {"window": 2, "min_periods": 1}), ("expanding", {})], + [ + ("rolling", {"window": 2, "min_periods": 1, "step": step}) + for step in [None, 1, 2, 5, 10] + ] + + [("expanding", {})], ) - def test_groupby_rolling_sem(self, func, kwargs): + def test_groupby_rolling_sem(self, func, kwargs, step_methods): # GH: 26476 df = DataFrame( [["a", 1], ["a", 2], ["b", 1], ["b", 2], ["b", 3]], columns=["a", "b"] @@ -660,12 +735,18 @@ def test_groupby_rolling_sem(self, func, kwargs): ) # GH 32262 expected = expected.drop(columns="a") + if "step" in kwargs: + step = kwargs["step"] + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_frame_equal(result, expected) @pytest.mark.parametrize( ("rollings", "key"), [({"on": "a"}, "a"), ({"on": None}, "index")] ) - def test_groupby_rolling_nans_in_index(self, rollings, key): + def test_groupby_rolling_nans_in_index(self, rollings, key, step): # GH: 34617 df = DataFrame( { @@ -677,10 +758,10 @@ def test_groupby_rolling_nans_in_index(self, rollings, key): if key == "index": df = df.set_index("a") with pytest.raises(ValueError, match=f"{key} must be monotonic"): - df.groupby("c").rolling("60min", **rollings) + df.groupby("c").rolling("60min", **rollings, step=step) @pytest.mark.parametrize("group_keys", [True, False]) - def test_groupby_rolling_group_keys(self, group_keys): + def test_groupby_rolling_group_keys(self, group_keys, step, step_methods): # GH 37641 # GH 38523: GH 37641 actually was not a bug. # group_keys only applies to groupby.apply directly @@ -688,7 +769,11 @@ def test_groupby_rolling_group_keys(self, group_keys): index = MultiIndex.from_arrays(arrays, names=("idx1", "idx2")) s = Series([1, 2, 3], index=index) - result = s.groupby(["idx1", "idx2"], group_keys=group_keys).rolling(1).mean() + result = ( + s.groupby(["idx1", "idx2"], group_keys=group_keys) + .rolling(1, step=step) + .mean() + ) expected = Series( [1.0, 2.0, 3.0], index=MultiIndex.from_tuples( @@ -700,15 +785,19 @@ def test_groupby_rolling_group_keys(self, group_keys): names=["idx1", "idx2", "idx1", "idx2"], ), ) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_series_equal(result, expected) - def test_groupby_rolling_index_level_and_column_label(self): + def test_groupby_rolling_index_level_and_column_label(self, step, step_methods): # The groupby keys should not appear as a resulting column arrays = [["val1", "val1", "val2"], ["val1", "val1", "val2"]] index = MultiIndex.from_arrays(arrays, names=("idx1", "idx2")) df = DataFrame({"A": [1, 1, 2], "B": range(3)}, index=index) - result = df.groupby(["idx1", "A"]).rolling(1).mean() + result = df.groupby(["idx1", "A"]).rolling(1, step=step).mean() expected = DataFrame( {"B": [0.0, 1.0, 2.0]}, index=MultiIndex.from_tuples( @@ -720,25 +809,33 @@ def test_groupby_rolling_index_level_and_column_label(self): names=["idx1", "A", "idx1", "idx2"], ), ) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_frame_equal(result, expected) - def test_groupby_rolling_resulting_multiindex(self): + def test_groupby_rolling_resulting_multiindex(self, step, step_methods): # a few different cases checking the created MultiIndex of the result # https://github.com/pandas-dev/pandas/pull/38057 # grouping by 1 columns -> 2-level MI as result df = DataFrame({"a": np.arange(8.0), "b": [1, 2] * 4}) - result = df.groupby("b").rolling(3).mean() + result = df.groupby("b").rolling(3, step=step).mean() expected_index = MultiIndex.from_tuples( [(1, 0), (1, 2), (1, 4), (1, 6), (2, 1), (2, 3), (2, 5), (2, 7)], names=["b", None], ) + selected = step_methods.get_selected_indices( + step, expected_index.get_level_values(0) + ) + expected_index = expected_index[selected] tm.assert_index_equal(result.index, expected_index) - def test_groupby_rolling_resulting_multiindex2(self): + def test_groupby_rolling_resulting_multiindex2(self, step, step_methods): # grouping by 2 columns -> 3-level MI as result df = DataFrame({"a": np.arange(12.0), "b": [1, 2] * 6, "c": [1, 2, 3, 4] * 3}) - result = df.groupby(["b", "c"]).rolling(2).sum() + result = df.groupby(["b", "c"]).rolling(2, step=step).sum() expected_index = MultiIndex.from_tuples( [ (1, 1, 0), @@ -756,13 +853,17 @@ def test_groupby_rolling_resulting_multiindex2(self): ], names=["b", "c", None], ) + selected = step_methods.get_selected_indices( + step, expected_index.droplevel(2).tolist() + ) + expected_index = expected_index[selected] tm.assert_index_equal(result.index, expected_index) - def test_groupby_rolling_resulting_multiindex3(self): + def test_groupby_rolling_resulting_multiindex3(self, step, step_methods): # grouping with 1 level on dataframe with 2-level MI -> 3-level MI as result df = DataFrame({"a": np.arange(8.0), "b": [1, 2] * 4, "c": [1, 2, 3, 4] * 2}) df = df.set_index("c", append=True) - result = df.groupby("b").rolling(3).mean() + result = df.groupby("b").rolling(3, step=step).mean() expected_index = MultiIndex.from_tuples( [ (1, 0, 1), @@ -776,14 +877,18 @@ def test_groupby_rolling_resulting_multiindex3(self): ], names=["b", None, "c"], ) + selected = step_methods.get_selected_indices( + step, expected_index.get_level_values(0) + ) + expected_index = expected_index[selected] tm.assert_index_equal(result.index, expected_index, exact="equiv") - def test_groupby_rolling_object_doesnt_affect_groupby_apply(self): + def test_groupby_rolling_object_doesnt_affect_groupby_apply(self, step): # GH 39732 g = self.frame.groupby("A") - expected = g.apply(lambda x: x.rolling(4).sum()).index + expected = g.apply(lambda x: x.rolling(4, step=step).sum()).index _ = g.rolling(window=4) - result = g.apply(lambda x: x.rolling(4).sum()).index + result = g.apply(lambda x: x.rolling(4, step=step).sum()).index tm.assert_index_equal(result, expected) assert not g.mutated assert not g.grouper.mutated @@ -797,11 +902,13 @@ def test_groupby_rolling_object_doesnt_affect_groupby_apply(self): (4, 4, "right", [None, None, None, 5.0, None, None, None, 5.0]), ], ) - def test_groupby_rolling_var(self, window, min_periods, closed, expected): + def test_groupby_rolling_var( + self, window, min_periods, closed, expected, step, step_methods + ): df = DataFrame([1, 2, 3, 4, 5, 6, 7, 8]) result = ( df.groupby([1, 2, 1, 2, 1, 2, 1, 2]) - .rolling(window=window, min_periods=min_periods, closed=closed) + .rolling(window=window, min_periods=min_periods, closed=closed, step=step) .var(0) ) expected_result = DataFrame( @@ -811,6 +918,10 @@ def test_groupby_rolling_var(self, window, min_periods, closed, expected): codes=[[0, 0, 0, 0, 1, 1, 1, 1], [0, 2, 4, 6, 1, 3, 5, 7]], ), ) + selected = step_methods.get_selected_indices( + step, expected_result.index.get_level_values(0) + ) + expected_result = expected_result.iloc[selected] tm.assert_frame_equal(result, expected_result) @pytest.mark.parametrize( @@ -826,7 +937,7 @@ def test_by_column_not_in_values(self, columns): assert "A" not in result.columns tm.assert_frame_equal(g.obj, original_obj) - def test_groupby_level(self): + def test_groupby_level(self, step, step_methods): # GH 38523, 38787 arrays = [ ["Falcon", "Falcon", "Parrot", "Parrot"], @@ -834,7 +945,7 @@ def test_groupby_level(self): ] index = MultiIndex.from_arrays(arrays, names=("Animal", "Type")) df = DataFrame({"Max Speed": [390.0, 350.0, 30.0, 20.0]}, index=index) - result = df.groupby(level=0)["Max Speed"].rolling(2).sum() + result = df.groupby(level=0)["Max Speed"].rolling(2, step=step).sum() expected = Series( [np.nan, 740.0, np.nan, 50.0], index=MultiIndex.from_tuples( @@ -848,6 +959,10 @@ def test_groupby_level(self): ), name="Max Speed", ) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_series_equal(result, expected) @pytest.mark.parametrize( @@ -868,7 +983,7 @@ def test_groupby_level(self): ], ], ) - def test_as_index_false(self, by, expected_data): + def test_as_index_false(self, by, expected_data, step, step_methods): # GH 39433 data = [ ["A", "2018-01-01", 100.0], @@ -882,7 +997,9 @@ def test_as_index_false(self, by, expected_data): gp_by = [getattr(df, attr) for attr in by] result = ( - df.groupby(gp_by, as_index=False).rolling(window=2, min_periods=1).mean() + df.groupby(gp_by, as_index=False) + .rolling(window=2, min_periods=1, step=step) + .mean() ) expected = {"id": ["A", "A", "B", "B"]} @@ -891,9 +1008,13 @@ def test_as_index_false(self, by, expected_data): expected, index=df.index, ) + selected = step_methods.get_selected_indices( + step, list(zip(*[expected[x if x != "index" else "date"] for x in by])) + ) + expected = expected.iloc[selected] tm.assert_frame_equal(result, expected) - def test_nan_and_zero_endpoints(self): + def test_nan_and_zero_endpoints(self, step, step_methods): # https://github.com/twosigma/pandas/issues/53 size = 1000 idx = np.repeat(0, size) @@ -910,7 +1031,11 @@ def test_nan_and_zero_endpoints(self): "adl2": arr, } ).set_index("index") - result = df.groupby("index")["adl2"].rolling(window=10, min_periods=1).mean() + result = ( + df.groupby("index")["adl2"] + .rolling(window=10, min_periods=1, step=step) + .mean() + ) expected = Series( arr, name="adl2", @@ -918,6 +1043,10 @@ def test_nan_and_zero_endpoints(self): [[0] * 999 + [1], [0] * 999 + [1]], names=["index", "index"] ), ) + selected = step_methods.get_selected_indices( + step, expected.index.get_level_values(0) + ) + expected = expected.iloc[selected] tm.assert_series_equal(result, expected) @@ -1140,7 +1269,7 @@ def test_times_array(self, times_frame): expected = gb.ewm(halflife=halflife, times=times_frame["C"].values).mean() tm.assert_frame_equal(result, expected) - def test_dont_mutate_obj_after_slicing(self): + def test_dont_mutate_obj_after_slicing(self, step, step_methods): # GH 43355 df = DataFrame( { @@ -1149,7 +1278,7 @@ def test_dont_mutate_obj_after_slicing(self): "y": range(5), } ) - grp = df.groupby("id").rolling("1H", on="timestamp") + grp = df.groupby("id").rolling("1H", on="timestamp", step=step) result = grp.count() expected_df = DataFrame( { @@ -1160,6 +1289,10 @@ def test_dont_mutate_obj_after_slicing(self): [["a", "a", "b", "b", "b"], list(range(5))], names=["id", None] ), ) + selected = step_methods.get_selected_indices( + step, expected_df.index.get_level_values(0) + ) + expected_df = expected_df.iloc[selected] tm.assert_frame_equal(result, expected_df) result = grp["y"].count() @@ -1174,6 +1307,10 @@ def test_dont_mutate_obj_after_slicing(self): ), name="y", ) + selected = step_methods.get_selected_indices( + step, expected_series.index.get_level_values(0) + ) + expected_series = expected_series.iloc[selected] tm.assert_series_equal(result, expected_series) # This is the key test result = grp.count() diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 2c9ae3d70f218..6a8ac60253654 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -57,7 +57,7 @@ def arithmetic_numba_supported_operators(request): # Filter warnings when parallel=True and the function can't be parallelized by Numba class TestEngine: @pytest.mark.parametrize("jit", [True, False]) - def test_numba_vs_cython_apply(self, jit, nogil, parallel, nopython, center): + def test_numba_vs_cython_apply(self, jit, nogil, parallel, nopython, center, step): def f(x, *args): arg_sum = 0 for arg in args: @@ -73,10 +73,10 @@ def f(x, *args): args = (2,) s = Series(range(10)) - result = s.rolling(2, center=center).apply( + result = s.rolling(2, center=center, step=step).apply( f, args=args, engine="numba", engine_kwargs=engine_kwargs, raw=True ) - expected = s.rolling(2, center=center).apply( + expected = s.rolling(2, center=center, step=step).apply( f, engine="cython", args=args, raw=True ) tm.assert_series_equal(result, expected) @@ -85,14 +85,20 @@ def f(x, *args): "data", [DataFrame(np.eye(5)), Series(range(5), name="foo")] ) def test_numba_vs_cython_rolling_methods( - self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators + self, + data, + nogil, + parallel, + nopython, + arithmetic_numba_supported_operators, + step, ): method, kwargs = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} - roll = data.rolling(2) + roll = data.rolling(2, step=step) result = getattr(roll, method)( engine="numba", engine_kwargs=engine_kwargs, **kwargs ) @@ -135,7 +141,7 @@ def test_numba_vs_cython_expanding_methods( tm.assert_equal(result, expected) @pytest.mark.parametrize("jit", [True, False]) - def test_cache_apply(self, jit, nogil, parallel, nopython): + def test_cache_apply(self, jit, nogil, parallel, nopython, step): # Test that the functions are cached correctly if we switch functions def func_1(x): return np.mean(x) + 4 @@ -151,7 +157,7 @@ def func_2(x): engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} - roll = Series(range(10)).rolling(2) + roll = Series(range(10)).rolling(2, step=step) result = roll.apply( func_1, engine="numba", engine_kwargs=engine_kwargs, raw=True ) @@ -323,21 +329,29 @@ def f(x): ) def test_table_method_rolling_methods( - self, axis, nogil, parallel, nopython, arithmetic_numba_supported_operators + self, + axis, + nogil, + parallel, + nopython, + arithmetic_numba_supported_operators, + step, ): method, kwargs = arithmetic_numba_supported_operators engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} df = DataFrame(np.eye(3)) - roll_table = df.rolling(2, method="table", axis=axis, min_periods=0) + roll_table = df.rolling(2, method="table", axis=axis, min_periods=0, step=step) if method in ("var", "std"): with pytest.raises(NotImplementedError, match=f"{method} not supported"): getattr(roll_table, method)( engine_kwargs=engine_kwargs, engine="numba", **kwargs ) else: - roll_single = df.rolling(2, method="single", axis=axis, min_periods=0) + roll_single = df.rolling( + 2, method="single", axis=axis, min_periods=0, step=step + ) result = getattr(roll_table, method)( engine_kwargs=engine_kwargs, engine="numba", **kwargs ) @@ -346,29 +360,29 @@ def test_table_method_rolling_methods( ) tm.assert_frame_equal(result, expected) - def test_table_method_rolling_apply(self, axis, nogil, parallel, nopython): + def test_table_method_rolling_apply(self, axis, nogil, parallel, nopython, step): engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} def f(x): return np.sum(x, axis=0) + 1 df = DataFrame(np.eye(3)) - result = df.rolling(2, method="table", axis=axis, min_periods=0).apply( - f, raw=True, engine_kwargs=engine_kwargs, engine="numba" - ) - expected = df.rolling(2, method="single", axis=axis, min_periods=0).apply( - f, raw=True, engine_kwargs=engine_kwargs, engine="numba" - ) + result = df.rolling( + 2, method="table", axis=axis, min_periods=0, step=step + ).apply(f, raw=True, engine_kwargs=engine_kwargs, engine="numba") + expected = df.rolling( + 2, method="single", axis=axis, min_periods=0, step=step + ).apply(f, raw=True, engine_kwargs=engine_kwargs, engine="numba") tm.assert_frame_equal(result, expected) - def test_table_method_rolling_weighted_mean(self): + def test_table_method_rolling_weighted_mean(self, step): def weighted_mean(x): arr = np.ones((1, x.shape[1])) arr[:, :2] = (x[:, :2] * x[:, 2]).sum(axis=0) / x[:, 2].sum() return arr df = DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]]) - result = df.rolling(2, method="table", min_periods=0).apply( + result = df.rolling(2, method="table", min_periods=0, step=step).apply( weighted_mean, raw=True, engine="numba" ) expected = DataFrame( @@ -378,7 +392,7 @@ def weighted_mean(x): [3.333333, 2.333333, 1.0], [1.555556, 7, 1.0], ] - ) + )[::step] tm.assert_frame_equal(result, expected) def test_table_method_expanding_apply(self, axis, nogil, parallel, nopython): diff --git a/pandas/tests/window/test_pairwise.py b/pandas/tests/window/test_pairwise.py index 6c3f3f7075ce0..bab5bd5168b58 100644 --- a/pandas/tests/window/test_pairwise.py +++ b/pandas/tests/window/test_pairwise.py @@ -47,20 +47,26 @@ def pairwise_other_frame(): ) -def test_rolling_cov(series): +def test_rolling_cov(series, step): A = series B = A + np.random.randn(len(A)) - result = A.rolling(window=50, min_periods=25).cov(B) - tm.assert_almost_equal(result[-1], np.cov(A[-50:], B[-50:])[0, 1]) + result = A.rolling(window=50, min_periods=25, step=step).cov(B) + end = range(0, len(series), step or 1)[-1] + 1 + tm.assert_almost_equal( + result[-1], np.cov(A[end - 50 : end], B[end - 50 : end])[0, 1] + ) -def test_rolling_corr(series): +def test_rolling_corr(series, step): A = series B = A + np.random.randn(len(A)) - result = A.rolling(window=50, min_periods=25).corr(B) - tm.assert_almost_equal(result[-1], np.corrcoef(A[-50:], B[-50:])[0, 1]) + result = A.rolling(window=50, min_periods=25, step=step).corr(B) + end = range(0, len(series), step or 1)[-1] + 1 + tm.assert_almost_equal( + result[-1], np.corrcoef(A[end - 50 : end], B[end - 50 : end])[0, 1] + ) # test for correct bias correction a = tm.makeTimeSeries() @@ -68,26 +74,31 @@ def test_rolling_corr(series): a[:5] = np.nan b[:10] = np.nan - result = a.rolling(window=len(a), min_periods=1).corr(b) - tm.assert_almost_equal(result[-1], a.corr(b)) + result = a.rolling(window=len(a), min_periods=1, step=step).corr(b) + end = range(0, len(a), step or 1)[-1] + 1 + tm.assert_almost_equal(result[-1], a[:end].corr(b)) @pytest.mark.parametrize("func", ["cov", "corr"]) -def test_rolling_pairwise_cov_corr(func, frame): - result = getattr(frame.rolling(window=10, min_periods=5), func)() +def test_rolling_pairwise_cov_corr(func, frame, step): + result = getattr(frame.rolling(window=10, min_periods=5, step=step), func)() result = result.loc[(slice(None), 1), 5] result.index = result.index.droplevel(1) - expected = getattr(frame[1].rolling(window=10, min_periods=5), func)(frame[5]) + expected = getattr(frame[1].rolling(window=10, min_periods=5, step=step), func)( + frame[5] + ) tm.assert_series_equal(result, expected, check_names=False) @pytest.mark.parametrize("method", ["corr", "cov"]) -def test_flex_binary_frame(method, frame): +def test_flex_binary_frame(method, frame, step): series = frame[1] - res = getattr(series.rolling(window=10), method)(frame) - res2 = getattr(frame.rolling(window=10), method)(series) - exp = frame.apply(lambda x: getattr(series.rolling(window=10), method)(x)) + res = getattr(series.rolling(window=10, step=step), method)(frame) + res2 = getattr(frame.rolling(window=10, step=step), method)(series) + exp = frame.apply( + lambda x: getattr(series.rolling(window=10, step=step), method)(x) + ) tm.assert_frame_equal(res, exp) tm.assert_frame_equal(res2, exp) @@ -95,9 +106,12 @@ def test_flex_binary_frame(method, frame): frame2 = frame.copy() frame2.values[:] = np.random.randn(*frame2.shape) - res3 = getattr(frame.rolling(window=10), method)(frame2) + res3 = getattr(frame.rolling(window=10, step=step), method)(frame2) exp = DataFrame( - {k: getattr(frame[k].rolling(window=10), method)(frame2[k]) for k in frame} + { + k: getattr(frame[k].rolling(window=10, step=step), method)(frame2[k]) + for k in frame + } ) tm.assert_frame_equal(res3, exp) diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index ced163178f73a..2e48473f19837 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -82,7 +82,7 @@ def test_invalid_constructor(frame_or_series, w): @pytest.mark.parametrize("window", [timedelta(days=3), Timedelta(days=3)]) -def test_constructor_with_timedelta_window(window): +def test_constructor_with_timedelta_window(window, step): # GH 15440 n = 10 df = DataFrame( @@ -91,18 +91,18 @@ def test_constructor_with_timedelta_window(window): ) expected_data = np.append([0.0, 1.0], np.arange(3.0, 27.0, 3)) - result = df.rolling(window=window).sum() + result = df.rolling(window=window, step=step).sum() expected = DataFrame( {"value": expected_data}, index=date_range("2015-12-24", periods=n, freq="D"), - ) + )[::step] tm.assert_frame_equal(result, expected) - expected = df.rolling("3D").sum() + expected = df.rolling("3D", step=step).sum() tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("window", [timedelta(days=3), Timedelta(days=3), "3D"]) -def test_constructor_timedelta_window_and_minperiods(window, raw): +def test_constructor_timedelta_window_and_minperiods(window, step, raw): # GH 15305 n = 10 df = DataFrame( @@ -112,9 +112,11 @@ def test_constructor_timedelta_window_and_minperiods(window, raw): expected = DataFrame( {"value": np.append([np.NaN, 1.0], np.arange(3.0, 27.0, 3))}, index=date_range("2017-08-08", periods=n, freq="D"), + )[::step] + result_roll_sum = df.rolling(window=window, min_periods=2, step=step).sum() + result_roll_generic = df.rolling(window=window, min_periods=2, step=step).apply( + sum, raw=raw ) - result_roll_sum = df.rolling(window=window, min_periods=2).sum() - result_roll_generic = df.rolling(window=window, min_periods=2).apply(sum, raw=raw) tm.assert_frame_equal(result_roll_sum, expected) tm.assert_frame_equal(result_roll_generic, expected) @@ -133,18 +135,20 @@ def test_numpy_compat(method): @pytest.mark.parametrize("closed", ["right", "left", "both", "neither"]) -def test_closed_fixed(closed, arithmetic_win_operators): +def test_closed_fixed(closed, arithmetic_win_operators, step): # GH 34315 func_name = arithmetic_win_operators df_fixed = DataFrame({"A": [0, 1, 2, 3, 4]}) df_time = DataFrame({"A": [0, 1, 2, 3, 4]}, index=date_range("2020", periods=5)) result = getattr( - df_fixed.rolling(2, closed=closed, min_periods=1), + df_fixed.rolling(2, closed=closed, min_periods=1, step=step), func_name, )() + if step is not None: + result = result.reset_index(drop=True) expected = getattr( - df_time.rolling("2D", closed=closed, min_periods=1), + df_time.rolling("2D", closed=closed, min_periods=1, step=step), func_name, )().reset_index(drop=True) @@ -197,7 +201,7 @@ def test_closed_fixed(closed, arithmetic_win_operators): ], ) def test_datetimelike_centered_selections( - closed, window_selections, arithmetic_win_operators + closed, window_selections, step, arithmetic_win_operators ): # GH 34315 func_name = arithmetic_win_operators @@ -208,7 +212,7 @@ def test_datetimelike_centered_selections( expected = DataFrame( {"A": [getattr(df_time["A"].iloc[s], func_name)() for s in window_selections]}, index=date_range("2020", periods=5), - ) + )[::step] if func_name == "sem": kwargs = {"ddof": 0} @@ -216,7 +220,7 @@ def test_datetimelike_centered_selections( kwargs = {} result = getattr( - df_time.rolling("2D", closed=closed, min_periods=1, center=True), + df_time.rolling("2D", closed=closed, min_periods=1, center=True, step=step), func_name, )(**kwargs) @@ -237,7 +241,7 @@ def test_datetimelike_centered_selections( ], ) def test_datetimelike_centered_offset_covers_all( - window, closed, expected, frame_or_series + window, closed, expected, step, frame_or_series ): # GH 42753 @@ -248,8 +252,8 @@ def test_datetimelike_centered_offset_covers_all( ] df = frame_or_series([1, 1, 1], index=index) - result = df.rolling(window, closed=closed, center=True).sum() - expected = frame_or_series(expected, index=index) + result = df.rolling(window, closed=closed, center=True, step=step).sum() + expected = frame_or_series(expected, index=index)[::step] tm.assert_equal(result, expected) @@ -263,7 +267,7 @@ def test_datetimelike_centered_offset_covers_all( ], ) def test_datetimelike_nonunique_index_centering( - window, closed, expected, frame_or_series + window, closed, expected, frame_or_series, step ): index = DatetimeIndex( [ @@ -279,28 +283,28 @@ def test_datetimelike_nonunique_index_centering( ) df = frame_or_series([1] * 8, index=index, dtype=float) - expected = frame_or_series(expected, index=index, dtype=float) + expected = frame_or_series(expected, index=index, dtype=float)[::step] - result = df.rolling(window, center=True, closed=closed).sum() + result = df.rolling(window, center=True, closed=closed, step=step).sum() tm.assert_equal(result, expected) -def test_even_number_window_alignment(): +def test_even_number_window_alignment(step): # see discussion in GH 38780 s = Series(range(3), index=date_range(start="2020-01-01", freq="D", periods=3)) # behavior of index- and datetime-based windows differs here! # s.rolling(window=2, min_periods=1, center=True).mean() - result = s.rolling(window="2D", min_periods=1, center=True).mean() + result = s.rolling(window="2D", min_periods=1, center=True, step=step).mean() - expected = Series([0.5, 1.5, 2], index=s.index) + expected = Series([0.5, 1.5, 2], index=s.index)[::step] tm.assert_series_equal(result, expected) -def test_closed_fixed_binary_col(center): +def test_closed_fixed_binary_col(center, step): # GH 34315 data = [0, 1, 1, 0, 0, 1, 0, 1] df = DataFrame( @@ -317,45 +321,51 @@ def test_closed_fixed_binary_col(center): expected_data, columns=["binary_col"], index=date_range(start="2020-01-01", freq="min", periods=len(expected_data)), - ) + )[::step] - rolling = df.rolling(window=len(df), closed="left", min_periods=1, center=center) + rolling = df.rolling( + window=len(df), closed="left", min_periods=1, center=center, step=step + ) result = rolling.mean() tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("closed", ["neither", "left"]) -def test_closed_empty(closed, arithmetic_win_operators): +def test_closed_empty(closed, arithmetic_win_operators, step): # GH 26005 func_name = arithmetic_win_operators ser = Series(data=np.arange(5), index=date_range("2000", periods=5, freq="2D")) - roll = ser.rolling("1D", closed=closed) + roll = ser.rolling("1D", closed=closed, step=step) result = getattr(roll, func_name)() - expected = Series([np.nan] * 5, index=ser.index) + expected = Series([np.nan] * 5, index=ser.index)[::step] tm.assert_series_equal(result, expected) @pytest.mark.parametrize("func", ["min", "max"]) -def test_closed_one_entry(func): +def test_closed_one_entry(func, step): # GH24718 ser = Series(data=[2], index=date_range("2000", periods=1)) - result = getattr(ser.rolling("10D", closed="left"), func)() - tm.assert_series_equal(result, Series([np.nan], index=ser.index)) + result = getattr(ser.rolling("10D", closed="left", step=step), func)() + index = ser.index.copy() + index.freq = index.freq * (step or 1) + tm.assert_series_equal(result, Series([np.nan], index=index)) @pytest.mark.parametrize("func", ["min", "max"]) -def test_closed_one_entry_groupby(func): +def test_closed_one_entry_groupby(func, step, step_methods): # GH24718 ser = DataFrame( data={"A": [1, 1, 2], "B": [3, 2, 1]}, index=date_range("2000", periods=3), ) result = getattr( - ser.groupby("A", sort=False)["B"].rolling("10D", closed="left"), func + ser.groupby("A", sort=False)["B"].rolling("10D", closed="left", step=step), func )() exp_idx = MultiIndex.from_arrays(arrays=[[1, 1, 2], ser.index], names=("A", None)) expected = Series(data=[np.nan, 3, np.nan], index=exp_idx, name="B") + selected = step_methods.get_selected_indices(step, exp_idx.get_level_values(0)) + expected = expected.iloc[selected] tm.assert_series_equal(result, expected) @@ -1362,7 +1372,7 @@ def test_rolling_non_monotonic(method, expected): df = DataFrame({"values": np.arange(len(use_expanding)) ** 2}) class CustomIndexer(BaseIndexer): - def get_window_bounds(self, num_values, min_periods, center, closed): + def get_window_bounds(self, num_values, min_periods, center, closed, step): start = np.empty(num_values, dtype=np.int64) end = np.empty(num_values, dtype=np.int64) for i in range(num_values): @@ -1372,7 +1382,8 @@ def get_window_bounds(self, num_values, min_periods, center, closed): else: start[i] = i end[i] = i + self.window_size - return start, end + ref = np.arange(0, num_values, dtype=np.int64) + return start, end, ref indexer = CustomIndexer(window_size=4, use_expanding=use_expanding) diff --git a/pandas/tests/window/test_rolling_functions.py b/pandas/tests/window/test_rolling_functions.py index 842c056806092..9ab4ff13796d6 100644 --- a/pandas/tests/window/test_rolling_functions.py +++ b/pandas/tests/window/test_rolling_functions.py @@ -38,10 +38,11 @@ [lambda x: np.var(x, ddof=0), "var", {"ddof": 0}], ], ) -def test_series(series, compare_func, roll_func, kwargs): - result = getattr(series.rolling(50), roll_func)(**kwargs) +def test_series(series, compare_func, roll_func, kwargs, step): + result = getattr(series.rolling(50, step=step), roll_func)(**kwargs) assert isinstance(result, Series) - tm.assert_almost_equal(result.iloc[-1], compare_func(series[-50:])) + end = range(0, len(series), step or 1)[-1] + 1 + tm.assert_almost_equal(result.iloc[-1], compare_func(series[end - 50 : end])) @pytest.mark.parametrize( @@ -64,12 +65,13 @@ def test_series(series, compare_func, roll_func, kwargs): [lambda x: np.var(x, ddof=0), "var", {"ddof": 0}], ], ) -def test_frame(raw, frame, compare_func, roll_func, kwargs): - result = getattr(frame.rolling(50), roll_func)(**kwargs) +def test_frame(raw, frame, compare_func, roll_func, kwargs, step): + result = getattr(frame.rolling(50, step=step), roll_func)(**kwargs) assert isinstance(result, DataFrame) + end = range(0, len(frame), step or 1)[-1] + 1 tm.assert_series_equal( result.iloc[-1, :], - frame.iloc[-50:, :].apply(compare_func, axis=0, raw=raw), + frame.iloc[end - 50 : end, :].apply(compare_func, axis=0, raw=raw), check_names=False, ) @@ -200,13 +202,13 @@ def test_nans_count(): ], ) @pytest.mark.parametrize("minp", [0, 99, 100]) -def test_min_periods(series, minp, roll_func, kwargs): - result = getattr(series.rolling(len(series) + 1, min_periods=minp), roll_func)( - **kwargs - ) - expected = getattr(series.rolling(len(series), min_periods=minp), roll_func)( - **kwargs - ) +def test_min_periods(series, minp, roll_func, kwargs, step): + result = getattr( + series.rolling(len(series) + 1, min_periods=minp, step=step), roll_func + )(**kwargs) + expected = getattr( + series.rolling(len(series), min_periods=minp, step=step), roll_func + )(**kwargs) nan_mask = isna(result) tm.assert_series_equal(nan_mask, isna(expected)) @@ -214,9 +216,9 @@ def test_min_periods(series, minp, roll_func, kwargs): tm.assert_almost_equal(result[nan_mask], expected[nan_mask]) -def test_min_periods_count(series): - result = series.rolling(len(series) + 1, min_periods=0).count() - expected = series.rolling(len(series), min_periods=0).count() +def test_min_periods_count(series, step): + result = series.rolling(len(series) + 1, min_periods=0, step=step).count() + expected = series.rolling(len(series), min_periods=0, step=step).count() nan_mask = isna(result) tm.assert_series_equal(nan_mask, isna(expected)) @@ -364,7 +366,7 @@ def test_rolling_functions_window_non_shrinkage(f): tm.assert_frame_equal(df_result, df_expected) -def test_rolling_max_gh6297(): +def test_rolling_max_gh6297(step): """Replicate result expected in GH #6297""" indices = [datetime(1975, 1, i) for i in range(1, 6)] # So that we can have 2 datapoints on one of the days @@ -378,12 +380,12 @@ def test_rolling_max_gh6297(): expected = Series( [1.0, 2.0, 6.0, 4.0, 5.0], index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - x = series.resample("D").max().rolling(window=1).max() + )[::step] + x = series.resample("D").max().rolling(window=1, step=step).max() tm.assert_series_equal(expected, x) -def test_rolling_max_resample(): +def test_rolling_max_resample(step): indices = [datetime(1975, 1, i) for i in range(1, 6)] # So that we can have 3 datapoints on last day (4, 10, and 20) @@ -399,16 +401,16 @@ def test_rolling_max_resample(): expected = Series( [0.0, 1.0, 2.0, 3.0, 20.0], index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - x = series.resample("D").max().rolling(window=1).max() + )[::step] + x = series.resample("D").max().rolling(window=1, step=step).max() tm.assert_series_equal(expected, x) # Now specify median (10.0) expected = Series( [0.0, 1.0, 2.0, 3.0, 10.0], index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - x = series.resample("D").median().rolling(window=1).max() + )[::step] + x = series.resample("D").median().rolling(window=1, step=step).max() tm.assert_series_equal(expected, x) # Now specify mean (4+10+20)/3 @@ -416,12 +418,12 @@ def test_rolling_max_resample(): expected = Series( [0.0, 1.0, 2.0, 3.0, v], index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - x = series.resample("D").mean().rolling(window=1).max() + )[::step] + x = series.resample("D").mean().rolling(window=1, step=step).max() tm.assert_series_equal(expected, x) -def test_rolling_min_resample(): +def test_rolling_min_resample(step): indices = [datetime(1975, 1, i) for i in range(1, 6)] # So that we can have 3 datapoints on last day (4, 10, and 20) @@ -437,8 +439,8 @@ def test_rolling_min_resample(): expected = Series( [0.0, 1.0, 2.0, 3.0, 4.0], index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - r = series.resample("D").min().rolling(window=1) + )[::step] + r = series.resample("D").min().rolling(window=1, step=step) tm.assert_series_equal(expected, r.min()) diff --git a/pandas/tests/window/test_rolling_quantile.py b/pandas/tests/window/test_rolling_quantile.py index 56b79097a1d05..815ee419590f7 100644 --- a/pandas/tests/window/test_rolling_quantile.py +++ b/pandas/tests/window/test_rolling_quantile.py @@ -34,21 +34,23 @@ def scoreatpercentile(a, per): @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) -def test_series(series, q): +def test_series(series, q, step): compare_func = partial(scoreatpercentile, per=q) - result = series.rolling(50).quantile(q) + result = series.rolling(50, step=step).quantile(q) assert isinstance(result, Series) - tm.assert_almost_equal(result.iloc[-1], compare_func(series[-50:])) + end = range(0, len(series), step or 1)[-1] + 1 + tm.assert_almost_equal(result.iloc[-1], compare_func(series[end - 50 : end])) @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) -def test_frame(raw, frame, q): +def test_frame(raw, frame, q, step): compare_func = partial(scoreatpercentile, per=q) - result = frame.rolling(50).quantile(q) + result = frame.rolling(50, step=step).quantile(q) assert isinstance(result, DataFrame) + end = range(0, len(frame), step or 1)[-1] + 1 tm.assert_series_equal( result.iloc[-1, :], - frame.iloc[-50:, :].apply(compare_func, axis=0, raw=raw), + frame.iloc[end - 50 : end, :].apply(compare_func, axis=0, raw=raw), check_names=False, ) @@ -113,9 +115,9 @@ def test_nans(q): @pytest.mark.parametrize("minp", [0, 99, 100]) @pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) -def test_min_periods(series, minp, q): - result = series.rolling(len(series) + 1, min_periods=minp).quantile(q) - expected = series.rolling(len(series), min_periods=minp).quantile(q) +def test_min_periods(series, minp, q, step): + result = series.rolling(len(series) + 1, min_periods=minp, step=step).quantile(q) + expected = series.rolling(len(series), min_periods=minp, step=step).quantile(q) nan_mask = isna(result) tm.assert_series_equal(nan_mask, isna(expected)) diff --git a/pandas/tests/window/test_rolling_skew_kurt.py b/pandas/tests/window/test_rolling_skew_kurt.py index 46b7eb6cbc285..152172d7b2266 100644 --- a/pandas/tests/window/test_rolling_skew_kurt.py +++ b/pandas/tests/window/test_rolling_skew_kurt.py @@ -112,9 +112,13 @@ def test_nans(sp_func, roll_func): @pytest.mark.parametrize("minp", [0, 99, 100]) @pytest.mark.parametrize("roll_func", ["kurt", "skew"]) -def test_min_periods(series, minp, roll_func): - result = getattr(series.rolling(len(series) + 1, min_periods=minp), roll_func)() - expected = getattr(series.rolling(len(series), min_periods=minp), roll_func)() +def test_min_periods(series, minp, roll_func, step): + result = getattr( + series.rolling(len(series) + 1, min_periods=minp, step=step), roll_func + )() + expected = getattr( + series.rolling(len(series), min_periods=minp, step=step), roll_func + )() nan_mask = isna(result) tm.assert_series_equal(nan_mask, isna(expected)) @@ -172,55 +176,55 @@ def test_center_reindex_frame(frame, roll_func): tm.assert_frame_equal(frame_xp, frame_rs) -def test_rolling_skew_edge_cases(): +def test_rolling_skew_edge_cases(step): - all_nan = Series([np.NaN] * 5) + all_nan = Series([np.NaN] * 5)[::step] # yields all NaN (0 variance) d = Series([1] * 5) - x = d.rolling(window=5).skew() + x = d.rolling(window=5, step=step).skew() tm.assert_series_equal(all_nan, x) # yields all NaN (window too small) d = Series(np.random.randn(5)) - x = d.rolling(window=2).skew() + x = d.rolling(window=2, step=step).skew() tm.assert_series_equal(all_nan, x) # yields [NaN, NaN, NaN, 0.177994, 1.548824] d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401]) - expected = Series([np.NaN, np.NaN, np.NaN, 0.177994, 1.548824]) - x = d.rolling(window=4).skew() + expected = Series([np.NaN, np.NaN, np.NaN, 0.177994, 1.548824])[::step] + x = d.rolling(window=4, step=step).skew() tm.assert_series_equal(expected, x) -def test_rolling_kurt_edge_cases(): +def test_rolling_kurt_edge_cases(step): - all_nan = Series([np.NaN] * 5) + all_nan = Series([np.NaN] * 5)[::step] # yields all NaN (0 variance) d = Series([1] * 5) - x = d.rolling(window=5).kurt() + x = d.rolling(window=5, step=step).kurt() tm.assert_series_equal(all_nan, x) # yields all NaN (window too small) d = Series(np.random.randn(5)) - x = d.rolling(window=3).kurt() + x = d.rolling(window=3, step=step).kurt() tm.assert_series_equal(all_nan, x) # yields [NaN, NaN, NaN, 1.224307, 2.671499] d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401]) - expected = Series([np.NaN, np.NaN, np.NaN, 1.224307, 2.671499]) - x = d.rolling(window=4).kurt() + expected = Series([np.NaN, np.NaN, np.NaN, 1.224307, 2.671499])[::step] + x = d.rolling(window=4, step=step).kurt() tm.assert_series_equal(expected, x) -def test_rolling_skew_eq_value_fperr(): +def test_rolling_skew_eq_value_fperr(step): # #18804 all rolling skew for all equal values should return Nan - a = Series([1.1] * 15).rolling(window=10).skew() + a = Series([1.1] * 15).rolling(window=10, step=step).skew() assert np.isnan(a).all() -def test_rolling_kurt_eq_value_fperr(): +def test_rolling_kurt_eq_value_fperr(step): # #18804 all rolling kurt for all equal values should return Nan - a = Series([1.1] * 15).rolling(window=10).kurt() + a = Series([1.1] * 15).rolling(window=10, step=step).kurt() assert np.isnan(a).all() diff --git a/pandas/tests/window/test_win_type.py b/pandas/tests/window/test_win_type.py index 03ea745d9cb86..4ed74fe8d2a14 100644 --- a/pandas/tests/window/test_win_type.py +++ b/pandas/tests/window/test_win_type.py @@ -125,10 +125,10 @@ def test_constructor_with_win_type_invalid(frame_or_series): @td.skip_if_no_scipy @pytest.mark.filterwarnings("ignore:can't resolve:ImportWarning") -def test_window_with_args(): +def test_window_with_args(step): # make sure that we are aggregating window functions correctly with arg r = Series(np.random.randn(100)).rolling( - window=10, min_periods=1, win_type="gaussian" + window=10, min_periods=1, win_type="gaussian", step=step ) expected = concat([r.mean(std=10), r.mean(std=0.01)], axis=1) expected.columns = ["", ""] @@ -173,8 +173,8 @@ def test_win_type_freq_return_deprecation(): @td.skip_if_no_scipy def test_win_type_not_implemented(): class CustomIndexer(BaseIndexer): - def get_window_bounds(self, num_values, min_periods, center, closed): - return np.array([0, 1]), np.array([1, 2]) + def get_window_bounds(self, num_values, min_periods, center, closed, step): + return np.array([0, 1]), np.array([1, 2]), np.array([0, 1]) df = DataFrame({"values": range(2)}) indexer = CustomIndexer() @@ -183,10 +183,10 @@ def get_window_bounds(self, num_values, min_periods, center, closed): @td.skip_if_no_scipy -def test_cmov_mean(): +def test_cmov_mean(step): # GH 8238 vals = np.array([6.95, 15.21, 4.72, 9.12, 13.81, 13.49, 16.68, 9.48, 10.63, 14.48]) - result = Series(vals).rolling(5, center=True).mean() + result = Series(vals).rolling(5, center=True, step=step).mean() expected_values = [ np.nan, np.nan, @@ -199,15 +199,15 @@ def test_cmov_mean(): np.nan, np.nan, ] - expected = Series(expected_values) + expected = Series(expected_values)[::step] tm.assert_series_equal(expected, result) @td.skip_if_no_scipy -def test_cmov_window(): +def test_cmov_window(step): # GH 8238 vals = np.array([6.95, 15.21, 4.72, 9.12, 13.81, 13.49, 16.68, 9.48, 10.63, 14.48]) - result = Series(vals).rolling(5, win_type="boxcar", center=True).mean() + result = Series(vals).rolling(5, win_type="boxcar", center=True, step=step).mean() expected_values = [ np.nan, np.nan, @@ -220,28 +220,28 @@ def test_cmov_window(): np.nan, np.nan, ] - expected = Series(expected_values) + expected = Series(expected_values)[::step] tm.assert_series_equal(expected, result) @td.skip_if_no_scipy -def test_cmov_window_corner(): +def test_cmov_window_corner(step): # GH 8238 # all nan vals = Series([np.nan] * 10) - result = vals.rolling(5, center=True, win_type="boxcar").mean() + result = vals.rolling(5, center=True, win_type="boxcar", step=step).mean() assert np.isnan(result).all() # empty vals = Series([], dtype=object) - result = vals.rolling(5, center=True, win_type="boxcar").mean() + result = vals.rolling(5, center=True, win_type="boxcar", step=step).mean() assert len(result) == 0 # shorter than window vals = Series(np.random.randn(5)) - result = vals.rolling(10, win_type="boxcar").mean() + result = vals.rolling(10, win_type="boxcar", step=step).mean() assert np.isnan(result).all() - assert len(result) == 5 + assert len(result) == len(range(0, 5, step or 1)) @td.skip_if_no_scipy @@ -310,7 +310,7 @@ def test_cmov_window_corner(): ), ], ) -def test_cmov_window_frame(f, xp): +def test_cmov_window_frame(f, xp, step): # Gh 8238 df = DataFrame( np.array( @@ -328,28 +328,30 @@ def test_cmov_window_frame(f, xp): ] ) ) - xp = DataFrame(np.array(xp)) + xp = DataFrame(np.array(xp))[::step] - roll = df.rolling(5, win_type="boxcar", center=True) + roll = df.rolling(5, win_type="boxcar", center=True, step=step) rs = getattr(roll, f)() tm.assert_frame_equal(xp, rs) @td.skip_if_no_scipy -def test_cmov_window_na_min_periods(): +def test_cmov_window_na_min_periods(step): # min_periods vals = Series(np.random.randn(10)) vals[4] = np.nan vals[8] = np.nan - xp = vals.rolling(5, min_periods=4, center=True).mean() - rs = vals.rolling(5, win_type="boxcar", min_periods=4, center=True).mean() + xp = vals.rolling(5, min_periods=4, center=True, step=step).mean() + rs = vals.rolling( + 5, win_type="boxcar", min_periods=4, center=True, step=step + ).mean() tm.assert_series_equal(xp, rs) @td.skip_if_no_scipy -def test_cmov_window_regular(win_types): +def test_cmov_window_regular(win_types, step): # GH 8238 vals = np.array([6.95, 15.21, 4.72, 9.12, 13.81, 13.49, 16.68, 9.48, 10.63, 14.48]) xps = { @@ -451,26 +453,26 @@ def test_cmov_window_regular(win_types): ], } - xp = Series(xps[win_types]) - rs = Series(vals).rolling(5, win_type=win_types, center=True).mean() + xp = Series(xps[win_types])[::step] + rs = Series(vals).rolling(5, win_type=win_types, center=True, step=step).mean() tm.assert_series_equal(xp, rs) @td.skip_if_no_scipy -def test_cmov_window_regular_linear_range(win_types): +def test_cmov_window_regular_linear_range(win_types, step): # GH 8238 vals = np.array(range(10), dtype=float) xp = vals.copy() xp[:2] = np.nan xp[-2:] = np.nan - xp = Series(xp) + xp = Series(xp)[::step] - rs = Series(vals).rolling(5, win_type=win_types, center=True).mean() + rs = Series(vals).rolling(5, win_type=win_types, center=True, step=step).mean() tm.assert_series_equal(xp, rs) @td.skip_if_no_scipy -def test_cmov_window_regular_missing_data(win_types): +def test_cmov_window_regular_missing_data(win_types, step): # GH 8238 vals = np.array( [6.95, 15.21, 4.72, 9.12, 13.81, 13.49, 16.68, np.nan, 10.63, 14.48] @@ -574,13 +576,13 @@ def test_cmov_window_regular_missing_data(win_types): ], } - xp = Series(xps[win_types]) - rs = Series(vals).rolling(5, win_type=win_types, min_periods=3).mean() + xp = Series(xps[win_types])[::step] + rs = Series(vals).rolling(5, win_type=win_types, min_periods=3, step=step).mean() tm.assert_series_equal(xp, rs) @td.skip_if_no_scipy -def test_cmov_window_special(win_types_special): +def test_cmov_window_special(win_types_special, step): # GH 8238 kwds = { "kaiser": {"beta": 1.0}, @@ -642,17 +644,17 @@ def test_cmov_window_special(win_types_special): ], } - xp = Series(xps[win_types_special]) + xp = Series(xps[win_types_special])[::step] rs = ( Series(vals) - .rolling(5, win_type=win_types_special, center=True) + .rolling(5, win_type=win_types_special, center=True, step=step) .mean(**kwds[win_types_special]) ) tm.assert_series_equal(xp, rs) @td.skip_if_no_scipy -def test_cmov_window_special_linear_range(win_types_special): +def test_cmov_window_special_linear_range(win_types_special, step): # GH 8238 kwds = { "kaiser": {"beta": 1.0}, @@ -666,11 +668,11 @@ def test_cmov_window_special_linear_range(win_types_special): xp = vals.copy() xp[:2] = np.nan xp[-2:] = np.nan - xp = Series(xp) + xp = Series(xp)[::step] rs = ( Series(vals) - .rolling(5, win_type=win_types_special, center=True) + .rolling(5, win_type=win_types_special, center=True, step=step) .mean(**kwds[win_types_special]) ) tm.assert_series_equal(xp, rs) From 8eff509c5d2652cbd120c7b6c3b5e179c9fe2a7a Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Mon, 31 Jan 2022 16:01:27 -0500 Subject: [PATCH 2/3] Fix typing --- pandas/core/indexers/objects.py | 5 ++-- pandas/core/window/ewm.py | 2 +- pandas/core/window/rolling.py | 50 ++++++++++++++++++++++----------- 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/pandas/core/indexers/objects.py b/pandas/core/indexers/objects.py index 8a73646138abf..427a9fb67075f 100644 --- a/pandas/core/indexers/objects.py +++ b/pandas/core/indexers/objects.py @@ -295,7 +295,7 @@ def get_window_bounds( if step is None: step = 1 - start = np.arange(num_values, step=step, dtype="int64") + start = np.arange(0, num_values, step, dtype="int64") end = start + self.window_size if self.window_size: end = np.clip(end, 0, num_values) @@ -358,6 +358,7 @@ def get_window_bounds( start_arrays = [] end_arrays = [] ref_arrays = [] + empty = np.array([], dtype=np.int64) window_indices_start = 0 for key, indices in self.groupby_indices.items(): index_array: np.ndarray | None @@ -393,7 +394,7 @@ def get_window_bounds( start_arrays.append(window_indices.take(ensure_platform_int(start))) end_arrays.append(window_indices.take(ensure_platform_int(end))) ref_arrays.append( - None if ref is None else window_indices.take(ensure_platform_int(ref)) + empty if ref is None else window_indices.take(ensure_platform_int(ref)) ) start = np.concatenate(start_arrays) end = np.concatenate(end_arrays) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 1b24d214d2356..74d5d75a73871 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -418,7 +418,7 @@ def __init__( ) def _check_window_bounds( - self, start: np.ndarray, end: np.ndarray, ref: np.ndarray, num_vals: int + self, start: np.ndarray, end: np.ndarray, ref: np.ndarray | None, num_vals: int ) -> None: # emw algorithms are iterative with each point # ExponentialMovingWindowIndexer "bounds" are the entire window diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 1d13475d6db95..980ec1c0bc029 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -14,6 +14,7 @@ Any, Callable, Hashable, + cast, ) import warnings @@ -230,7 +231,7 @@ def _validate(self) -> None: raise ValueError("method must be 'table' or 'single") def _check_window_bounds( - self, start: np.ndarray, end: np.ndarray, ref: np.ndarray, num_vals: int + self, start: np.ndarray, end: np.ndarray, ref: np.ndarray | None, num_vals: int ) -> None: if len(start) != len(end): raise ValueError( @@ -251,7 +252,7 @@ def _check_window_bounds( f"if given and rounded up unless groupby was used" ) - def _slice_index(self, index: Index, at: np.ndarray) -> Index: + def _slice_index(self, index: Index, at: np.ndarray | None) -> Index: """ Slices the index of the object. """ @@ -435,7 +436,11 @@ def _get_window_indexer(self) -> BaseIndexer: return FixedWindowIndexer(window_size=self.window) def _apply_series( - self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None + self, + homogeneous_func: Callable[ + ..., ArrayLike | tuple[ArrayLike, np.ndarray | None] + ], + name: str | None = None, ) -> Series: """ Series version of _apply_blockwise @@ -457,7 +462,11 @@ def _apply_series( return obj._constructor(result, index=index, name=obj.name) def _apply_blockwise( - self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None + self, + homogeneous_func: Callable[ + ..., ArrayLike | tuple[ArrayLike, np.ndarray | None] + ], + name: str | None = None, ) -> DataFrame | Series: """ Apply the given function to the DataFrame broken down into homogeneous @@ -472,7 +481,7 @@ def _apply_blockwise( obj = notna(obj).astype(int) obj._mgr = obj._mgr.consolidate() - def hfunc(values: ArrayLike) -> ArrayLike: + def hfunc(values: ArrayLike) -> ArrayLike | tuple[ArrayLike, np.ndarray | None]: values = self._prep_values(values) return homogeneous_func(values) @@ -480,22 +489,24 @@ def hfunc(values: ArrayLike) -> ArrayLike: obj = obj.T taker = [] - res_values: list[Any] = [] + res_values: list[ArrayLike] = [] ref_value = None for i, arr in enumerate(obj._iter_column_arrays()): # GH#42736 operate column-wise instead of block-wise try: - res, ref = hfunc(arr), None + hresult = hfunc(arr) except (TypeError, NotImplementedError): pass else: - if type(res) is tuple: - res, ref = res + res, ref = ( + hresult + if type(hresult) is tuple + else (cast(ArrayLike, hresult), None) + ) if len(res_values) == 0: ref_value = ref - elif not ( - (ref_value is None and ref is None) - or np.array_equal(ref_value, ref) + elif ((ref_value is None) != (ref is None)) or not np.array_equal( + cast(np.ndarray, ref_value), cast(np.ndarray, ref) ): raise ValueError("hfunc returned inconsistent ref value") res_values.append(res) @@ -526,7 +537,11 @@ def hfunc(values: ArrayLike) -> ArrayLike: return self._resolve_output(df, obj) def _apply_tablewise( - self, homogeneous_func: Callable[..., ArrayLike], name: str | None = None + self, + homogeneous_func: Callable[ + ..., ArrayLike | tuple[ArrayLike, np.ndarray | None] + ], + name: str | None = None, ) -> DataFrame | Series: """ Apply the given function to the DataFrame across the entire object @@ -536,9 +551,10 @@ def _apply_tablewise( obj = self._create_data(self._selected_obj) values = self._prep_values(obj.to_numpy()) values = values.T if self.axis == 1 else values - result, ref = homogeneous_func(values), None - if type(result) is tuple: - result, ref = result + hresult = homogeneous_func(values) + result, ref = ( + hresult if type(hresult) is tuple else (cast(ArrayLike, hresult), None) + ) result = result.T if self.axis == 1 else result index = obj.index if self.axis == 1 else self._slice_index(obj.index, ref) columns = obj.columns if self.axis != 1 else self._slice_index(obj.columns, ref) @@ -2649,7 +2665,7 @@ class RollingGroupby(BaseWindowGroupby, Rolling): _attributes = Rolling._attributes + BaseWindowGroupby._attributes - def _slice_index(self, index: Index, at: np.ndarray) -> Index: + def _slice_index(self, index: Index, at: np.ndarray | None) -> Index: """ Slices the index of the object. """ From 93a8b0bb0c83faca3856bc5e3343949c5e54efe3 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Mon, 31 Jan 2022 17:09:02 -0500 Subject: [PATCH 3/3] Fix docstrings --- pandas/core/indexers/objects.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pandas/core/indexers/objects.py b/pandas/core/indexers/objects.py index 427a9fb67075f..aa98a6f6c4354 100644 --- a/pandas/core/indexers/objects.py +++ b/pandas/core/indexers/objects.py @@ -35,10 +35,9 @@ Returns ------- A tuple of ndarray[int64]s: - start : array of start boundaries - end : array of end boundaries - ref : array of window reference locations, or None indicating all - must be None if step is None or 1 +start : array of start boundaries +end : array of end boundaries +ref : array of window reference locations, or None indicating all if step is None or 1 """