-
-
Notifications
You must be signed in to change notification settings - Fork 18.4k
Remove blocks from GroupBy Code #28782
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
b694b09
ee85e5a
32d1b6b
6115323
2324c0c
5de118f
e6b5fd1
44a0c6a
9efdab6
654ccf2
2d3c5dd
6c521f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,7 +56,6 @@ | |
) | ||
from pandas.core.index import Index, MultiIndex, _all_indexes_same | ||
import pandas.core.indexes.base as ibase | ||
from pandas.core.internals import BlockManager, make_block | ||
from pandas.core.series import Series | ||
|
||
from pandas.plotting import boxplot_frame_groupby | ||
|
@@ -147,93 +146,6 @@ def _iterate_slices(self): | |
continue | ||
yield val, slicer(val) | ||
|
||
def _cython_agg_general(self, how, alt=None, numeric_only=True, min_count=-1): | ||
new_items, new_blocks = self._cython_agg_blocks( | ||
how, alt=alt, numeric_only=numeric_only, min_count=min_count | ||
) | ||
return self._wrap_agged_blocks(new_items, new_blocks) | ||
|
||
_block_agg_axis = 0 | ||
|
||
def _cython_agg_blocks(self, how, alt=None, numeric_only=True, min_count=-1): | ||
# TODO: the actual managing of mgr_locs is a PITA | ||
# here, it should happen via BlockManager.combine | ||
|
||
data, agg_axis = self._get_data_to_aggregate() | ||
|
||
if numeric_only: | ||
data = data.get_numeric_data(copy=False) | ||
|
||
new_blocks = [] | ||
new_items = [] | ||
deleted_items = [] | ||
no_result = object() | ||
for block in data.blocks: | ||
# Avoid inheriting result from earlier in the loop | ||
result = no_result | ||
locs = block.mgr_locs.as_array | ||
try: | ||
result, _ = self.grouper.aggregate( | ||
block.values, how, axis=agg_axis, min_count=min_count | ||
) | ||
except NotImplementedError: | ||
# generally if we have numeric_only=False | ||
# and non-applicable functions | ||
# try to python agg | ||
|
||
if alt is None: | ||
# we cannot perform the operation | ||
# in an alternate way, exclude the block | ||
deleted_items.append(locs) | ||
continue | ||
|
||
# call our grouper again with only this block | ||
obj = self.obj[data.items[locs]] | ||
s = groupby(obj, self.grouper) | ||
try: | ||
result = s.aggregate(lambda x: alt(x, axis=self.axis)) | ||
except TypeError: | ||
# we may have an exception in trying to aggregate | ||
# continue and exclude the block | ||
deleted_items.append(locs) | ||
continue | ||
finally: | ||
if result is not no_result: | ||
# see if we can cast the block back to the original dtype | ||
result = maybe_downcast_numeric(result, block.dtype) | ||
newb = block.make_block(result) | ||
|
||
new_items.append(locs) | ||
new_blocks.append(newb) | ||
|
||
if len(new_blocks) == 0: | ||
raise DataError("No numeric types to aggregate") | ||
|
||
# reset the locs in the blocks to correspond to our | ||
# current ordering | ||
indexer = np.concatenate(new_items) | ||
new_items = data.items.take(np.sort(indexer)) | ||
|
||
if len(deleted_items): | ||
|
||
# we need to adjust the indexer to account for the | ||
# items we have removed | ||
# really should be done in internals :< | ||
|
||
deleted = np.concatenate(deleted_items) | ||
ai = np.arange(len(data)) | ||
mask = np.zeros(len(data)) | ||
mask[deleted] = 1 | ||
indexer = (ai - mask.cumsum())[indexer] | ||
|
||
offset = 0 | ||
for b in new_blocks: | ||
loc = len(b.mgr_locs) | ||
b.mgr_locs = indexer[offset : (offset + loc)] | ||
offset += loc | ||
|
||
return new_items, new_blocks | ||
|
||
def aggregate(self, func, *args, **kwargs): | ||
_level = kwargs.pop("_level", None) | ||
|
||
|
@@ -1385,7 +1297,6 @@ class DataFrameGroupBy(NDFrameGroupBy): | |
|
||
_apply_whitelist = base.dataframe_apply_whitelist | ||
|
||
_block_agg_axis = 1 | ||
|
||
_agg_see_also_doc = dedent( | ||
""" | ||
|
@@ -1525,13 +1436,6 @@ def _wrap_generic_output(self, result, obj): | |
else: | ||
return DataFrame(result, index=obj.index, columns=result_index) | ||
|
||
def _get_data_to_aggregate(self): | ||
obj = self._obj_with_exclusions | ||
if self.axis == 1: | ||
return obj.T._data, 1 | ||
else: | ||
return obj._data, 1 | ||
|
||
def _insert_inaxis_grouper_inplace(self, result): | ||
# zip in reverse so we can always insert at loc 0 | ||
izip = zip( | ||
|
@@ -1550,18 +1454,23 @@ def _insert_inaxis_grouper_inplace(self, result): | |
result.insert(0, name, lev) | ||
|
||
def _wrap_aggregated_output(self, output, names=None): | ||
agg_axis = 0 if self.axis == 1 else 1 | ||
agg_labels = self._obj_with_exclusions._get_axis(agg_axis) | ||
index = self.grouper.result_index | ||
WillAyd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if isinstance(output, dict): | ||
result = DataFrame(output, index=index) | ||
else: | ||
agg_axis = 0 if self.axis == 1 else 1 | ||
agg_labels = self._obj_with_exclusions._get_axis(agg_axis) | ||
output_keys = self._decide_output_index(output, index=index, columns=agg_labels) | ||
result = DataFrame(output, columns=output_keys) | ||
|
||
output_keys = self._decide_output_index(output, agg_labels) | ||
if names: | ||
result.columns = names | ||
|
||
if not self.as_index: | ||
result = DataFrame(output, columns=output_keys) | ||
self._insert_inaxis_grouper_inplace(result) | ||
result = result._consolidate() | ||
else: | ||
index = self.grouper.result_index | ||
result = DataFrame(output, index=index, columns=output_keys) | ||
result = result.reset_index(drop=True) | ||
|
||
if self.axis == 1: | ||
result = result.T | ||
|
@@ -1571,24 +1480,6 @@ def _wrap_aggregated_output(self, output, names=None): | |
def _wrap_transformed_output(self, output, names=None): | ||
return DataFrame(output, index=self.obj.index) | ||
|
||
def _wrap_agged_blocks(self, items, blocks): | ||
if not self.as_index: | ||
index = np.arange(blocks[0].values.shape[-1]) | ||
mgr = BlockManager(blocks, [items, index]) | ||
result = DataFrame(mgr) | ||
|
||
self._insert_inaxis_grouper_inplace(result) | ||
result = result._consolidate() | ||
else: | ||
index = self.grouper.result_index | ||
mgr = BlockManager(blocks, [items, index]) | ||
result = DataFrame(mgr) | ||
|
||
if self.axis == 1: | ||
result = result.T | ||
|
||
return self._reindex_output(result)._convert(datetime=True) | ||
|
||
def _iterate_column_groupbys(self): | ||
for i, colname in enumerate(self._selected_obj.columns): | ||
yield colname, SeriesGroupBy( | ||
|
@@ -1616,20 +1507,27 @@ def count(self): | |
DataFrame | ||
Count of values within each group. | ||
""" | ||
data, _ = self._get_data_to_aggregate() | ||
output = OrderedDict() | ||
names = [] | ||
|
||
# TODO: dispatch to _cython_agg_general instead of custom looping | ||
# TODO: refactor with series logic | ||
ids, _, ngroups = self.grouper.group_info | ||
mask = ids != -1 | ||
|
||
val = ( | ||
(mask & ~_isna_ndarraylike(np.atleast_2d(blk.get_values()))) | ||
for blk in data.blocks | ||
) | ||
loc = (blk.mgr_locs for blk in data.blocks) | ||
if self.axis == 0: | ||
iter_obj = self._obj_with_exclusions | ||
else: | ||
iter_obj = self._obj_with_exclusions.T | ||
|
||
counter = partial(lib.count_level_2d, labels=ids, max_bin=ngroups, axis=1) | ||
blk = map(make_block, map(counter, val), loc) | ||
for index, (name, obj) in enumerate(iter_obj.items()): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't the ideal way to do this, but to keep the scope of the PR minimum and get tests passing I put this iteration directly in this method. This really should be shared by a few aggregation functions, but I believe is currently blocked by #25610 and #21668, amongst possibly others Would prefer to address comprehensively in a follow up |
||
mask = (ids != -1) & ~isna(obj) | ||
ids = ensure_platform_int(ids) | ||
minlength = ngroups or 0 | ||
out = np.bincount(ids[mask], minlength=minlength) | ||
output[index] = out | ||
names.append(name) | ||
|
||
return self._wrap_agged_blocks(data.items, list(blk)) | ||
return self._wrap_aggregated_output(output, names=names) | ||
|
||
def nunique(self, dropna=True): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -692,7 +692,7 @@ def __iter__(self): | |
Generator yielding sequence of (name, subsetted object) | ||
for each group | ||
""" | ||
return self.grouper.get_iterator(self.obj, axis=self.axis) | ||
return self.grouper.get_iterator(self._selected_obj, axis=self.axis) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was rather surprising that I had to change here, but getting rid of the block code seemed to change the behavior of the following snippet: In [14]: df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["g", "a", "a"])
In [15]: df.groupby("g").transform("first")
Out[15]:
a a
0 2 3
1 5 6 Without referencing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is surprising. Is it possible that it is wrong now but untested? (i.e. could be fixed independently) |
||
|
||
@Appender( | ||
_apply_docs["template"].format( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -300,7 +300,7 @@ def test_observed(observed): | |
exp_index = CategoricalIndex( | ||
list("ab"), name="cat", categories=list("abc"), ordered=True | ||
) | ||
expected = DataFrame({"ints": [1.5, 1.5], "val": [20.0, 30]}, index=exp_index) | ||
expected = DataFrame({"ints": [1.5, 1.5], "val": [20, 30]}, index=exp_index) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was coercing to float with the block code but I don't think that was necessarily desired; fits in an int with new impl There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though we might not want this either. Here is code to reproduce: In [8]: d = {
...: "ints": [1, 1, 2, 2],
...: "val": [10, 20, 30, 40],
...: }
...: df = pd.DataFrame(d)
In [8]: df.groupby(list("abab")).mean()
Out[7]:
ints val
a 1.5 20
b 1.5 30 A typical call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My intuition is that small changes like this are inevitable when changing from block-wise to column-wise. Definitely happened with arithmetic changeover. |
||
if not observed: | ||
index = CategoricalIndex( | ||
list("abc"), name="cat", categories=list("abc"), ordered=True | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,7 +179,13 @@ def test_arg_passthru(): | |
tm.assert_index_equal(result.columns, expected_columns_numeric) | ||
|
||
result = f(numeric_only=False) | ||
tm.assert_frame_equal(result.reindex_like(expected), expected) | ||
|
||
# TODO: median isn't implemented for DTI but was working blockwise before? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Surprised by this as well. I guess the by block iteration would bypass some of the checks that were put into a DTA, so while this doesn't work on master: >>> df = pd.DataFrame([pd.Timestamp("2000-01-01"), pd.Timestamp("2000-01-01")])
>>> df[0].median()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-18-c894fad0307b> in <module>
----> 1 df[0].median()
/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/core/generic.py in stat_func(self, axis, skipna, level, numeric_only, **kwargs)
11616 return self._agg_by_level(name, axis=axis, level=level, skipna=skipna)
11617 return self._reduce(
> 11618 f, name, axis=axis, skipna=skipna, numeric_only=numeric_only
11619 )
11620
/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/core/series.py in _reduce(self, op, name, axis, skipna, numeric_only, filter_type, **kwds)
4097 numeric_only=numeric_only,
4098 filter_type=filter_type,
-> 4099 **kwds
4100 )
4101
/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/core/base.py in _reduce(self, op, name, axis, skipna, numeric_only, filter_type, **kwds)
1218 raise TypeError(
1219 "{klass} cannot perform the operation {op}".format(
-> 1220 klass=self.__class__.__name__, op=name
1221 )
1222 )
TypeError: DatetimeIndex cannot perform the operation median This somehow did >>> df.groupby([1, 1]).median(numeric_only=False)
0
1 2000-01-01 Whether or not that is intentional I'm not sure so cc @jbrockmendel There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure off the top of my head what is going on here, but I can say that DTA/DTI should have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea I just saw the update to that come through. I'll try to give it another look review wise tomorrow. Might be the precursor we need for this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Any thoughts on how this interacts with the ongoing catch-less project? I'm pretty sure the block-wise method this rips out is involved in a lot of that catching. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tough to say but I think it can only help. I think generally a lot of the blanket Exception catching and state management in GroupBy might make it one of the harder parts of the code base to digest, so limiting the ways in which we construct results here should make things easier in the future (lot more to be done) |
||
if attr == "median": | ||
new_expected = expected.drop(columns=["datetime", "datetimetz"]) | ||
tm.assert_frame_equal(result, new_expected) | ||
else: | ||
tm.assert_frame_equal(result.reindex_like(expected), expected) | ||
|
||
# TODO: min, max *should* handle | ||
# categorical (ordered) dtype | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
troubleshooting this chunk of code may be irrelevant if its getting ripped out, but FWIW: there are cases, particularly via L194, where we get here with
result
that is a DataFrame instead of an ndarray/EA. In those cases, the make_block call below raises.