Skip to content

REF: implement _apply_blockwise #35359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 6, 2020
27 changes: 4 additions & 23 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
from pandas.util._decorators import Appender, Substitution, doc

from pandas.core.dtypes.common import is_datetime64_ns_dtype
from pandas.core.dtypes.generic import ABCDataFrame

from pandas.core.base import DataError
import pandas.core.common as common
from pandas.core.window.common import _doc_template, _shared_docs, zsqrt
from pandas.core.window.rolling import _flex_binary_moment, _Rolling
Expand Down Expand Up @@ -302,30 +300,13 @@ def _apply(self, func):
-------
y : same type as input argument
"""
blocks, obj = self._create_blocks(self._selected_obj)
block_list = list(blocks)

results = []
exclude = []
for i, b in enumerate(blocks):
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError) as err:
if isinstance(obj, ABCDataFrame):
exclude.extend(b.columns)
del block_list[i]
continue
else:
raise DataError("No numeric types to aggregate") from err

def homogeneous_func(values: np.ndarray):
if values.size == 0:
results.append(values.copy())
continue
return values.copy()
return np.apply_along_axis(func, self.axis, values)

results.append(np.apply_along_axis(func, self.axis, values))

return self._wrap_results(results, block_list, obj, exclude)
return self._apply_blockwise(homogeneous_func)

@Substitution(name="ewm", func_name="mean")
@Appender(_doc_template)
Expand Down
58 changes: 36 additions & 22 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from pandas._libs.tslibs import BaseOffset, to_offset
import pandas._libs.window.aggregations as window_aggregations
from pandas._typing import Axis, FrameOrSeries, Scalar
from pandas._typing import ArrayLike, Axis, FrameOrSeries, Scalar
from pandas.compat._optional import import_optional_dependency
from pandas.compat.numpy import function as nv
from pandas.util._decorators import Appender, Substitution, cache_readonly, doc
Expand Down Expand Up @@ -487,6 +487,36 @@ def _get_window_indexer(self, window: int) -> BaseIndexer:
return VariableWindowIndexer(index_array=self._on.asi8, window_size=window)
return FixedWindowIndexer(window_size=window)

def _apply_blockwise(self, homogeneous_func: Callable) -> FrameOrSeries:
"""
Apply the given function to the DataFrame broken down into homogeneous
sub-frames.
"""
# This isn't quite blockwise, since `blocks` is actually a collection
# of homogenenous DataFrames.
blocks, obj = self._create_blocks(self._selected_obj)

skipped: List[int] = []
results: List[ArrayLike] = []
exclude: List[Scalar] = []
for i, b in enumerate(blocks):
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError) as err:
if isinstance(obj, ABCDataFrame):
skipped.append(i)
exclude.extend(b.columns)
continue
else:
raise DataError("No numeric types to aggregate") from err

result = homogeneous_func(values)
results.append(result)

block_list = [blk for i, blk in enumerate(blocks) if i not in skipped]
return self._wrap_results(results, block_list, obj, exclude)

def _apply(
self,
func: Callable,
Expand Down Expand Up @@ -524,30 +554,14 @@ def _apply(
"""
win_type = self._get_win_type(kwargs)
window = self._get_window(win_type=win_type)

blocks, obj = self._create_blocks(self._selected_obj)
block_list = list(blocks)
window_indexer = self._get_window_indexer(window)

results = []
exclude: List[Scalar] = []
for i, b in enumerate(blocks):
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError) as err:
if isinstance(obj, ABCDataFrame):
exclude.extend(b.columns)
del block_list[i]
continue
else:
raise DataError("No numeric types to aggregate") from err
def homogeneous_func(values: np.ndarray):
# calculation function

if values.size == 0:
results.append(values.copy())
continue
return values.copy()

# calculation function
offset = calculate_center_offset(window) if center else 0
additional_nans = np.array([np.nan] * offset)

Expand Down Expand Up @@ -594,9 +608,9 @@ def calc(x):
if center:
result = self._center_window(result, window)

results.append(result)
return result

return self._wrap_results(results, block_list, obj, exclude)
return self._apply_blockwise(homogeneous_func)

def aggregate(self, func, *args, **kwargs):
result, how = self._aggregate(func, *args, **kwargs)
Expand Down