Skip to content

Commit 408216c

Browse files
REF: move Block construction in groupby aggregation to internals (#39997)
1 parent c545701 commit 408216c

File tree

2 files changed

+44
-15
lines changed

2 files changed

+44
-15
lines changed

pandas/core/groupby/generic.py

+9-15
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from functools import partial
1616
from textwrap import dedent
1717
from typing import (
18-
TYPE_CHECKING,
1918
Any,
2019
Callable,
2120
Dict,
@@ -25,7 +24,6 @@
2524
List,
2625
Mapping,
2726
Optional,
28-
Sequence,
2927
Type,
3028
TypeVar,
3129
Union,
@@ -115,10 +113,6 @@
115113

116114
from pandas.plotting import boxplot_frame_groupby
117115

118-
if TYPE_CHECKING:
119-
from pandas.core.internals import Block
120-
121-
122116
NamedAgg = namedtuple("NamedAgg", ["column", "aggfunc"])
123117
# TODO(typing) the return value on this callable should be any *scalar*.
124118
AggScalar = Union[str, Callable[..., Any]]
@@ -1083,7 +1077,7 @@ def _cython_agg_general(
10831077
agg_mgr = self._cython_agg_blocks(
10841078
how, alt=alt, numeric_only=numeric_only, min_count=min_count
10851079
)
1086-
return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items)
1080+
return self._wrap_agged_manager(agg_mgr)
10871081

10881082
def _cython_agg_blocks(
10891083
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
@@ -1183,7 +1177,7 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike:
11831177
# TypeError -> we may have an exception in trying to aggregate
11841178
# continue and exclude the block
11851179
# NotImplementedError -> "ohlc" with wrong dtype
1186-
new_mgr = data.apply(blk_func, ignore_failures=True)
1180+
new_mgr = data.grouped_reduce(blk_func, ignore_failures=True)
11871181

11881182
if not len(new_mgr):
11891183
raise DataError("No numeric types to aggregate")
@@ -1761,17 +1755,17 @@ def _wrap_transformed_output(
17611755

17621756
return result
17631757

1764-
def _wrap_agged_blocks(self, blocks: Sequence[Block], items: Index) -> DataFrame:
1758+
def _wrap_agged_manager(self, mgr: BlockManager) -> DataFrame:
17651759
if not self.as_index:
1766-
index = np.arange(blocks[0].values.shape[-1])
1767-
mgr = BlockManager(blocks, axes=[items, index])
1760+
index = np.arange(mgr.shape[1])
1761+
mgr.axes[1] = ibase.Index(index)
17681762
result = self.obj._constructor(mgr)
17691763

17701764
self._insert_inaxis_grouper_inplace(result)
17711765
result = result._consolidate()
17721766
else:
17731767
index = self.grouper.result_index
1774-
mgr = BlockManager(blocks, axes=[items, index])
1768+
mgr.axes[1] = index
17751769
result = self.obj._constructor(mgr)
17761770

17771771
if self.axis == 1:
@@ -1821,13 +1815,13 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike:
18211815
counted = lib.count_level_2d(masked, labels=ids, max_bin=ngroups, axis=1)
18221816
return counted
18231817

1824-
new_mgr = data.apply(hfunc)
1818+
new_mgr = data.grouped_reduce(hfunc)
18251819

18261820
# If we are grouping on categoricals we want unobserved categories to
18271821
# return zero, rather than the default of NaN which the reindexing in
1828-
# _wrap_agged_blocks() returns. GH 35028
1822+
# _wrap_agged_manager() returns. GH 35028
18291823
with com.temp_setattr(self, "observed", True):
1830-
result = self._wrap_agged_blocks(new_mgr.blocks, items=data.items)
1824+
result = self._wrap_agged_manager(new_mgr)
18311825

18321826
return self._reindex_output(result, fill_value=0)
18331827

pandas/core/internals/managers.py

+35
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,41 @@ def reduce(
403403
new_mgr = type(self).from_blocks(res_blocks, [self.items, index])
404404
return new_mgr, indexer
405405

406+
def grouped_reduce(self: T, func: Callable, ignore_failures: bool = False) -> T:
407+
"""
408+
Apply grouped reduction function blockwise, returning a new BlockManager.
409+
410+
Parameters
411+
----------
412+
func : grouped reduction function
413+
ignore_failures : bool, default False
414+
Whether to drop blocks where func raises TypeError.
415+
416+
Returns
417+
-------
418+
BlockManager
419+
"""
420+
result_blocks: List[Block] = []
421+
422+
for blk in self.blocks:
423+
try:
424+
applied = blk.apply(func)
425+
except (TypeError, NotImplementedError):
426+
if not ignore_failures:
427+
raise
428+
continue
429+
result_blocks = extend_blocks(applied, result_blocks)
430+
431+
if len(result_blocks) == 0:
432+
index = Index([None]) # placeholder
433+
else:
434+
index = Index(range(result_blocks[0].values.shape[-1]))
435+
436+
if ignore_failures:
437+
return self._combine(result_blocks, index=index)
438+
439+
return type(self).from_blocks(result_blocks, [self.axes[0], index])
440+
406441
def operate_blockwise(self, other: BlockManager, array_op) -> BlockManager:
407442
"""
408443
Apply array_op blockwise with another (aligned) BlockManager.

0 commit comments

Comments
 (0)