Skip to content

REF: GroupBy.any/all use WrappedCythonOp #52089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 12 additions & 152 deletions pandas/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ class providing the base-class of operations.
from pandas.core._numba import executor
from pandas.core.arrays import (
BaseMaskedArray,
BooleanArray,
Categorical,
ExtensionArray,
FloatingArray,
Expand Down Expand Up @@ -1545,6 +1544,8 @@ def array_func(values: ArrayLike) -> ArrayLike:
# and non-applicable functions
# try to python agg
# TODO: shouldn't min_count matter?
if how in ["any", "all"]:
raise # TODO: re-raise as TypeError?
result = self._agg_py_fallback(values, ndim=data.ndim, alt=alt)

return result
Expand Down Expand Up @@ -1694,45 +1695,6 @@ def _obj_1d_constructor(self) -> Callable:
assert isinstance(self.obj, Series)
return self.obj._constructor

@final
def _bool_agg(self, val_test: Literal["any", "all"], skipna: bool):
"""
Shared func to call any / all Cython GroupBy implementations.
"""

def objs_to_bool(vals: ArrayLike) -> tuple[np.ndarray, type]:
if is_object_dtype(vals.dtype) and skipna:
# GH#37501: don't raise on pd.NA when skipna=True
mask = isna(vals)
if mask.any():
# mask on original values computed separately
vals = vals.copy()
vals[mask] = True
elif isinstance(vals, BaseMaskedArray):
vals = vals._data
vals = vals.astype(bool, copy=False)
return vals.view(np.int8), bool

def result_to_bool(
result: np.ndarray,
inference: type,
result_mask,
) -> ArrayLike:
if result_mask is not None:
return BooleanArray(result.astype(bool, copy=False), result_mask)
else:
return result.astype(inference, copy=False)

return self._get_cythonized_result(
libgroupby.group_any_all,
numeric_only=False,
cython_dtype=np.dtype(np.int8),
pre_processing=objs_to_bool,
post_processing=result_to_bool,
val_test=val_test,
skipna=skipna,
)

@final
@Substitution(name="groupby")
@Appender(_common_see_also)
Expand All @@ -1751,7 +1713,11 @@ def any(self, skipna: bool = True):
DataFrame or Series of boolean values, where a value is True if any element
is True within its respective group, False otherwise.
"""
return self._bool_agg("any", skipna)
return self._cython_agg_general(
"any",
alt=lambda x: Series(x).any(skipna=skipna),
skipna=skipna,
)

@final
@Substitution(name="groupby")
Expand All @@ -1771,7 +1737,11 @@ def all(self, skipna: bool = True):
DataFrame or Series of boolean values, where a value is True if all elements
are True within its respective group, False otherwise.
"""
return self._bool_agg("all", skipna)
return self._cython_agg_general(
"all",
alt=lambda x: Series(x).all(skipna=skipna),
skipna=skipna,
)

@final
@Substitution(name="groupby")
Expand Down Expand Up @@ -3702,116 +3672,6 @@ def cummax(
"cummax", numeric_only=numeric_only, skipna=skipna
)

@final
def _get_cythonized_result(
self,
base_func: Callable,
cython_dtype: np.dtype,
numeric_only: bool = False,
pre_processing=None,
post_processing=None,
how: str = "any_all",
**kwargs,
):
"""
Get result for Cythonized functions.

Parameters
----------
base_func : callable, Cythonized function to be called
cython_dtype : np.dtype
Type of the array that will be modified by the Cython call.
numeric_only : bool, default False
Whether only numeric datatypes should be computed
pre_processing : function, default None
Function to be applied to `values` prior to passing to Cython.
Function should return a tuple where the first element is the
values to be passed to Cython and the second element is an optional
type which the values should be converted to after being returned
by the Cython operation. This function is also responsible for
raising a TypeError if the values have an invalid type. Raises
if `needs_values` is False.
post_processing : function, default None
Function to be applied to result of Cython function. Should accept
an array of values as the first argument and type inferences as its
second argument, i.e. the signature should be
(ndarray, Type). If `needs_nullable=True`, a third argument should be
`nullable`, to allow for processing specific to nullable values.
how : str, default any_all
Determines if any/all cython interface or std interface is used.
**kwargs : dict
Extra arguments to be passed back to Cython funcs

Returns
-------
`Series` or `DataFrame` with filled values
"""
if post_processing and not callable(post_processing):
raise ValueError("'post_processing' must be a callable!")
if pre_processing and not callable(pre_processing):
raise ValueError("'pre_processing' must be a callable!")

grouper = self.grouper

ids, _, ngroups = grouper.group_info

base_func = partial(base_func, labels=ids)

def blk_func(values: ArrayLike) -> ArrayLike:
values = values.T
ncols = 1 if values.ndim == 1 else values.shape[1]

result: ArrayLike
result = np.zeros(ngroups * ncols, dtype=cython_dtype)
result = result.reshape((ngroups, ncols))

func = partial(base_func, out=result)

inferences = None

vals = values
if pre_processing:
vals, inferences = pre_processing(vals)

vals = vals.astype(cython_dtype, copy=False)
if vals.ndim == 1:
vals = vals.reshape((-1, 1))
func = partial(func, values=vals)

mask = isna(values).view(np.uint8)
if mask.ndim == 1:
mask = mask.reshape(-1, 1)
func = partial(func, mask=mask)

result_mask = None
if isinstance(values, BaseMaskedArray):
result_mask = np.zeros(result.shape, dtype=np.bool_)

func = partial(func, result_mask=result_mask)

# Call func to modify result in place
func(**kwargs)

if values.ndim == 1:
assert result.shape[1] == 1, result.shape
result = result[:, 0]
if result_mask is not None:
assert result_mask.shape[1] == 1, result_mask.shape
result_mask = result_mask[:, 0]

if post_processing:
result = post_processing(result, inferences, result_mask=result_mask)

return result.T

# Operate block-wise instead of column-by-column
mgr = self._get_data_to_aggregate(numeric_only=numeric_only, name=how)

res_mgr = mgr.grouped_reduce(blk_func)

out = self._wrap_agged_manager(res_mgr)
return self._wrap_aggregated_output(out)

@final
@Substitution(name="groupby")
def shift(
Expand Down
40 changes: 35 additions & 5 deletions pandas/core/groupby/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ class WrappedCythonOp:

# Functions for which we do _not_ attempt to cast the cython result
# back to the original dtype.
cast_blocklist = frozenset(["rank", "count", "size", "idxmin", "idxmax"])
cast_blocklist = frozenset(
["any", "all", "rank", "count", "size", "idxmin", "idxmax"]
)

def __init__(self, kind: str, how: str, has_dropped_na: bool) -> None:
self.kind = kind
Expand All @@ -124,6 +126,8 @@ def __init__(self, kind: str, how: str, has_dropped_na: bool) -> None:

_CYTHON_FUNCTIONS: dict[str, dict] = {
"aggregate": {
"any": functools.partial(libgroupby.group_any_all, val_test="any"),
"all": functools.partial(libgroupby.group_any_all, val_test="all"),
"sum": "group_sum",
"prod": "group_prod",
"min": "group_min",
Expand Down Expand Up @@ -253,7 +257,7 @@ def _disallow_invalid_ops(self, dtype: DtypeObj, is_numeric: bool = False):
# don't go down a group-by-group path, since in the empty-groups
# case that would fail to raise
raise TypeError(f"Cannot perform {how} with non-ordered Categorical")
if how not in ["rank"]:
if how not in ["rank", "any", "all"]:
# only "rank" is implemented in cython
raise NotImplementedError(f"{dtype} dtype not supported")

Expand Down Expand Up @@ -352,10 +356,13 @@ def _ea_wrap_cython_operation(
)

elif isinstance(values, Categorical):
assert self.how == "rank" # the only one implemented ATM
assert values.ordered # checked earlier
assert self.how in ["rank", "any", "all"]
mask = values.isna()
npvalues = values._ndarray
if self.how == "rank":
assert values.ordered # checked earlier
npvalues = values._ndarray
else:
npvalues = values.astype(bool)

res_values = self._cython_op_ndim_compat(
npvalues,
Expand Down Expand Up @@ -546,6 +553,19 @@ def _call_cython_op(
if values.dtype == "float16":
values = values.astype(np.float32)

if self.how in ["any", "all"]:
if mask is None:
mask = isna(values)
if dtype == object:
if kwargs["skipna"]:
# GH#37501: don't raise on pd.NA when skipna=True
if mask.any():
# mask on original values computed separately
values = values.copy()
values[mask] = True
values = values.astype(bool, copy=False).view(np.int8)
is_numeric = True

values = values.T
if mask is not None:
mask = mask.T
Expand Down Expand Up @@ -584,6 +604,16 @@ def _call_cython_op(
result_mask=result_mask,
**kwargs,
)
elif self.how in ["any", "all"]:
func(
out=result,
values=values,
labels=comp_ids,
mask=mask,
result_mask=result_mask,
**kwargs,
)
result = result.astype(bool, copy=False)
else:
raise NotImplementedError(f"{self.how} is not implemented")
else:
Expand Down