diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 4a721ae0d4bf6..a059a90aa3a4e 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -345,6 +345,49 @@ def _aggregate_multiple_funcs(self, arg): ) return self.obj._constructor_expanddim(output, columns=columns) + def _cython_agg_general( + self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 + ): + output: dict[base.OutputKey, ArrayLike] = {} + # Ideally we would be able to enumerate self._iterate_slices and use + # the index from enumeration as the key of output, but ohlc in particular + # returns a (n x 4) array. Output requires 1D ndarrays as values, so we + # need to slice that up into 1D arrays + idx = 0 + for obj in self._iterate_slices(): + name = obj.name + is_numeric = is_numeric_dtype(obj.dtype) + if numeric_only and not is_numeric: + continue + + result = self.grouper._cython_operation( + "aggregate", obj._values, how, axis=0, min_count=min_count + ) + + if how == "ohlc": + # e.g. ohlc + agg_names = ["open", "high", "low", "close"] + assert len(agg_names) == result.shape[1] + for result_column, result_name in zip(result.T, agg_names): + key = base.OutputKey(label=result_name, position=idx) + output[key] = result_column + idx += 1 + else: + assert result.ndim == 1 + key = base.OutputKey(label=name, position=idx) + output[key] = result + idx += 1 + + if not output: + raise DataError("No numeric types to aggregate") + + # error: Argument 1 to "_wrap_aggregated_output" of "BaseGroupBy" has + # incompatible type "Dict[OutputKey, Union[ndarray, DatetimeArray]]"; + # expected "Mapping[OutputKey, ndarray]" + return self._wrap_aggregated_output( + output, index=self.grouper.result_index # type: ignore[arg-type] + ) + # TODO: index should not be Optional - see GH 35490 def _wrap_series_output( self, diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index f2fffe4c3741c..9a4f343ab3dc2 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -504,7 +504,7 @@ def f(self): @contextmanager -def group_selection_context(groupby: BaseGroupBy) -> Iterator[BaseGroupBy]: +def group_selection_context(groupby: GroupBy) -> Iterator[GroupBy]: """ Set / reset the group_selection_context. """ @@ -543,63 +543,9 @@ class BaseGroupBy(PandasObject, SelectionMixin, Generic[FrameOrSeries]): "squeeze", } - def __init__( - self, - obj: FrameOrSeries, - keys: _KeysArgType | None = None, - axis: int = 0, - level: IndexLabel | None = None, - grouper: ops.BaseGrouper | None = None, - exclusions: set[Hashable] | None = None, - selection: IndexLabel | None = None, - as_index: bool = True, - sort: bool = True, - group_keys: bool = True, - squeeze: bool = False, - observed: bool = False, - mutated: bool = False, - dropna: bool = True, - ): - - self._selection = selection - - assert isinstance(obj, NDFrame), type(obj) - - self.level = level - - if not as_index: - if not isinstance(obj, DataFrame): - raise TypeError("as_index=False only valid with DataFrame") - if axis != 0: - raise ValueError("as_index=False only valid for axis=0") - - self.as_index = as_index - self.keys = keys - self.sort = sort - self.group_keys = group_keys - self.squeeze = squeeze - self.observed = observed - self.mutated = mutated - self.dropna = dropna - - if grouper is None: - from pandas.core.groupby.grouper import get_grouper - - grouper, exclusions, obj = get_grouper( - obj, - keys, - axis=axis, - level=level, - sort=sort, - observed=observed, - mutated=self.mutated, - dropna=self.dropna, - ) - - self.obj = obj - self.axis = obj._get_axis_number(axis) - self.grouper = grouper - self.exclusions = exclusions or set() + axis: int + grouper: ops.BaseGrouper + obj: FrameOrSeries @final def __len__(self) -> int: @@ -711,90 +657,10 @@ def _selected_obj(self): else: return self.obj[self._selection] - @final - def _reset_group_selection(self) -> None: - """ - Clear group based selection. - - Used for methods needing to return info on each group regardless of - whether a group selection was previously set. - """ - if self._group_selection is not None: - # GH12839 clear cached selection too when changing group selection - self._group_selection = None - self._reset_cache("_selected_obj") - - @final - def _set_group_selection(self) -> None: - """ - Create group based selection. - - Used when selection is not passed directly but instead via a grouper. - - NOTE: this should be paired with a call to _reset_group_selection - """ - grp = self.grouper - if not ( - self.as_index - and getattr(grp, "groupings", None) is not None - and self.obj.ndim > 1 - and self._group_selection is None - ): - return - - groupers = [g.name for g in grp.groupings if g.level is None and g.in_axis] - - if len(groupers): - # GH12839 clear selected obj cache when group selection changes - ax = self.obj._info_axis - self._group_selection = ax.difference(Index(groupers), sort=False).tolist() - self._reset_cache("_selected_obj") - - @final - def _set_result_index_ordered( - self, result: OutputFrameOrSeries - ) -> OutputFrameOrSeries: - # set the result index on the passed values object and - # return the new object, xref 8046 - - if self.grouper.is_monotonic: - # shortcut if we have an already ordered grouper - result.set_axis(self.obj._get_axis(self.axis), axis=self.axis, inplace=True) - return result - - # row order is scrambled => sort the rows by position in original index - original_positions = Index( - np.concatenate(self._get_indices(self.grouper.result_index)) - ) - result.set_axis(original_positions, axis=self.axis, inplace=True) - result = result.sort_index(axis=self.axis) - - dropped_rows = len(result.index) < len(self.obj.index) - - if dropped_rows: - # get index by slicing original index according to original positions - # slice drops attrs => use set_axis when no rows were dropped - sorted_indexer = result.index - result.index = self._selected_obj.index[sorted_indexer] - else: - result.set_axis(self.obj._get_axis(self.axis), axis=self.axis, inplace=True) - - return result - @final def _dir_additions(self) -> set[str]: return self.obj._dir_additions() | self._apply_allowlist - def __getattr__(self, attr: str): - if attr in self._internal_names_set: - return object.__getattribute__(self, attr) - if attr in self.obj: - return self[attr] - - raise AttributeError( - f"'{type(self).__name__}' object has no attribute '{attr}'" - ) - @Substitution( klass="GroupBy", examples=dedent( @@ -828,44 +694,6 @@ def pipe( plot = property(GroupByPlot) - @final - def _make_wrapper(self, name: str) -> Callable: - assert name in self._apply_allowlist - - with group_selection_context(self): - # need to setup the selection - # as are not passed directly but in the grouper - f = getattr(self._obj_with_exclusions, name) - if not isinstance(f, types.MethodType): - return self.apply(lambda self: getattr(self, name)) - - f = getattr(type(self._obj_with_exclusions), name) - sig = inspect.signature(f) - - def wrapper(*args, **kwargs): - # a little trickery for aggregation functions that need an axis - # argument - if "axis" in sig.parameters: - if kwargs.get("axis", None) is None: - kwargs["axis"] = self.axis - - def curried(x): - return f(x, *args, **kwargs) - - # preserve the name so we can detect it when calling plot methods, - # to avoid duplicates - curried.__name__ = name - - # special case otherwise extra plots are created when catching the - # exception below - if name in base.plotting_methods: - return self.apply(curried) - - return self._python_apply_general(curried, self._obj_with_exclusions) - - wrapper.__name__ = name - return wrapper - @final def get_group(self, name, obj=None): """ @@ -904,145 +732,329 @@ def __iter__(self) -> Iterator[tuple[Hashable, FrameOrSeries]]: """ return self.grouper.get_iterator(self.obj, axis=self.axis) - @Appender( - _apply_docs["template"].format( - input="dataframe", examples=_apply_docs["dataframe_examples"] - ) - ) - def apply(self, func, *args, **kwargs): - - func = com.is_builtin_func(func) - # this is needed so we don't try and wrap strings. If we could - # resolve functions to their callable functions prior, this - # wouldn't be needed - if args or kwargs: - if callable(func): +# To track operations that expand dimensions, like ohlc +OutputFrameOrSeries = TypeVar("OutputFrameOrSeries", bound=NDFrame) - @wraps(func) - def f(g): - with np.errstate(all="ignore"): - return func(g, *args, **kwargs) - elif hasattr(nanops, "nan" + func): - # TODO: should we wrap this in to e.g. _is_builtin_func? - f = getattr(nanops, "nan" + func) +class GroupBy(BaseGroupBy[FrameOrSeries]): + """ + Class for grouping and aggregating relational data. - else: - raise ValueError( - "func must be a callable if args or kwargs are supplied" - ) - else: - f = func + See aggregate, transform, and apply functions on this object. - # ignore SettingWithCopy here in case the user mutates - with option_context("mode.chained_assignment", None): - try: - result = self._python_apply_general(f, self._selected_obj) - except TypeError: - # gh-20949 - # try again, with .apply acting as a filtering - # operation, by excluding the grouping column - # This would normally not be triggered - # except if the udf is trying an operation that - # fails on *some* columns, e.g. a numeric operation - # on a string grouper column + It's easiest to use obj.groupby(...) to use GroupBy, but you can also do: - with group_selection_context(self): - return self._python_apply_general(f, self._selected_obj) + :: - return result + grouped = groupby(obj, ...) - @final - def _python_apply_general( - self, f: F, data: FrameOrSeriesUnion - ) -> FrameOrSeriesUnion: - """ - Apply function f in python space + Parameters + ---------- + obj : pandas object + axis : int, default 0 + level : int, default None + Level of MultiIndex + groupings : list of Grouping objects + Most users should ignore this + exclusions : array-like, optional + List of columns to exclude + name : str + Most users should ignore this - Parameters - ---------- - f : callable - Function to apply - data : Series or DataFrame - Data to apply f to + Returns + ------- + **Attributes** + groups : dict + {group name -> group labels} + len(grouped) : int + Number of groups - Returns - ------- - Series or DataFrame - data after applying f - """ - keys, values, mutated = self.grouper.apply(f, data, self.axis) + Notes + ----- + After grouping, see aggregate, apply, and transform functions. Here are + some other brief notes about usage. When grouping by multiple groups, the + result index will be a MultiIndex (hierarchical) by default. - return self._wrap_applied_output( - data, keys, values, not_indexed_same=mutated or self.mutated + Iteration produces (key, group) tuples, i.e. chunking the data by group. So + you can write code like: + + :: + + grouped = obj.groupby(keys, axis=axis) + for key, group in grouped: + # do something with the data + + Function calls on GroupBy, if not specially implemented, "dispatch" to the + grouped data. So if you group a DataFrame and wish to invoke the std() + method on each group, you can simply do: + + :: + + df.groupby(mapper).std() + + rather than + + :: + + df.groupby(mapper).aggregate(np.std) + + You can pass arguments to these "wrapped" functions, too. + + See the online documentation for full exposition on these topics and much + more + """ + + obj: FrameOrSeries + grouper: ops.BaseGrouper + as_index: bool + + def __init__( + self, + obj: FrameOrSeries, + keys: _KeysArgType | None = None, + axis: int = 0, + level: IndexLabel | None = None, + grouper: ops.BaseGrouper | None = None, + exclusions: set[Hashable] | None = None, + selection: IndexLabel | None = None, + as_index: bool = True, + sort: bool = True, + group_keys: bool = True, + squeeze: bool = False, + observed: bool = False, + mutated: bool = False, + dropna: bool = True, + ): + + self._selection = selection + + assert isinstance(obj, NDFrame), type(obj) + + self.level = level + + if not as_index: + if not isinstance(obj, DataFrame): + raise TypeError("as_index=False only valid with DataFrame") + if axis != 0: + raise ValueError("as_index=False only valid for axis=0") + + self.as_index = as_index + self.keys = keys + self.sort = sort + self.group_keys = group_keys + self.squeeze = squeeze + self.observed = observed + self.mutated = mutated + self.dropna = dropna + + if grouper is None: + from pandas.core.groupby.grouper import get_grouper + + grouper, exclusions, obj = get_grouper( + obj, + keys, + axis=axis, + level=level, + sort=sort, + observed=observed, + mutated=self.mutated, + dropna=self.dropna, + ) + + self.obj = obj + self.axis = obj._get_axis_number(axis) + self.grouper = grouper + self.exclusions = exclusions or set() + + def __getattr__(self, attr: str): + if attr in self._internal_names_set: + return object.__getattribute__(self, attr) + if attr in self.obj: + return self[attr] + + raise AttributeError( + f"'{type(self).__name__}' object has no attribute '{attr}'" ) - def _iterate_slices(self) -> Iterable[Series]: - raise AbstractMethodError(self) + @final + def _make_wrapper(self, name: str) -> Callable: + assert name in self._apply_allowlist - def transform(self, func, *args, **kwargs): - raise AbstractMethodError(self) + with group_selection_context(self): + # need to setup the selection + # as are not passed directly but in the grouper + f = getattr(self._obj_with_exclusions, name) + if not isinstance(f, types.MethodType): + return self.apply(lambda self: getattr(self, name)) + + f = getattr(type(self._obj_with_exclusions), name) + sig = inspect.signature(f) + + def wrapper(*args, **kwargs): + # a little trickery for aggregation functions that need an axis + # argument + if "axis" in sig.parameters: + if kwargs.get("axis", None) is None: + kwargs["axis"] = self.axis + + def curried(x): + return f(x, *args, **kwargs) + + # preserve the name so we can detect it when calling plot methods, + # to avoid duplicates + curried.__name__ = name + + # special case otherwise extra plots are created when catching the + # exception below + if name in base.plotting_methods: + return self.apply(curried) + + return self._python_apply_general(curried, self._obj_with_exclusions) + + wrapper.__name__ = name + return wrapper + + # ----------------------------------------------------------------- + # Selection @final - def _cumcount_array(self, ascending: bool = True): + def _set_group_selection(self) -> None: """ - Parameters - ---------- - ascending : bool, default True - If False, number in reverse, from length of group - 1 to 0. + Create group based selection. - Notes - ----- - this is currently implementing sort=False - (though the default is sort=True) for groupby in general + Used when selection is not passed directly but instead via a grouper. + + NOTE: this should be paired with a call to _reset_group_selection """ - ids, _, ngroups = self.grouper.group_info - sorter = get_group_index_sorter(ids, ngroups) - ids, count = ids[sorter], len(ids) + grp = self.grouper + if not ( + self.as_index + and getattr(grp, "groupings", None) is not None + and self.obj.ndim > 1 + and self._group_selection is None + ): + return - if count == 0: - return np.empty(0, dtype=np.int64) + groupers = [g.name for g in grp.groupings if g.level is None and g.in_axis] - run = np.r_[True, ids[:-1] != ids[1:]] - rep = np.diff(np.r_[np.nonzero(run)[0], count]) - out = (~run).cumsum() + if len(groupers): + # GH12839 clear selected obj cache when group selection changes + ax = self.obj._info_axis + self._group_selection = ax.difference(Index(groupers), sort=False).tolist() + self._reset_cache("_selected_obj") - if ascending: - out -= np.repeat(out[run], rep) - else: - out = np.repeat(out[np.r_[run[1:], True]], rep) - out + @final + def _reset_group_selection(self) -> None: + """ + Clear group based selection. - rev = np.empty(count, dtype=np.intp) - rev[sorter] = np.arange(count, dtype=np.intp) - return out[rev].astype(np.int64, copy=False) + Used for methods needing to return info on each group regardless of + whether a group selection was previously set. + """ + if self._group_selection is not None: + # GH12839 clear cached selection too when changing group selection + self._group_selection = None + self._reset_cache("_selected_obj") + + def _iterate_slices(self) -> Iterable[Series]: + raise AbstractMethodError(self) + + # ----------------------------------------------------------------- + # Dispatch/Wrapping @final - def _cython_transform( - self, how: str, numeric_only: bool = True, axis: int = 0, **kwargs - ): - output: dict[base.OutputKey, ArrayLike] = {} + def _concat_objects(self, keys, values, not_indexed_same: bool = False): + from pandas.core.reshape.concat import concat - for idx, obj in enumerate(self._iterate_slices()): - name = obj.name - is_numeric = is_numeric_dtype(obj.dtype) - if numeric_only and not is_numeric: - continue + def reset_identity(values): + # reset the identities of the components + # of the values to prevent aliasing + for v in com.not_none(*values): + ax = v._get_axis(self.axis) + ax._reset_identity() + return values - try: - result = self.grouper._cython_operation( - "transform", obj._values, how, axis, **kwargs + if not not_indexed_same: + result = concat(values, axis=self.axis) + ax = self.filter(lambda x: True).axes[self.axis] + + # this is a very unfortunate situation + # we can't use reindex to restore the original order + # when the ax has duplicates + # so we resort to this + # GH 14776, 30667 + if ax.has_duplicates and not result.axes[self.axis].equals(ax): + indexer, _ = result.index.get_indexer_non_unique(ax._values) + indexer = algorithms.unique1d(indexer) + result = result.take(indexer, axis=self.axis) + else: + result = result.reindex(ax, axis=self.axis, copy=False) + + elif self.group_keys: + + values = reset_identity(values) + if self.as_index: + + # possible MI return case + group_keys = keys + group_levels = self.grouper.levels + group_names = self.grouper.names + + result = concat( + values, + axis=self.axis, + keys=group_keys, + levels=group_levels, + names=group_names, + sort=False, ) - except (NotImplementedError, TypeError): - continue + else: - key = base.OutputKey(label=name, position=idx) - output[key] = result + # GH5610, returns a MI, with the first level being a + # range index + keys = list(range(len(values))) + result = concat(values, axis=self.axis, keys=keys) + else: + values = reset_identity(values) + result = concat(values, axis=self.axis) - if not output: - raise DataError("No numeric types to aggregate") + if isinstance(result, Series) and self._selection_name is not None: - return self._wrap_transformed_output(output) + result.name = self._selection_name + + return result + + @final + def _set_result_index_ordered( + self, result: OutputFrameOrSeries + ) -> OutputFrameOrSeries: + # set the result index on the passed values object and + # return the new object, xref 8046 + + if self.grouper.is_monotonic: + # shortcut if we have an already ordered grouper + result.set_axis(self.obj._get_axis(self.axis), axis=self.axis, inplace=True) + return result + + # row order is scrambled => sort the rows by position in original index + original_positions = Index( + np.concatenate(self._get_indices(self.grouper.result_index)) + ) + result.set_axis(original_positions, axis=self.axis, inplace=True) + result = result.sort_index(axis=self.axis) + + dropped_rows = len(result.index) < len(self.obj.index) + + if dropped_rows: + # get index by slicing original index according to original positions + # slice drops attrs => use set_axis when no rows were dropped + sorted_indexer = result.index + result.index = self._selected_obj.index[sorted_indexer] + else: + result.set_axis(self.obj._get_axis(self.axis), axis=self.axis, inplace=True) + + return result def _wrap_aggregated_output( self, output: Mapping[base.OutputKey, np.ndarray], index: Index | None @@ -1055,84 +1067,8 @@ def _wrap_transformed_output(self, output: Mapping[base.OutputKey, ArrayLike]): def _wrap_applied_output(self, data, keys, values, not_indexed_same: bool = False): raise AbstractMethodError(self) - @final - def _agg_general( - self, - numeric_only: bool = True, - min_count: int = -1, - *, - alias: str, - npfunc: Callable, - ): - with group_selection_context(self): - # try a cython aggregation if we can - result = None - try: - result = self._cython_agg_general( - how=alias, - alt=npfunc, - numeric_only=numeric_only, - min_count=min_count, - ) - except DataError: - pass - except NotImplementedError as err: - if "function is not implemented for this dtype" in str( - err - ) or "category dtype not supported" in str(err): - # raised in _get_cython_function, in some cases can - # be trimmed by implementing cython funcs for more dtypes - pass - else: - raise - - # apply a non-cython aggregation - if result is None: - result = self.aggregate(lambda x: npfunc(x, axis=self.axis)) - return result.__finalize__(self.obj, method="groupby") - - def _cython_agg_general( - self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 - ): - output: dict[base.OutputKey, ArrayLike] = {} - # Ideally we would be able to enumerate self._iterate_slices and use - # the index from enumeration as the key of output, but ohlc in particular - # returns a (n x 4) array. Output requires 1D ndarrays as values, so we - # need to slice that up into 1D arrays - idx = 0 - for obj in self._iterate_slices(): - name = obj.name - is_numeric = is_numeric_dtype(obj.dtype) - if numeric_only and not is_numeric: - continue - - result = self.grouper._cython_operation( - "aggregate", obj._values, how, axis=0, min_count=min_count - ) - - if how == "ohlc": - # e.g. ohlc - agg_names = ["open", "high", "low", "close"] - assert len(agg_names) == result.shape[1] - for result_column, result_name in zip(result.T, agg_names): - key = base.OutputKey(label=result_name, position=idx) - output[key] = result_column - idx += 1 - else: - assert result.ndim == 1 - key = base.OutputKey(label=name, position=idx) - output[key] = result - idx += 1 - - if not output: - raise DataError("No numeric types to aggregate") - - # error: Argument 1 to "_wrap_aggregated_output" of "BaseGroupBy" has - # incompatible type "Dict[OutputKey, Union[ndarray, DatetimeArray]]"; - # expected "Mapping[OutputKey, ndarray]" - return self._wrap_aggregated_output( - output, index=self.grouper.result_index # type: ignore[arg-type] - ) + # ----------------------------------------------------------------- + # numba @final def _numba_prep(self, func, data): @@ -1168,43 +1104,120 @@ def _transform_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs) sorted_data, sorted_index, starts, ends, len(group_keys), len(data.columns) ) - cache_key = (func, "groupby_transform") - if cache_key not in NUMBA_FUNC_CACHE: - NUMBA_FUNC_CACHE[cache_key] = numba_transform_func + cache_key = (func, "groupby_transform") + if cache_key not in NUMBA_FUNC_CACHE: + NUMBA_FUNC_CACHE[cache_key] = numba_transform_func + + # result values needs to be resorted to their original positions since we + # evaluated the data sorted by group + return result.take(np.argsort(sorted_index), axis=0) + + @final + def _aggregate_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs): + """ + Perform groupby aggregation routine with the numba engine. + + This routine mimics the data splitting routine of the DataSplitter class + to generate the indices of each group in the sorted data and then passes the + data and indices into a Numba jitted function. + """ + starts, ends, sorted_index, sorted_data = self._numba_prep(func, data) + group_keys = self.grouper._get_group_keys() + + numba_agg_func = numba_.generate_numba_agg_func( + tuple(args), kwargs, func, engine_kwargs + ) + result = numba_agg_func( + sorted_data, sorted_index, starts, ends, len(group_keys), len(data.columns) + ) + + cache_key = (func, "groupby_agg") + if cache_key not in NUMBA_FUNC_CACHE: + NUMBA_FUNC_CACHE[cache_key] = numba_agg_func + + if self.grouper.nkeys > 1: + index = MultiIndex.from_tuples(group_keys, names=self.grouper.names) + else: + index = Index(group_keys, name=self.grouper.names[0]) + return result, index + + # ----------------------------------------------------------------- + # apply/agg/transform + + @Appender( + _apply_docs["template"].format( + input="dataframe", examples=_apply_docs["dataframe_examples"] + ) + ) + def apply(self, func, *args, **kwargs): + + func = com.is_builtin_func(func) + + # this is needed so we don't try and wrap strings. If we could + # resolve functions to their callable functions prior, this + # wouldn't be needed + if args or kwargs: + if callable(func): + + @wraps(func) + def f(g): + with np.errstate(all="ignore"): + return func(g, *args, **kwargs) + + elif hasattr(nanops, "nan" + func): + # TODO: should we wrap this in to e.g. _is_builtin_func? + f = getattr(nanops, "nan" + func) + + else: + raise ValueError( + "func must be a callable if args or kwargs are supplied" + ) + else: + f = func + + # ignore SettingWithCopy here in case the user mutates + with option_context("mode.chained_assignment", None): + try: + result = self._python_apply_general(f, self._selected_obj) + except TypeError: + # gh-20949 + # try again, with .apply acting as a filtering + # operation, by excluding the grouping column + # This would normally not be triggered + # except if the udf is trying an operation that + # fails on *some* columns, e.g. a numeric operation + # on a string grouper column + + with group_selection_context(self): + return self._python_apply_general(f, self._selected_obj) - # result values needs to be resorted to their original positions since we - # evaluated the data sorted by group - return result.take(np.argsort(sorted_index), axis=0) + return result @final - def _aggregate_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs): + def _python_apply_general( + self, f: F, data: FrameOrSeriesUnion + ) -> FrameOrSeriesUnion: """ - Perform groupby aggregation routine with the numba engine. + Apply function f in python space - This routine mimics the data splitting routine of the DataSplitter class - to generate the indices of each group in the sorted data and then passes the - data and indices into a Numba jitted function. + Parameters + ---------- + f : callable + Function to apply + data : Series or DataFrame + Data to apply f to + + Returns + ------- + Series or DataFrame + data after applying f """ - starts, ends, sorted_index, sorted_data = self._numba_prep(func, data) - group_keys = self.grouper._get_group_keys() + keys, values, mutated = self.grouper.apply(f, data, self.axis) - numba_agg_func = numba_.generate_numba_agg_func( - tuple(args), kwargs, func, engine_kwargs - ) - result = numba_agg_func( - sorted_data, sorted_index, starts, ends, len(group_keys), len(data.columns) + return self._wrap_applied_output( + data, keys, values, not_indexed_same=mutated or self.mutated ) - cache_key = (func, "groupby_agg") - if cache_key not in NUMBA_FUNC_CACHE: - NUMBA_FUNC_CACHE[cache_key] = numba_agg_func - - if self.grouper.nkeys > 1: - index = MultiIndex.from_tuples(group_keys, names=self.grouper.names) - else: - index = Index(group_keys, name=self.grouper.names[0]) - return result, index - @final def _python_agg_general(self, func, *args, **kwargs): func = com.is_builtin_func(func) @@ -1249,66 +1262,78 @@ def _python_agg_general(self, func, *args, **kwargs): return self._wrap_aggregated_output(output, index=self.grouper.result_index) @final - def _concat_objects(self, keys, values, not_indexed_same: bool = False): - from pandas.core.reshape.concat import concat - - def reset_identity(values): - # reset the identities of the components - # of the values to prevent aliasing - for v in com.not_none(*values): - ax = v._get_axis(self.axis) - ax._reset_identity() - return values - - if not not_indexed_same: - result = concat(values, axis=self.axis) - ax = self.filter(lambda x: True).axes[self.axis] + def _agg_general( + self, + numeric_only: bool = True, + min_count: int = -1, + *, + alias: str, + npfunc: Callable, + ): + with group_selection_context(self): + # try a cython aggregation if we can + result = None + try: + result = self._cython_agg_general( + how=alias, + alt=npfunc, + numeric_only=numeric_only, + min_count=min_count, + ) + except DataError: + pass + except NotImplementedError as err: + if "function is not implemented for this dtype" in str( + err + ) or "category dtype not supported" in str(err): + # raised in _get_cython_function, in some cases can + # be trimmed by implementing cython funcs for more dtypes + pass + else: + raise - # this is a very unfortunate situation - # we can't use reindex to restore the original order - # when the ax has duplicates - # so we resort to this - # GH 14776, 30667 - if ax.has_duplicates and not result.axes[self.axis].equals(ax): - indexer, _ = result.index.get_indexer_non_unique(ax._values) - indexer = algorithms.unique1d(indexer) - result = result.take(indexer, axis=self.axis) - else: - result = result.reindex(ax, axis=self.axis, copy=False) + # apply a non-cython aggregation + if result is None: + result = self.aggregate(lambda x: npfunc(x, axis=self.axis)) + return result.__finalize__(self.obj, method="groupby") - elif self.group_keys: + def _cython_agg_general( + self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 + ): + raise AbstractMethodError(self) - values = reset_identity(values) - if self.as_index: + @final + def _cython_transform( + self, how: str, numeric_only: bool = True, axis: int = 0, **kwargs + ): + output: dict[base.OutputKey, ArrayLike] = {} - # possible MI return case - group_keys = keys - group_levels = self.grouper.levels - group_names = self.grouper.names + for idx, obj in enumerate(self._iterate_slices()): + name = obj.name + is_numeric = is_numeric_dtype(obj.dtype) + if numeric_only and not is_numeric: + continue - result = concat( - values, - axis=self.axis, - keys=group_keys, - levels=group_levels, - names=group_names, - sort=False, + try: + result = self.grouper._cython_operation( + "transform", obj._values, how, axis, **kwargs ) - else: + except (NotImplementedError, TypeError): + continue - # GH5610, returns a MI, with the first level being a - # range index - keys = list(range(len(values))) - result = concat(values, axis=self.axis, keys=keys) - else: - values = reset_identity(values) - result = concat(values, axis=self.axis) + key = base.OutputKey(label=name, position=idx) + output[key] = result - if isinstance(result, Series) and self._selection_name is not None: + if not output: + raise DataError("No numeric types to aggregate") - result.name = self._selection_name + return self._wrap_transformed_output(output) - return result + def transform(self, func, *args, **kwargs): + raise AbstractMethodError(self) + + # ----------------------------------------------------------------- + # Utilities @final def _apply_filter(self, indices, dropna): @@ -1327,78 +1352,40 @@ def _apply_filter(self, indices, dropna): filtered = self._selected_obj.where(mask) # Fill with NaNs. return filtered + @final + def _cumcount_array(self, ascending: bool = True): + """ + Parameters + ---------- + ascending : bool, default True + If False, number in reverse, from length of group - 1 to 0. -# To track operations that expand dimensions, like ohlc -OutputFrameOrSeries = TypeVar("OutputFrameOrSeries", bound=NDFrame) - - -class GroupBy(BaseGroupBy[FrameOrSeries]): - """ - Class for grouping and aggregating relational data. - - See aggregate, transform, and apply functions on this object. - - It's easiest to use obj.groupby(...) to use GroupBy, but you can also do: - - :: - - grouped = groupby(obj, ...) - - Parameters - ---------- - obj : pandas object - axis : int, default 0 - level : int, default None - Level of MultiIndex - groupings : list of Grouping objects - Most users should ignore this - exclusions : array-like, optional - List of columns to exclude - name : str - Most users should ignore this - - Returns - ------- - **Attributes** - groups : dict - {group name -> group labels} - len(grouped) : int - Number of groups - - Notes - ----- - After grouping, see aggregate, apply, and transform functions. Here are - some other brief notes about usage. When grouping by multiple groups, the - result index will be a MultiIndex (hierarchical) by default. - - Iteration produces (key, group) tuples, i.e. chunking the data by group. So - you can write code like: - - :: - - grouped = obj.groupby(keys, axis=axis) - for key, group in grouped: - # do something with the data - - Function calls on GroupBy, if not specially implemented, "dispatch" to the - grouped data. So if you group a DataFrame and wish to invoke the std() - method on each group, you can simply do: - - :: - - df.groupby(mapper).std() + Notes + ----- + this is currently implementing sort=False + (though the default is sort=True) for groupby in general + """ + ids, _, ngroups = self.grouper.group_info + sorter = get_group_index_sorter(ids, ngroups) + ids, count = ids[sorter], len(ids) - rather than + if count == 0: + return np.empty(0, dtype=np.int64) - :: + run = np.r_[True, ids[:-1] != ids[1:]] + rep = np.diff(np.r_[np.nonzero(run)[0], count]) + out = (~run).cumsum() - df.groupby(mapper).aggregate(np.std) + if ascending: + out -= np.repeat(out[run], rep) + else: + out = np.repeat(out[np.r_[run[1:], True]], rep) - out - You can pass arguments to these "wrapped" functions, too. + rev = np.empty(count, dtype=np.intp) + rev[sorter] = np.arange(count, dtype=np.intp) + return out[rev].astype(np.int64, copy=False) - See the online documentation for full exposition on these topics and much - more - """ + # ----------------------------------------------------------------- @final @property diff --git a/pandas/core/resample.py b/pandas/core/resample.py index 58003c10db9e0..95690146e94a8 100644 --- a/pandas/core/resample.py +++ b/pandas/core/resample.py @@ -110,6 +110,11 @@ class Resampler(BaseGroupBy, PandasObject): After resampling, see aggregate, apply, and transform functions. """ + # error: Incompatible types in assignment (expression has type + # "Optional[BinGrouper]", base class "BaseGroupBy" defined the type as + # "BaseGrouper") + grouper: BinGrouper | None # type: ignore[assignment] + # to the groupby descriptor _attributes = [ "freq", @@ -134,9 +139,7 @@ def __init__(self, obj, groupby=None, axis=0, kind=None, **kwargs): self.as_index = True self.exclusions = set() self.binner = None - # error: Incompatible types in assignment (expression has type "None", variable - # has type "BaseGrouper") - self.grouper = None # type: ignore[assignment] + self.grouper = None if self.groupby is not None: self.groupby._set_grouper(self._convert_obj(obj), sort=True) @@ -1150,10 +1153,10 @@ def _downsample(self, how, **kwargs): # do we have a regular frequency - # error: "BaseGrouper" has no attribute "binlabels" + # error: Item "None" of "Optional[Any]" has no attribute "binlabels" if ( (ax.freq is not None or ax.inferred_freq is not None) - and len(self.grouper.binlabels) > len(ax) # type: ignore[attr-defined] + and len(self.grouper.binlabels) > len(ax) # type: ignore[union-attr] and how is None ):