Skip to content

Commit 3e10b7c

Browse files
authored
REF: implement _apply_blockwise (#35359)
1 parent 11e9d11 commit 3e10b7c

File tree

2 files changed

+42
-45
lines changed

2 files changed

+42
-45
lines changed

pandas/core/window/ewm.py

+4-23
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
from pandas.util._decorators import Appender, Substitution, doc
1313

1414
from pandas.core.dtypes.common import is_datetime64_ns_dtype
15-
from pandas.core.dtypes.generic import ABCDataFrame
1615

17-
from pandas.core.base import DataError
1816
import pandas.core.common as common
1917
from pandas.core.window.common import _doc_template, _shared_docs, zsqrt
2018
from pandas.core.window.rolling import _flex_binary_moment, _Rolling
@@ -302,30 +300,13 @@ def _apply(self, func):
302300
-------
303301
y : same type as input argument
304302
"""
305-
blocks, obj = self._create_blocks(self._selected_obj)
306-
block_list = list(blocks)
307-
308-
results = []
309-
exclude = []
310-
for i, b in enumerate(blocks):
311-
try:
312-
values = self._prep_values(b.values)
313-
314-
except (TypeError, NotImplementedError) as err:
315-
if isinstance(obj, ABCDataFrame):
316-
exclude.extend(b.columns)
317-
del block_list[i]
318-
continue
319-
else:
320-
raise DataError("No numeric types to aggregate") from err
321303

304+
def homogeneous_func(values: np.ndarray):
322305
if values.size == 0:
323-
results.append(values.copy())
324-
continue
306+
return values.copy()
307+
return np.apply_along_axis(func, self.axis, values)
325308

326-
results.append(np.apply_along_axis(func, self.axis, values))
327-
328-
return self._wrap_results(results, block_list, obj, exclude)
309+
return self._apply_blockwise(homogeneous_func)
329310

330311
@Substitution(name="ewm", func_name="mean")
331312
@Appender(_doc_template)

pandas/core/window/rolling.py

+38-22
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from pandas._libs.tslibs import BaseOffset, to_offset
1414
import pandas._libs.window.aggregations as window_aggregations
15-
from pandas._typing import Axis, FrameOrSeries, Scalar
15+
from pandas._typing import ArrayLike, Axis, FrameOrSeries, Scalar
1616
from pandas.compat._optional import import_optional_dependency
1717
from pandas.compat.numpy import function as nv
1818
from pandas.util._decorators import Appender, Substitution, cache_readonly, doc
@@ -487,6 +487,38 @@ def _get_window_indexer(self, window: int) -> BaseIndexer:
487487
return VariableWindowIndexer(index_array=self._on.asi8, window_size=window)
488488
return FixedWindowIndexer(window_size=window)
489489

490+
def _apply_blockwise(
491+
self, homogeneous_func: Callable[..., ArrayLike]
492+
) -> FrameOrSeries:
493+
"""
494+
Apply the given function to the DataFrame broken down into homogeneous
495+
sub-frames.
496+
"""
497+
# This isn't quite blockwise, since `blocks` is actually a collection
498+
# of homogenenous DataFrames.
499+
blocks, obj = self._create_blocks(self._selected_obj)
500+
501+
skipped: List[int] = []
502+
results: List[ArrayLike] = []
503+
exclude: List[Scalar] = []
504+
for i, b in enumerate(blocks):
505+
try:
506+
values = self._prep_values(b.values)
507+
508+
except (TypeError, NotImplementedError) as err:
509+
if isinstance(obj, ABCDataFrame):
510+
skipped.append(i)
511+
exclude.extend(b.columns)
512+
continue
513+
else:
514+
raise DataError("No numeric types to aggregate") from err
515+
516+
result = homogeneous_func(values)
517+
results.append(result)
518+
519+
block_list = [blk for i, blk in enumerate(blocks) if i not in skipped]
520+
return self._wrap_results(results, block_list, obj, exclude)
521+
490522
def _apply(
491523
self,
492524
func: Callable,
@@ -524,30 +556,14 @@ def _apply(
524556
"""
525557
win_type = self._get_win_type(kwargs)
526558
window = self._get_window(win_type=win_type)
527-
528-
blocks, obj = self._create_blocks(self._selected_obj)
529-
block_list = list(blocks)
530559
window_indexer = self._get_window_indexer(window)
531560

532-
results = []
533-
exclude: List[Scalar] = []
534-
for i, b in enumerate(blocks):
535-
try:
536-
values = self._prep_values(b.values)
537-
538-
except (TypeError, NotImplementedError) as err:
539-
if isinstance(obj, ABCDataFrame):
540-
exclude.extend(b.columns)
541-
del block_list[i]
542-
continue
543-
else:
544-
raise DataError("No numeric types to aggregate") from err
561+
def homogeneous_func(values: np.ndarray):
562+
# calculation function
545563

546564
if values.size == 0:
547-
results.append(values.copy())
548-
continue
565+
return values.copy()
549566

550-
# calculation function
551567
offset = calculate_center_offset(window) if center else 0
552568
additional_nans = np.array([np.nan] * offset)
553569

@@ -594,9 +610,9 @@ def calc(x):
594610
if center:
595611
result = self._center_window(result, window)
596612

597-
results.append(result)
613+
return result
598614

599-
return self._wrap_results(results, block_list, obj, exclude)
615+
return self._apply_blockwise(homogeneous_func)
600616

601617
def aggregate(self, func, *args, **kwargs):
602618
result, how = self._aggregate(func, *args, **kwargs)

0 commit comments

Comments
 (0)