From 18f35f67850080f179f5ffa5dd0203381bea74c5 Mon Sep 17 00:00:00 2001 From: Brock Date: Tue, 20 Apr 2021 17:50:39 -0700 Subject: [PATCH 1/4] REF: move __init__ from BaseGroupBy to GroupBy --- pandas/core/groupby/groupby.py | 116 ++++++++++++++++----------------- pandas/core/resample.py | 8 +-- 2 files changed, 61 insertions(+), 63 deletions(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index f2fffe4c3741c..be3cc4968973c 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -543,64 +543,6 @@ class BaseGroupBy(PandasObject, SelectionMixin, Generic[FrameOrSeries]): "squeeze", } - def __init__( - self, - obj: FrameOrSeries, - keys: _KeysArgType | None = None, - axis: int = 0, - level: IndexLabel | None = None, - grouper: ops.BaseGrouper | None = None, - exclusions: set[Hashable] | None = None, - selection: IndexLabel | None = None, - as_index: bool = True, - sort: bool = True, - group_keys: bool = True, - squeeze: bool = False, - observed: bool = False, - mutated: bool = False, - dropna: bool = True, - ): - - self._selection = selection - - assert isinstance(obj, NDFrame), type(obj) - - self.level = level - - if not as_index: - if not isinstance(obj, DataFrame): - raise TypeError("as_index=False only valid with DataFrame") - if axis != 0: - raise ValueError("as_index=False only valid for axis=0") - - self.as_index = as_index - self.keys = keys - self.sort = sort - self.group_keys = group_keys - self.squeeze = squeeze - self.observed = observed - self.mutated = mutated - self.dropna = dropna - - if grouper is None: - from pandas.core.groupby.grouper import get_grouper - - grouper, exclusions, obj = get_grouper( - obj, - keys, - axis=axis, - level=level, - sort=sort, - observed=observed, - mutated=self.mutated, - dropna=self.dropna, - ) - - self.obj = obj - self.axis = obj._get_axis_number(axis) - self.grouper = grouper - self.exclusions = exclusions or set() - @final def __len__(self) -> int: return len(self.groups) @@ -1400,6 +1342,64 @@ class GroupBy(BaseGroupBy[FrameOrSeries]): more """ + def __init__( + self, + obj: FrameOrSeries, + keys: _KeysArgType | None = None, + axis: int = 0, + level: IndexLabel | None = None, + grouper: ops.BaseGrouper | None = None, + exclusions: set[Hashable] | None = None, + selection: IndexLabel | None = None, + as_index: bool = True, + sort: bool = True, + group_keys: bool = True, + squeeze: bool = False, + observed: bool = False, + mutated: bool = False, + dropna: bool = True, + ): + + self._selection = selection + + assert isinstance(obj, NDFrame), type(obj) + + self.level = level + + if not as_index: + if not isinstance(obj, DataFrame): + raise TypeError("as_index=False only valid with DataFrame") + if axis != 0: + raise ValueError("as_index=False only valid for axis=0") + + self.as_index = as_index + self.keys = keys + self.sort = sort + self.group_keys = group_keys + self.squeeze = squeeze + self.observed = observed + self.mutated = mutated + self.dropna = dropna + + if grouper is None: + from pandas.core.groupby.grouper import get_grouper + + grouper, exclusions, obj = get_grouper( + obj, + keys, + axis=axis, + level=level, + sort=sort, + observed=observed, + mutated=self.mutated, + dropna=self.dropna, + ) + + self.obj = obj + self.axis = obj._get_axis_number(axis) + self.grouper = grouper + self.exclusions = exclusions or set() + @final @property def _obj_1d_constructor(self) -> type[Series]: diff --git a/pandas/core/resample.py b/pandas/core/resample.py index 58003c10db9e0..26546daacdfce 100644 --- a/pandas/core/resample.py +++ b/pandas/core/resample.py @@ -134,9 +134,7 @@ def __init__(self, obj, groupby=None, axis=0, kind=None, **kwargs): self.as_index = True self.exclusions = set() self.binner = None - # error: Incompatible types in assignment (expression has type "None", variable - # has type "BaseGrouper") - self.grouper = None # type: ignore[assignment] + self.grouper = None if self.groupby is not None: self.groupby._set_grouper(self._convert_obj(obj), sort=True) @@ -1150,10 +1148,10 @@ def _downsample(self, how, **kwargs): # do we have a regular frequency - # error: "BaseGrouper" has no attribute "binlabels" + # error: Item "None" of "Optional[Any]" has no attribute "binlabels" if ( (ax.freq is not None or ax.inferred_freq is not None) - and len(self.grouper.binlabels) > len(ax) # type: ignore[attr-defined] + and len(self.grouper.binlabels) > len(ax) # type: ignore[union-attr] and how is None ): From 505fdae276bb72c65b1f8c7c62916d0035262ff6 Mon Sep 17 00:00:00 2001 From: Brock Date: Tue, 20 Apr 2021 19:24:03 -0700 Subject: [PATCH 2/4] REF: move would-raise attributes from BaseGroupBy to GroupBy --- pandas/core/groupby/groupby.py | 672 +++++++++++++++++---------------- pandas/core/resample.py | 5 + 2 files changed, 350 insertions(+), 327 deletions(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index be3cc4968973c..bf3a3cd2aa37b 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -504,7 +504,7 @@ def f(self): @contextmanager -def group_selection_context(groupby: BaseGroupBy) -> Iterator[BaseGroupBy]: +def group_selection_context(groupby: GroupBy) -> Iterator[GroupBy]: """ Set / reset the group_selection_context. """ @@ -543,6 +543,10 @@ class BaseGroupBy(PandasObject, SelectionMixin, Generic[FrameOrSeries]): "squeeze", } + axis: int + grouper: ops.BaseGrouper + obj: FrameOrSeries + @final def __len__(self) -> int: return len(self.groups) @@ -653,45 +657,6 @@ def _selected_obj(self): else: return self.obj[self._selection] - @final - def _reset_group_selection(self) -> None: - """ - Clear group based selection. - - Used for methods needing to return info on each group regardless of - whether a group selection was previously set. - """ - if self._group_selection is not None: - # GH12839 clear cached selection too when changing group selection - self._group_selection = None - self._reset_cache("_selected_obj") - - @final - def _set_group_selection(self) -> None: - """ - Create group based selection. - - Used when selection is not passed directly but instead via a grouper. - - NOTE: this should be paired with a call to _reset_group_selection - """ - grp = self.grouper - if not ( - self.as_index - and getattr(grp, "groupings", None) is not None - and self.obj.ndim > 1 - and self._group_selection is None - ): - return - - groupers = [g.name for g in grp.groupings if g.level is None and g.in_axis] - - if len(groupers): - # GH12839 clear selected obj cache when group selection changes - ax = self.obj._info_axis - self._group_selection = ax.difference(Index(groupers), sort=False).tolist() - self._reset_cache("_selected_obj") - @final def _set_result_index_ordered( self, result: OutputFrameOrSeries @@ -727,16 +692,6 @@ def _set_result_index_ordered( def _dir_additions(self) -> set[str]: return self.obj._dir_additions() | self._apply_allowlist - def __getattr__(self, attr: str): - if attr in self._internal_names_set: - return object.__getattribute__(self, attr) - if attr in self.obj: - return self[attr] - - raise AttributeError( - f"'{type(self).__name__}' object has no attribute '{attr}'" - ) - @Substitution( klass="GroupBy", examples=dedent( @@ -770,44 +725,6 @@ def pipe( plot = property(GroupByPlot) - @final - def _make_wrapper(self, name: str) -> Callable: - assert name in self._apply_allowlist - - with group_selection_context(self): - # need to setup the selection - # as are not passed directly but in the grouper - f = getattr(self._obj_with_exclusions, name) - if not isinstance(f, types.MethodType): - return self.apply(lambda self: getattr(self, name)) - - f = getattr(type(self._obj_with_exclusions), name) - sig = inspect.signature(f) - - def wrapper(*args, **kwargs): - # a little trickery for aggregation functions that need an axis - # argument - if "axis" in sig.parameters: - if kwargs.get("axis", None) is None: - kwargs["axis"] = self.axis - - def curried(x): - return f(x, *args, **kwargs) - - # preserve the name so we can detect it when calling plot methods, - # to avoid duplicates - curried.__name__ = name - - # special case otherwise extra plots are created when catching the - # exception below - if name in base.plotting_methods: - return self.apply(curried) - - return self._python_apply_general(curried, self._obj_with_exclusions) - - wrapper.__name__ = name - return wrapper - @final def get_group(self, name, obj=None): """ @@ -846,80 +763,6 @@ def __iter__(self) -> Iterator[tuple[Hashable, FrameOrSeries]]: """ return self.grouper.get_iterator(self.obj, axis=self.axis) - @Appender( - _apply_docs["template"].format( - input="dataframe", examples=_apply_docs["dataframe_examples"] - ) - ) - def apply(self, func, *args, **kwargs): - - func = com.is_builtin_func(func) - - # this is needed so we don't try and wrap strings. If we could - # resolve functions to their callable functions prior, this - # wouldn't be needed - if args or kwargs: - if callable(func): - - @wraps(func) - def f(g): - with np.errstate(all="ignore"): - return func(g, *args, **kwargs) - - elif hasattr(nanops, "nan" + func): - # TODO: should we wrap this in to e.g. _is_builtin_func? - f = getattr(nanops, "nan" + func) - - else: - raise ValueError( - "func must be a callable if args or kwargs are supplied" - ) - else: - f = func - - # ignore SettingWithCopy here in case the user mutates - with option_context("mode.chained_assignment", None): - try: - result = self._python_apply_general(f, self._selected_obj) - except TypeError: - # gh-20949 - # try again, with .apply acting as a filtering - # operation, by excluding the grouping column - # This would normally not be triggered - # except if the udf is trying an operation that - # fails on *some* columns, e.g. a numeric operation - # on a string grouper column - - with group_selection_context(self): - return self._python_apply_general(f, self._selected_obj) - - return result - - @final - def _python_apply_general( - self, f: F, data: FrameOrSeriesUnion - ) -> FrameOrSeriesUnion: - """ - Apply function f in python space - - Parameters - ---------- - f : callable - Function to apply - data : Series or DataFrame - Data to apply f to - - Returns - ------- - Series or DataFrame - data after applying f - """ - keys, values, mutated = self.grouper.apply(f, data, self.axis) - - return self._wrap_applied_output( - data, keys, values, not_indexed_same=mutated or self.mutated - ) - def _iterate_slices(self) -> Iterable[Series]: raise AbstractMethodError(self) @@ -997,42 +840,6 @@ def _wrap_transformed_output(self, output: Mapping[base.OutputKey, ArrayLike]): def _wrap_applied_output(self, data, keys, values, not_indexed_same: bool = False): raise AbstractMethodError(self) - @final - def _agg_general( - self, - numeric_only: bool = True, - min_count: int = -1, - *, - alias: str, - npfunc: Callable, - ): - with group_selection_context(self): - # try a cython aggregation if we can - result = None - try: - result = self._cython_agg_general( - how=alias, - alt=npfunc, - numeric_only=numeric_only, - min_count=min_count, - ) - except DataError: - pass - except NotImplementedError as err: - if "function is not implemented for this dtype" in str( - err - ) or "category dtype not supported" in str(err): - # raised in _get_cython_function, in some cases can - # be trimmed by implementing cython funcs for more dtypes - pass - else: - raise - - # apply a non-cython aggregation - if result is None: - result = self.aggregate(lambda x: npfunc(x, axis=self.axis)) - return result.__finalize__(self.obj, method="groupby") - def _cython_agg_general( self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 ): @@ -1148,143 +955,38 @@ def _aggregate_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs) return result, index @final - def _python_agg_general(self, func, *args, **kwargs): - func = com.is_builtin_func(func) - f = lambda x: func(x, *args, **kwargs) - - # iterate through "columns" ex exclusions to populate output dict - output: dict[base.OutputKey, np.ndarray] = {} - - for idx, obj in enumerate(self._iterate_slices()): - name = obj.name - if self.grouper.ngroups == 0: - # agg_series below assumes ngroups > 0 - continue - - try: - # if this function is invalid for this dtype, we will ignore it. - result, counts = self.grouper.agg_series(obj, f) - except TypeError: - continue + def _apply_filter(self, indices, dropna): + if len(indices) == 0: + indices = np.array([], dtype="int64") + else: + indices = np.sort(np.concatenate(indices)) + if dropna: + filtered = self._selected_obj.take(indices, axis=self.axis) + else: + mask = np.empty(len(self._selected_obj.index), dtype=bool) + mask.fill(False) + mask[indices.astype(int)] = True + # mask fails to broadcast when passed to where; broadcast manually. + mask = np.tile(mask, list(self._selected_obj.shape[1:]) + [1]).T + filtered = self._selected_obj.where(mask) # Fill with NaNs. + return filtered - assert result is not None - key = base.OutputKey(label=name, position=idx) - if is_numeric_dtype(obj.dtype): - result = maybe_downcast_numeric(result, obj.dtype) +# To track operations that expand dimensions, like ohlc +OutputFrameOrSeries = TypeVar("OutputFrameOrSeries", bound=NDFrame) - if self.grouper._filter_empty_groups: - mask = counts.ravel() > 0 - # since we are masking, make sure that we have a float object - values = result - if is_numeric_dtype(values.dtype): - values = ensure_float(values) +class GroupBy(BaseGroupBy[FrameOrSeries]): + """ + Class for grouping and aggregating relational data. - result = maybe_downcast_numeric(values[mask], result.dtype) + See aggregate, transform, and apply functions on this object. - output[key] = result + It's easiest to use obj.groupby(...) to use GroupBy, but you can also do: - if not output: - return self._python_apply_general(f, self._selected_obj) + :: - return self._wrap_aggregated_output(output, index=self.grouper.result_index) - - @final - def _concat_objects(self, keys, values, not_indexed_same: bool = False): - from pandas.core.reshape.concat import concat - - def reset_identity(values): - # reset the identities of the components - # of the values to prevent aliasing - for v in com.not_none(*values): - ax = v._get_axis(self.axis) - ax._reset_identity() - return values - - if not not_indexed_same: - result = concat(values, axis=self.axis) - ax = self.filter(lambda x: True).axes[self.axis] - - # this is a very unfortunate situation - # we can't use reindex to restore the original order - # when the ax has duplicates - # so we resort to this - # GH 14776, 30667 - if ax.has_duplicates and not result.axes[self.axis].equals(ax): - indexer, _ = result.index.get_indexer_non_unique(ax._values) - indexer = algorithms.unique1d(indexer) - result = result.take(indexer, axis=self.axis) - else: - result = result.reindex(ax, axis=self.axis, copy=False) - - elif self.group_keys: - - values = reset_identity(values) - if self.as_index: - - # possible MI return case - group_keys = keys - group_levels = self.grouper.levels - group_names = self.grouper.names - - result = concat( - values, - axis=self.axis, - keys=group_keys, - levels=group_levels, - names=group_names, - sort=False, - ) - else: - - # GH5610, returns a MI, with the first level being a - # range index - keys = list(range(len(values))) - result = concat(values, axis=self.axis, keys=keys) - else: - values = reset_identity(values) - result = concat(values, axis=self.axis) - - if isinstance(result, Series) and self._selection_name is not None: - - result.name = self._selection_name - - return result - - @final - def _apply_filter(self, indices, dropna): - if len(indices) == 0: - indices = np.array([], dtype="int64") - else: - indices = np.sort(np.concatenate(indices)) - if dropna: - filtered = self._selected_obj.take(indices, axis=self.axis) - else: - mask = np.empty(len(self._selected_obj.index), dtype=bool) - mask.fill(False) - mask[indices.astype(int)] = True - # mask fails to broadcast when passed to where; broadcast manually. - mask = np.tile(mask, list(self._selected_obj.shape[1:]) + [1]).T - filtered = self._selected_obj.where(mask) # Fill with NaNs. - return filtered - - -# To track operations that expand dimensions, like ohlc -OutputFrameOrSeries = TypeVar("OutputFrameOrSeries", bound=NDFrame) - - -class GroupBy(BaseGroupBy[FrameOrSeries]): - """ - Class for grouping and aggregating relational data. - - See aggregate, transform, and apply functions on this object. - - It's easiest to use obj.groupby(...) to use GroupBy, but you can also do: - - :: - - grouped = groupby(obj, ...) + grouped = groupby(obj, ...) Parameters ---------- @@ -1342,6 +1044,10 @@ class GroupBy(BaseGroupBy[FrameOrSeries]): more """ + obj: FrameOrSeries + grouper: ops.BaseGrouper + as_index: bool + def __init__( self, obj: FrameOrSeries, @@ -1400,6 +1106,318 @@ def __init__( self.grouper = grouper self.exclusions = exclusions or set() + def __getattr__(self, attr: str): + if attr in self._internal_names_set: + return object.__getattribute__(self, attr) + if attr in self.obj: + return self[attr] + + raise AttributeError( + f"'{type(self).__name__}' object has no attribute '{attr}'" + ) + + @final + def _make_wrapper(self, name: str) -> Callable: + assert name in self._apply_allowlist + + with group_selection_context(self): + # need to setup the selection + # as are not passed directly but in the grouper + f = getattr(self._obj_with_exclusions, name) + if not isinstance(f, types.MethodType): + return self.apply(lambda self: getattr(self, name)) + + f = getattr(type(self._obj_with_exclusions), name) + sig = inspect.signature(f) + + def wrapper(*args, **kwargs): + # a little trickery for aggregation functions that need an axis + # argument + if "axis" in sig.parameters: + if kwargs.get("axis", None) is None: + kwargs["axis"] = self.axis + + def curried(x): + return f(x, *args, **kwargs) + + # preserve the name so we can detect it when calling plot methods, + # to avoid duplicates + curried.__name__ = name + + # special case otherwise extra plots are created when catching the + # exception below + if name in base.plotting_methods: + return self.apply(curried) + + return self._python_apply_general(curried, self._obj_with_exclusions) + + wrapper.__name__ = name + return wrapper + + # ----------------------------------------------------------------- + # Selection + + @final + def _set_group_selection(self) -> None: + """ + Create group based selection. + + Used when selection is not passed directly but instead via a grouper. + + NOTE: this should be paired with a call to _reset_group_selection + """ + grp = self.grouper + if not ( + self.as_index + and getattr(grp, "groupings", None) is not None + and self.obj.ndim > 1 + and self._group_selection is None + ): + return + + groupers = [g.name for g in grp.groupings if g.level is None and g.in_axis] + + if len(groupers): + # GH12839 clear selected obj cache when group selection changes + ax = self.obj._info_axis + self._group_selection = ax.difference(Index(groupers), sort=False).tolist() + self._reset_cache("_selected_obj") + + @final + def _reset_group_selection(self) -> None: + """ + Clear group based selection. + + Used for methods needing to return info on each group regardless of + whether a group selection was previously set. + """ + if self._group_selection is not None: + # GH12839 clear cached selection too when changing group selection + self._group_selection = None + self._reset_cache("_selected_obj") + + # ----------------------------------------------------------------- + # Dispatch/Wrapping + + @final + def _concat_objects(self, keys, values, not_indexed_same: bool = False): + from pandas.core.reshape.concat import concat + + def reset_identity(values): + # reset the identities of the components + # of the values to prevent aliasing + for v in com.not_none(*values): + ax = v._get_axis(self.axis) + ax._reset_identity() + return values + + if not not_indexed_same: + result = concat(values, axis=self.axis) + ax = self.filter(lambda x: True).axes[self.axis] + + # this is a very unfortunate situation + # we can't use reindex to restore the original order + # when the ax has duplicates + # so we resort to this + # GH 14776, 30667 + if ax.has_duplicates and not result.axes[self.axis].equals(ax): + indexer, _ = result.index.get_indexer_non_unique(ax._values) + indexer = algorithms.unique1d(indexer) + result = result.take(indexer, axis=self.axis) + else: + result = result.reindex(ax, axis=self.axis, copy=False) + + elif self.group_keys: + + values = reset_identity(values) + if self.as_index: + + # possible MI return case + group_keys = keys + group_levels = self.grouper.levels + group_names = self.grouper.names + + result = concat( + values, + axis=self.axis, + keys=group_keys, + levels=group_levels, + names=group_names, + sort=False, + ) + else: + + # GH5610, returns a MI, with the first level being a + # range index + keys = list(range(len(values))) + result = concat(values, axis=self.axis, keys=keys) + else: + values = reset_identity(values) + result = concat(values, axis=self.axis) + + if isinstance(result, Series) and self._selection_name is not None: + + result.name = self._selection_name + + return result + + # ----------------------------------------------------------------- + + @Appender( + _apply_docs["template"].format( + input="dataframe", examples=_apply_docs["dataframe_examples"] + ) + ) + def apply(self, func, *args, **kwargs): + + func = com.is_builtin_func(func) + + # this is needed so we don't try and wrap strings. If we could + # resolve functions to their callable functions prior, this + # wouldn't be needed + if args or kwargs: + if callable(func): + + @wraps(func) + def f(g): + with np.errstate(all="ignore"): + return func(g, *args, **kwargs) + + elif hasattr(nanops, "nan" + func): + # TODO: should we wrap this in to e.g. _is_builtin_func? + f = getattr(nanops, "nan" + func) + + else: + raise ValueError( + "func must be a callable if args or kwargs are supplied" + ) + else: + f = func + + # ignore SettingWithCopy here in case the user mutates + with option_context("mode.chained_assignment", None): + try: + result = self._python_apply_general(f, self._selected_obj) + except TypeError: + # gh-20949 + # try again, with .apply acting as a filtering + # operation, by excluding the grouping column + # This would normally not be triggered + # except if the udf is trying an operation that + # fails on *some* columns, e.g. a numeric operation + # on a string grouper column + + with group_selection_context(self): + return self._python_apply_general(f, self._selected_obj) + + return result + + @final + def _python_apply_general( + self, f: F, data: FrameOrSeriesUnion + ) -> FrameOrSeriesUnion: + """ + Apply function f in python space + + Parameters + ---------- + f : callable + Function to apply + data : Series or DataFrame + Data to apply f to + + Returns + ------- + Series or DataFrame + data after applying f + """ + keys, values, mutated = self.grouper.apply(f, data, self.axis) + + return self._wrap_applied_output( + data, keys, values, not_indexed_same=mutated or self.mutated + ) + + @final + def _python_agg_general(self, func, *args, **kwargs): + func = com.is_builtin_func(func) + f = lambda x: func(x, *args, **kwargs) + + # iterate through "columns" ex exclusions to populate output dict + output: dict[base.OutputKey, np.ndarray] = {} + + for idx, obj in enumerate(self._iterate_slices()): + name = obj.name + if self.grouper.ngroups == 0: + # agg_series below assumes ngroups > 0 + continue + + try: + # if this function is invalid for this dtype, we will ignore it. + result, counts = self.grouper.agg_series(obj, f) + except TypeError: + continue + + assert result is not None + key = base.OutputKey(label=name, position=idx) + + if is_numeric_dtype(obj.dtype): + result = maybe_downcast_numeric(result, obj.dtype) + + if self.grouper._filter_empty_groups: + mask = counts.ravel() > 0 + + # since we are masking, make sure that we have a float object + values = result + if is_numeric_dtype(values.dtype): + values = ensure_float(values) + + result = maybe_downcast_numeric(values[mask], result.dtype) + + output[key] = result + + if not output: + return self._python_apply_general(f, self._selected_obj) + + return self._wrap_aggregated_output(output, index=self.grouper.result_index) + + @final + def _agg_general( + self, + numeric_only: bool = True, + min_count: int = -1, + *, + alias: str, + npfunc: Callable, + ): + with group_selection_context(self): + # try a cython aggregation if we can + result = None + try: + result = self._cython_agg_general( + how=alias, + alt=npfunc, + numeric_only=numeric_only, + min_count=min_count, + ) + except DataError: + pass + except NotImplementedError as err: + if "function is not implemented for this dtype" in str( + err + ) or "category dtype not supported" in str(err): + # raised in _get_cython_function, in some cases can + # be trimmed by implementing cython funcs for more dtypes + pass + else: + raise + + # apply a non-cython aggregation + if result is None: + result = self.aggregate(lambda x: npfunc(x, axis=self.axis)) + return result.__finalize__(self.obj, method="groupby") + + # ----------------------------------------------------------------- + @final @property def _obj_1d_constructor(self) -> type[Series]: diff --git a/pandas/core/resample.py b/pandas/core/resample.py index 26546daacdfce..95690146e94a8 100644 --- a/pandas/core/resample.py +++ b/pandas/core/resample.py @@ -110,6 +110,11 @@ class Resampler(BaseGroupBy, PandasObject): After resampling, see aggregate, apply, and transform functions. """ + # error: Incompatible types in assignment (expression has type + # "Optional[BinGrouper]", base class "BaseGroupBy" defined the type as + # "BaseGrouper") + grouper: BinGrouper | None # type: ignore[assignment] + # to the groupby descriptor _attributes = [ "freq", From b2cee4a52ec320b6a71bf82a186e3a946179b973 Mon Sep 17 00:00:00 2001 From: Brock Date: Tue, 20 Apr 2021 20:01:46 -0700 Subject: [PATCH 3/4] REF: move would-raise methods from BaseGroupBy to GroupBy --- pandas/core/groupby/groupby.py | 485 +++++++++++++++++---------------- 1 file changed, 246 insertions(+), 239 deletions(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index bf3a3cd2aa37b..ec1a32ff6d95e 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -657,37 +657,6 @@ def _selected_obj(self): else: return self.obj[self._selection] - @final - def _set_result_index_ordered( - self, result: OutputFrameOrSeries - ) -> OutputFrameOrSeries: - # set the result index on the passed values object and - # return the new object, xref 8046 - - if self.grouper.is_monotonic: - # shortcut if we have an already ordered grouper - result.set_axis(self.obj._get_axis(self.axis), axis=self.axis, inplace=True) - return result - - # row order is scrambled => sort the rows by position in original index - original_positions = Index( - np.concatenate(self._get_indices(self.grouper.result_index)) - ) - result.set_axis(original_positions, axis=self.axis, inplace=True) - result = result.sort_index(axis=self.axis) - - dropped_rows = len(result.index) < len(self.obj.index) - - if dropped_rows: - # get index by slicing original index according to original positions - # slice drops attrs => use set_axis when no rows were dropped - sorted_indexer = result.index - result.index = self._selected_obj.index[sorted_indexer] - else: - result.set_axis(self.obj._get_axis(self.axis), axis=self.axis, inplace=True) - - return result - @final def _dir_additions(self) -> set[str]: return self.obj._dir_additions() | self._apply_allowlist @@ -763,214 +732,6 @@ def __iter__(self) -> Iterator[tuple[Hashable, FrameOrSeries]]: """ return self.grouper.get_iterator(self.obj, axis=self.axis) - def _iterate_slices(self) -> Iterable[Series]: - raise AbstractMethodError(self) - - def transform(self, func, *args, **kwargs): - raise AbstractMethodError(self) - - @final - def _cumcount_array(self, ascending: bool = True): - """ - Parameters - ---------- - ascending : bool, default True - If False, number in reverse, from length of group - 1 to 0. - - Notes - ----- - this is currently implementing sort=False - (though the default is sort=True) for groupby in general - """ - ids, _, ngroups = self.grouper.group_info - sorter = get_group_index_sorter(ids, ngroups) - ids, count = ids[sorter], len(ids) - - if count == 0: - return np.empty(0, dtype=np.int64) - - run = np.r_[True, ids[:-1] != ids[1:]] - rep = np.diff(np.r_[np.nonzero(run)[0], count]) - out = (~run).cumsum() - - if ascending: - out -= np.repeat(out[run], rep) - else: - out = np.repeat(out[np.r_[run[1:], True]], rep) - out - - rev = np.empty(count, dtype=np.intp) - rev[sorter] = np.arange(count, dtype=np.intp) - return out[rev].astype(np.int64, copy=False) - - @final - def _cython_transform( - self, how: str, numeric_only: bool = True, axis: int = 0, **kwargs - ): - output: dict[base.OutputKey, ArrayLike] = {} - - for idx, obj in enumerate(self._iterate_slices()): - name = obj.name - is_numeric = is_numeric_dtype(obj.dtype) - if numeric_only and not is_numeric: - continue - - try: - result = self.grouper._cython_operation( - "transform", obj._values, how, axis, **kwargs - ) - except (NotImplementedError, TypeError): - continue - - key = base.OutputKey(label=name, position=idx) - output[key] = result - - if not output: - raise DataError("No numeric types to aggregate") - - return self._wrap_transformed_output(output) - - def _wrap_aggregated_output( - self, output: Mapping[base.OutputKey, np.ndarray], index: Index | None - ): - raise AbstractMethodError(self) - - def _wrap_transformed_output(self, output: Mapping[base.OutputKey, ArrayLike]): - raise AbstractMethodError(self) - - def _wrap_applied_output(self, data, keys, values, not_indexed_same: bool = False): - raise AbstractMethodError(self) - - def _cython_agg_general( - self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 - ): - output: dict[base.OutputKey, ArrayLike] = {} - # Ideally we would be able to enumerate self._iterate_slices and use - # the index from enumeration as the key of output, but ohlc in particular - # returns a (n x 4) array. Output requires 1D ndarrays as values, so we - # need to slice that up into 1D arrays - idx = 0 - for obj in self._iterate_slices(): - name = obj.name - is_numeric = is_numeric_dtype(obj.dtype) - if numeric_only and not is_numeric: - continue - - result = self.grouper._cython_operation( - "aggregate", obj._values, how, axis=0, min_count=min_count - ) - - if how == "ohlc": - # e.g. ohlc - agg_names = ["open", "high", "low", "close"] - assert len(agg_names) == result.shape[1] - for result_column, result_name in zip(result.T, agg_names): - key = base.OutputKey(label=result_name, position=idx) - output[key] = result_column - idx += 1 - else: - assert result.ndim == 1 - key = base.OutputKey(label=name, position=idx) - output[key] = result - idx += 1 - - if not output: - raise DataError("No numeric types to aggregate") - - # error: Argument 1 to "_wrap_aggregated_output" of "BaseGroupBy" has - # incompatible type "Dict[OutputKey, Union[ndarray, DatetimeArray]]"; - # expected "Mapping[OutputKey, ndarray]" - return self._wrap_aggregated_output( - output, index=self.grouper.result_index # type: ignore[arg-type] - ) - - @final - def _numba_prep(self, func, data): - if not callable(func): - raise NotImplementedError( - "Numba engine can only be used with a single function." - ) - labels, _, n_groups = self.grouper.group_info - sorted_index = get_group_index_sorter(labels, n_groups) - sorted_labels = algorithms.take_nd(labels, sorted_index, allow_fill=False) - - sorted_data = data.take(sorted_index, axis=self.axis).to_numpy() - - starts, ends = lib.generate_slices(sorted_labels, n_groups) - return starts, ends, sorted_index, sorted_data - - @final - def _transform_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs): - """ - Perform groupby transform routine with the numba engine. - - This routine mimics the data splitting routine of the DataSplitter class - to generate the indices of each group in the sorted data and then passes the - data and indices into a Numba jitted function. - """ - starts, ends, sorted_index, sorted_data = self._numba_prep(func, data) - group_keys = self.grouper._get_group_keys() - - numba_transform_func = numba_.generate_numba_transform_func( - tuple(args), kwargs, func, engine_kwargs - ) - result = numba_transform_func( - sorted_data, sorted_index, starts, ends, len(group_keys), len(data.columns) - ) - - cache_key = (func, "groupby_transform") - if cache_key not in NUMBA_FUNC_CACHE: - NUMBA_FUNC_CACHE[cache_key] = numba_transform_func - - # result values needs to be resorted to their original positions since we - # evaluated the data sorted by group - return result.take(np.argsort(sorted_index), axis=0) - - @final - def _aggregate_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs): - """ - Perform groupby aggregation routine with the numba engine. - - This routine mimics the data splitting routine of the DataSplitter class - to generate the indices of each group in the sorted data and then passes the - data and indices into a Numba jitted function. - """ - starts, ends, sorted_index, sorted_data = self._numba_prep(func, data) - group_keys = self.grouper._get_group_keys() - - numba_agg_func = numba_.generate_numba_agg_func( - tuple(args), kwargs, func, engine_kwargs - ) - result = numba_agg_func( - sorted_data, sorted_index, starts, ends, len(group_keys), len(data.columns) - ) - - cache_key = (func, "groupby_agg") - if cache_key not in NUMBA_FUNC_CACHE: - NUMBA_FUNC_CACHE[cache_key] = numba_agg_func - - if self.grouper.nkeys > 1: - index = MultiIndex.from_tuples(group_keys, names=self.grouper.names) - else: - index = Index(group_keys, name=self.grouper.names[0]) - return result, index - - @final - def _apply_filter(self, indices, dropna): - if len(indices) == 0: - indices = np.array([], dtype="int64") - else: - indices = np.sort(np.concatenate(indices)) - if dropna: - filtered = self._selected_obj.take(indices, axis=self.axis) - else: - mask = np.empty(len(self._selected_obj.index), dtype=bool) - mask.fill(False) - mask[indices.astype(int)] = True - # mask fails to broadcast when passed to where; broadcast manually. - mask = np.tile(mask, list(self._selected_obj.shape[1:]) + [1]).T - filtered = self._selected_obj.where(mask) # Fill with NaNs. - return filtered - # To track operations that expand dimensions, like ohlc OutputFrameOrSeries = TypeVar("OutputFrameOrSeries", bound=NDFrame) @@ -1196,6 +957,9 @@ def _reset_group_selection(self) -> None: self._group_selection = None self._reset_cache("_selected_obj") + def _iterate_slices(self) -> Iterable[Series]: + raise AbstractMethodError(self) + # ----------------------------------------------------------------- # Dispatch/Wrapping @@ -1261,7 +1025,124 @@ def reset_identity(values): return result + @final + def _set_result_index_ordered( + self, result: OutputFrameOrSeries + ) -> OutputFrameOrSeries: + # set the result index on the passed values object and + # return the new object, xref 8046 + + if self.grouper.is_monotonic: + # shortcut if we have an already ordered grouper + result.set_axis(self.obj._get_axis(self.axis), axis=self.axis, inplace=True) + return result + + # row order is scrambled => sort the rows by position in original index + original_positions = Index( + np.concatenate(self._get_indices(self.grouper.result_index)) + ) + result.set_axis(original_positions, axis=self.axis, inplace=True) + result = result.sort_index(axis=self.axis) + + dropped_rows = len(result.index) < len(self.obj.index) + + if dropped_rows: + # get index by slicing original index according to original positions + # slice drops attrs => use set_axis when no rows were dropped + sorted_indexer = result.index + result.index = self._selected_obj.index[sorted_indexer] + else: + result.set_axis(self.obj._get_axis(self.axis), axis=self.axis, inplace=True) + + return result + + def _wrap_aggregated_output( + self, output: Mapping[base.OutputKey, np.ndarray], index: Index | None + ): + raise AbstractMethodError(self) + + def _wrap_transformed_output(self, output: Mapping[base.OutputKey, ArrayLike]): + raise AbstractMethodError(self) + + def _wrap_applied_output(self, data, keys, values, not_indexed_same: bool = False): + raise AbstractMethodError(self) + # ----------------------------------------------------------------- + # numba + + @final + def _numba_prep(self, func, data): + if not callable(func): + raise NotImplementedError( + "Numba engine can only be used with a single function." + ) + labels, _, n_groups = self.grouper.group_info + sorted_index = get_group_index_sorter(labels, n_groups) + sorted_labels = algorithms.take_nd(labels, sorted_index, allow_fill=False) + + sorted_data = data.take(sorted_index, axis=self.axis).to_numpy() + + starts, ends = lib.generate_slices(sorted_labels, n_groups) + return starts, ends, sorted_index, sorted_data + + @final + def _transform_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs): + """ + Perform groupby transform routine with the numba engine. + + This routine mimics the data splitting routine of the DataSplitter class + to generate the indices of each group in the sorted data and then passes the + data and indices into a Numba jitted function. + """ + starts, ends, sorted_index, sorted_data = self._numba_prep(func, data) + group_keys = self.grouper._get_group_keys() + + numba_transform_func = numba_.generate_numba_transform_func( + tuple(args), kwargs, func, engine_kwargs + ) + result = numba_transform_func( + sorted_data, sorted_index, starts, ends, len(group_keys), len(data.columns) + ) + + cache_key = (func, "groupby_transform") + if cache_key not in NUMBA_FUNC_CACHE: + NUMBA_FUNC_CACHE[cache_key] = numba_transform_func + + # result values needs to be resorted to their original positions since we + # evaluated the data sorted by group + return result.take(np.argsort(sorted_index), axis=0) + + @final + def _aggregate_with_numba(self, data, func, *args, engine_kwargs=None, **kwargs): + """ + Perform groupby aggregation routine with the numba engine. + + This routine mimics the data splitting routine of the DataSplitter class + to generate the indices of each group in the sorted data and then passes the + data and indices into a Numba jitted function. + """ + starts, ends, sorted_index, sorted_data = self._numba_prep(func, data) + group_keys = self.grouper._get_group_keys() + + numba_agg_func = numba_.generate_numba_agg_func( + tuple(args), kwargs, func, engine_kwargs + ) + result = numba_agg_func( + sorted_data, sorted_index, starts, ends, len(group_keys), len(data.columns) + ) + + cache_key = (func, "groupby_agg") + if cache_key not in NUMBA_FUNC_CACHE: + NUMBA_FUNC_CACHE[cache_key] = numba_agg_func + + if self.grouper.nkeys > 1: + index = MultiIndex.from_tuples(group_keys, names=self.grouper.names) + else: + index = Index(group_keys, name=self.grouper.names[0]) + return result, index + + # ----------------------------------------------------------------- + # apply/agg/transform @Appender( _apply_docs["template"].format( @@ -1416,6 +1297,132 @@ def _agg_general( result = self.aggregate(lambda x: npfunc(x, axis=self.axis)) return result.__finalize__(self.obj, method="groupby") + def _cython_agg_general( + self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 + ): + output: dict[base.OutputKey, ArrayLike] = {} + # Ideally we would be able to enumerate self._iterate_slices and use + # the index from enumeration as the key of output, but ohlc in particular + # returns a (n x 4) array. Output requires 1D ndarrays as values, so we + # need to slice that up into 1D arrays + idx = 0 + for obj in self._iterate_slices(): + name = obj.name + is_numeric = is_numeric_dtype(obj.dtype) + if numeric_only and not is_numeric: + continue + + result = self.grouper._cython_operation( + "aggregate", obj._values, how, axis=0, min_count=min_count + ) + + if how == "ohlc": + # e.g. ohlc + agg_names = ["open", "high", "low", "close"] + assert len(agg_names) == result.shape[1] + for result_column, result_name in zip(result.T, agg_names): + key = base.OutputKey(label=result_name, position=idx) + output[key] = result_column + idx += 1 + else: + assert result.ndim == 1 + key = base.OutputKey(label=name, position=idx) + output[key] = result + idx += 1 + + if not output: + raise DataError("No numeric types to aggregate") + + # error: Argument 1 to "_wrap_aggregated_output" of "BaseGroupBy" has + # incompatible type "Dict[OutputKey, Union[ndarray, DatetimeArray]]"; + # expected "Mapping[OutputKey, ndarray]" + return self._wrap_aggregated_output( + output, index=self.grouper.result_index # type: ignore[arg-type] + ) + + @final + def _cython_transform( + self, how: str, numeric_only: bool = True, axis: int = 0, **kwargs + ): + output: dict[base.OutputKey, ArrayLike] = {} + + for idx, obj in enumerate(self._iterate_slices()): + name = obj.name + is_numeric = is_numeric_dtype(obj.dtype) + if numeric_only and not is_numeric: + continue + + try: + result = self.grouper._cython_operation( + "transform", obj._values, how, axis, **kwargs + ) + except (NotImplementedError, TypeError): + continue + + key = base.OutputKey(label=name, position=idx) + output[key] = result + + if not output: + raise DataError("No numeric types to aggregate") + + return self._wrap_transformed_output(output) + + def transform(self, func, *args, **kwargs): + raise AbstractMethodError(self) + + # ----------------------------------------------------------------- + # Utilities + + @final + def _apply_filter(self, indices, dropna): + if len(indices) == 0: + indices = np.array([], dtype="int64") + else: + indices = np.sort(np.concatenate(indices)) + if dropna: + filtered = self._selected_obj.take(indices, axis=self.axis) + else: + mask = np.empty(len(self._selected_obj.index), dtype=bool) + mask.fill(False) + mask[indices.astype(int)] = True + # mask fails to broadcast when passed to where; broadcast manually. + mask = np.tile(mask, list(self._selected_obj.shape[1:]) + [1]).T + filtered = self._selected_obj.where(mask) # Fill with NaNs. + return filtered + + @final + def _cumcount_array(self, ascending: bool = True): + """ + Parameters + ---------- + ascending : bool, default True + If False, number in reverse, from length of group - 1 to 0. + + Notes + ----- + this is currently implementing sort=False + (though the default is sort=True) for groupby in general + """ + ids, _, ngroups = self.grouper.group_info + sorter = get_group_index_sorter(ids, ngroups) + ids, count = ids[sorter], len(ids) + + if count == 0: + return np.empty(0, dtype=np.int64) + + run = np.r_[True, ids[:-1] != ids[1:]] + rep = np.diff(np.r_[np.nonzero(run)[0], count]) + out = (~run).cumsum() + + if ascending: + out -= np.repeat(out[run], rep) + else: + out = np.repeat(out[np.r_[run[1:], True]], rep) - out + + rev = np.empty(count, dtype=np.intp) + rev[sorter] = np.arange(count, dtype=np.intp) + return out[rev].astype(np.int64, copy=False) + # ----------------------------------------------------------------- @final From 769c89d7a0253a4077b293bbaa6596823318398e Mon Sep 17 00:00:00 2001 From: Brock Date: Tue, 20 Apr 2021 20:04:09 -0700 Subject: [PATCH 4/4] REF: move SGB-specific method to SGB --- pandas/core/groupby/generic.py | 43 ++++++++++++++++++++++++++++++++++ pandas/core/groupby/groupby.py | 40 +------------------------------ 2 files changed, 44 insertions(+), 39 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 4a721ae0d4bf6..a059a90aa3a4e 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -345,6 +345,49 @@ def _aggregate_multiple_funcs(self, arg): ) return self.obj._constructor_expanddim(output, columns=columns) + def _cython_agg_general( + self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 + ): + output: dict[base.OutputKey, ArrayLike] = {} + # Ideally we would be able to enumerate self._iterate_slices and use + # the index from enumeration as the key of output, but ohlc in particular + # returns a (n x 4) array. Output requires 1D ndarrays as values, so we + # need to slice that up into 1D arrays + idx = 0 + for obj in self._iterate_slices(): + name = obj.name + is_numeric = is_numeric_dtype(obj.dtype) + if numeric_only and not is_numeric: + continue + + result = self.grouper._cython_operation( + "aggregate", obj._values, how, axis=0, min_count=min_count + ) + + if how == "ohlc": + # e.g. ohlc + agg_names = ["open", "high", "low", "close"] + assert len(agg_names) == result.shape[1] + for result_column, result_name in zip(result.T, agg_names): + key = base.OutputKey(label=result_name, position=idx) + output[key] = result_column + idx += 1 + else: + assert result.ndim == 1 + key = base.OutputKey(label=name, position=idx) + output[key] = result + idx += 1 + + if not output: + raise DataError("No numeric types to aggregate") + + # error: Argument 1 to "_wrap_aggregated_output" of "BaseGroupBy" has + # incompatible type "Dict[OutputKey, Union[ndarray, DatetimeArray]]"; + # expected "Mapping[OutputKey, ndarray]" + return self._wrap_aggregated_output( + output, index=self.grouper.result_index # type: ignore[arg-type] + ) + # TODO: index should not be Optional - see GH 35490 def _wrap_series_output( self, diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index ec1a32ff6d95e..9a4f343ab3dc2 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1300,45 +1300,7 @@ def _agg_general( def _cython_agg_general( self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 ): - output: dict[base.OutputKey, ArrayLike] = {} - # Ideally we would be able to enumerate self._iterate_slices and use - # the index from enumeration as the key of output, but ohlc in particular - # returns a (n x 4) array. Output requires 1D ndarrays as values, so we - # need to slice that up into 1D arrays - idx = 0 - for obj in self._iterate_slices(): - name = obj.name - is_numeric = is_numeric_dtype(obj.dtype) - if numeric_only and not is_numeric: - continue - - result = self.grouper._cython_operation( - "aggregate", obj._values, how, axis=0, min_count=min_count - ) - - if how == "ohlc": - # e.g. ohlc - agg_names = ["open", "high", "low", "close"] - assert len(agg_names) == result.shape[1] - for result_column, result_name in zip(result.T, agg_names): - key = base.OutputKey(label=result_name, position=idx) - output[key] = result_column - idx += 1 - else: - assert result.ndim == 1 - key = base.OutputKey(label=name, position=idx) - output[key] = result - idx += 1 - - if not output: - raise DataError("No numeric types to aggregate") - - # error: Argument 1 to "_wrap_aggregated_output" of "BaseGroupBy" has - # incompatible type "Dict[OutputKey, Union[ndarray, DatetimeArray]]"; - # expected "Mapping[OutputKey, ndarray]" - return self._wrap_aggregated_output( - output, index=self.grouper.result_index # type: ignore[arg-type] - ) + raise AbstractMethodError(self) @final def _cython_transform(