From 5b7f77098071969d3c00bdfb3905ce7c8d1ae498 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 23 Feb 2021 16:41:09 +0100 Subject: [PATCH 1/3] REF: move Block construction in groupby aggregation to internals --- pandas/core/groupby/generic.py | 33 ++++++++------------------- pandas/core/internals/managers.py | 38 +++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 23 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index c1a277925de2a..dae84b007a2e7 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -15,7 +15,6 @@ from functools import partial from textwrap import dedent from typing import ( - TYPE_CHECKING, Any, Callable, Dict, @@ -25,7 +24,6 @@ List, Mapping, Optional, - Sequence, Type, TypeVar, Union, @@ -86,10 +84,7 @@ Categorical, ExtensionArray, ) -from pandas.core.base import ( - DataError, - SpecificationError, -) +from pandas.core.base import SpecificationError import pandas.core.common as com from pandas.core.construction import create_series_with_explicit_dtype from pandas.core.frame import DataFrame @@ -115,10 +110,6 @@ from pandas.plotting import boxplot_frame_groupby -if TYPE_CHECKING: - from pandas.core.internals import Block - - NamedAgg = namedtuple("NamedAgg", ["column", "aggfunc"]) # TODO(typing) the return value on this callable should be any *scalar*. AggScalar = Union[str, Callable[..., Any]] @@ -1074,7 +1065,7 @@ def _cython_agg_general( agg_mgr = self._cython_agg_blocks( how, alt=alt, numeric_only=numeric_only, min_count=min_count ) - return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items) + return self._wrap_agged_manager(agg_mgr) def _cython_agg_blocks( self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 @@ -1174,11 +1165,7 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike: # TypeError -> we may have an exception in trying to aggregate # continue and exclude the block # NotImplementedError -> "ohlc" with wrong dtype - new_mgr = data.apply(blk_func, ignore_failures=True) - - if not len(new_mgr): - raise DataError("No numeric types to aggregate") - + new_mgr = data.grouped_reduce(blk_func, ignore_failures=True) return new_mgr def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame: @@ -1748,17 +1735,17 @@ def _wrap_transformed_output( return result - def _wrap_agged_blocks(self, blocks: Sequence[Block], items: Index) -> DataFrame: + def _wrap_agged_manager(self, mgr: BlockManager) -> DataFrame: if not self.as_index: - index = np.arange(blocks[0].values.shape[-1]) - mgr = BlockManager(blocks, axes=[items, index]) + index = np.arange(mgr.shape[1]) + mgr.set_axis(1, ibase.Index(index)) result = self.obj._constructor(mgr) self._insert_inaxis_grouper_inplace(result) result = result._consolidate() else: index = self.grouper.result_index - mgr = BlockManager(blocks, axes=[items, index]) + mgr.set_axis(1, index) result = self.obj._constructor(mgr) if self.axis == 1: @@ -1808,13 +1795,13 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike: counted = lib.count_level_2d(masked, labels=ids, max_bin=ngroups, axis=1) return counted - new_mgr = data.apply(hfunc) + new_mgr = data.grouped_reduce(hfunc) # If we are grouping on categoricals we want unobserved categories to # return zero, rather than the default of NaN which the reindexing in - # _wrap_agged_blocks() returns. GH 35028 + # _wrap_agged_manager() returns. GH 35028 with com.temp_setattr(self, "observed", True): - result = self._wrap_agged_blocks(new_mgr.blocks, items=data.items) + result = self._wrap_agged_manager(new_mgr) return self._reindex_output(result, fill_value=0) diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index 86df773147e21..dbe0c75967a86 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -55,6 +55,7 @@ import pandas.core.algorithms as algos from pandas.core.arrays.sparse import SparseDtype +from pandas.core.base import DataError from pandas.core.construction import extract_array from pandas.core.indexers import maybe_convert_indices from pandas.core.indexes.api import ( @@ -403,6 +404,43 @@ def reduce( new_mgr = type(self).from_blocks(res_blocks, [self.items, index]) return new_mgr, indexer + def grouped_reduce( + self: T, func: Callable, ignore_failures: bool = False + ) -> Tuple[T, np.ndarray]: + """ + Apply grouped reduction function blockwise, returning a new BlockManager. + + Parameters + ---------- + func : grouped reduction function + ignore_failures : bool, default False + Whether to drop blocks where func raises TypeError. + + Returns + ------- + BlockManager + """ + result_blocks: List[Block] = [] + + for blk in self.blocks: + try: + applied = blk.apply(func) + except (TypeError, NotImplementedError): + if not ignore_failures: + raise + continue + result_blocks = extend_blocks(applied, result_blocks) + + if len(result_blocks) == 0: + raise DataError("No numeric types to aggregate") + + index = Index(range(result_blocks[0].values.shape[-1])) + + if ignore_failures: + return self._combine(result_blocks, index=index) + + return type(self).from_blocks(result_blocks, [self.axes[0], index]) + def operate_blockwise(self, other: BlockManager, array_op) -> BlockManager: """ Apply array_op blockwise with another (aligned) BlockManager. From 9fd90727a39482f6c94744381f18249545073764 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 23 Feb 2021 22:00:38 +0100 Subject: [PATCH 2/3] move empty check back up --- pandas/core/groupby/generic.py | 9 ++++++++- pandas/core/internals/managers.py | 11 ++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index dae84b007a2e7..f71e898b79791 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -84,7 +84,10 @@ Categorical, ExtensionArray, ) -from pandas.core.base import SpecificationError +from pandas.core.base import ( + DataError, + SpecificationError, +) import pandas.core.common as com from pandas.core.construction import create_series_with_explicit_dtype from pandas.core.frame import DataFrame @@ -1166,6 +1169,10 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike: # continue and exclude the block # NotImplementedError -> "ohlc" with wrong dtype new_mgr = data.grouped_reduce(blk_func, ignore_failures=True) + + if not len(new_mgr): + raise DataError("No numeric types to aggregate") + return new_mgr def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame: diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index dbe0c75967a86..60cdc8bc287db 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -55,7 +55,6 @@ import pandas.core.algorithms as algos from pandas.core.arrays.sparse import SparseDtype -from pandas.core.base import DataError from pandas.core.construction import extract_array from pandas.core.indexers import maybe_convert_indices from pandas.core.indexes.api import ( @@ -404,9 +403,7 @@ def reduce( new_mgr = type(self).from_blocks(res_blocks, [self.items, index]) return new_mgr, indexer - def grouped_reduce( - self: T, func: Callable, ignore_failures: bool = False - ) -> Tuple[T, np.ndarray]: + def grouped_reduce(self: T, func: Callable, ignore_failures: bool = False) -> T: """ Apply grouped reduction function blockwise, returning a new BlockManager. @@ -432,9 +429,9 @@ def grouped_reduce( result_blocks = extend_blocks(applied, result_blocks) if len(result_blocks) == 0: - raise DataError("No numeric types to aggregate") - - index = Index(range(result_blocks[0].values.shape[-1])) + index = Index([None]) # placeholder + else: + index = Index(range(result_blocks[0].values.shape[-1])) if ignore_failures: return self._combine(result_blocks, index=index) From ee6268764e59887dd05ef0b819126c5293bb0548 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 24 Feb 2021 08:59:25 +0100 Subject: [PATCH 3/3] override axis without checking length --- pandas/core/groupby/generic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index f71e898b79791..138ca9945d759 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -1745,14 +1745,14 @@ def _wrap_transformed_output( def _wrap_agged_manager(self, mgr: BlockManager) -> DataFrame: if not self.as_index: index = np.arange(mgr.shape[1]) - mgr.set_axis(1, ibase.Index(index)) + mgr.axes[1] = ibase.Index(index) result = self.obj._constructor(mgr) self._insert_inaxis_grouper_inplace(result) result = result._consolidate() else: index = self.grouper.result_index - mgr.set_axis(1, index) + mgr.axes[1] = index result = self.obj._constructor(mgr) if self.axis == 1: