From 6047dea13207dd3be3337f7e08294bab836d45bd Mon Sep 17 00:00:00 2001 From: Brock Date: Mon, 20 Jul 2020 14:34:19 -0700 Subject: [PATCH 1/3] REF: implement _apply_blockwise --- pandas/core/window/ewm.py | 27 +++-------------- pandas/core/window/rolling.py | 56 ++++++++++++++++++++++------------- 2 files changed, 39 insertions(+), 44 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 7a2d8e84bec76..c57c434dd3040 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -12,9 +12,7 @@ from pandas.util._decorators import Appender, Substitution, doc from pandas.core.dtypes.common import is_datetime64_ns_dtype -from pandas.core.dtypes.generic import ABCDataFrame -from pandas.core.base import DataError import pandas.core.common as common from pandas.core.window.common import _doc_template, _shared_docs, zsqrt from pandas.core.window.rolling import _flex_binary_moment, _Rolling @@ -302,30 +300,13 @@ def _apply(self, func): ------- y : same type as input argument """ - blocks, obj = self._create_blocks(self._selected_obj) - block_list = list(blocks) - - results = [] - exclude = [] - for i, b in enumerate(blocks): - try: - values = self._prep_values(b.values) - - except (TypeError, NotImplementedError) as err: - if isinstance(obj, ABCDataFrame): - exclude.extend(b.columns) - del block_list[i] - continue - else: - raise DataError("No numeric types to aggregate") from err + def homogeneous_func(values: np.ndarray): if values.size == 0: - results.append(values.copy()) - continue + return values.copy() + return np.apply_along_axis(func, self.axis, values) - results.append(np.apply_along_axis(func, self.axis, values)) - - return self._wrap_results(results, block_list, obj, exclude) + return self._apply_blockwise(homogeneous_func) @Substitution(name="ewm", func_name="mean") @Appender(_doc_template) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 48953f6a75487..f80e9a0e84778 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -487,6 +487,36 @@ def _get_window_indexer(self, window: int) -> BaseIndexer: return VariableWindowIndexer(index_array=self._on.asi8, window_size=window) return FixedWindowIndexer(window_size=window) + def _apply_blockwise(self, homogeneous_func: Callable): + """ + Apply the given function to the DataFrame broken down into homogeneous + sub-frames. + """ + # This isn't quite blockwise, since `blocks` is actually a collection + # of homogenenous DataFrames. + blocks, obj = self._create_blocks(self._selected_obj) + + skipped = [] + results = [] + exclude: List[Scalar] = [] + for i, b in enumerate(blocks): + try: + values = self._prep_values(b.values) + + except (TypeError, NotImplementedError) as err: + if isinstance(obj, ABCDataFrame): + skipped.append(i) + exclude.extend(b.columns) + continue + else: + raise DataError("No numeric types to aggregate") from err + + result = homogeneous_func(values) + results.append(result) + + block_list = [blk for i, blk in enumerate(blocks) if i not in skipped] + return self._wrap_results(results, block_list, obj, exclude) + def _apply( self, func: Callable, @@ -524,30 +554,14 @@ def _apply( """ win_type = self._get_win_type(kwargs) window = self._get_window(win_type=win_type) - - blocks, obj = self._create_blocks(self._selected_obj) - block_list = list(blocks) window_indexer = self._get_window_indexer(window) - results = [] - exclude: List[Scalar] = [] - for i, b in enumerate(blocks): - try: - values = self._prep_values(b.values) - - except (TypeError, NotImplementedError) as err: - if isinstance(obj, ABCDataFrame): - exclude.extend(b.columns) - del block_list[i] - continue - else: - raise DataError("No numeric types to aggregate") from err + def homogeneous_func(values: np.ndarray): + # calculation function if values.size == 0: - results.append(values.copy()) - continue + return values.copy() - # calculation function offset = calculate_center_offset(window) if center else 0 additional_nans = np.array([np.nan] * offset) @@ -594,9 +608,9 @@ def calc(x): if center: result = self._center_window(result, window) - results.append(result) + return result - return self._wrap_results(results, block_list, obj, exclude) + return self._apply_blockwise(homogeneous_func) def aggregate(self, func, *args, **kwargs): result, how = self._aggregate(func, *args, **kwargs) From a74e2e297e41015c5ccdab9d07e7b8afed2ef5c1 Mon Sep 17 00:00:00 2001 From: Brock Date: Tue, 4 Aug 2020 20:37:02 -0700 Subject: [PATCH 2/3] Annotate --- pandas/core/window/rolling.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 95aa7ab3e7d49..b98831b488c7d 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -12,7 +12,7 @@ from pandas._libs.tslibs import BaseOffset, to_offset import pandas._libs.window.aggregations as window_aggregations -from pandas._typing import Axis, FrameOrSeries, Scalar +from pandas._typing import ArrayLike, Axis, FrameOrSeries, Scalar from pandas.compat._optional import import_optional_dependency from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, cache_readonly, doc @@ -487,7 +487,7 @@ def _get_window_indexer(self, window: int) -> BaseIndexer: return VariableWindowIndexer(index_array=self._on.asi8, window_size=window) return FixedWindowIndexer(window_size=window) - def _apply_blockwise(self, homogeneous_func: Callable): + def _apply_blockwise(self, homogeneous_func: Callable) -> FrameOrSeries: """ Apply the given function to the DataFrame broken down into homogeneous sub-frames. @@ -496,8 +496,8 @@ def _apply_blockwise(self, homogeneous_func: Callable): # of homogenenous DataFrames. blocks, obj = self._create_blocks(self._selected_obj) - skipped = [] - results = [] + skipped: List[int] = [] + results: List[ArrayLike] = [] exclude: List[Scalar] = [] for i, b in enumerate(blocks): try: From 80df9bc30e8dff20f52804373f5aa995da6df8b2 Mon Sep 17 00:00:00 2001 From: jbrockmendel Date: Thu, 6 Aug 2020 08:35:40 -0700 Subject: [PATCH 3/3] Update pandas/core/window/rolling.py Co-authored-by: Simon Hawkins --- pandas/core/window/rolling.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index b98831b488c7d..e1be7c72f274f 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -487,7 +487,9 @@ def _get_window_indexer(self, window: int) -> BaseIndexer: return VariableWindowIndexer(index_array=self._on.asi8, window_size=window) return FixedWindowIndexer(window_size=window) - def _apply_blockwise(self, homogeneous_func: Callable) -> FrameOrSeries: + def _apply_blockwise( + self, homogeneous_func: Callable[..., ArrayLike] + ) -> FrameOrSeries: """ Apply the given function to the DataFrame broken down into homogeneous sub-frames.