Skip to content

Commit 81e3236

Browse files
authored
REF: use BlockManager.apply for cython_agg_blocks, apply_blockwise (#35900)
1 parent 76f74d5 commit 81e3236

File tree

3 files changed

+33
-39
lines changed

3 files changed

+33
-39
lines changed

pandas/core/groupby/generic.py

+5-16
Original file line numberDiff line numberDiff line change
@@ -1035,8 +1035,6 @@ def _cython_agg_blocks(
10351035
if numeric_only:
10361036
data = data.get_numeric_data(copy=False)
10371037

1038-
agg_blocks: List["Block"] = []
1039-
10401038
no_result = object()
10411039

10421040
def cast_agg_result(result, values: ArrayLike, how: str) -> ArrayLike:
@@ -1118,23 +1116,14 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike:
11181116
res_values = cast_agg_result(result, bvalues, how)
11191117
return res_values
11201118

1121-
for i, block in enumerate(data.blocks):
1122-
try:
1123-
nbs = block.apply(blk_func)
1124-
except (NotImplementedError, TypeError):
1125-
# TypeError -> we may have an exception in trying to aggregate
1126-
# continue and exclude the block
1127-
# NotImplementedError -> "ohlc" with wrong dtype
1128-
pass
1129-
else:
1130-
agg_blocks.extend(nbs)
1119+
# TypeError -> we may have an exception in trying to aggregate
1120+
# continue and exclude the block
1121+
# NotImplementedError -> "ohlc" with wrong dtype
1122+
new_mgr = data.apply(blk_func, ignore_failures=True)
11311123

1132-
if not agg_blocks:
1124+
if not len(new_mgr):
11331125
raise DataError("No numeric types to aggregate")
11341126

1135-
# reset the locs in the blocks to correspond to our
1136-
# current ordering
1137-
new_mgr = data._combine(agg_blocks)
11381127
return new_mgr
11391128

11401129
def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame:

pandas/core/internals/managers.py

+24-6
Original file line numberDiff line numberDiff line change
@@ -350,14 +350,24 @@ def operate_blockwise(self, other: "BlockManager", array_op) -> "BlockManager":
350350
"""
351351
return operate_blockwise(self, other, array_op)
352352

353-
def apply(self: T, f, align_keys=None, **kwargs) -> T:
353+
def apply(
354+
self: T,
355+
f,
356+
align_keys: Optional[List[str]] = None,
357+
ignore_failures: bool = False,
358+
**kwargs,
359+
) -> T:
354360
"""
355361
Iterate over the blocks, collect and create a new BlockManager.
356362
357363
Parameters
358364
----------
359365
f : str or callable
360366
Name of the Block method to apply.
367+
align_keys: List[str] or None, default None
368+
ignore_failures: bool, default False
369+
**kwargs
370+
Keywords to pass to `f`
361371
362372
Returns
363373
-------
@@ -387,12 +397,20 @@ def apply(self: T, f, align_keys=None, **kwargs) -> T:
387397
# otherwise we have an ndarray
388398
kwargs[k] = obj[b.mgr_locs.indexer]
389399

390-
if callable(f):
391-
applied = b.apply(f, **kwargs)
392-
else:
393-
applied = getattr(b, f)(**kwargs)
400+
try:
401+
if callable(f):
402+
applied = b.apply(f, **kwargs)
403+
else:
404+
applied = getattr(b, f)(**kwargs)
405+
except (TypeError, NotImplementedError):
406+
if not ignore_failures:
407+
raise
408+
continue
394409
result_blocks = _extend_blocks(applied, result_blocks)
395410

411+
if ignore_failures:
412+
return self._combine(result_blocks)
413+
396414
if len(result_blocks) == 0:
397415
return self.make_empty(self.axes)
398416

@@ -704,7 +722,7 @@ def get_numeric_data(self, copy: bool = False) -> "BlockManager":
704722
self._consolidate_inplace()
705723
return self._combine([b for b in self.blocks if b.is_numeric], copy)
706724

707-
def _combine(self, blocks: List[Block], copy: bool = True) -> "BlockManager":
725+
def _combine(self: T, blocks: List[Block], copy: bool = True) -> T:
708726
""" return a new manager with the blocks """
709727
if len(blocks) == 0:
710728
return self.make_empty()

pandas/core/window/rolling.py

+4-17
Original file line numberDiff line numberDiff line change
@@ -489,8 +489,6 @@ def _apply_blockwise(
489489
if self._selected_obj.ndim == 1:
490490
return self._apply_series(homogeneous_func)
491491

492-
# This isn't quite blockwise, since `blocks` is actually a collection
493-
# of homogenenous DataFrames.
494492
_, obj = self._create_blocks(self._selected_obj)
495493
mgr = obj._mgr
496494

@@ -500,25 +498,14 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike:
500498
res_values = homogeneous_func(values)
501499
return getattr(res_values, "T", res_values)
502500

503-
skipped: List[int] = []
504-
res_blocks: List["Block"] = []
505-
for i, blk in enumerate(mgr.blocks):
506-
try:
507-
nbs = blk.apply(hfunc)
508-
509-
except (TypeError, NotImplementedError):
510-
skipped.append(i)
511-
continue
512-
513-
res_blocks.extend(nbs)
501+
new_mgr = mgr.apply(hfunc, ignore_failures=True)
502+
out = obj._constructor(new_mgr)
514503

515-
if not len(res_blocks) and skipped:
504+
if out.shape[1] == 0 and obj.shape[1] > 0:
516505
raise DataError("No numeric types to aggregate")
517-
elif not len(res_blocks):
506+
elif out.shape[1] == 0:
518507
return obj.astype("float64")
519508

520-
new_mgr = mgr._combine(res_blocks)
521-
out = obj._constructor(new_mgr)
522509
self._insert_on_column(out, obj)
523510
return out
524511

0 commit comments

Comments
 (0)