Skip to content

Remove blocks from GroupBy Code #28782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
243 changes: 86 additions & 157 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,29 @@
from functools import partial
from textwrap import dedent
import typing
from typing import Any, Callable, FrozenSet, Sequence, Type, Union
from typing import (
Any,
Callable,
Dict,
FrozenSet,
Hashable,
List,
Optional,
Sequence,
Tuple,
Type,
Union,
)
import warnings

import numpy as np

from pandas._libs import Timestamp, lib
from pandas._libs import Timestamp
from pandas.compat import PY36
from pandas.errors import AbstractMethodError
from pandas.util._decorators import Appender, Substitution

from pandas.core.dtypes.cast import (
maybe_convert_objects,
maybe_downcast_numeric,
maybe_downcast_to_dtype,
)
from pandas.core.dtypes.cast import maybe_convert_objects, maybe_downcast_to_dtype
from pandas.core.dtypes.common import (
ensure_int64,
ensure_platform_int,
Expand All @@ -39,24 +47,18 @@
is_object_dtype,
is_scalar,
)
from pandas.core.dtypes.missing import _isna_ndarraylike, isna, notna
from pandas.core.dtypes.missing import isna, notna

from pandas._typing import FrameOrSeries
import pandas.core.algorithms as algorithms
from pandas.core.base import DataError, SpecificationError
from pandas.core.base import SpecificationError
import pandas.core.common as com
from pandas.core.frame import DataFrame
from pandas.core.generic import ABCDataFrame, ABCSeries, NDFrame, _shared_docs
from pandas.core.groupby import base
from pandas.core.groupby.groupby import (
GroupBy,
_apply_docs,
_transform_template,
groupby,
)
from pandas.core.groupby.groupby import GroupBy, _apply_docs, _transform_template
from pandas.core.index import Index, MultiIndex, _all_indexes_same
import pandas.core.indexes.base as ibase
from pandas.core.internals import BlockManager, make_block
from pandas.core.series import Series

from pandas.plotting import boxplot_frame_groupby
Expand Down Expand Up @@ -147,93 +149,6 @@ def _iterate_slices(self):
continue
yield val, slicer(val)

def _cython_agg_general(self, how, alt=None, numeric_only=True, min_count=-1):
new_items, new_blocks = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_blocks(new_items, new_blocks)

_block_agg_axis = 0

def _cython_agg_blocks(self, how, alt=None, numeric_only=True, min_count=-1):
# TODO: the actual managing of mgr_locs is a PITA
# here, it should happen via BlockManager.combine

data, agg_axis = self._get_data_to_aggregate()

if numeric_only:
data = data.get_numeric_data(copy=False)

new_blocks = []
new_items = []
deleted_items = []
no_result = object()
for block in data.blocks:
# Avoid inheriting result from earlier in the loop
result = no_result
locs = block.mgr_locs.as_array
try:
result, _ = self.grouper.aggregate(
block.values, how, axis=agg_axis, min_count=min_count
)
except NotImplementedError:
# generally if we have numeric_only=False
# and non-applicable functions
# try to python agg

if alt is None:
# we cannot perform the operation
# in an alternate way, exclude the block
deleted_items.append(locs)
continue

# call our grouper again with only this block
obj = self.obj[data.items[locs]]
s = groupby(obj, self.grouper)
try:
result = s.aggregate(lambda x: alt(x, axis=self.axis))
except TypeError:
# we may have an exception in trying to aggregate
# continue and exclude the block
deleted_items.append(locs)
continue
finally:
if result is not no_result:
# see if we can cast the block back to the original dtype
result = maybe_downcast_numeric(result, block.dtype)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

troubleshooting this chunk of code may be irrelevant if its getting ripped out, but FWIW: there are cases, particularly via L194, where we get here with result that is a DataFrame instead of an ndarray/EA. In those cases, the make_block call below raises.

newb = block.make_block(result)

new_items.append(locs)
new_blocks.append(newb)

if len(new_blocks) == 0:
raise DataError("No numeric types to aggregate")

# reset the locs in the blocks to correspond to our
# current ordering
indexer = np.concatenate(new_items)
new_items = data.items.take(np.sort(indexer))

if len(deleted_items):

# we need to adjust the indexer to account for the
# items we have removed
# really should be done in internals :<

deleted = np.concatenate(deleted_items)
ai = np.arange(len(data))
mask = np.zeros(len(data))
mask[deleted] = 1
indexer = (ai - mask.cumsum())[indexer]

offset = 0
for b in new_blocks:
loc = len(b.mgr_locs)
b.mgr_locs = indexer[offset : (offset + loc)]
offset += loc

return new_items, new_blocks

def aggregate(self, func, *args, **kwargs):
_level = kwargs.pop("_level", None)

Expand Down Expand Up @@ -355,18 +270,48 @@ def _aggregate_item_by_item(self, func, *args, **kwargs):

return DataFrame(result, columns=result_columns)

def _decide_output_index(self, output, labels):
if len(output) == len(labels):
output_keys = labels
def _decide_output_index(
self,
output: Dict,
labels: Index,
col_labels: Optional[List[Union[Hashable, Tuple[Hashable, ...]]]] = None,
) -> Index:
"""
Determine axis labels to use while wrapping aggregated values.

Parameters
----------
output : dict of ndarrays
Results of aggregating by-column.
labels : Index
Existing labels of selected object. Used to determine resulting
shape and name(s).
col_labels : list, optional
The ultimate column labels for the reshaped object. Each entry in
this list should correspond to a key value in output. Must be valid
column labels and tuples are contained within should map to a
MultiIndex.

Returns
-------
Index or MultiIndex

Notes
-----
Ideally output should always have integers as a key and the col_labels
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this as a note for now because I didn't want duplicate labels to complicate the diff, but this should be bundled with a more comprehensive handling of iteration that I think will need to go column-by-column

Here's an example where we iterate by "slice":

for name, obj in self._iterate_slices():

But a slice can still be a DataFrame in case of duplicated column labels, and especially if those duplicate labels don't have a contiguous dtype can make things messy.

I think comprehensively need to change the iterations in these modules and ensure consistency

should be provided separately, but as of writing this is not the case.
When output is not using integers there is a risk of duplicate column
labels not be handled correctly.
"""
if col_labels:
keys = col_labels
else:
output_keys = sorted(output)
try:
output_keys.sort()
except TypeError:
pass
keys = list(output.keys())

if isinstance(labels, MultiIndex):
output_keys = MultiIndex.from_tuples(output_keys, names=labels.names)
if isinstance(labels, MultiIndex):
output_keys = MultiIndex.from_tuples(keys, names=labels.names)
else:
output_keys = Index(keys, name=labels.name)

return output_keys

Expand Down Expand Up @@ -1385,8 +1330,6 @@ class DataFrameGroupBy(NDFrameGroupBy):

_apply_whitelist = base.dataframe_apply_whitelist

_block_agg_axis = 1

_agg_see_also_doc = dedent(
"""
See Also
Expand Down Expand Up @@ -1525,13 +1468,6 @@ def _wrap_generic_output(self, result, obj):
else:
return DataFrame(result, index=obj.index, columns=result_index)

def _get_data_to_aggregate(self):
obj = self._obj_with_exclusions
if self.axis == 1:
return obj.T._data, 1
else:
return obj._data, 1

def _insert_inaxis_grouper_inplace(self, result):
# zip in reverse so we can always insert at loc 0
izip = zip(
Expand All @@ -1549,19 +1485,23 @@ def _insert_inaxis_grouper_inplace(self, result):
if in_axis:
result.insert(0, name, lev)

def _wrap_aggregated_output(self, output, names=None):
def _wrap_aggregated_output(
self,
output: Dict[int, np.ndarray],
names: Optional[List[Union[Hashable, Tuple[Hashable, ...]]]] = None,
) -> DataFrame:
index = self.grouper.result_index
result = DataFrame(output, index)

agg_axis = 0 if self.axis == 1 else 1
agg_labels = self._obj_with_exclusions._get_axis(agg_axis)

output_keys = self._decide_output_index(output, agg_labels)
output_keys = self._decide_output_index(output, agg_labels, names)
result.columns = output_keys

if not self.as_index:
result = DataFrame(output, columns=output_keys)
self._insert_inaxis_grouper_inplace(result)
result = result._consolidate()
else:
index = self.grouper.result_index
result = DataFrame(output, index=index, columns=output_keys)
result = result.reset_index(drop=True)

if self.axis == 1:
result = result.T
Expand All @@ -1571,24 +1511,6 @@ def _wrap_aggregated_output(self, output, names=None):
def _wrap_transformed_output(self, output, names=None):
return DataFrame(output, index=self.obj.index)

def _wrap_agged_blocks(self, items, blocks):
if not self.as_index:
index = np.arange(blocks[0].values.shape[-1])
mgr = BlockManager(blocks, [items, index])
result = DataFrame(mgr)

self._insert_inaxis_grouper_inplace(result)
result = result._consolidate()
else:
index = self.grouper.result_index
mgr = BlockManager(blocks, [items, index])
result = DataFrame(mgr)

if self.axis == 1:
result = result.T

return self._reindex_output(result)._convert(datetime=True)

def _iterate_column_groupbys(self):
for i, colname in enumerate(self._selected_obj.columns):
yield colname, SeriesGroupBy(
Expand Down Expand Up @@ -1616,20 +1538,27 @@ def count(self):
DataFrame
Count of values within each group.
"""
data, _ = self._get_data_to_aggregate()
output = OrderedDict()
names = []

# TODO: dispatch to _cython_agg_general instead of custom looping
# TODO: refactor with series logic
ids, _, ngroups = self.grouper.group_info
mask = ids != -1

val = (
(mask & ~_isna_ndarraylike(np.atleast_2d(blk.get_values())))
for blk in data.blocks
)
loc = (blk.mgr_locs for blk in data.blocks)
if self.axis == 0:
iter_obj = self._obj_with_exclusions
else:
iter_obj = self._obj_with_exclusions.T

counter = partial(lib.count_level_2d, labels=ids, max_bin=ngroups, axis=1)
blk = map(make_block, map(counter, val), loc)
for index, (name, obj) in enumerate(iter_obj.items()):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the ideal way to do this, but to keep the scope of the PR minimum and get tests passing I put this iteration directly in this method. This really should be shared by a few aggregation functions, but I believe is currently blocked by #25610 and #21668, amongst possibly others

Would prefer to address comprehensively in a follow up

mask = (ids != -1) & ~isna(obj)
ids = ensure_platform_int(ids)
minlength = ngroups or 0
out = np.bincount(ids[mask], minlength=minlength)
output[index] = out
names.append(name)

return self._wrap_agged_blocks(data.items, list(blk))
return self._wrap_aggregated_output(output, names=names)

def nunique(self, dropna=True):
"""
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def __iter__(self):
Generator yielding sequence of (name, subsetted object)
for each group
"""
return self.grouper.get_iterator(self.obj, axis=self.axis)
return self.grouper.get_iterator(self._selected_obj, axis=self.axis)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was rather surprising that I had to change here, but getting rid of the block code seemed to change the behavior of the following snippet:

In [14]: df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["g", "a", "a"])

In [15]: df.groupby("g").transform("first")
Out[15]:
   a  a
0  2  3
1  5  6

Without referencing self._selected_obj this would still include the grouping as part of the output (this change might close some open issues; will check later)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is surprising. Is it possible that it is wrong now but untested? (i.e. could be fixed independently)


@Appender(
_apply_docs["template"].format(
Expand Down
2 changes: 1 addition & 1 deletion pandas/tests/groupby/test_categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def test_observed(observed):
exp_index = CategoricalIndex(
list("ab"), name="cat", categories=list("abc"), ordered=True
)
expected = DataFrame({"ints": [1.5, 1.5], "val": [20.0, 30]}, index=exp_index)
expected = DataFrame({"ints": [1.5, 1.5], "val": [20, 30]}, index=exp_index)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was coercing to float with the block code but I don't think that was necessarily desired; fits in an int with new impl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though we might not want this either. Here is code to reproduce:

In [8]:     d = {
   ...:         "ints": [1, 1, 2, 2],
   ...:         "val": [10, 20, 30, 40],
   ...:     }
   ...:     df = pd.DataFrame(d)
In [8]: df.groupby(list("abab")).mean()
Out[7]:
   ints  val
a   1.5   20
b   1.5   30

A typical call to mean on Series doesn't preserve the int type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intuition is that small changes like this are inevitable when changing from block-wise to column-wise. Definitely happened with arithmetic changeover.

if not observed:
index = CategoricalIndex(
list("abc"), name="cat", categories=list("abc"), ordered=True
Expand Down
8 changes: 7 additions & 1 deletion pandas/tests/groupby/test_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ def test_arg_passthru():
tm.assert_index_equal(result.columns, expected_columns_numeric)

result = f(numeric_only=False)
tm.assert_frame_equal(result.reindex_like(expected), expected)

# TODO: median isn't implemented for DTI but was working blockwise before?
if attr == "median":
new_expected = expected.drop(columns=["datetime", "datetimetz"])
tm.assert_frame_equal(result, new_expected)
else:
tm.assert_frame_equal(result.reindex_like(expected), expected)

# TODO: min, max *should* handle
# categorical (ordered) dtype
Expand Down