diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index f516871f789d0..f7e81f41b8675 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -6,13 +6,23 @@ from functools import partial import inspect from textwrap import dedent -from typing import Callable, Dict, List, Optional, Set, Tuple, Type, Union +from typing import ( + TYPE_CHECKING, + Callable, + Dict, + List, + Optional, + Set, + Tuple, + Type, + Union, +) import numpy as np from pandas._libs.tslibs import BaseOffset, to_offset import pandas._libs.window.aggregations as window_aggregations -from pandas._typing import ArrayLike, Axis, FrameOrSeries, Label +from pandas._typing import ArrayLike, Axis, FrameOrSeriesUnion, Label from pandas.compat._optional import import_optional_dependency from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, cache_readonly, doc @@ -55,6 +65,9 @@ ) from pandas.core.window.numba_ import generate_numba_apply_func +if TYPE_CHECKING: + from pandas import Series + def calculate_center_offset(window) -> int: """ @@ -145,7 +158,7 @@ class _Window(PandasObject, ShallowMixin, SelectionMixin): def __init__( self, - obj: FrameOrSeries, + obj: FrameOrSeriesUnion, window=None, min_periods: Optional[int] = None, center: bool = False, @@ -219,7 +232,7 @@ def _validate_get_window_bounds_signature(window: BaseIndexer) -> None: f"get_window_bounds" ) - def _create_blocks(self, obj: FrameOrSeries): + def _create_blocks(self, obj: FrameOrSeriesUnion): """ Split data into blocks & return conformed data. """ @@ -381,7 +394,7 @@ def _wrap_result(self, result, block=None, obj=None): return type(obj)(result, index=index, columns=block.columns) return result - def _wrap_results(self, results, obj, skipped: List[int]) -> FrameOrSeries: + def _wrap_results(self, results, obj, skipped: List[int]) -> FrameOrSeriesUnion: """ Wrap the results. @@ -394,22 +407,23 @@ def _wrap_results(self, results, obj, skipped: List[int]) -> FrameOrSeries: """ from pandas import Series, concat + if obj.ndim == 1: + if not results: + raise DataError("No numeric types to aggregate") + assert len(results) == 1 + return Series(results[0], index=obj.index, name=obj.name) + exclude: List[Label] = [] - if obj.ndim == 2: - orig_blocks = list(obj._to_dict_of_blocks(copy=False).values()) - for i in skipped: - exclude.extend(orig_blocks[i].columns) - else: - orig_blocks = [obj] + orig_blocks = list(obj._to_dict_of_blocks(copy=False).values()) + for i in skipped: + exclude.extend(orig_blocks[i].columns) kept_blocks = [blk for i, blk in enumerate(orig_blocks) if i not in skipped] final = [] for result, block in zip(results, kept_blocks): - result = self._wrap_result(result, block=block, obj=obj) - if result.ndim == 1: - return result + result = type(obj)(result, index=obj.index, columns=block.columns) final.append(result) exclude = exclude or [] @@ -488,13 +502,31 @@ def _get_window_indexer(self, window: int) -> BaseIndexer: return VariableWindowIndexer(index_array=self._on.asi8, window_size=window) return FixedWindowIndexer(window_size=window) + def _apply_series(self, homogeneous_func: Callable[..., ArrayLike]) -> "Series": + """ + Series version of _apply_blockwise + """ + _, obj = self._create_blocks(self._selected_obj) + values = obj.values + + try: + values = self._prep_values(obj.values) + except (TypeError, NotImplementedError) as err: + raise DataError("No numeric types to aggregate") from err + + result = homogeneous_func(values) + return obj._constructor(result, index=obj.index, name=obj.name) + def _apply_blockwise( self, homogeneous_func: Callable[..., ArrayLike] - ) -> FrameOrSeries: + ) -> FrameOrSeriesUnion: """ Apply the given function to the DataFrame broken down into homogeneous sub-frames. """ + if self._selected_obj.ndim == 1: + return self._apply_series(homogeneous_func) + # This isn't quite blockwise, since `blocks` is actually a collection # of homogenenous DataFrames. blocks, obj = self._create_blocks(self._selected_obj) @@ -505,12 +537,9 @@ def _apply_blockwise( try: values = self._prep_values(b.values) - except (TypeError, NotImplementedError) as err: - if isinstance(obj, ABCDataFrame): - skipped.append(i) - continue - else: - raise DataError("No numeric types to aggregate") from err + except (TypeError, NotImplementedError): + skipped.append(i) + continue result = homogeneous_func(values) results.append(result) @@ -2234,7 +2263,7 @@ def _apply( def _constructor(self): return Rolling - def _create_blocks(self, obj: FrameOrSeries): + def _create_blocks(self, obj: FrameOrSeriesUnion): """ Split data into blocks & return conformed data. """ @@ -2275,7 +2304,7 @@ def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer: if isinstance(self.window, BaseIndexer): rolling_indexer = type(self.window) indexer_kwargs = self.window.__dict__ - assert isinstance(indexer_kwargs, dict) + assert isinstance(indexer_kwargs, dict) # for mypy # We'll be using the index of each group later indexer_kwargs.pop("index_array", None) elif self.is_freq_type: