Skip to content

Remove blocks from GroupBy Code #28782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
160 changes: 29 additions & 131 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
)
from pandas.core.index import Index, MultiIndex, _all_indexes_same
import pandas.core.indexes.base as ibase
from pandas.core.internals import BlockManager, make_block
from pandas.core.series import Series

from pandas.plotting import boxplot_frame_groupby
Expand Down Expand Up @@ -147,93 +146,6 @@ def _iterate_slices(self):
continue
yield val, slicer(val)

def _cython_agg_general(self, how, alt=None, numeric_only=True, min_count=-1):
new_items, new_blocks = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_blocks(new_items, new_blocks)

_block_agg_axis = 0

def _cython_agg_blocks(self, how, alt=None, numeric_only=True, min_count=-1):
# TODO: the actual managing of mgr_locs is a PITA
# here, it should happen via BlockManager.combine

data, agg_axis = self._get_data_to_aggregate()

if numeric_only:
data = data.get_numeric_data(copy=False)

new_blocks = []
new_items = []
deleted_items = []
no_result = object()
for block in data.blocks:
# Avoid inheriting result from earlier in the loop
result = no_result
locs = block.mgr_locs.as_array
try:
result, _ = self.grouper.aggregate(
block.values, how, axis=agg_axis, min_count=min_count
)
except NotImplementedError:
# generally if we have numeric_only=False
# and non-applicable functions
# try to python agg

if alt is None:
# we cannot perform the operation
# in an alternate way, exclude the block
deleted_items.append(locs)
continue

# call our grouper again with only this block
obj = self.obj[data.items[locs]]
s = groupby(obj, self.grouper)
try:
result = s.aggregate(lambda x: alt(x, axis=self.axis))
except TypeError:
# we may have an exception in trying to aggregate
# continue and exclude the block
deleted_items.append(locs)
continue
finally:
if result is not no_result:
# see if we can cast the block back to the original dtype
result = maybe_downcast_numeric(result, block.dtype)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

troubleshooting this chunk of code may be irrelevant if its getting ripped out, but FWIW: there are cases, particularly via L194, where we get here with result that is a DataFrame instead of an ndarray/EA. In those cases, the make_block call below raises.

newb = block.make_block(result)

new_items.append(locs)
new_blocks.append(newb)

if len(new_blocks) == 0:
raise DataError("No numeric types to aggregate")

# reset the locs in the blocks to correspond to our
# current ordering
indexer = np.concatenate(new_items)
new_items = data.items.take(np.sort(indexer))

if len(deleted_items):

# we need to adjust the indexer to account for the
# items we have removed
# really should be done in internals :<

deleted = np.concatenate(deleted_items)
ai = np.arange(len(data))
mask = np.zeros(len(data))
mask[deleted] = 1
indexer = (ai - mask.cumsum())[indexer]

offset = 0
for b in new_blocks:
loc = len(b.mgr_locs)
b.mgr_locs = indexer[offset : (offset + loc)]
offset += loc

return new_items, new_blocks

def aggregate(self, func, *args, **kwargs):
_level = kwargs.pop("_level", None)

Expand Down Expand Up @@ -1385,7 +1297,6 @@ class DataFrameGroupBy(NDFrameGroupBy):

_apply_whitelist = base.dataframe_apply_whitelist

_block_agg_axis = 1

_agg_see_also_doc = dedent(
"""
Expand Down Expand Up @@ -1525,13 +1436,6 @@ def _wrap_generic_output(self, result, obj):
else:
return DataFrame(result, index=obj.index, columns=result_index)

def _get_data_to_aggregate(self):
obj = self._obj_with_exclusions
if self.axis == 1:
return obj.T._data, 1
else:
return obj._data, 1

def _insert_inaxis_grouper_inplace(self, result):
# zip in reverse so we can always insert at loc 0
izip = zip(
Expand All @@ -1550,18 +1454,23 @@ def _insert_inaxis_grouper_inplace(self, result):
result.insert(0, name, lev)

def _wrap_aggregated_output(self, output, names=None):
agg_axis = 0 if self.axis == 1 else 1
agg_labels = self._obj_with_exclusions._get_axis(agg_axis)
index = self.grouper.result_index

if isinstance(output, dict):
result = DataFrame(output, index=index)
else:
agg_axis = 0 if self.axis == 1 else 1
agg_labels = self._obj_with_exclusions._get_axis(agg_axis)
output_keys = self._decide_output_index(output, index=index, columns=agg_labels)
result = DataFrame(output, columns=output_keys)

output_keys = self._decide_output_index(output, agg_labels)
if names:
result.columns = names

if not self.as_index:
result = DataFrame(output, columns=output_keys)
self._insert_inaxis_grouper_inplace(result)
result = result._consolidate()
else:
index = self.grouper.result_index
result = DataFrame(output, index=index, columns=output_keys)
result = result.reset_index(drop=True)

if self.axis == 1:
result = result.T
Expand All @@ -1571,24 +1480,6 @@ def _wrap_aggregated_output(self, output, names=None):
def _wrap_transformed_output(self, output, names=None):
return DataFrame(output, index=self.obj.index)

def _wrap_agged_blocks(self, items, blocks):
if not self.as_index:
index = np.arange(blocks[0].values.shape[-1])
mgr = BlockManager(blocks, [items, index])
result = DataFrame(mgr)

self._insert_inaxis_grouper_inplace(result)
result = result._consolidate()
else:
index = self.grouper.result_index
mgr = BlockManager(blocks, [items, index])
result = DataFrame(mgr)

if self.axis == 1:
result = result.T

return self._reindex_output(result)._convert(datetime=True)

def _iterate_column_groupbys(self):
for i, colname in enumerate(self._selected_obj.columns):
yield colname, SeriesGroupBy(
Expand Down Expand Up @@ -1616,20 +1507,27 @@ def count(self):
DataFrame
Count of values within each group.
"""
data, _ = self._get_data_to_aggregate()
output = OrderedDict()
names = []

# TODO: dispatch to _cython_agg_general instead of custom looping
# TODO: refactor with series logic
ids, _, ngroups = self.grouper.group_info
mask = ids != -1

val = (
(mask & ~_isna_ndarraylike(np.atleast_2d(blk.get_values())))
for blk in data.blocks
)
loc = (blk.mgr_locs for blk in data.blocks)
if self.axis == 0:
iter_obj = self._obj_with_exclusions
else:
iter_obj = self._obj_with_exclusions.T

counter = partial(lib.count_level_2d, labels=ids, max_bin=ngroups, axis=1)
blk = map(make_block, map(counter, val), loc)
for index, (name, obj) in enumerate(iter_obj.items()):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the ideal way to do this, but to keep the scope of the PR minimum and get tests passing I put this iteration directly in this method. This really should be shared by a few aggregation functions, but I believe is currently blocked by #25610 and #21668, amongst possibly others

Would prefer to address comprehensively in a follow up

mask = (ids != -1) & ~isna(obj)
ids = ensure_platform_int(ids)
minlength = ngroups or 0
out = np.bincount(ids[mask], minlength=minlength)
output[index] = out
names.append(name)

return self._wrap_agged_blocks(data.items, list(blk))
return self._wrap_aggregated_output(output, names=names)

def nunique(self, dropna=True):
"""
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def __iter__(self):
Generator yielding sequence of (name, subsetted object)
for each group
"""
return self.grouper.get_iterator(self.obj, axis=self.axis)
return self.grouper.get_iterator(self._selected_obj, axis=self.axis)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was rather surprising that I had to change here, but getting rid of the block code seemed to change the behavior of the following snippet:

In [14]: df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["g", "a", "a"])

In [15]: df.groupby("g").transform("first")
Out[15]:
   a  a
0  2  3
1  5  6

Without referencing self._selected_obj this would still include the grouping as part of the output (this change might close some open issues; will check later)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is surprising. Is it possible that it is wrong now but untested? (i.e. could be fixed independently)


@Appender(
_apply_docs["template"].format(
Expand Down
2 changes: 1 addition & 1 deletion pandas/tests/groupby/test_categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def test_observed(observed):
exp_index = CategoricalIndex(
list("ab"), name="cat", categories=list("abc"), ordered=True
)
expected = DataFrame({"ints": [1.5, 1.5], "val": [20.0, 30]}, index=exp_index)
expected = DataFrame({"ints": [1.5, 1.5], "val": [20, 30]}, index=exp_index)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was coercing to float with the block code but I don't think that was necessarily desired; fits in an int with new impl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though we might not want this either. Here is code to reproduce:

In [8]:     d = {
   ...:         "ints": [1, 1, 2, 2],
   ...:         "val": [10, 20, 30, 40],
   ...:     }
   ...:     df = pd.DataFrame(d)
In [8]: df.groupby(list("abab")).mean()
Out[7]:
   ints  val
a   1.5   20
b   1.5   30

A typical call to mean on Series doesn't preserve the int type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intuition is that small changes like this are inevitable when changing from block-wise to column-wise. Definitely happened with arithmetic changeover.

if not observed:
index = CategoricalIndex(
list("abc"), name="cat", categories=list("abc"), ordered=True
Expand Down
8 changes: 7 additions & 1 deletion pandas/tests/groupby/test_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ def test_arg_passthru():
tm.assert_index_equal(result.columns, expected_columns_numeric)

result = f(numeric_only=False)
tm.assert_frame_equal(result.reindex_like(expected), expected)

# TODO: median isn't implemented for DTI but was working blockwise before?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised by this as well. I guess the by block iteration would bypass some of the checks that were put into a DTA, so while this doesn't work on master:

>>> df = pd.DataFrame([pd.Timestamp("2000-01-01"), pd.Timestamp("2000-01-01")])
>>> df[0].median()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-18-c894fad0307b> in <module>
----> 1 df[0].median()

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/core/generic.py in stat_func(self, axis, skipna, level, numeric_only, **kwargs)
  11616             return self._agg_by_level(name, axis=axis, level=level, skipna=skipna)
  11617         return self._reduce(
> 11618             f, name, axis=axis, skipna=skipna, numeric_only=numeric_only
  11619         )
  11620

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/core/series.py in _reduce(self, op, name, axis, skipna, numeric_only, filter_type, **kwds)
   4097             numeric_only=numeric_only,
   4098             filter_type=filter_type,
-> 4099             **kwds
   4100         )
   4101

/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/core/base.py in _reduce(self, op, name, axis, skipna, numeric_only, filter_type, **kwds)
   1218             raise TypeError(
   1219                 "{klass} cannot perform the operation {op}".format(
-> 1220                     klass=self.__class__.__name__, op=name
   1221                 )
   1222             )

TypeError: DatetimeIndex cannot perform the operation median

This somehow did

>>> df.groupby([1, 1]).median(numeric_only=False)
           0
1 2000-01-01

Whether or not that is intentional I'm not sure so cc @jbrockmendel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure off the top of my head what is going on here, but I can say that DTA/DTI should have median. #28165 implements reductions for TDA, waiting to get that merged before doing DTA and other reductions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I just saw the update to that come through. I'll try to give it another look review wise tomorrow. Might be the precursor we need for this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be the precursor we need for this

Any thoughts on how this interacts with the ongoing catch-less project? I'm pretty sure the block-wise method this rips out is involved in a lot of that catching.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tough to say but I think it can only help. I think generally a lot of the blanket Exception catching and state management in GroupBy might make it one of the harder parts of the code base to digest, so limiting the ways in which we construct results here should make things easier in the future (lot more to be done)

if attr == "median":
new_expected = expected.drop(columns=["datetime", "datetimetz"])
tm.assert_frame_equal(result, new_expected)
else:
tm.assert_frame_equal(result.reindex_like(expected), expected)

# TODO: min, max *should* handle
# categorical (ordered) dtype
Expand Down