diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index c1a277925de2a..138ca9945d759 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -15,7 +15,6 @@ from functools import partial from textwrap import dedent from typing import ( - TYPE_CHECKING, Any, Callable, Dict, @@ -25,7 +24,6 @@ List, Mapping, Optional, - Sequence, Type, TypeVar, Union, @@ -115,10 +113,6 @@ from pandas.plotting import boxplot_frame_groupby -if TYPE_CHECKING: - from pandas.core.internals import Block - - NamedAgg = namedtuple("NamedAgg", ["column", "aggfunc"]) # TODO(typing) the return value on this callable should be any *scalar*. AggScalar = Union[str, Callable[..., Any]] @@ -1074,7 +1068,7 @@ def _cython_agg_general( agg_mgr = self._cython_agg_blocks( how, alt=alt, numeric_only=numeric_only, min_count=min_count ) - return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items) + return self._wrap_agged_manager(agg_mgr) def _cython_agg_blocks( self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 @@ -1174,7 +1168,7 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike: # TypeError -> we may have an exception in trying to aggregate # continue and exclude the block # NotImplementedError -> "ohlc" with wrong dtype - new_mgr = data.apply(blk_func, ignore_failures=True) + new_mgr = data.grouped_reduce(blk_func, ignore_failures=True) if not len(new_mgr): raise DataError("No numeric types to aggregate") @@ -1748,17 +1742,17 @@ def _wrap_transformed_output( return result - def _wrap_agged_blocks(self, blocks: Sequence[Block], items: Index) -> DataFrame: + def _wrap_agged_manager(self, mgr: BlockManager) -> DataFrame: if not self.as_index: - index = np.arange(blocks[0].values.shape[-1]) - mgr = BlockManager(blocks, axes=[items, index]) + index = np.arange(mgr.shape[1]) + mgr.axes[1] = ibase.Index(index) result = self.obj._constructor(mgr) self._insert_inaxis_grouper_inplace(result) result = result._consolidate() else: index = self.grouper.result_index - mgr = BlockManager(blocks, axes=[items, index]) + mgr.axes[1] = index result = self.obj._constructor(mgr) if self.axis == 1: @@ -1808,13 +1802,13 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike: counted = lib.count_level_2d(masked, labels=ids, max_bin=ngroups, axis=1) return counted - new_mgr = data.apply(hfunc) + new_mgr = data.grouped_reduce(hfunc) # If we are grouping on categoricals we want unobserved categories to # return zero, rather than the default of NaN which the reindexing in - # _wrap_agged_blocks() returns. GH 35028 + # _wrap_agged_manager() returns. GH 35028 with com.temp_setattr(self, "observed", True): - result = self._wrap_agged_blocks(new_mgr.blocks, items=data.items) + result = self._wrap_agged_manager(new_mgr) return self._reindex_output(result, fill_value=0) diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index 86df773147e21..60cdc8bc287db 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -403,6 +403,41 @@ def reduce( new_mgr = type(self).from_blocks(res_blocks, [self.items, index]) return new_mgr, indexer + def grouped_reduce(self: T, func: Callable, ignore_failures: bool = False) -> T: + """ + Apply grouped reduction function blockwise, returning a new BlockManager. + + Parameters + ---------- + func : grouped reduction function + ignore_failures : bool, default False + Whether to drop blocks where func raises TypeError. + + Returns + ------- + BlockManager + """ + result_blocks: List[Block] = [] + + for blk in self.blocks: + try: + applied = blk.apply(func) + except (TypeError, NotImplementedError): + if not ignore_failures: + raise + continue + result_blocks = extend_blocks(applied, result_blocks) + + if len(result_blocks) == 0: + index = Index([None]) # placeholder + else: + index = Index(range(result_blocks[0].values.shape[-1])) + + if ignore_failures: + return self._combine(result_blocks, index=index) + + return type(self).from_blocks(result_blocks, [self.axes[0], index]) + def operate_blockwise(self, other: BlockManager, array_op) -> BlockManager: """ Apply array_op blockwise with another (aligned) BlockManager.