Skip to content

REF: implement _apply_blockwise #35359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 6, 2020
27 changes: 4 additions & 23 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
from pandas.util._decorators import Appender, Substitution, doc

from pandas.core.dtypes.common import is_datetime64_ns_dtype
from pandas.core.dtypes.generic import ABCDataFrame

from pandas.core.base import DataError
import pandas.core.common as common
from pandas.core.window.common import _doc_template, _shared_docs, zsqrt
from pandas.core.window.rolling import _flex_binary_moment, _Rolling
Expand Down Expand Up @@ -302,30 +300,13 @@ def _apply(self, func):
-------
y : same type as input argument
"""
blocks, obj = self._create_blocks(self._selected_obj)
block_list = list(blocks)

results = []
exclude = []
for i, b in enumerate(blocks):
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError) as err:
if isinstance(obj, ABCDataFrame):
exclude.extend(b.columns)
del block_list[i]
continue
else:
raise DataError("No numeric types to aggregate") from err

def homogeneous_func(values: np.ndarray):
if values.size == 0:
results.append(values.copy())
continue
return values.copy()
return np.apply_along_axis(func, self.axis, values)

results.append(np.apply_along_axis(func, self.axis, values))

return self._wrap_results(results, block_list, obj, exclude)
return self._apply_blockwise(homogeneous_func)

@Substitution(name="ewm", func_name="mean")
@Appender(_doc_template)
Expand Down
56 changes: 35 additions & 21 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,36 @@ def _get_window_indexer(self, window: int) -> BaseIndexer:
return VariableWindowIndexer(index_array=self._on.asi8, window_size=window)
return FixedWindowIndexer(window_size=window)

def _apply_blockwise(self, homogeneous_func: Callable):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you type the output

"""
Apply the given function to the DataFrame broken down into homogeneous
sub-frames.
"""
# This isn't quite blockwise, since `blocks` is actually a collection
# of homogenenous DataFrames.
blocks, obj = self._create_blocks(self._selected_obj)

skipped = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you type these

results = []
exclude: List[Scalar] = []
for i, b in enumerate(blocks):
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError) as err:
if isinstance(obj, ABCDataFrame):
skipped.append(i)
exclude.extend(b.columns)
continue
else:
raise DataError("No numeric types to aggregate") from err

result = homogeneous_func(values)
results.append(result)

block_list = [blk for i, blk in enumerate(blocks) if i not in skipped]
return self._wrap_results(results, block_list, obj, exclude)

def _apply(
self,
func: Callable,
Expand Down Expand Up @@ -524,30 +554,14 @@ def _apply(
"""
win_type = self._get_win_type(kwargs)
window = self._get_window(win_type=win_type)

blocks, obj = self._create_blocks(self._selected_obj)
block_list = list(blocks)
window_indexer = self._get_window_indexer(window)

results = []
exclude: List[Scalar] = []
for i, b in enumerate(blocks):
try:
values = self._prep_values(b.values)

except (TypeError, NotImplementedError) as err:
if isinstance(obj, ABCDataFrame):
exclude.extend(b.columns)
del block_list[i]
continue
else:
raise DataError("No numeric types to aggregate") from err
def homogeneous_func(values: np.ndarray):
# calculation function

if values.size == 0:
results.append(values.copy())
continue
return values.copy()

# calculation function
offset = calculate_center_offset(window) if center else 0
additional_nans = np.array([np.nan] * offset)

Expand Down Expand Up @@ -594,9 +608,9 @@ def calc(x):
if center:
result = self._center_window(result, window)

results.append(result)
return result

return self._wrap_results(results, block_list, obj, exclude)
return self._apply_blockwise(homogeneous_func)

def aggregate(self, func, *args, **kwargs):
result, how = self._aggregate(func, *args, **kwargs)
Expand Down