Skip to content

Align cython and python reduction code paths #36459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions pandas/_libs/reduction.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ from pandas._libs cimport util
from pandas._libs.lib import is_scalar, maybe_convert_objects


cdef _check_result_array(object obj, Py_ssize_t cnt):
cpdef check_result_array(object obj, Py_ssize_t cnt):

if (util.is_array(obj) or
(isinstance(obj, list) and len(obj) == cnt) or
getattr(obj, 'shape', None) == (cnt,)):
raise ValueError('Function does not reduce')
raise ValueError('Must produce aggregated value')


cdef class _BaseGrouper:
Expand Down Expand Up @@ -74,12 +74,14 @@ cdef class _BaseGrouper:
cached_ityp._engine.clear_mapping()
cached_ityp._cache.clear() # e.g. inferred_freq must go
res = self.f(cached_typ)
res = _extract_result(res)
res = extract_result(res)
if not initialized:
# On the first pass, we check the output shape to see
# if this looks like a reduction.
initialized = True
_check_result_array(res, len(self.dummy_arr))
# In all tests other than test_series_grouper and
# test_series_bin_grouper, we have len(self.dummy_arr) == 0
check_result_array(res, len(self.dummy_arr))

return res, initialized

Expand Down Expand Up @@ -278,9 +280,14 @@ cdef class SeriesGrouper(_BaseGrouper):
return result, counts


cdef inline _extract_result(object res, bint squeeze=True):
cpdef inline extract_result(object res, bint squeeze=True):
""" extract the result object, it might be a 0-dim ndarray
or a len-1 0-dim, or a scalar """
if hasattr(res, "_values"):
# Preserve EA
res = res._values
if squeeze and res.ndim == 1 and len(res) == 1:
res = res[0]
if hasattr(res, 'values') and util.is_array(res.values):
res = res.values
if util.is_array(res):
Expand Down
15 changes: 11 additions & 4 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import numpy as np

from pandas._libs import lib
from pandas._libs import lib, reduction as libreduction
from pandas._typing import ArrayLike, FrameOrSeries, FrameOrSeriesUnion
from pandas.util._decorators import Appender, Substitution, doc

Expand Down Expand Up @@ -471,12 +471,19 @@ def _get_index() -> Index:

def _aggregate_named(self, func, *args, **kwargs):
result = {}
initialized = False

for name, group in self:
group.name = name
# Each step of this loop corresponds to
# libreduction._BaseGrouper._apply_to_group
group.name = name # NB: libreduction does not pin name

output = func(group, *args, **kwargs)
if isinstance(output, (Series, Index, np.ndarray)):
raise ValueError("Must produce aggregated value")
output = libreduction.extract_result(output)
if not initialized:
# We only do this validation on the first iteration
libreduction.check_result_array(output, 0)
initialized = True
result[name] = output

return result
Expand Down
23 changes: 11 additions & 12 deletions pandas/core/groupby/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ def agg_series(self, obj: Series, func: F):
try:
return self._aggregate_series_fast(obj, func)
except ValueError as err:
if "Function does not reduce" in str(err):
if "Must produce aggregated value" in str(err):
# raised in libreduction
pass
else:
Expand Down Expand Up @@ -653,27 +653,26 @@ def _aggregate_series_pure_python(self, obj: Series, func: F):
group_index, _, ngroups = self.group_info

counts = np.zeros(ngroups, dtype=int)
result = None
result = np.empty(ngroups, dtype="O")
initialized = False

splitter = get_splitter(obj, group_index, ngroups, axis=0)

for label, group in splitter:

# Each step of this loop corresponds to
# libreduction._BaseGrouper._apply_to_group
res = func(group)
res = libreduction.extract_result(res)

if result is None:
if isinstance(res, (Series, Index, np.ndarray)):
if len(res) == 1:
# e.g. test_agg_lambda_with_timezone lambda e: e.head(1)
# FIXME: are we potentially losing important res.index info?
res = res.item()
else:
raise ValueError("Function does not reduce")
result = np.empty(ngroups, dtype="O")
if not initialized:
# We only do this validation on the first iteration
libreduction.check_result_array(res, 0)
initialized = True

counts[label] = group.shape[0]
result[label] = res

assert result is not None
result = lib.maybe_convert_objects(result, try_float=0)
# TODO: maybe_cast_to_extension_array?

Expand Down