Skip to content

[ArrayManager] GroupBy cython aggregations (no fallback) #39885

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ jobs:
run: |
source activate pandas-dev
pytest pandas/tests/frame/methods --array-manager
pytest pandas/tests/groupby/aggregate/ --array-manager
pytest pandas/tests/arithmetic/ --array-manager

# indexing subset (temporary since other tests don't pass yet)
Expand Down
131 changes: 102 additions & 29 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Mapping,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
Expand All @@ -43,6 +44,7 @@
ArrayLike,
FrameOrSeries,
FrameOrSeriesUnion,
Manager,
)
from pandas.util._decorators import (
Appender,
Expand Down Expand Up @@ -109,7 +111,10 @@
all_indexes_same,
)
import pandas.core.indexes.base as ibase
from pandas.core.internals import BlockManager
from pandas.core.internals import (
ArrayManager,
BlockManager,
)
from pandas.core.series import Series
from pandas.core.util.numba_ import maybe_use_numba

Expand Down Expand Up @@ -187,6 +192,31 @@ def pinner(cls):
return pinner


def _cast_agg_result(
result, values: ArrayLike, how: str, reshape1d: bool = True
) -> ArrayLike:
# see if we can cast the values to the desired dtype
# this may not be the original dtype
assert not isinstance(result, DataFrame)

dtype = maybe_cast_result_dtype(values.dtype, how)
result = maybe_downcast_numeric(result, dtype)

if isinstance(values, Categorical) and isinstance(result, np.ndarray):
# If the Categorical op didn't raise, it is dtype-preserving
result = type(values)._from_sequence(result.ravel(), dtype=values.dtype)
# Note this will have result.dtype == dtype from above

elif reshape1d and isinstance(result, np.ndarray) and result.ndim == 1:
# We went through a SeriesGroupByPath and need to reshape
# GH#32223 includes case with IntegerArray values
result = result.reshape(1, -1)
# test_groupby_duplicate_columns gets here with
# result.dtype == int64, values.dtype=object, how="min"

return result


@pin_allowlisted_properties(Series, base.series_apply_allowlist)
class SeriesGroupBy(GroupBy[Series]):
_apply_allowlist = base.series_apply_allowlist
Expand Down Expand Up @@ -1071,10 +1101,18 @@ def _iterate_slices(self) -> Iterable[Series]:
def _cython_agg_general(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
) -> DataFrame:
agg_mgr = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items)
if isinstance(self.obj._mgr, BlockManager):
agg_mgr = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items)
elif isinstance(self.obj._mgr, ArrayManager):
agg_arrays, columns = self._cython_agg_arrays(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_arrays(agg_arrays, columns)
else:
raise TypeError("Unknown manager type")

def _cython_agg_blocks(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
Expand All @@ -1085,28 +1123,6 @@ def _cython_agg_blocks(
if numeric_only:
data = data.get_numeric_data(copy=False)

def cast_agg_result(result, values: ArrayLike, how: str) -> ArrayLike:
# see if we can cast the values to the desired dtype
# this may not be the original dtype
assert not isinstance(result, DataFrame)

dtype = maybe_cast_result_dtype(values.dtype, how)
result = maybe_downcast_numeric(result, dtype)

if isinstance(values, Categorical) and isinstance(result, np.ndarray):
# If the Categorical op didn't raise, it is dtype-preserving
result = type(values)._from_sequence(result.ravel(), dtype=values.dtype)
# Note this will have result.dtype == dtype from above

elif isinstance(result, np.ndarray) and result.ndim == 1:
# We went through a SeriesGroupByPath and need to reshape
# GH#32223 includes case with IntegerArray values
result = result.reshape(1, -1)
# test_groupby_duplicate_columns gets here with
# result.dtype == int64, values.dtype=object, how="min"

return result

def py_fallback(bvalues: ArrayLike) -> ArrayLike:
# if self.grouper.aggregate fails, we fall back to a pure-python
# solution
Expand Down Expand Up @@ -1169,7 +1185,7 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike:

result = py_fallback(bvalues)

return cast_agg_result(result, bvalues, how)
return _cast_agg_result(result, bvalues, how)

# TypeError -> we may have an exception in trying to aggregate
# continue and exclude the block
Expand All @@ -1181,6 +1197,63 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike:

return new_mgr

def _cython_agg_arrays(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
) -> Tuple[List[ArrayLike], Index]:

data: ArrayManager = self._get_data_to_aggregate()

if numeric_only:
data = data.get_numeric_data(copy=False)

def py_fallback(bvalues: ArrayLike) -> ArrayLike:
# TODO(ArrayManager)
raise NotImplementedError

def array_func(values: ArrayLike) -> ArrayLike:

try:
result = self.grouper._cython_operation(
"aggregate", values, how, axis=1, min_count=min_count
)
except NotImplementedError:
# generally if we have numeric_only=False
# and non-applicable functions
# try to python agg

if alt is None:
# we cannot perform the operation
# in an alternate way, exclude the block
assert how == "ohlc"
raise

result = py_fallback(values)

return _cast_agg_result(result, values, how, reshape1d=False)

result_arrays = [array_func(arr) for arr in data.arrays]

if not len(result_arrays):
raise DataError("No numeric types to aggregate")

return result_arrays, data.axes[0]

def _wrap_agged_arrays(self, arrays: List[ArrayLike], columns: Index) -> DataFrame:
if not self.as_index:
index = np.arange(arrays[0].shape[0])
mgr = ArrayManager(arrays, axes=[index, columns])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if/when BlockManager is gone, then axes=[index, columns] makes sense, but until then switching to axes=[columns, index] to match BlockManager constructor will facilitate code sharing

result = self.obj._constructor(mgr)
self._insert_inaxis_grouper_inplace(result)
else:
index = self.grouper.result_index
mgr = ArrayManager(arrays, axes=[index, columns])
result = self.obj._constructor(mgr)

if self.axis == 1:
result = result.T

return self._reindex_output(result)._convert(datetime=True)

def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame:
if self.grouper.nkeys != 1:
raise AssertionError("Number of keys must be 1")
Expand Down Expand Up @@ -1663,7 +1736,7 @@ def _wrap_frame_output(self, result, obj: DataFrame) -> DataFrame:
else:
return self.obj._constructor(result, index=obj.index, columns=result_index)

def _get_data_to_aggregate(self) -> BlockManager:
def _get_data_to_aggregate(self) -> Manager:
obj = self._obj_with_exclusions
if self.axis == 1:
return obj.T._mgr
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/internals/array_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def apply_with_block(self: T, f, align_keys=None, **kwargs) -> T:
if hasattr(arr, "tz") and arr.tz is None: # type: ignore[union-attr]
# DatetimeArray needs to be converted to ndarray for DatetimeBlock
arr = arr._data # type: ignore[union-attr]
elif arr.dtype.kind == "m":
elif arr.dtype.kind == "m" and not isinstance(arr, np.ndarray):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could combine this with previous check as

if arr.dtype.kind in ["m", "M"] and not isinstance(arr, np.ndarray):
    arr = arr._data

Copy link
Member Author

@jorisvandenbossche jorisvandenbossche Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be nice, but the problem is that we still need to keep DatetimeArray intact for DatetimeTZBlock. So we would still need the if hasattr(arr, "tz") and arr.tz is None check as well, in which case it doesn't necessarily become more readable to combine both checks.

Edit: the diff would be:

-            if hasattr(arr, "tz") and arr.tz is None:  # type: ignore[union-attr]
-                # DatetimeArray needs to be converted to ndarray for DatetimeBlock
-                arr = arr._data  # type: ignore[union-attr]
-            elif arr.dtype.kind == "m" and not isinstance(arr, np.ndarray):
-                # TimedeltaArray needs to be converted to ndarray for TimedeltaBlock
+            if (
+                arr.dtype.kind == "m"
+                and not isinstance(arr, np.ndarray)
+                and getattr(arr, "tz", None) is None
+            ):
+                # DatetimeArray/TimedeltaArray needs to be converted to ndarray
+                # for DatetimeBlock/TimedeltaBlock (except DatetimeArray with tz,
+                # which needs to be preserved for DatetimeTZBlock)
                 arr = arr._data  # type: ignore[union-attr]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of and getattr(arr, "tz", None) is None how about isinstance(arr.dtype, np.dtype). either way works i guess

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That still gives the same length of the if check as in my diff example above, which I don't find an improvement in readability

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yah the only possible difference is for mypy

# TimedeltaArray needs to be converted to ndarray for TimedeltaBlock
arr = arr._data # type: ignore[union-attr]
if isinstance(arr, np.ndarray):
Expand Down
7 changes: 7 additions & 0 deletions pandas/tests/groupby/aggregate/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest

from pandas.errors import PerformanceWarning
import pandas.util._test_decorators as td

from pandas.core.dtypes.common import is_integer_dtype

Expand Down Expand Up @@ -44,6 +45,7 @@ def test_agg_regression1(tsframe):
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) quantile/describe
def test_agg_must_agg(df):
grouped = df.groupby("A")["C"]

Expand Down Expand Up @@ -133,6 +135,7 @@ def test_groupby_aggregation_multi_level_column():
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) non-cython agg
def test_agg_apply_corner(ts, tsframe):
# nothing to group, all NA
grouped = ts.groupby(ts * np.nan)
Expand Down Expand Up @@ -209,6 +212,7 @@ def test_aggregate_str_func(tsframe, groupbyfunc):
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) non-cython agg
def test_agg_str_with_kwarg_axis_1_raises(df, reduction_func):
gb = df.groupby(level=0)
if reduction_func in ("idxmax", "idxmin"):
Expand Down Expand Up @@ -488,6 +492,7 @@ def test_agg_index_has_complex_internals(index):
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) agg py_fallback
def test_agg_split_block():
# https://github.com/pandas-dev/pandas/issues/31522
df = DataFrame(
Expand All @@ -505,6 +510,7 @@ def test_agg_split_block():
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) agg py_fallback
def test_agg_split_object_part_datetime():
# https://github.com/pandas-dev/pandas/pull/31616
df = DataFrame(
Expand Down Expand Up @@ -1195,6 +1201,7 @@ def test_aggregate_datetime_objects():
tm.assert_series_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) agg py_fallback
def test_aggregate_numeric_object_dtype():
# https://github.com/pandas-dev/pandas/issues/39329
# simplified case: multiple object columns where one is all-NaN
Expand Down
12 changes: 9 additions & 3 deletions pandas/tests/groupby/aggregate/test_cython.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
"max",
],
)
def test_cythonized_aggers(op_name):
def test_cythonized_aggers(op_name, using_array_manager):
if using_array_manager and op_name in {"count", "sem"}:
# TODO(ArrayManager) groupby count/sem
pytest.skip("ArrayManager groupby count/sem not yet implemented")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the add_marker pattern be used here?

Copy link
Member Author

@jorisvandenbossche jorisvandenbossche Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the add_marker pattern for xfail (because just raising pytest.xfail wouldn't result in a strict xfail, unlike the skip here), so for skip there is no advantage using it AFAIK.

Now, that said, I should maybe actually start using xfail instead of skip for the "skip_array_manager_not_yet_implemented", so it's easier to notice if certain tests can be unskipped when more features get implemented.

So will already start using the xfail as you suggest here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, in the meantime I fixed count, so the skip/xfail could be removed altogether.

(but a sign that using xfail instead of skip is actually a good idea ;))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

data = {
"A": [0, 0, 0, 0, 1, 1, 1, 1, 1, 1.0, np.nan, np.nan],
"B": ["A", "B"] * 6,
Expand Down Expand Up @@ -273,15 +276,18 @@ def test_cython_with_timestamp_and_nat(op, data):
"cummax",
],
)
def test_read_only_buffer_source_agg(agg):
def test_read_only_buffer_source_agg(agg, using_array_manager):
# https://github.com/pandas-dev/pandas/issues/36014
df = DataFrame(
{
"sepal_length": [5.1, 4.9, 4.7, 4.6, 5.0],
"species": ["setosa", "setosa", "setosa", "setosa", "setosa"],
}
)
df._mgr.blocks[0].values.flags.writeable = False
if using_array_manager:
df._mgr.arrays[0].flags.writeable = False
else:
df._mgr.blocks[0].values.flags.writeable = False

result = df.groupby(["species"]).agg({"sepal_length": agg})
expected = df.copy().groupby(["species"]).agg({"sepal_length": agg})
Expand Down
3 changes: 3 additions & 0 deletions pandas/tests/groupby/aggregate/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import numpy as np
import pytest

import pandas.util._test_decorators as td

import pandas as pd
from pandas import (
DataFrame,
Expand Down Expand Up @@ -412,6 +414,7 @@ def __call__(self, x):
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) columns with ndarrays
def test_agg_over_numpy_arrays():
# GH 3788
df = DataFrame(
Expand Down