Skip to content

REF: move towards making _apply_blockwise actually block-wise #35730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 23, 2020
Merged
69 changes: 52 additions & 17 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,23 @@
from functools import partial
import inspect
from textwrap import dedent
from typing import Callable, Dict, List, Optional, Set, Tuple, Type, Union
from typing import (
TYPE_CHECKING,
Callable,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
)

import numpy as np

from pandas._libs.tslibs import BaseOffset, to_offset
import pandas._libs.window.aggregations as window_aggregations
from pandas._typing import ArrayLike, Axis, FrameOrSeries, Scalar
from pandas._typing import ArrayLike, Axis, FrameOrSeriesUnion, Scalar
from pandas.compat._optional import import_optional_dependency
from pandas.compat.numpy import function as nv
from pandas.util._decorators import Appender, Substitution, cache_readonly, doc
Expand Down Expand Up @@ -55,6 +65,9 @@
)
from pandas.core.window.numba_ import generate_numba_apply_func

if TYPE_CHECKING:
from pandas import Series


def calculate_center_offset(window) -> int:
"""
Expand Down Expand Up @@ -145,7 +158,7 @@ class _Window(PandasObject, ShallowMixin, SelectionMixin):

def __init__(
self,
obj: FrameOrSeries,
obj: FrameOrSeriesUnion,
window=None,
min_periods: Optional[int] = None,
center: bool = False,
Expand Down Expand Up @@ -219,7 +232,7 @@ def _validate_get_window_bounds_signature(window: BaseIndexer) -> None:
f"get_window_bounds"
)

def _create_blocks(self, obj: FrameOrSeries):
def _create_blocks(self, obj: FrameOrSeriesUnion):
"""
Split data into blocks & return conformed data.
"""
Expand Down Expand Up @@ -381,7 +394,7 @@ def _wrap_result(self, result, block=None, obj=None):
return type(obj)(result, index=index, columns=block.columns)
return result

def _wrap_results(self, results, blocks, obj, exclude=None) -> FrameOrSeries:
def _wrap_results(self, results, blocks, obj, exclude=None) -> FrameOrSeriesUnion:
"""
Wrap the results.

Expand All @@ -394,12 +407,16 @@ def _wrap_results(self, results, blocks, obj, exclude=None) -> FrameOrSeries:
"""
from pandas import Series, concat

if obj.ndim == 1:
if not results:
raise DataError("No numeric types to aggregate")
assert len(results) == 1
return Series(results[0], index=obj.index, name=obj.name)

final = []
for result, block in zip(results, blocks):

result = self._wrap_result(result, block=block, obj=obj)
if result.ndim == 1:
return result
result = type(obj)(result, index=obj.index, columns=block.columns)
final.append(result)

# if we have an 'on' column
Expand Down Expand Up @@ -487,13 +504,33 @@ def _get_window_indexer(self, window: int) -> BaseIndexer:
return VariableWindowIndexer(index_array=self._on.asi8, window_size=window)
return FixedWindowIndexer(window_size=window)

def _apply_series(self, homogeneous_func: Callable[..., ArrayLike]) -> "Series":
"""
Series version of _apply_blockwise
"""
from pandas import Series # TODO: use _constructor?

_, obj = self._create_blocks(self._selected_obj)
values = obj.values

try:
values = self._prep_values(obj.values)
except (TypeError, NotImplementedError) as err:
raise DataError("No numeric types to aggregate") from err

result = homogeneous_func(values)
return Series(result, index=obj.index, name=obj.name)

def _apply_blockwise(
self, homogeneous_func: Callable[..., ArrayLike]
) -> FrameOrSeries:
) -> FrameOrSeriesUnion:
"""
Apply the given function to the DataFrame broken down into homogeneous
sub-frames.
"""
if self._selected_obj.ndim == 1:
return self._apply_series(homogeneous_func)

# This isn't quite blockwise, since `blocks` is actually a collection
# of homogenenous DataFrames.
blocks, obj = self._create_blocks(self._selected_obj)
Expand All @@ -505,13 +542,10 @@ def _apply_blockwise(
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError) as err:
if isinstance(obj, ABCDataFrame):
skipped.append(i)
exclude.extend(b.columns)
continue
else:
raise DataError("No numeric types to aggregate") from err
except (TypeError, NotImplementedError):
skipped.append(i)
exclude.extend(b.columns)
continue

result = homogeneous_func(values)
results.append(result)
Expand Down Expand Up @@ -2236,7 +2270,7 @@ def _apply(
def _constructor(self):
return Rolling

def _create_blocks(self, obj: FrameOrSeries):
def _create_blocks(self, obj: FrameOrSeriesUnion):
"""
Split data into blocks & return conformed data.
"""
Expand Down Expand Up @@ -2277,6 +2311,7 @@ def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer:
if isinstance(self.window, BaseIndexer):
rolling_indexer = type(self.window)
indexer_kwargs = self.window.__dict__
assert isinstance(indexer_kwargs, dict) # for mypy
# We'll be using the index of each group later
indexer_kwargs.pop("index_array", None)
elif self.is_freq_type:
Expand Down