From 7a4bed53673e6d7f965f2552bdc6836fe16ca9de Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Mon, 7 Oct 2019 22:20:53 -0700 Subject: [PATCH 1/4] Moved NDFrame into DataFrameGroupBy --- pandas/core/groupby/generic.py | 2180 ++++++++++++++++---------------- 1 file changed, 1089 insertions(+), 1091 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index e556708dc9283..3eb66e5f4bf6c 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -129,663 +129,637 @@ def pinner(cls): return pinner -class NDFrameGroupBy(GroupBy): - def _iterate_slices(self): - if self.axis == 0: - # kludge - if self._selection is None: - slice_axis = self.obj.columns - else: - slice_axis = self._selection_list - slicer = lambda x: self.obj[x] - else: - slice_axis = self.obj.index - slicer = self.obj.xs - - for val in slice_axis: - if val in self.exclusions: - continue - yield val, slicer(val) - - def _cython_agg_general(self, how, alt=None, numeric_only=True, min_count=-1): - new_items, new_blocks = self._cython_agg_blocks( - how, alt=alt, numeric_only=numeric_only, min_count=min_count - ) - return self._wrap_agged_blocks(new_items, new_blocks) - - _block_agg_axis = 0 - - def _cython_agg_blocks(self, how, alt=None, numeric_only=True, min_count=-1): - # TODO: the actual managing of mgr_locs is a PITA - # here, it should happen via BlockManager.combine - - data, agg_axis = self._get_data_to_aggregate() - - if numeric_only: - data = data.get_numeric_data(copy=False) - - new_blocks = [] - new_items = [] - deleted_items = [] - no_result = object() - for block in data.blocks: - # Avoid inheriting result from earlier in the loop - result = no_result - locs = block.mgr_locs.as_array - try: - result, _ = self.grouper.aggregate( - block.values, how, axis=agg_axis, min_count=min_count - ) - except NotImplementedError: - # generally if we have numeric_only=False - # and non-applicable functions - # try to python agg +@pin_whitelisted_properties(Series, base.series_apply_whitelist) +class SeriesGroupBy(GroupBy): + _apply_whitelist = base.series_apply_whitelist - if alt is None: - # we cannot perform the operation - # in an alternate way, exclude the block - deleted_items.append(locs) - continue + @property + def _selection_name(self): + """ + since we are a series, we by definition only have + a single name, but may be the result of a selection or + the name of our object + """ + if self._selection is None: + return self.obj.name + else: + return self._selection - # call our grouper again with only this block - obj = self.obj[data.items[locs]] - s = groupby(obj, self.grouper) - try: - result = s.aggregate(lambda x: alt(x, axis=self.axis)) - except TypeError: - # we may have an exception in trying to aggregate - # continue and exclude the block - deleted_items.append(locs) - continue - finally: - if result is not no_result: - # see if we can cast the block back to the original dtype - result = maybe_downcast_numeric(result, block.dtype) - newb = block.make_block(result) + _agg_see_also_doc = dedent( + """ + See Also + -------- + pandas.Series.groupby.apply + pandas.Series.groupby.transform + pandas.Series.aggregate + """ + ) - new_items.append(locs) - new_blocks.append(newb) + _agg_examples_doc = dedent( + """ + Examples + -------- + >>> s = pd.Series([1, 2, 3, 4]) - if len(new_blocks) == 0: - raise DataError("No numeric types to aggregate") + >>> s + 0 1 + 1 2 + 2 3 + 3 4 + dtype: int64 - # reset the locs in the blocks to correspond to our - # current ordering - indexer = np.concatenate(new_items) - new_items = data.items.take(np.sort(indexer)) + >>> s.groupby([1, 1, 2, 2]).min() + 1 1 + 2 3 + dtype: int64 - if len(deleted_items): + >>> s.groupby([1, 1, 2, 2]).agg('min') + 1 1 + 2 3 + dtype: int64 - # we need to adjust the indexer to account for the - # items we have removed - # really should be done in internals :< + >>> s.groupby([1, 1, 2, 2]).agg(['min', 'max']) + min max + 1 1 2 + 2 3 4 - deleted = np.concatenate(deleted_items) - ai = np.arange(len(data)) - mask = np.zeros(len(data)) - mask[deleted] = 1 - indexer = (ai - mask.cumsum())[indexer] + The output column names can be controlled by passing + the desired column names and aggregations as keyword arguments. - offset = 0 - for b in new_blocks: - loc = len(b.mgr_locs) - b.mgr_locs = indexer[offset : (offset + loc)] - offset += loc + >>> s.groupby([1, 1, 2, 2]).agg( + ... minimum='min', + ... maximum='max', + ... ) + minimum maximum + 1 1 2 + 2 3 4 + """ + ) - return new_items, new_blocks + @Appender( + _apply_docs["template"].format( + input="series", examples=_apply_docs["series_examples"] + ) + ) + def apply(self, func, *args, **kwargs): + return super().apply(func, *args, **kwargs) - def aggregate(self, func, *args, **kwargs): + @Substitution( + see_also=_agg_see_also_doc, + examples=_agg_examples_doc, + versionadded="", + klass="Series", + axis="", + ) + @Appender(_shared_docs["aggregate"]) + def aggregate(self, func=None, *args, **kwargs): _level = kwargs.pop("_level", None) - relabeling = func is None and _is_multi_agg_with_relabel(**kwargs) + relabeling = func is None + columns = None + no_arg_message = "Must provide 'func' or named aggregation **kwargs." if relabeling: - func, columns, order = _normalize_keyword_aggregation(kwargs) + columns = list(kwargs) + if not PY36: + # sort for 3.5 and earlier + columns = list(sorted(columns)) + func = [kwargs[col] for col in columns] kwargs = {} - elif func is None: - # nicer error message - raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).") - - func = _maybe_mangle_lambdas(func) + if not columns: + raise TypeError(no_arg_message) - result, how = self._aggregate(func, _level=_level, *args, **kwargs) - if how is None: - return result + if isinstance(func, str): + return getattr(self, func)(*args, **kwargs) - if result is None: + if isinstance(func, abc.Iterable): + # Catch instances of lists / tuples + # but not the class list / tuple itself. + func = _maybe_mangle_lambdas(func) + ret = self._aggregate_multiple_funcs(func, (_level or 0) + 1) + if relabeling: + ret.columns = columns + else: + cyfunc = self._get_cython_func(func) + if cyfunc and not args and not kwargs: + return getattr(self, cyfunc)() - # grouper specific aggregations if self.grouper.nkeys > 1: return self._python_agg_general(func, *args, **kwargs) - elif args or kwargs: - result = self._aggregate_generic(func, *args, **kwargs) - else: - # try to treat as if we are passing a list - try: - result = self._aggregate_multiple_funcs( - [func], _level=_level, _axis=self.axis - ) - except Exception: - result = self._aggregate_generic(func) - else: - result.columns = Index( - result.columns.levels[0], name=self._selected_obj.columns.name - ) + try: + return self._python_agg_general(func, *args, **kwargs) + except Exception: + result = self._aggregate_named(func, *args, **kwargs) - if not self.as_index: - self._insert_inaxis_grouper_inplace(result) - result.index = np.arange(len(result)) + index = Index(sorted(result), name=self.grouper.names[0]) + ret = Series(result, index=index) - if relabeling: + if not self.as_index: # pragma: no cover + print("Warning, ignoring as_index=True") - # used reordered index of columns - result = result.iloc[:, order] - result.columns = columns + # _level handled at higher + if not _level and isinstance(ret, dict): + from pandas import concat - return result._convert(datetime=True) + ret = concat(ret, axis=1) + return ret agg = aggregate - def _aggregate_generic(self, func, *args, **kwargs): - if self.grouper.nkeys != 1: - raise AssertionError("Number of keys must be 1") + def _aggregate_multiple_funcs(self, arg, _level): + if isinstance(arg, dict): - axis = self.axis - obj = self._obj_with_exclusions + # show the deprecation, but only if we + # have not shown a higher level one + # GH 15931 + if isinstance(self._selected_obj, Series) and _level <= 1: + msg = dedent( + """\ + using a dict on a Series for aggregation + is deprecated and will be removed in a future version. Use \ + named aggregation instead. - result = OrderedDict() - if axis != obj._info_axis_number: - try: - for name, data in self: - result[name] = self._try_cast(func(data, *args, **kwargs), data) - except Exception: - return self._aggregate_item_by_item(func, *args, **kwargs) - else: - for name in self.indices: - try: - data = self.get_group(name, obj=obj) - result[name] = self._try_cast(func(data, *args, **kwargs), data) - except Exception: - wrapper = lambda x: func(x, *args, **kwargs) - result[name] = data.apply(wrapper, axis=axis) - - return self._wrap_generic_output(result, obj) - - def _wrap_aggregated_output(self, output, names=None): - raise AbstractMethodError(self) + >>> grouper.agg(name_1=func_1, name_2=func_2) + """ + ) + warnings.warn(msg, FutureWarning, stacklevel=3) - def _aggregate_item_by_item(self, func, *args, **kwargs): - # only for axis==0 + columns = list(arg.keys()) + arg = arg.items() + elif any(isinstance(x, (tuple, list)) for x in arg): + arg = [(x, x) if not isinstance(x, (tuple, list)) else x for x in arg] - obj = self._obj_with_exclusions - result = OrderedDict() - cannot_agg = [] - errors = None - for item in obj: - data = obj[item] - colg = SeriesGroupBy(data, selection=item, grouper=self.grouper) + # indicated column order + columns = next(zip(*arg)) + else: + # list of functions / function names + columns = [] + for f in arg: + columns.append(com.get_callable_name(f) or f) - try: - cast = self._transform_should_cast(func) + arg = zip(columns, arg) - result[item] = colg.aggregate(func, *args, **kwargs) - if cast: - result[item] = self._try_cast(result[item], data) + results = OrderedDict() + for name, func in arg: + obj = self + if name in results: + raise SpecificationError( + "Function names must be unique, found multiple named " + "{}".format(name) + ) - except ValueError as err: - if "Must produce aggregated value" in str(err): - # raised in _aggregate_named, handle at higher level - # see test_apply_with_mutated_index - raise - cannot_agg.append(item) - continue - except TypeError as e: - cannot_agg.append(item) - errors = e - continue + # reset the cache so that we + # only include the named selection + if name in self._selected_obj: + obj = copy.copy(obj) + obj._reset_cache() + obj._selection = name + results[name] = obj.aggregate(func) - result_columns = obj.columns - if cannot_agg: - result_columns = result_columns.drop(cannot_agg) + if any(isinstance(x, DataFrame) for x in results.values()): + # let higher level handle + if _level: + return results - # GH6337 - if not len(result_columns) and errors is not None: - raise errors + return DataFrame(results, columns=columns) - return DataFrame(result, columns=result_columns) + def _wrap_output(self, output, index, names=None): + """ common agg/transform wrapping logic """ + output = output[self._selection_name] - def _decide_output_index(self, output, labels): - if len(output) == len(labels): - output_keys = labels + if names is not None: + return DataFrame(output, index=index, columns=names) else: - output_keys = sorted(output) - try: - output_keys.sort() - except TypeError: - pass + name = self._selection_name + if name is None: + name = self._selected_obj.name + return Series(output, index=index, name=name) - if isinstance(labels, MultiIndex): - output_keys = MultiIndex.from_tuples(output_keys, names=labels.names) + def _wrap_aggregated_output(self, output, names=None): + result = self._wrap_output( + output=output, index=self.grouper.result_index, names=names + ) + return self._reindex_output(result)._convert(datetime=True) - return output_keys + def _wrap_transformed_output(self, output, names=None): + return self._wrap_output(output=output, index=self.obj.index, names=names) def _wrap_applied_output(self, keys, values, not_indexed_same=False): if len(keys) == 0: - return DataFrame(index=keys) - - key_names = self.grouper.names + # GH #6265 + return Series([], name=self._selection_name, index=keys) - # GH12824. - def first_not_none(values): - try: - return next(com.not_none(*values)) - except StopIteration: - return None + def _get_index(): + if self.grouper.nkeys > 1: + index = MultiIndex.from_tuples(keys, names=self.grouper.names) + else: + index = Index(keys, name=self.grouper.names[0]) + return index - v = first_not_none(values) + if isinstance(values[0], dict): + # GH #823 #24880 + index = _get_index() + result = self._reindex_output(DataFrame(values, index=index)) + # if self.observed is False, + # keep all-NaN rows created while re-indexing + result = result.stack(dropna=self.observed) + result.name = self._selection_name + return result - if v is None: - # GH9684. If all values are None, then this will throw an error. - # We'd prefer it return an empty dataframe. - return DataFrame() - elif isinstance(v, DataFrame): + if isinstance(values[0], Series): return self._concat_objects(keys, values, not_indexed_same=not_indexed_same) - elif self.grouper.groupings is not None: - if len(self.grouper.groupings) > 1: - key_index = self.grouper.result_index + elif isinstance(values[0], DataFrame): + # possible that Series -> DataFrame by applied function + return self._concat_objects(keys, values, not_indexed_same=not_indexed_same) + else: + # GH #6265 #24880 + result = Series(data=values, index=_get_index(), name=self._selection_name) + return self._reindex_output(result) - else: - ping = self.grouper.groupings[0] - if len(keys) == ping.ngroups: - key_index = ping.group_index - key_index.name = key_names[0] + def _aggregate_named(self, func, *args, **kwargs): + result = OrderedDict() - key_lookup = Index(keys) - indexer = key_lookup.get_indexer(key_index) + for name, group in self: + group.name = name + output = func(group, *args, **kwargs) + if isinstance(output, (Series, Index, np.ndarray)): + raise ValueError("Must produce aggregated value") + result[name] = self._try_cast(output, group) - # reorder the values - values = [values[i] for i in indexer] - else: + return result - key_index = Index(keys, name=key_names[0]) + @Substitution(klass="Series", selected="A.") + @Appender(_transform_template) + def transform(self, func, *args, **kwargs): + func = self._get_cython_func(func) or func - # don't use the key indexer - if not self.as_index: - key_index = None + if isinstance(func, str): + if not (func in base.transform_kernel_whitelist): + msg = "'{func}' is not a valid function name for transform(name)" + raise ValueError(msg.format(func=func)) + if func in base.cythonized_kernels: + # cythonized transform or canned "agg+broadcast" + return getattr(self, func)(*args, **kwargs) + else: + # If func is a reduction, we need to broadcast the + # result to the whole group. Compute func result + # and deal with possible broadcasting below. + return self._transform_fast( + lambda: getattr(self, func)(*args, **kwargs), func + ) - # make Nones an empty object - v = first_not_none(values) - if v is None: - return DataFrame() - elif isinstance(v, NDFrame): - values = [ - x if x is not None else v._constructor(**v._construct_axes_dict()) - for x in values - ] + # reg transform + klass = self._selected_obj.__class__ + results = [] + wrapper = lambda x: func(x, *args, **kwargs) + for name, group in self: + object.__setattr__(group, "name", name) + res = wrapper(group) - v = values[0] + if isinstance(res, (ABCDataFrame, ABCSeries)): + res = res._values - if isinstance(v, (np.ndarray, Index, Series)): - if isinstance(v, Series): - applied_index = self._selected_obj._get_axis(self.axis) - all_indexed_same = _all_indexes_same([x.index for x in values]) - singular_series = len(values) == 1 and applied_index.nlevels == 1 + indexer = self._get_index(name) + s = klass(res, indexer) + results.append(s) - # GH3596 - # provide a reduction (Frame -> Series) if groups are - # unique - if self.squeeze: - # assign the name to this series - if singular_series: - values[0].name = keys[0] + # check for empty "results" to avoid concat ValueError + if results: + from pandas.core.reshape.concat import concat - # GH2893 - # we have series in the values array, we want to - # produce a series: - # if any of the sub-series are not indexed the same - # OR we don't have a multi-index and we have only a - # single values - return self._concat_objects( - keys, values, not_indexed_same=not_indexed_same - ) + result = concat(results).sort_index() + else: + result = Series() - # still a series - # path added as of GH 5545 - elif all_indexed_same: - from pandas.core.reshape.concat import concat + # we will only try to coerce the result type if + # we have a numeric dtype, as these are *always* udfs + # the cython take a different path (and casting) + dtype = self._selected_obj.dtype + if is_numeric_dtype(dtype): + result = maybe_downcast_to_dtype(result, dtype) - return concat(values) + result.name = self._selected_obj.name + result.index = self._selected_obj.index + return result - if not all_indexed_same: - # GH 8467 - return self._concat_objects(keys, values, not_indexed_same=True) + def _transform_fast(self, func, func_nm): + """ + fast version of transform, only applicable to + builtin/cythonizable functions + """ + if isinstance(func, str): + func = getattr(self, func) - try: - if self.axis == 0: - # GH6124 if the list of Series have a consistent name, - # then propagate that name to the result. - index = v.index.copy() - if index.name is None: - # Only propagate the series name to the result - # if all series have a consistent name. If the - # series do not have a consistent name, do - # nothing. - names = {v.name for v in values} - if len(names) == 1: - index.name = list(names)[0] + ids, _, ngroup = self.grouper.group_info + cast = self._transform_should_cast(func_nm) + out = algorithms.take_1d(func()._values, ids) + if cast: + out = self._try_cast(out, self.obj) + return Series(out, index=self.obj.index, name=self.obj.name) - # normally use vstack as its faster than concat - # and if we have mi-columns - if ( - isinstance(v.index, MultiIndex) - or key_index is None - or isinstance(key_index, MultiIndex) - ): - stacked_values = np.vstack([np.asarray(v) for v in values]) - result = DataFrame( - stacked_values, index=key_index, columns=index - ) - else: - # GH5788 instead of stacking; concat gets the - # dtypes correct - from pandas.core.reshape.concat import concat + def filter(self, func, dropna=True, *args, **kwargs): # noqa + """ + Return a copy of a Series excluding elements from groups that + do not satisfy the boolean criterion specified by func. - result = concat( - values, - keys=key_index, - names=key_index.names, - axis=self.axis, - ).unstack() - result.columns = index - else: - stacked_values = np.vstack([np.asarray(v) for v in values]) - result = DataFrame( - stacked_values.T, index=v.index, columns=key_index - ) + Parameters + ---------- + func : function + To apply to each group. Should return True or False. + dropna : Drop groups that do not pass the filter. True by default; + if False, groups that evaluate False are filled with NaNs. - except (ValueError, AttributeError): - # GH1738: values is list of arrays of unequal lengths fall - # through to the outer else caluse - return Series(values, index=key_index, name=self._selection_name) + Examples + -------- + >>> df = pd.DataFrame({'A' : ['foo', 'bar', 'foo', 'bar', + ... 'foo', 'bar'], + ... 'B' : [1, 2, 3, 4, 5, 6], + ... 'C' : [2.0, 5., 8., 1., 2., 9.]}) + >>> grouped = df.groupby('A') + >>> df.groupby('A').B.filter(lambda x: x.mean() > 3.) + 1 2 + 3 4 + 5 6 + Name: B, dtype: int64 - # if we have date/time like in the original, then coerce dates - # as we are stacking can easily have object dtypes here - so = self._selected_obj - if so.ndim == 2 and so.dtypes.apply(is_datetimelike).any(): - result = _recast_datetimelike_result(result) - else: - result = result._convert(datetime=True) + Returns + ------- + filtered : Series + """ + if isinstance(func, str): + wrapper = lambda x: getattr(x, func)(*args, **kwargs) + else: + wrapper = lambda x: func(x, *args, **kwargs) - return self._reindex_output(result) + # Interpret np.nan as False. + def true_and_notna(x, *args, **kwargs): + b = wrapper(x, *args, **kwargs) + return b and notna(b) - # values are not series or array-like but scalars - else: - # only coerce dates if we find at least 1 datetime - coerce = any(isinstance(x, Timestamp) for x in values) - # self._selection_name not passed through to Series as the - # result should not take the name of original selection - # of columns - return Series(values, index=key_index)._convert( - datetime=True, coerce=coerce - ) + try: + indices = [ + self._get_index(name) for name, group in self if true_and_notna(group) + ] + except ValueError: + raise TypeError("the filter must return a boolean result") + except TypeError: + raise TypeError("the filter must return a boolean result") + + filtered = self._apply_filter(indices, dropna) + return filtered + + def nunique(self, dropna=True): + """ + Return number of unique elements in the group. + + Returns + ------- + Series + Number of unique values within each group. + """ + ids, _, _ = self.grouper.group_info + + val = self.obj._internal_get_values() + + # GH 27951 + # temporary fix while we wait for NumPy bug 12629 to be fixed + val[isna(val)] = np.datetime64("NaT") + try: + sorter = np.lexsort((val, ids)) + except TypeError: # catches object dtypes + msg = "val.dtype must be object, got {}".format(val.dtype) + assert val.dtype == object, msg + val, _ = algorithms.factorize(val, sort=False) + sorter = np.lexsort((val, ids)) + _isna = lambda a: a == -1 else: - # Handle cases like BinGrouper - return self._concat_objects(keys, values, not_indexed_same=not_indexed_same) + _isna = isna - def _transform_general(self, func, *args, **kwargs): - from pandas.core.reshape.concat import concat + ids, val = ids[sorter], val[sorter] - applied = [] - obj = self._obj_with_exclusions - gen = self.grouper.get_iterator(obj, axis=self.axis) - fast_path, slow_path = self._define_paths(func, *args, **kwargs) + # group boundaries are where group ids change + # unique observations are where sorted values change + idx = np.r_[0, 1 + np.nonzero(ids[1:] != ids[:-1])[0]] + inc = np.r_[1, val[1:] != val[:-1]] - path = None - for name, group in gen: - object.__setattr__(group, "name", name) + # 1st item of each group is a new unique observation + mask = _isna(val) + if dropna: + inc[idx] = 1 + inc[mask] = 0 + else: + inc[mask & np.r_[False, mask[:-1]]] = 0 + inc[idx] = 1 - if path is None: - # Try slow path and fast path. - try: - path, res = self._choose_path(fast_path, slow_path, group) - except TypeError: - return self._transform_item_by_item(obj, fast_path) - except ValueError: - msg = "transform must return a scalar value for each group" - raise ValueError(msg) + out = np.add.reduceat(inc, idx).astype("int64", copy=False) + if len(ids): + # NaN/NaT group exists if the head of ids is -1, + # so remove it from res and exclude its index from idx + if ids[0] == -1: + res = out[1:] + idx = idx[np.flatnonzero(idx)] else: - res = path(group) + res = out + else: + res = out[1:] + ri = self.grouper.result_index - if isinstance(res, Series): + # we might have duplications among the bins + if len(res) != len(ri): + res, out = np.zeros(len(ri), dtype=out.dtype), res + res[ids[idx]] = out - # we need to broadcast across the - # other dimension; this will preserve dtypes - # GH14457 - if not np.prod(group.shape): - continue - elif res.index.is_(obj.index): - r = concat([res] * len(group.columns), axis=1) - r.columns = group.columns - r.index = group.index - else: - r = DataFrame( - np.concatenate([res.values] * len(group.index)).reshape( - group.shape - ), - columns=group.columns, - index=group.index, - ) + return Series(res, index=ri, name=self._selection_name) - applied.append(r) - else: - applied.append(res) + @Appender(Series.describe.__doc__) + def describe(self, **kwargs): + result = self.apply(lambda x: x.describe(**kwargs)) + if self.axis == 1: + return result.T + return result.unstack() - concat_index = obj.columns if self.axis == 0 else obj.index - other_axis = 1 if self.axis == 0 else 0 # switches between 0 & 1 - concatenated = concat(applied, axis=self.axis, verify_integrity=False) - concatenated = concatenated.reindex(concat_index, axis=other_axis, copy=False) - return self._set_result_index_ordered(concatenated) + def value_counts( + self, normalize=False, sort=True, ascending=False, bins=None, dropna=True + ): - @Substitution(klass="DataFrame", selected="") - @Appender(_transform_template) - def transform(self, func, *args, **kwargs): + from pandas.core.reshape.tile import cut + from pandas.core.reshape.merge import _get_join_indexers - # optimized transforms - func = self._get_cython_func(func) or func + if bins is not None and not np.iterable(bins): + # scalar bins cannot be done at top level + # in a backward compatible way + return self.apply( + Series.value_counts, + normalize=normalize, + sort=sort, + ascending=ascending, + bins=bins, + ) - if isinstance(func, str): - if not (func in base.transform_kernel_whitelist): - msg = "'{func}' is not a valid function name for transform(name)" - raise ValueError(msg.format(func=func)) - if func in base.cythonized_kernels: - # cythonized transformation or canned "reduction+broadcast" - return getattr(self, func)(*args, **kwargs) - else: - # If func is a reduction, we need to broadcast the - # result to the whole group. Compute func result - # and deal with possible broadcasting below. - result = getattr(self, func)(*args, **kwargs) + ids, _, _ = self.grouper.group_info + val = self.obj._internal_get_values() + + # groupby removes null keys from groupings + mask = ids != -1 + ids, val = ids[mask], val[mask] + + if bins is None: + lab, lev = algorithms.factorize(val, sort=True) + llab = lambda lab, inc: lab[inc] else: - return self._transform_general(func, *args, **kwargs) - # a reduction transform - if not isinstance(result, DataFrame): - return self._transform_general(func, *args, **kwargs) + # lab is a Categorical with categories an IntervalIndex + lab = cut(Series(val), bins, include_lowest=True) + lev = lab.cat.categories + lab = lev.take(lab.cat.codes) + llab = lambda lab, inc: lab[inc]._multiindex.codes[-1] - obj = self._obj_with_exclusions + if is_interval_dtype(lab): + # TODO: should we do this inside II? + sorter = np.lexsort((lab.left, lab.right, ids)) + else: + sorter = np.lexsort((lab, ids)) - # nuisance columns - if not result.columns.equals(obj.columns): - return self._transform_general(func, *args, **kwargs) + ids, lab = ids[sorter], lab[sorter] - return self._transform_fast(result, obj, func) + # group boundaries are where group ids change + idx = np.r_[0, 1 + np.nonzero(ids[1:] != ids[:-1])[0]] - def _transform_fast(self, result, obj, func_nm): - """ - Fast transform path for aggregations - """ - # if there were groups with no observations (Categorical only?) - # try casting data to original dtype - cast = self._transform_should_cast(func_nm) + # new values are where sorted labels change + lchanges = llab(lab, slice(1, None)) != llab(lab, slice(None, -1)) + inc = np.r_[True, lchanges] + inc[idx] = True # group boundaries are also new values + out = np.diff(np.nonzero(np.r_[inc, True])[0]) # value counts - # for each col, reshape to to size of original frame - # by take operation - ids, _, ngroup = self.grouper.group_info - output = [] - for i, _ in enumerate(result.columns): - res = algorithms.take_1d(result.iloc[:, i].values, ids) - if cast: - res = self._try_cast(res, obj.iloc[:, i]) - output.append(res) + # num. of times each group should be repeated + rep = partial(np.repeat, repeats=np.add.reduceat(inc, idx)) - return DataFrame._from_arrays(output, columns=result.columns, index=obj.index) + # multi-index components + labels = list(map(rep, self.grouper.recons_labels)) + [llab(lab, inc)] + levels = [ping.group_index for ping in self.grouper.groupings] + [lev] + names = self.grouper.names + [self._selection_name] - def _define_paths(self, func, *args, **kwargs): - if isinstance(func, str): - fast_path = lambda group: getattr(group, func)(*args, **kwargs) - slow_path = lambda group: group.apply( - lambda x: getattr(x, func)(*args, **kwargs), axis=self.axis - ) - else: - fast_path = lambda group: func(group, *args, **kwargs) - slow_path = lambda group: group.apply( - lambda x: func(x, *args, **kwargs), axis=self.axis + if dropna: + mask = labels[-1] != -1 + if mask.all(): + dropna = False + else: + out, labels = out[mask], [label[mask] for label in labels] + + if normalize: + out = out.astype("float") + d = np.diff(np.r_[idx, len(ids)]) + if dropna: + m = ids[lab == -1] + np.add.at(d, m, -1) + acc = rep(d)[mask] + else: + acc = rep(d) + out /= acc + + if sort and bins is None: + cat = ids[inc][mask] if dropna else ids[inc] + sorter = np.lexsort((out if ascending else -out, cat)) + out, labels[-1] = out[sorter], labels[-1][sorter] + + if bins is None: + mi = MultiIndex( + levels=levels, codes=labels, names=names, verify_integrity=False ) - return fast_path, slow_path - def _choose_path(self, fast_path, slow_path, group): - path = slow_path - res = slow_path(group) + if is_integer_dtype(out): + out = ensure_int64(out) + return Series(out, index=mi, name=self._selection_name) - # if we make it here, test if we can use the fast path - try: - res_fast = fast_path(group) - except Exception: - # Hard to know ex-ante what exceptions `fast_path` might raise - return path, res + # for compat. with libgroupby.value_counts need to ensure every + # bin is present at every index level, null filled with zeros + diff = np.zeros(len(out), dtype="bool") + for lab in labels[:-1]: + diff |= np.r_[True, lab[1:] != lab[:-1]] - # verify fast path does not change columns (and names), otherwise - # its results cannot be joined with those of the slow path - if not isinstance(res_fast, DataFrame): - return path, res + ncat, nbin = diff.sum(), len(levels[-1]) - if not res_fast.columns.equals(group.columns): - return path, res + left = [np.repeat(np.arange(ncat), nbin), np.tile(np.arange(nbin), ncat)] - if res_fast.equals(res): - path = fast_path + right = [diff.cumsum() - 1, labels[-1]] - return path, res + _, idx = _get_join_indexers(left, right, sort=False, how="left") + out = np.where(idx != -1, out[idx], 0) - def _transform_item_by_item(self, obj, wrapper): - # iterate through columns - output = {} - inds = [] - for i, col in enumerate(obj): - try: - output[col] = self[col].transform(wrapper) - inds.append(i) - except Exception: - pass + if sort: + sorter = np.lexsort((out if ascending else -out, left[0])) + out, left[-1] = out[sorter], left[-1][sorter] - if len(output) == 0: - raise TypeError("Transform function invalid for data types") + # build the multi-index w/ full levels + codes = list(map(lambda lab: np.repeat(lab[diff], nbin), labels[:-1])) + codes.append(left[-1]) - columns = obj.columns - if len(output) < len(obj.columns): - columns = columns.take(inds) + mi = MultiIndex(levels=levels, codes=codes, names=names, verify_integrity=False) - return DataFrame(output, index=obj.index, columns=columns) + if is_integer_dtype(out): + out = ensure_int64(out) + return Series(out, index=mi, name=self._selection_name) - def filter(self, func, dropna=True, *args, **kwargs): + def count(self): """ - Return a copy of a DataFrame excluding elements from groups that - do not satisfy the boolean criterion specified by func. - - Parameters - ---------- - f : function - Function to apply to each subframe. Should return True or False. - dropna : Drop groups that do not pass the filter. True by default; - If False, groups that evaluate False are filled with NaNs. + Compute count of group, excluding missing values. Returns ------- - filtered : DataFrame - - Notes - ----- - Each subframe is endowed the attribute 'name' in case you need to know - which group you are working on. - - Examples - -------- - >>> df = pd.DataFrame({'A' : ['foo', 'bar', 'foo', 'bar', - ... 'foo', 'bar'], - ... 'B' : [1, 2, 3, 4, 5, 6], - ... 'C' : [2.0, 5., 8., 1., 2., 9.]}) - >>> grouped = df.groupby('A') - >>> grouped.filter(lambda x: x['B'].mean() > 3.) - A B C - 1 bar 2 5.0 - 3 bar 4 1.0 - 5 bar 6 9.0 + Series + Count of values within each group. """ + ids, _, ngroups = self.grouper.group_info + val = self.obj._internal_get_values() - indices = [] - - obj = self._selected_obj - gen = self.grouper.get_iterator(obj, axis=self.axis) - - for name, group in gen: - object.__setattr__(group, "name", name) + mask = (ids != -1) & ~isna(val) + ids = ensure_platform_int(ids) + minlength = ngroups or 0 + out = np.bincount(ids[mask], minlength=minlength) - res = func(group, *args, **kwargs) + return Series( + out, + index=self.grouper.result_index, + name=self._selection_name, + dtype="int64", + ) - try: - res = res.squeeze() - except AttributeError: # allow e.g., scalars and frames to pass - pass + def _apply_to_column_groupbys(self, func): + """ return a pass thru """ + return func(self) - # interpret the result of the filter - if is_bool(res) or (is_scalar(res) and isna(res)): - if res and notna(res): - indices.append(self._get_index(name)) - else: - # non scalars aren't allowed - raise TypeError( - "filter function returned a %s, " - "but expected a scalar bool" % type(res).__name__ + def pct_change(self, periods=1, fill_method="pad", limit=None, freq=None): + """Calculate pct_change of each value to previous entry in group""" + # TODO: Remove this conditional when #23918 is fixed + if freq: + return self.apply( + lambda x: x.pct_change( + periods=periods, fill_method=fill_method, limit=limit, freq=freq ) + ) + filled = getattr(self, fill_method)(limit=limit) + fill_grp = filled.groupby(self.grouper.labels) + shifted = fill_grp.shift(periods=periods, freq=freq) - return self._apply_filter(indices, dropna) + return (filled / shifted) - 1 -@pin_whitelisted_properties(Series, base.series_apply_whitelist) -class SeriesGroupBy(GroupBy): - _apply_whitelist = base.series_apply_whitelist +@pin_whitelisted_properties(DataFrame, base.dataframe_apply_whitelist) +class DataFrameGroupBy(NDFrameGroupBy): - @property - def _selection_name(self): - """ - since we are a series, we by definition only have - a single name, but may be the result of a selection or - the name of our object - """ - if self._selection is None: - return self.obj.name - else: - return self._selection + _apply_whitelist = base.dataframe_apply_whitelist + + _block_agg_axis = 1 _agg_see_also_doc = dedent( """ See Also -------- - pandas.Series.groupby.apply - pandas.Series.groupby.transform - pandas.Series.aggregate + pandas.DataFrame.groupby.apply + pandas.DataFrame.groupby.transform + pandas.DataFrame.aggregate """ ) @@ -793,694 +767,718 @@ def _selection_name(self): """ Examples -------- - >>> s = pd.Series([1, 2, 3, 4]) - >>> s - 0 1 - 1 2 - 2 3 - 3 4 - dtype: int64 + >>> df = pd.DataFrame({'A': [1, 1, 2, 2], + ... 'B': [1, 2, 3, 4], + ... 'C': np.random.randn(4)}) - >>> s.groupby([1, 1, 2, 2]).min() - 1 1 - 2 3 - dtype: int64 + >>> df + A B C + 0 1 1 0.362838 + 1 1 2 0.227877 + 2 2 3 1.267767 + 3 2 4 -0.562860 - >>> s.groupby([1, 1, 2, 2]).agg('min') - 1 1 - 2 3 - dtype: int64 + The aggregation is for each column. - >>> s.groupby([1, 1, 2, 2]).agg(['min', 'max']) - min max - 1 1 2 - 2 3 4 + >>> df.groupby('A').agg('min') + B C + A + 1 1 0.227877 + 2 3 -0.562860 - The output column names can be controlled by passing - the desired column names and aggregations as keyword arguments. + Multiple aggregations - >>> s.groupby([1, 1, 2, 2]).agg( - ... minimum='min', - ... maximum='max', - ... ) - minimum maximum - 1 1 2 - 2 3 4 - """ - ) + >>> df.groupby('A').agg(['min', 'max']) + B C + min max min max + A + 1 1 2 0.227877 0.362838 + 2 3 4 -0.562860 1.267767 - @Appender( - _apply_docs["template"].format( - input="series", examples=_apply_docs["series_examples"] - ) - ) - def apply(self, func, *args, **kwargs): - return super().apply(func, *args, **kwargs) + Select a column for aggregation - @Substitution( - see_also=_agg_see_also_doc, - examples=_agg_examples_doc, - versionadded="", - klass="Series", - axis="", - ) - @Appender(_shared_docs["aggregate"]) - def aggregate(self, func=None, *args, **kwargs): - _level = kwargs.pop("_level", None) + >>> df.groupby('A').B.agg(['min', 'max']) + min max + A + 1 1 2 + 2 3 4 - relabeling = func is None - columns = None - no_arg_message = "Must provide 'func' or named aggregation **kwargs." - if relabeling: - columns = list(kwargs) - if not PY36: - # sort for 3.5 and earlier - columns = list(sorted(columns)) + Different aggregations per column - func = [kwargs[col] for col in columns] - kwargs = {} - if not columns: - raise TypeError(no_arg_message) + >>> df.groupby('A').agg({'B': ['min', 'max'], 'C': 'sum'}) + B C + min max sum + A + 1 1 2 0.590716 + 2 3 4 0.704907 - if isinstance(func, str): - return getattr(self, func)(*args, **kwargs) + To control the output names with different aggregations per column, + pandas supports "named aggregation" - if isinstance(func, abc.Iterable): - # Catch instances of lists / tuples - # but not the class list / tuple itself. - func = _maybe_mangle_lambdas(func) - ret = self._aggregate_multiple_funcs(func, (_level or 0) + 1) - if relabeling: - ret.columns = columns - else: - cyfunc = self._get_cython_func(func) - if cyfunc and not args and not kwargs: - return getattr(self, cyfunc)() + >>> df.groupby("A").agg( + ... b_min=pd.NamedAgg(column="B", aggfunc="min"), + ... c_sum=pd.NamedAgg(column="C", aggfunc="sum")) + b_min c_sum + A + 1 1 -1.956929 + 2 3 -0.322183 - if self.grouper.nkeys > 1: - return self._python_agg_general(func, *args, **kwargs) + - The keywords are the *output* column names + - The values are tuples whose first element is the column to select + and the second element is the aggregation to apply to that column. + Pandas provides the ``pandas.NamedAgg`` namedtuple with the fields + ``['column', 'aggfunc']`` to make it clearer what the arguments are. + As usual, the aggregation can be a callable or a string alias. - try: - return self._python_agg_general(func, *args, **kwargs) - except Exception: - result = self._aggregate_named(func, *args, **kwargs) + See :ref:`groupby.aggregate.named` for more. + """ + ) - index = Index(sorted(result), name=self.grouper.names[0]) - ret = Series(result, index=index) + @Substitution( + see_also=_agg_see_also_doc, + examples=_agg_examples_doc, + versionadded="", + klass="DataFrame", + axis="", + ) + @Appender(_shared_docs["aggregate"]) + def aggregate(self, func=None, *args, **kwargs): + return super().aggregate(func, *args, **kwargs) - if not self.as_index: # pragma: no cover - print("Warning, ignoring as_index=True") + agg = aggregate - # _level handled at higher - if not _level and isinstance(ret, dict): - from pandas import concat + def _iterate_slices(self): + if self.axis == 0: + # kludge + if self._selection is None: + slice_axis = self.obj.columns + else: + slice_axis = self._selection_list + slicer = lambda x: self.obj[x] + else: + slice_axis = self.obj.index + slicer = self.obj.xs - ret = concat(ret, axis=1) - return ret + for val in slice_axis: + if val in self.exclusions: + continue + yield val, slicer(val) - agg = aggregate + def _cython_agg_general(self, how, alt=None, numeric_only=True, min_count=-1): + new_items, new_blocks = self._cython_agg_blocks( + how, alt=alt, numeric_only=numeric_only, min_count=min_count + ) + return self._wrap_agged_blocks(new_items, new_blocks) - def _aggregate_multiple_funcs(self, arg, _level): - if isinstance(arg, dict): + _block_agg_axis = 0 - # show the deprecation, but only if we - # have not shown a higher level one - # GH 15931 - if isinstance(self._selected_obj, Series) and _level <= 1: - msg = dedent( - """\ - using a dict on a Series for aggregation - is deprecated and will be removed in a future version. Use \ - named aggregation instead. + def _cython_agg_blocks(self, how, alt=None, numeric_only=True, min_count=-1): + # TODO: the actual managing of mgr_locs is a PITA + # here, it should happen via BlockManager.combine - >>> grouper.agg(name_1=func_1, name_2=func_2) - """ + data, agg_axis = self._get_data_to_aggregate() + + if numeric_only: + data = data.get_numeric_data(copy=False) + + new_blocks = [] + new_items = [] + deleted_items = [] + no_result = object() + for block in data.blocks: + # Avoid inheriting result from earlier in the loop + result = no_result + locs = block.mgr_locs.as_array + try: + result, _ = self.grouper.aggregate( + block.values, how, axis=agg_axis, min_count=min_count ) - warnings.warn(msg, FutureWarning, stacklevel=3) + except NotImplementedError: + # generally if we have numeric_only=False + # and non-applicable functions + # try to python agg - columns = list(arg.keys()) - arg = arg.items() - elif any(isinstance(x, (tuple, list)) for x in arg): - arg = [(x, x) if not isinstance(x, (tuple, list)) else x for x in arg] + if alt is None: + # we cannot perform the operation + # in an alternate way, exclude the block + deleted_items.append(locs) + continue - # indicated column order - columns = next(zip(*arg)) - else: - # list of functions / function names - columns = [] - for f in arg: - columns.append(com.get_callable_name(f) or f) + # call our grouper again with only this block + obj = self.obj[data.items[locs]] + s = groupby(obj, self.grouper) + try: + result = s.aggregate(lambda x: alt(x, axis=self.axis)) + except TypeError: + # we may have an exception in trying to aggregate + # continue and exclude the block + deleted_items.append(locs) + continue + finally: + if result is not no_result: + # see if we can cast the block back to the original dtype + result = maybe_downcast_numeric(result, block.dtype) + newb = block.make_block(result) - arg = zip(columns, arg) + new_items.append(locs) + new_blocks.append(newb) - results = OrderedDict() - for name, func in arg: - obj = self - if name in results: - raise SpecificationError( - "Function names must be unique, found multiple named " - "{}".format(name) - ) + if len(new_blocks) == 0: + raise DataError("No numeric types to aggregate") - # reset the cache so that we - # only include the named selection - if name in self._selected_obj: - obj = copy.copy(obj) - obj._reset_cache() - obj._selection = name - results[name] = obj.aggregate(func) + # reset the locs in the blocks to correspond to our + # current ordering + indexer = np.concatenate(new_items) + new_items = data.items.take(np.sort(indexer)) - if any(isinstance(x, DataFrame) for x in results.values()): - # let higher level handle - if _level: - return results + if len(deleted_items): - return DataFrame(results, columns=columns) + # we need to adjust the indexer to account for the + # items we have removed + # really should be done in internals :< - def _wrap_output(self, output, index, names=None): - """ common agg/transform wrapping logic """ - output = output[self._selection_name] + deleted = np.concatenate(deleted_items) + ai = np.arange(len(data)) + mask = np.zeros(len(data)) + mask[deleted] = 1 + indexer = (ai - mask.cumsum())[indexer] - if names is not None: - return DataFrame(output, index=index, columns=names) - else: - name = self._selection_name - if name is None: - name = self._selected_obj.name - return Series(output, index=index, name=name) + offset = 0 + for b in new_blocks: + loc = len(b.mgr_locs) + b.mgr_locs = indexer[offset : (offset + loc)] + offset += loc - def _wrap_aggregated_output(self, output, names=None): - result = self._wrap_output( - output=output, index=self.grouper.result_index, names=names - ) - return self._reindex_output(result)._convert(datetime=True) + return new_items, new_blocks - def _wrap_transformed_output(self, output, names=None): - return self._wrap_output(output=output, index=self.obj.index, names=names) + def aggregate(self, func, *args, **kwargs): + _level = kwargs.pop("_level", None) - def _wrap_applied_output(self, keys, values, not_indexed_same=False): - if len(keys) == 0: - # GH #6265 - return Series([], name=self._selection_name, index=keys) + relabeling = func is None and _is_multi_agg_with_relabel(**kwargs) + if relabeling: + func, columns, order = _normalize_keyword_aggregation(kwargs) - def _get_index(): - if self.grouper.nkeys > 1: - index = MultiIndex.from_tuples(keys, names=self.grouper.names) - else: - index = Index(keys, name=self.grouper.names[0]) - return index + kwargs = {} + elif func is None: + # nicer error message + raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).") - if isinstance(values[0], dict): - # GH #823 #24880 - index = _get_index() - result = self._reindex_output(DataFrame(values, index=index)) - # if self.observed is False, - # keep all-NaN rows created while re-indexing - result = result.stack(dropna=self.observed) - result.name = self._selection_name + func = _maybe_mangle_lambdas(func) + + result, how = self._aggregate(func, _level=_level, *args, **kwargs) + if how is None: return result - if isinstance(values[0], Series): - return self._concat_objects(keys, values, not_indexed_same=not_indexed_same) - elif isinstance(values[0], DataFrame): - # possible that Series -> DataFrame by applied function - return self._concat_objects(keys, values, not_indexed_same=not_indexed_same) - else: - # GH #6265 #24880 - result = Series(data=values, index=_get_index(), name=self._selection_name) - return self._reindex_output(result) + if result is None: - def _aggregate_named(self, func, *args, **kwargs): - result = OrderedDict() + # grouper specific aggregations + if self.grouper.nkeys > 1: + return self._python_agg_general(func, *args, **kwargs) + elif args or kwargs: + result = self._aggregate_generic(func, *args, **kwargs) + else: - for name, group in self: - group.name = name - output = func(group, *args, **kwargs) - if isinstance(output, (Series, Index, np.ndarray)): - raise ValueError("Must produce aggregated value") - result[name] = self._try_cast(output, group) + # try to treat as if we are passing a list + try: + result = self._aggregate_multiple_funcs( + [func], _level=_level, _axis=self.axis + ) + except Exception: + result = self._aggregate_generic(func) + else: + result.columns = Index( + result.columns.levels[0], name=self._selected_obj.columns.name + ) - return result + if not self.as_index: + self._insert_inaxis_grouper_inplace(result) + result.index = np.arange(len(result)) - @Substitution(klass="Series", selected="A.") - @Appender(_transform_template) - def transform(self, func, *args, **kwargs): - func = self._get_cython_func(func) or func + if relabeling: - if isinstance(func, str): - if not (func in base.transform_kernel_whitelist): - msg = "'{func}' is not a valid function name for transform(name)" - raise ValueError(msg.format(func=func)) - if func in base.cythonized_kernels: - # cythonized transform or canned "agg+broadcast" - return getattr(self, func)(*args, **kwargs) - else: - # If func is a reduction, we need to broadcast the - # result to the whole group. Compute func result - # and deal with possible broadcasting below. - return self._transform_fast( - lambda: getattr(self, func)(*args, **kwargs), func - ) + # used reordered index of columns + result = result.iloc[:, order] + result.columns = columns - # reg transform - klass = self._selected_obj.__class__ - results = [] - wrapper = lambda x: func(x, *args, **kwargs) - for name, group in self: - object.__setattr__(group, "name", name) - res = wrapper(group) + return result._convert(datetime=True) - if isinstance(res, (ABCDataFrame, ABCSeries)): - res = res._values + agg = aggregate - indexer = self._get_index(name) - s = klass(res, indexer) - results.append(s) + def _aggregate_generic(self, func, *args, **kwargs): + if self.grouper.nkeys != 1: + raise AssertionError("Number of keys must be 1") - # check for empty "results" to avoid concat ValueError - if results: - from pandas.core.reshape.concat import concat + axis = self.axis + obj = self._obj_with_exclusions - result = concat(results).sort_index() + result = OrderedDict() + if axis != obj._info_axis_number: + try: + for name, data in self: + result[name] = self._try_cast(func(data, *args, **kwargs), data) + except Exception: + return self._aggregate_item_by_item(func, *args, **kwargs) else: - result = Series() - - # we will only try to coerce the result type if - # we have a numeric dtype, as these are *always* udfs - # the cython take a different path (and casting) - dtype = self._selected_obj.dtype - if is_numeric_dtype(dtype): - result = maybe_downcast_to_dtype(result, dtype) + for name in self.indices: + try: + data = self.get_group(name, obj=obj) + result[name] = self._try_cast(func(data, *args, **kwargs), data) + except Exception: + wrapper = lambda x: func(x, *args, **kwargs) + result[name] = data.apply(wrapper, axis=axis) - result.name = self._selected_obj.name - result.index = self._selected_obj.index - return result + return self._wrap_generic_output(result, obj) - def _transform_fast(self, func, func_nm): - """ - fast version of transform, only applicable to - builtin/cythonizable functions - """ - if isinstance(func, str): - func = getattr(self, func) + def _wrap_aggregated_output(self, output, names=None): + raise AbstractMethodError(self) - ids, _, ngroup = self.grouper.group_info - cast = self._transform_should_cast(func_nm) - out = algorithms.take_1d(func()._values, ids) - if cast: - out = self._try_cast(out, self.obj) - return Series(out, index=self.obj.index, name=self.obj.name) + def _aggregate_item_by_item(self, func, *args, **kwargs): + # only for axis==0 - def filter(self, func, dropna=True, *args, **kwargs): # noqa - """ - Return a copy of a Series excluding elements from groups that - do not satisfy the boolean criterion specified by func. + obj = self._obj_with_exclusions + result = OrderedDict() + cannot_agg = [] + errors = None + for item in obj: + data = obj[item] + colg = SeriesGroupBy(data, selection=item, grouper=self.grouper) - Parameters - ---------- - func : function - To apply to each group. Should return True or False. - dropna : Drop groups that do not pass the filter. True by default; - if False, groups that evaluate False are filled with NaNs. + try: + cast = self._transform_should_cast(func) - Examples - -------- - >>> df = pd.DataFrame({'A' : ['foo', 'bar', 'foo', 'bar', - ... 'foo', 'bar'], - ... 'B' : [1, 2, 3, 4, 5, 6], - ... 'C' : [2.0, 5., 8., 1., 2., 9.]}) - >>> grouped = df.groupby('A') - >>> df.groupby('A').B.filter(lambda x: x.mean() > 3.) - 1 2 - 3 4 - 5 6 - Name: B, dtype: int64 + result[item] = colg.aggregate(func, *args, **kwargs) + if cast: + result[item] = self._try_cast(result[item], data) - Returns - ------- - filtered : Series - """ - if isinstance(func, str): - wrapper = lambda x: getattr(x, func)(*args, **kwargs) - else: - wrapper = lambda x: func(x, *args, **kwargs) + except ValueError as err: + if "Must produce aggregated value" in str(err): + # raised in _aggregate_named, handle at higher level + # see test_apply_with_mutated_index + raise + cannot_agg.append(item) + continue + except TypeError as e: + cannot_agg.append(item) + errors = e + continue - # Interpret np.nan as False. - def true_and_notna(x, *args, **kwargs): - b = wrapper(x, *args, **kwargs) - return b and notna(b) + result_columns = obj.columns + if cannot_agg: + result_columns = result_columns.drop(cannot_agg) - try: - indices = [ - self._get_index(name) for name, group in self if true_and_notna(group) - ] - except ValueError: - raise TypeError("the filter must return a boolean result") - except TypeError: - raise TypeError("the filter must return a boolean result") + # GH6337 + if not len(result_columns) and errors is not None: + raise errors - filtered = self._apply_filter(indices, dropna) - return filtered + return DataFrame(result, columns=result_columns) - def nunique(self, dropna=True): - """ - Return number of unique elements in the group. + def _decide_output_index(self, output, labels): + if len(output) == len(labels): + output_keys = labels + else: + output_keys = sorted(output) + try: + output_keys.sort() + except TypeError: + pass - Returns - ------- - Series - Number of unique values within each group. - """ - ids, _, _ = self.grouper.group_info + if isinstance(labels, MultiIndex): + output_keys = MultiIndex.from_tuples(output_keys, names=labels.names) - val = self.obj._internal_get_values() + return output_keys - # GH 27951 - # temporary fix while we wait for NumPy bug 12629 to be fixed - val[isna(val)] = np.datetime64("NaT") + def _wrap_applied_output(self, keys, values, not_indexed_same=False): + if len(keys) == 0: + return DataFrame(index=keys) - try: - sorter = np.lexsort((val, ids)) - except TypeError: # catches object dtypes - msg = "val.dtype must be object, got {}".format(val.dtype) - assert val.dtype == object, msg - val, _ = algorithms.factorize(val, sort=False) - sorter = np.lexsort((val, ids)) - _isna = lambda a: a == -1 - else: - _isna = isna + key_names = self.grouper.names - ids, val = ids[sorter], val[sorter] + # GH12824. + def first_not_none(values): + try: + return next(com.not_none(*values)) + except StopIteration: + return None - # group boundaries are where group ids change - # unique observations are where sorted values change - idx = np.r_[0, 1 + np.nonzero(ids[1:] != ids[:-1])[0]] - inc = np.r_[1, val[1:] != val[:-1]] + v = first_not_none(values) - # 1st item of each group is a new unique observation - mask = _isna(val) - if dropna: - inc[idx] = 1 - inc[mask] = 0 - else: - inc[mask & np.r_[False, mask[:-1]]] = 0 - inc[idx] = 1 + if v is None: + # GH9684. If all values are None, then this will throw an error. + # We'd prefer it return an empty dataframe. + return DataFrame() + elif isinstance(v, DataFrame): + return self._concat_objects(keys, values, not_indexed_same=not_indexed_same) + elif self.grouper.groupings is not None: + if len(self.grouper.groupings) > 1: + key_index = self.grouper.result_index - out = np.add.reduceat(inc, idx).astype("int64", copy=False) - if len(ids): - # NaN/NaT group exists if the head of ids is -1, - # so remove it from res and exclude its index from idx - if ids[0] == -1: - res = out[1:] - idx = idx[np.flatnonzero(idx)] else: - res = out - else: - res = out[1:] - ri = self.grouper.result_index + ping = self.grouper.groupings[0] + if len(keys) == ping.ngroups: + key_index = ping.group_index + key_index.name = key_names[0] - # we might have duplications among the bins - if len(res) != len(ri): - res, out = np.zeros(len(ri), dtype=out.dtype), res - res[ids[idx]] = out + key_lookup = Index(keys) + indexer = key_lookup.get_indexer(key_index) + + # reorder the values + values = [values[i] for i in indexer] + else: + + key_index = Index(keys, name=key_names[0]) - return Series(res, index=ri, name=self._selection_name) + # don't use the key indexer + if not self.as_index: + key_index = None - @Appender(Series.describe.__doc__) - def describe(self, **kwargs): - result = self.apply(lambda x: x.describe(**kwargs)) - if self.axis == 1: - return result.T - return result.unstack() + # make Nones an empty object + v = first_not_none(values) + if v is None: + return DataFrame() + elif isinstance(v, NDFrame): + values = [ + x if x is not None else v._constructor(**v._construct_axes_dict()) + for x in values + ] - def value_counts( - self, normalize=False, sort=True, ascending=False, bins=None, dropna=True - ): + v = values[0] - from pandas.core.reshape.tile import cut - from pandas.core.reshape.merge import _get_join_indexers + if isinstance(v, (np.ndarray, Index, Series)): + if isinstance(v, Series): + applied_index = self._selected_obj._get_axis(self.axis) + all_indexed_same = _all_indexes_same([x.index for x in values]) + singular_series = len(values) == 1 and applied_index.nlevels == 1 - if bins is not None and not np.iterable(bins): - # scalar bins cannot be done at top level - # in a backward compatible way - return self.apply( - Series.value_counts, - normalize=normalize, - sort=sort, - ascending=ascending, - bins=bins, - ) + # GH3596 + # provide a reduction (Frame -> Series) if groups are + # unique + if self.squeeze: + # assign the name to this series + if singular_series: + values[0].name = keys[0] - ids, _, _ = self.grouper.group_info - val = self.obj._internal_get_values() + # GH2893 + # we have series in the values array, we want to + # produce a series: + # if any of the sub-series are not indexed the same + # OR we don't have a multi-index and we have only a + # single values + return self._concat_objects( + keys, values, not_indexed_same=not_indexed_same + ) - # groupby removes null keys from groupings - mask = ids != -1 - ids, val = ids[mask], val[mask] + # still a series + # path added as of GH 5545 + elif all_indexed_same: + from pandas.core.reshape.concat import concat - if bins is None: - lab, lev = algorithms.factorize(val, sort=True) - llab = lambda lab, inc: lab[inc] - else: + return concat(values) - # lab is a Categorical with categories an IntervalIndex - lab = cut(Series(val), bins, include_lowest=True) - lev = lab.cat.categories - lab = lev.take(lab.cat.codes) - llab = lambda lab, inc: lab[inc]._multiindex.codes[-1] + if not all_indexed_same: + # GH 8467 + return self._concat_objects(keys, values, not_indexed_same=True) - if is_interval_dtype(lab): - # TODO: should we do this inside II? - sorter = np.lexsort((lab.left, lab.right, ids)) - else: - sorter = np.lexsort((lab, ids)) + try: + if self.axis == 0: + # GH6124 if the list of Series have a consistent name, + # then propagate that name to the result. + index = v.index.copy() + if index.name is None: + # Only propagate the series name to the result + # if all series have a consistent name. If the + # series do not have a consistent name, do + # nothing. + names = {v.name for v in values} + if len(names) == 1: + index.name = list(names)[0] - ids, lab = ids[sorter], lab[sorter] + # normally use vstack as its faster than concat + # and if we have mi-columns + if ( + isinstance(v.index, MultiIndex) + or key_index is None + or isinstance(key_index, MultiIndex) + ): + stacked_values = np.vstack([np.asarray(v) for v in values]) + result = DataFrame( + stacked_values, index=key_index, columns=index + ) + else: + # GH5788 instead of stacking; concat gets the + # dtypes correct + from pandas.core.reshape.concat import concat - # group boundaries are where group ids change - idx = np.r_[0, 1 + np.nonzero(ids[1:] != ids[:-1])[0]] + result = concat( + values, + keys=key_index, + names=key_index.names, + axis=self.axis, + ).unstack() + result.columns = index + else: + stacked_values = np.vstack([np.asarray(v) for v in values]) + result = DataFrame( + stacked_values.T, index=v.index, columns=key_index + ) - # new values are where sorted labels change - lchanges = llab(lab, slice(1, None)) != llab(lab, slice(None, -1)) - inc = np.r_[True, lchanges] - inc[idx] = True # group boundaries are also new values - out = np.diff(np.nonzero(np.r_[inc, True])[0]) # value counts + except (ValueError, AttributeError): + # GH1738: values is list of arrays of unequal lengths fall + # through to the outer else caluse + return Series(values, index=key_index, name=self._selection_name) - # num. of times each group should be repeated - rep = partial(np.repeat, repeats=np.add.reduceat(inc, idx)) + # if we have date/time like in the original, then coerce dates + # as we are stacking can easily have object dtypes here + so = self._selected_obj + if so.ndim == 2 and so.dtypes.apply(is_datetimelike).any(): + result = _recast_datetimelike_result(result) + else: + result = result._convert(datetime=True) - # multi-index components - labels = list(map(rep, self.grouper.recons_labels)) + [llab(lab, inc)] - levels = [ping.group_index for ping in self.grouper.groupings] + [lev] - names = self.grouper.names + [self._selection_name] + return self._reindex_output(result) - if dropna: - mask = labels[-1] != -1 - if mask.all(): - dropna = False + # values are not series or array-like but scalars else: - out, labels = out[mask], [label[mask] for label in labels] + # only coerce dates if we find at least 1 datetime + coerce = any(isinstance(x, Timestamp) for x in values) + # self._selection_name not passed through to Series as the + # result should not take the name of original selection + # of columns + return Series(values, index=key_index)._convert( + datetime=True, coerce=coerce + ) - if normalize: - out = out.astype("float") - d = np.diff(np.r_[idx, len(ids)]) - if dropna: - m = ids[lab == -1] - np.add.at(d, m, -1) - acc = rep(d)[mask] - else: - acc = rep(d) - out /= acc + else: + # Handle cases like BinGrouper + return self._concat_objects(keys, values, not_indexed_same=not_indexed_same) - if sort and bins is None: - cat = ids[inc][mask] if dropna else ids[inc] - sorter = np.lexsort((out if ascending else -out, cat)) - out, labels[-1] = out[sorter], labels[-1][sorter] + def _transform_general(self, func, *args, **kwargs): + from pandas.core.reshape.concat import concat - if bins is None: - mi = MultiIndex( - levels=levels, codes=labels, names=names, verify_integrity=False - ) + applied = [] + obj = self._obj_with_exclusions + gen = self.grouper.get_iterator(obj, axis=self.axis) + fast_path, slow_path = self._define_paths(func, *args, **kwargs) - if is_integer_dtype(out): - out = ensure_int64(out) - return Series(out, index=mi, name=self._selection_name) + path = None + for name, group in gen: + object.__setattr__(group, "name", name) - # for compat. with libgroupby.value_counts need to ensure every - # bin is present at every index level, null filled with zeros - diff = np.zeros(len(out), dtype="bool") - for lab in labels[:-1]: - diff |= np.r_[True, lab[1:] != lab[:-1]] + if path is None: + # Try slow path and fast path. + try: + path, res = self._choose_path(fast_path, slow_path, group) + except TypeError: + return self._transform_item_by_item(obj, fast_path) + except ValueError: + msg = "transform must return a scalar value for each group" + raise ValueError(msg) + else: + res = path(group) - ncat, nbin = diff.sum(), len(levels[-1]) + if isinstance(res, Series): - left = [np.repeat(np.arange(ncat), nbin), np.tile(np.arange(nbin), ncat)] + # we need to broadcast across the + # other dimension; this will preserve dtypes + # GH14457 + if not np.prod(group.shape): + continue + elif res.index.is_(obj.index): + r = concat([res] * len(group.columns), axis=1) + r.columns = group.columns + r.index = group.index + else: + r = DataFrame( + np.concatenate([res.values] * len(group.index)).reshape( + group.shape + ), + columns=group.columns, + index=group.index, + ) - right = [diff.cumsum() - 1, labels[-1]] + applied.append(r) + else: + applied.append(res) - _, idx = _get_join_indexers(left, right, sort=False, how="left") - out = np.where(idx != -1, out[idx], 0) + concat_index = obj.columns if self.axis == 0 else obj.index + other_axis = 1 if self.axis == 0 else 0 # switches between 0 & 1 + concatenated = concat(applied, axis=self.axis, verify_integrity=False) + concatenated = concatenated.reindex(concat_index, axis=other_axis, copy=False) + return self._set_result_index_ordered(concatenated) + + @Substitution(klass="DataFrame", selected="") + @Appender(_transform_template) + def transform(self, func, *args, **kwargs): + + # optimized transforms + func = self._get_cython_func(func) or func + + if isinstance(func, str): + if not (func in base.transform_kernel_whitelist): + msg = "'{func}' is not a valid function name for transform(name)" + raise ValueError(msg.format(func=func)) + if func in base.cythonized_kernels: + # cythonized transformation or canned "reduction+broadcast" + return getattr(self, func)(*args, **kwargs) + else: + # If func is a reduction, we need to broadcast the + # result to the whole group. Compute func result + # and deal with possible broadcasting below. + result = getattr(self, func)(*args, **kwargs) + else: + return self._transform_general(func, *args, **kwargs) - if sort: - sorter = np.lexsort((out if ascending else -out, left[0])) - out, left[-1] = out[sorter], left[-1][sorter] + # a reduction transform + if not isinstance(result, DataFrame): + return self._transform_general(func, *args, **kwargs) - # build the multi-index w/ full levels - codes = list(map(lambda lab: np.repeat(lab[diff], nbin), labels[:-1])) - codes.append(left[-1]) + obj = self._obj_with_exclusions - mi = MultiIndex(levels=levels, codes=codes, names=names, verify_integrity=False) + # nuisance columns + if not result.columns.equals(obj.columns): + return self._transform_general(func, *args, **kwargs) - if is_integer_dtype(out): - out = ensure_int64(out) - return Series(out, index=mi, name=self._selection_name) + return self._transform_fast(result, obj, func) - def count(self): + def _transform_fast(self, result, obj, func_nm): """ - Compute count of group, excluding missing values. - - Returns - ------- - Series - Count of values within each group. + Fast transform path for aggregations """ - ids, _, ngroups = self.grouper.group_info - val = self.obj._internal_get_values() - - mask = (ids != -1) & ~isna(val) - ids = ensure_platform_int(ids) - minlength = ngroups or 0 - out = np.bincount(ids[mask], minlength=minlength) + # if there were groups with no observations (Categorical only?) + # try casting data to original dtype + cast = self._transform_should_cast(func_nm) - return Series( - out, - index=self.grouper.result_index, - name=self._selection_name, - dtype="int64", - ) + # for each col, reshape to to size of original frame + # by take operation + ids, _, ngroup = self.grouper.group_info + output = [] + for i, _ in enumerate(result.columns): + res = algorithms.take_1d(result.iloc[:, i].values, ids) + if cast: + res = self._try_cast(res, obj.iloc[:, i]) + output.append(res) - def _apply_to_column_groupbys(self, func): - """ return a pass thru """ - return func(self) + return DataFrame._from_arrays(output, columns=result.columns, index=obj.index) - def pct_change(self, periods=1, fill_method="pad", limit=None, freq=None): - """Calculate pct_change of each value to previous entry in group""" - # TODO: Remove this conditional when #23918 is fixed - if freq: - return self.apply( - lambda x: x.pct_change( - periods=periods, fill_method=fill_method, limit=limit, freq=freq - ) + def _define_paths(self, func, *args, **kwargs): + if isinstance(func, str): + fast_path = lambda group: getattr(group, func)(*args, **kwargs) + slow_path = lambda group: group.apply( + lambda x: getattr(x, func)(*args, **kwargs), axis=self.axis ) - filled = getattr(self, fill_method)(limit=limit) - fill_grp = filled.groupby(self.grouper.labels) - shifted = fill_grp.shift(periods=periods, freq=freq) - - return (filled / shifted) - 1 + else: + fast_path = lambda group: func(group, *args, **kwargs) + slow_path = lambda group: group.apply( + lambda x: func(x, *args, **kwargs), axis=self.axis + ) + return fast_path, slow_path + def _choose_path(self, fast_path, slow_path, group): + path = slow_path + res = slow_path(group) -@pin_whitelisted_properties(DataFrame, base.dataframe_apply_whitelist) -class DataFrameGroupBy(NDFrameGroupBy): + # if we make it here, test if we can use the fast path + try: + res_fast = fast_path(group) + except Exception: + # Hard to know ex-ante what exceptions `fast_path` might raise + return path, res - _apply_whitelist = base.dataframe_apply_whitelist + # verify fast path does not change columns (and names), otherwise + # its results cannot be joined with those of the slow path + if not isinstance(res_fast, DataFrame): + return path, res - _block_agg_axis = 1 + if not res_fast.columns.equals(group.columns): + return path, res - _agg_see_also_doc = dedent( - """ - See Also - -------- - pandas.DataFrame.groupby.apply - pandas.DataFrame.groupby.transform - pandas.DataFrame.aggregate - """ - ) + if res_fast.equals(res): + path = fast_path - _agg_examples_doc = dedent( - """ - Examples - -------- + return path, res - >>> df = pd.DataFrame({'A': [1, 1, 2, 2], - ... 'B': [1, 2, 3, 4], - ... 'C': np.random.randn(4)}) + def _transform_item_by_item(self, obj, wrapper): + # iterate through columns + output = {} + inds = [] + for i, col in enumerate(obj): + try: + output[col] = self[col].transform(wrapper) + inds.append(i) + except Exception: + pass - >>> df - A B C - 0 1 1 0.362838 - 1 1 2 0.227877 - 2 2 3 1.267767 - 3 2 4 -0.562860 + if len(output) == 0: + raise TypeError("Transform function invalid for data types") - The aggregation is for each column. + columns = obj.columns + if len(output) < len(obj.columns): + columns = columns.take(inds) - >>> df.groupby('A').agg('min') - B C - A - 1 1 0.227877 - 2 3 -0.562860 + return DataFrame(output, index=obj.index, columns=columns) - Multiple aggregations + def filter(self, func, dropna=True, *args, **kwargs): + """ + Return a copy of a DataFrame excluding elements from groups that + do not satisfy the boolean criterion specified by func. - >>> df.groupby('A').agg(['min', 'max']) - B C - min max min max - A - 1 1 2 0.227877 0.362838 - 2 3 4 -0.562860 1.267767 + Parameters + ---------- + f : function + Function to apply to each subframe. Should return True or False. + dropna : Drop groups that do not pass the filter. True by default; + If False, groups that evaluate False are filled with NaNs. - Select a column for aggregation + Returns + ------- + filtered : DataFrame - >>> df.groupby('A').B.agg(['min', 'max']) - min max - A - 1 1 2 - 2 3 4 + Notes + ----- + Each subframe is endowed the attribute 'name' in case you need to know + which group you are working on. - Different aggregations per column + Examples + -------- + >>> df = pd.DataFrame({'A' : ['foo', 'bar', 'foo', 'bar', + ... 'foo', 'bar'], + ... 'B' : [1, 2, 3, 4, 5, 6], + ... 'C' : [2.0, 5., 8., 1., 2., 9.]}) + >>> grouped = df.groupby('A') + >>> grouped.filter(lambda x: x['B'].mean() > 3.) + A B C + 1 bar 2 5.0 + 3 bar 4 1.0 + 5 bar 6 9.0 + """ - >>> df.groupby('A').agg({'B': ['min', 'max'], 'C': 'sum'}) - B C - min max sum - A - 1 1 2 0.590716 - 2 3 4 0.704907 + indices = [] - To control the output names with different aggregations per column, - pandas supports "named aggregation" + obj = self._selected_obj + gen = self.grouper.get_iterator(obj, axis=self.axis) - >>> df.groupby("A").agg( - ... b_min=pd.NamedAgg(column="B", aggfunc="min"), - ... c_sum=pd.NamedAgg(column="C", aggfunc="sum")) - b_min c_sum - A - 1 1 -1.956929 - 2 3 -0.322183 + for name, group in gen: + object.__setattr__(group, "name", name) - - The keywords are the *output* column names - - The values are tuples whose first element is the column to select - and the second element is the aggregation to apply to that column. - Pandas provides the ``pandas.NamedAgg`` namedtuple with the fields - ``['column', 'aggfunc']`` to make it clearer what the arguments are. - As usual, the aggregation can be a callable or a string alias. + res = func(group, *args, **kwargs) - See :ref:`groupby.aggregate.named` for more. - """ - ) + try: + res = res.squeeze() + except AttributeError: # allow e.g., scalars and frames to pass + pass - @Substitution( - see_also=_agg_see_also_doc, - examples=_agg_examples_doc, - versionadded="", - klass="DataFrame", - axis="", - ) - @Appender(_shared_docs["aggregate"]) - def aggregate(self, func=None, *args, **kwargs): - return super().aggregate(func, *args, **kwargs) + # interpret the result of the filter + if is_bool(res) or (is_scalar(res) and isna(res)): + if res and notna(res): + indices.append(self._get_index(name)) + else: + # non scalars aren't allowed + raise TypeError( + "filter function returned a %s, " + "but expected a scalar bool" % type(res).__name__ + ) - agg = aggregate + return self._apply_filter(indices, dropna) def _gotitem(self, key, ndim, subset=None): """ From ce580eae2716eb16e74e1ddb6ab6fc7ca9ce7ec4 Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Mon, 7 Oct 2019 22:30:36 -0700 Subject: [PATCH 2/4] Cleaned up aggregate hierarchy --- pandas/core/groupby/generic.py | 108 ++++++++++++++++----------------- 1 file changed, 52 insertions(+), 56 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 3eb66e5f4bf6c..2fad2a3f067eb 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -747,7 +747,7 @@ def pct_change(self, periods=1, fill_method="pad", limit=None, freq=None): @pin_whitelisted_properties(DataFrame, base.dataframe_apply_whitelist) -class DataFrameGroupBy(NDFrameGroupBy): +class DataFrameGroupBy(GroupBy): _apply_whitelist = base.dataframe_apply_whitelist @@ -844,7 +844,56 @@ class DataFrameGroupBy(NDFrameGroupBy): ) @Appender(_shared_docs["aggregate"]) def aggregate(self, func=None, *args, **kwargs): - return super().aggregate(func, *args, **kwargs) + _level = kwargs.pop("_level", None) + + relabeling = func is None and _is_multi_agg_with_relabel(**kwargs) + if relabeling: + func, columns, order = _normalize_keyword_aggregation(kwargs) + + kwargs = {} + elif func is None: + # nicer error message + raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).") + + func = _maybe_mangle_lambdas(func) + + result, how = self._aggregate(func, _level=_level, *args, **kwargs) + if how is None: + return result + + if result is None: + + # grouper specific aggregations + if self.grouper.nkeys > 1: + return self._python_agg_general(func, *args, **kwargs) + elif args or kwargs: + result = self._aggregate_generic(func, *args, **kwargs) + else: + + # try to treat as if we are passing a list + try: + result = self._aggregate_multiple_funcs( + [func], _level=_level, _axis=self.axis + ) + except Exception: + result = self._aggregate_generic(func) + else: + result.columns = Index( + result.columns.levels[0], name=self._selected_obj.columns.name + ) + + if not self.as_index: + self._insert_inaxis_grouper_inplace(result) + result.index = np.arange(len(result)) + + if relabeling: + + # used reordered index of columns + result = result.iloc[:, order] + result.columns = columns + + return result._convert(datetime=True) + agg = aggregate @@ -952,59 +1001,6 @@ def _cython_agg_blocks(self, how, alt=None, numeric_only=True, min_count=-1): return new_items, new_blocks - def aggregate(self, func, *args, **kwargs): - _level = kwargs.pop("_level", None) - - relabeling = func is None and _is_multi_agg_with_relabel(**kwargs) - if relabeling: - func, columns, order = _normalize_keyword_aggregation(kwargs) - - kwargs = {} - elif func is None: - # nicer error message - raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).") - - func = _maybe_mangle_lambdas(func) - - result, how = self._aggregate(func, _level=_level, *args, **kwargs) - if how is None: - return result - - if result is None: - - # grouper specific aggregations - if self.grouper.nkeys > 1: - return self._python_agg_general(func, *args, **kwargs) - elif args or kwargs: - result = self._aggregate_generic(func, *args, **kwargs) - else: - - # try to treat as if we are passing a list - try: - result = self._aggregate_multiple_funcs( - [func], _level=_level, _axis=self.axis - ) - except Exception: - result = self._aggregate_generic(func) - else: - result.columns = Index( - result.columns.levels[0], name=self._selected_obj.columns.name - ) - - if not self.as_index: - self._insert_inaxis_grouper_inplace(result) - result.index = np.arange(len(result)) - - if relabeling: - - # used reordered index of columns - result = result.iloc[:, order] - result.columns = columns - - return result._convert(datetime=True) - - agg = aggregate - def _aggregate_generic(self, func, *args, **kwargs): if self.grouper.nkeys != 1: raise AssertionError("Number of keys must be 1") @@ -1852,7 +1848,7 @@ def _maybe_mangle_lambdas(agg_spec: Any) -> Any: Parameters ---------- agg_spec : Any - An argument to NDFrameGroupBy.agg. + An argument to GroupBy.agg. Non-dict-like `agg_spec` are pass through as is. For dict-like `agg_spec` a new spec is returned with name-mangled lambdas. From a28b8147f96aeea94f297d31c311ebd973c2e7b3 Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Mon, 7 Oct 2019 22:36:40 -0700 Subject: [PATCH 3/4] black --- pandas/core/groupby/generic.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 2fad2a3f067eb..d660a4721a1b2 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -894,7 +894,6 @@ def aggregate(self, func=None, *args, **kwargs): return result._convert(datetime=True) - agg = aggregate def _iterate_slices(self): From 29ace87d34f8215c1c13ce2fa88a15edf1f8264f Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Mon, 7 Oct 2019 23:05:28 -0700 Subject: [PATCH 4/4] lint fixup --- pandas/core/groupby/generic.py | 4 ---- pandas/core/groupby/groupby.py | 3 +++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index d660a4721a1b2..0bd6f746e4f3a 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -18,7 +18,6 @@ from pandas._libs import Timestamp, lib from pandas.compat import PY36 -from pandas.errors import AbstractMethodError from pandas.util._decorators import Appender, Substitution from pandas.core.dtypes.cast import ( @@ -1025,9 +1024,6 @@ def _aggregate_generic(self, func, *args, **kwargs): return self._wrap_generic_output(result, obj) - def _wrap_aggregated_output(self, output, names=None): - raise AbstractMethodError(self) - def _aggregate_item_by_item(self, func, *args, **kwargs): # only for axis==0 diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index e93ce3ce93164..cb56f7b8d535b 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -869,6 +869,9 @@ def _cython_transform(self, how, numeric_only=True, **kwargs): return self._wrap_transformed_output(output, names) + def _wrap_aggregated_output(self, output, names=None): + raise AbstractMethodError(self) + def _cython_agg_general(self, how, alt=None, numeric_only=True, min_count=-1): output = {} for name, obj in self._iterate_slices():