-
-
Notifications
You must be signed in to change notification settings - Fork 18.4k
Remove blocks from GroupBy Code #28782
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b694b09
ee85e5a
32d1b6b
6115323
2324c0c
5de118f
e6b5fd1
44a0c6a
9efdab6
654ccf2
2d3c5dd
6c521f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -11,21 +11,29 @@ | |||
from functools import partial | ||||
from textwrap import dedent | ||||
import typing | ||||
from typing import Any, Callable, FrozenSet, Sequence, Type, Union | ||||
from typing import ( | ||||
Any, | ||||
Callable, | ||||
Dict, | ||||
FrozenSet, | ||||
Hashable, | ||||
List, | ||||
Optional, | ||||
Sequence, | ||||
Tuple, | ||||
Type, | ||||
Union, | ||||
) | ||||
import warnings | ||||
|
||||
import numpy as np | ||||
|
||||
from pandas._libs import Timestamp, lib | ||||
from pandas._libs import Timestamp | ||||
from pandas.compat import PY36 | ||||
from pandas.errors import AbstractMethodError | ||||
from pandas.util._decorators import Appender, Substitution | ||||
|
||||
from pandas.core.dtypes.cast import ( | ||||
maybe_convert_objects, | ||||
maybe_downcast_numeric, | ||||
maybe_downcast_to_dtype, | ||||
) | ||||
from pandas.core.dtypes.cast import maybe_convert_objects, maybe_downcast_to_dtype | ||||
from pandas.core.dtypes.common import ( | ||||
ensure_int64, | ||||
ensure_platform_int, | ||||
|
@@ -39,24 +47,18 @@ | |||
is_object_dtype, | ||||
is_scalar, | ||||
) | ||||
from pandas.core.dtypes.missing import _isna_ndarraylike, isna, notna | ||||
from pandas.core.dtypes.missing import isna, notna | ||||
|
||||
from pandas._typing import FrameOrSeries | ||||
import pandas.core.algorithms as algorithms | ||||
from pandas.core.base import DataError, SpecificationError | ||||
from pandas.core.base import SpecificationError | ||||
import pandas.core.common as com | ||||
from pandas.core.frame import DataFrame | ||||
from pandas.core.generic import ABCDataFrame, ABCSeries, NDFrame, _shared_docs | ||||
from pandas.core.groupby import base | ||||
from pandas.core.groupby.groupby import ( | ||||
GroupBy, | ||||
_apply_docs, | ||||
_transform_template, | ||||
groupby, | ||||
) | ||||
from pandas.core.groupby.groupby import GroupBy, _apply_docs, _transform_template | ||||
from pandas.core.index import Index, MultiIndex, _all_indexes_same | ||||
import pandas.core.indexes.base as ibase | ||||
from pandas.core.internals import BlockManager, make_block | ||||
from pandas.core.series import Series | ||||
|
||||
from pandas.plotting import boxplot_frame_groupby | ||||
|
@@ -147,93 +149,6 @@ def _iterate_slices(self): | |||
continue | ||||
yield val, slicer(val) | ||||
|
||||
def _cython_agg_general(self, how, alt=None, numeric_only=True, min_count=-1): | ||||
new_items, new_blocks = self._cython_agg_blocks( | ||||
how, alt=alt, numeric_only=numeric_only, min_count=min_count | ||||
) | ||||
return self._wrap_agged_blocks(new_items, new_blocks) | ||||
|
||||
_block_agg_axis = 0 | ||||
|
||||
def _cython_agg_blocks(self, how, alt=None, numeric_only=True, min_count=-1): | ||||
# TODO: the actual managing of mgr_locs is a PITA | ||||
# here, it should happen via BlockManager.combine | ||||
|
||||
data, agg_axis = self._get_data_to_aggregate() | ||||
|
||||
if numeric_only: | ||||
data = data.get_numeric_data(copy=False) | ||||
|
||||
new_blocks = [] | ||||
new_items = [] | ||||
deleted_items = [] | ||||
no_result = object() | ||||
for block in data.blocks: | ||||
# Avoid inheriting result from earlier in the loop | ||||
result = no_result | ||||
locs = block.mgr_locs.as_array | ||||
try: | ||||
result, _ = self.grouper.aggregate( | ||||
block.values, how, axis=agg_axis, min_count=min_count | ||||
) | ||||
except NotImplementedError: | ||||
# generally if we have numeric_only=False | ||||
# and non-applicable functions | ||||
# try to python agg | ||||
|
||||
if alt is None: | ||||
# we cannot perform the operation | ||||
# in an alternate way, exclude the block | ||||
deleted_items.append(locs) | ||||
continue | ||||
|
||||
# call our grouper again with only this block | ||||
obj = self.obj[data.items[locs]] | ||||
s = groupby(obj, self.grouper) | ||||
try: | ||||
result = s.aggregate(lambda x: alt(x, axis=self.axis)) | ||||
except TypeError: | ||||
# we may have an exception in trying to aggregate | ||||
# continue and exclude the block | ||||
deleted_items.append(locs) | ||||
continue | ||||
finally: | ||||
if result is not no_result: | ||||
# see if we can cast the block back to the original dtype | ||||
result = maybe_downcast_numeric(result, block.dtype) | ||||
newb = block.make_block(result) | ||||
|
||||
new_items.append(locs) | ||||
new_blocks.append(newb) | ||||
|
||||
if len(new_blocks) == 0: | ||||
raise DataError("No numeric types to aggregate") | ||||
|
||||
# reset the locs in the blocks to correspond to our | ||||
# current ordering | ||||
indexer = np.concatenate(new_items) | ||||
new_items = data.items.take(np.sort(indexer)) | ||||
|
||||
if len(deleted_items): | ||||
|
||||
# we need to adjust the indexer to account for the | ||||
# items we have removed | ||||
# really should be done in internals :< | ||||
|
||||
deleted = np.concatenate(deleted_items) | ||||
ai = np.arange(len(data)) | ||||
mask = np.zeros(len(data)) | ||||
mask[deleted] = 1 | ||||
indexer = (ai - mask.cumsum())[indexer] | ||||
|
||||
offset = 0 | ||||
for b in new_blocks: | ||||
loc = len(b.mgr_locs) | ||||
b.mgr_locs = indexer[offset : (offset + loc)] | ||||
offset += loc | ||||
|
||||
return new_items, new_blocks | ||||
|
||||
def aggregate(self, func, *args, **kwargs): | ||||
_level = kwargs.pop("_level", None) | ||||
|
||||
|
@@ -355,18 +270,48 @@ def _aggregate_item_by_item(self, func, *args, **kwargs): | |||
|
||||
return DataFrame(result, columns=result_columns) | ||||
|
||||
def _decide_output_index(self, output, labels): | ||||
if len(output) == len(labels): | ||||
output_keys = labels | ||||
def _decide_output_index( | ||||
self, | ||||
output: Dict, | ||||
labels: Index, | ||||
col_labels: Optional[List[Union[Hashable, Tuple[Hashable, ...]]]] = None, | ||||
) -> Index: | ||||
""" | ||||
Determine axis labels to use while wrapping aggregated values. | ||||
|
||||
Parameters | ||||
---------- | ||||
output : dict of ndarrays | ||||
Results of aggregating by-column. | ||||
labels : Index | ||||
Existing labels of selected object. Used to determine resulting | ||||
shape and name(s). | ||||
col_labels : list, optional | ||||
The ultimate column labels for the reshaped object. Each entry in | ||||
this list should correspond to a key value in output. Must be valid | ||||
column labels and tuples are contained within should map to a | ||||
MultiIndex. | ||||
|
||||
Returns | ||||
------- | ||||
Index or MultiIndex | ||||
|
||||
Notes | ||||
----- | ||||
Ideally output should always have integers as a key and the col_labels | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added this as a note for now because I didn't want duplicate labels to complicate the diff, but this should be bundled with a more comprehensive handling of iteration that I think will need to go column-by-column Here's an example where we iterate by "slice": pandas/pandas/core/groupby/groupby.py Line 2263 in 558f754
But a slice can still be a DataFrame in case of duplicated column labels, and especially if those duplicate labels don't have a contiguous dtype can make things messy. I think comprehensively need to change the iterations in these modules and ensure consistency |
||||
should be provided separately, but as of writing this is not the case. | ||||
When output is not using integers there is a risk of duplicate column | ||||
labels not be handled correctly. | ||||
""" | ||||
if col_labels: | ||||
keys = col_labels | ||||
else: | ||||
output_keys = sorted(output) | ||||
try: | ||||
output_keys.sort() | ||||
except TypeError: | ||||
pass | ||||
keys = list(output.keys()) | ||||
|
||||
if isinstance(labels, MultiIndex): | ||||
output_keys = MultiIndex.from_tuples(output_keys, names=labels.names) | ||||
if isinstance(labels, MultiIndex): | ||||
output_keys = MultiIndex.from_tuples(keys, names=labels.names) | ||||
else: | ||||
output_keys = Index(keys, name=labels.name) | ||||
|
||||
return output_keys | ||||
|
||||
|
@@ -1385,8 +1330,6 @@ class DataFrameGroupBy(NDFrameGroupBy): | |||
|
||||
_apply_whitelist = base.dataframe_apply_whitelist | ||||
|
||||
_block_agg_axis = 1 | ||||
|
||||
_agg_see_also_doc = dedent( | ||||
""" | ||||
See Also | ||||
|
@@ -1525,13 +1468,6 @@ def _wrap_generic_output(self, result, obj): | |||
else: | ||||
return DataFrame(result, index=obj.index, columns=result_index) | ||||
|
||||
def _get_data_to_aggregate(self): | ||||
obj = self._obj_with_exclusions | ||||
if self.axis == 1: | ||||
return obj.T._data, 1 | ||||
else: | ||||
return obj._data, 1 | ||||
|
||||
def _insert_inaxis_grouper_inplace(self, result): | ||||
# zip in reverse so we can always insert at loc 0 | ||||
izip = zip( | ||||
|
@@ -1549,19 +1485,23 @@ def _insert_inaxis_grouper_inplace(self, result): | |||
if in_axis: | ||||
result.insert(0, name, lev) | ||||
|
||||
def _wrap_aggregated_output(self, output, names=None): | ||||
def _wrap_aggregated_output( | ||||
self, | ||||
output: Dict[int, np.ndarray], | ||||
names: Optional[List[Union[Hashable, Tuple[Hashable, ...]]]] = None, | ||||
) -> DataFrame: | ||||
index = self.grouper.result_index | ||||
WillAyd marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
result = DataFrame(output, index) | ||||
|
||||
agg_axis = 0 if self.axis == 1 else 1 | ||||
agg_labels = self._obj_with_exclusions._get_axis(agg_axis) | ||||
|
||||
output_keys = self._decide_output_index(output, agg_labels) | ||||
output_keys = self._decide_output_index(output, agg_labels, names) | ||||
result.columns = output_keys | ||||
|
||||
if not self.as_index: | ||||
result = DataFrame(output, columns=output_keys) | ||||
self._insert_inaxis_grouper_inplace(result) | ||||
result = result._consolidate() | ||||
else: | ||||
index = self.grouper.result_index | ||||
result = DataFrame(output, index=index, columns=output_keys) | ||||
result = result.reset_index(drop=True) | ||||
|
||||
if self.axis == 1: | ||||
result = result.T | ||||
|
@@ -1571,24 +1511,6 @@ def _wrap_aggregated_output(self, output, names=None): | |||
def _wrap_transformed_output(self, output, names=None): | ||||
return DataFrame(output, index=self.obj.index) | ||||
|
||||
def _wrap_agged_blocks(self, items, blocks): | ||||
if not self.as_index: | ||||
index = np.arange(blocks[0].values.shape[-1]) | ||||
mgr = BlockManager(blocks, [items, index]) | ||||
result = DataFrame(mgr) | ||||
|
||||
self._insert_inaxis_grouper_inplace(result) | ||||
result = result._consolidate() | ||||
else: | ||||
index = self.grouper.result_index | ||||
mgr = BlockManager(blocks, [items, index]) | ||||
result = DataFrame(mgr) | ||||
|
||||
if self.axis == 1: | ||||
result = result.T | ||||
|
||||
return self._reindex_output(result)._convert(datetime=True) | ||||
|
||||
def _iterate_column_groupbys(self): | ||||
for i, colname in enumerate(self._selected_obj.columns): | ||||
yield colname, SeriesGroupBy( | ||||
|
@@ -1616,20 +1538,27 @@ def count(self): | |||
DataFrame | ||||
Count of values within each group. | ||||
""" | ||||
data, _ = self._get_data_to_aggregate() | ||||
output = OrderedDict() | ||||
names = [] | ||||
|
||||
# TODO: dispatch to _cython_agg_general instead of custom looping | ||||
# TODO: refactor with series logic | ||||
ids, _, ngroups = self.grouper.group_info | ||||
mask = ids != -1 | ||||
|
||||
val = ( | ||||
(mask & ~_isna_ndarraylike(np.atleast_2d(blk.get_values()))) | ||||
for blk in data.blocks | ||||
) | ||||
loc = (blk.mgr_locs for blk in data.blocks) | ||||
if self.axis == 0: | ||||
iter_obj = self._obj_with_exclusions | ||||
else: | ||||
iter_obj = self._obj_with_exclusions.T | ||||
|
||||
counter = partial(lib.count_level_2d, labels=ids, max_bin=ngroups, axis=1) | ||||
blk = map(make_block, map(counter, val), loc) | ||||
for index, (name, obj) in enumerate(iter_obj.items()): | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't the ideal way to do this, but to keep the scope of the PR minimum and get tests passing I put this iteration directly in this method. This really should be shared by a few aggregation functions, but I believe is currently blocked by #25610 and #21668, amongst possibly others Would prefer to address comprehensively in a follow up |
||||
mask = (ids != -1) & ~isna(obj) | ||||
ids = ensure_platform_int(ids) | ||||
minlength = ngroups or 0 | ||||
out = np.bincount(ids[mask], minlength=minlength) | ||||
output[index] = out | ||||
names.append(name) | ||||
|
||||
return self._wrap_agged_blocks(data.items, list(blk)) | ||||
return self._wrap_aggregated_output(output, names=names) | ||||
|
||||
def nunique(self, dropna=True): | ||||
""" | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -692,7 +692,7 @@ def __iter__(self): | |
Generator yielding sequence of (name, subsetted object) | ||
for each group | ||
""" | ||
return self.grouper.get_iterator(self.obj, axis=self.axis) | ||
return self.grouper.get_iterator(self._selected_obj, axis=self.axis) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was rather surprising that I had to change here, but getting rid of the block code seemed to change the behavior of the following snippet: In [14]: df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=["g", "a", "a"])
In [15]: df.groupby("g").transform("first")
Out[15]:
a a
0 2 3
1 5 6 Without referencing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is surprising. Is it possible that it is wrong now but untested? (i.e. could be fixed independently) |
||
|
||
@Appender( | ||
_apply_docs["template"].format( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -300,7 +300,7 @@ def test_observed(observed): | |
exp_index = CategoricalIndex( | ||
list("ab"), name="cat", categories=list("abc"), ordered=True | ||
) | ||
expected = DataFrame({"ints": [1.5, 1.5], "val": [20.0, 30]}, index=exp_index) | ||
expected = DataFrame({"ints": [1.5, 1.5], "val": [20, 30]}, index=exp_index) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was coercing to float with the block code but I don't think that was necessarily desired; fits in an int with new impl There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though we might not want this either. Here is code to reproduce: In [8]: d = {
...: "ints": [1, 1, 2, 2],
...: "val": [10, 20, 30, 40],
...: }
...: df = pd.DataFrame(d)
In [8]: df.groupby(list("abab")).mean()
Out[7]:
ints val
a 1.5 20
b 1.5 30 A typical call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My intuition is that small changes like this are inevitable when changing from block-wise to column-wise. Definitely happened with arithmetic changeover. |
||
if not observed: | ||
index = CategoricalIndex( | ||
list("abc"), name="cat", categories=list("abc"), ordered=True | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
troubleshooting this chunk of code may be irrelevant if its getting ripped out, but FWIW: there are cases, particularly via L194, where we get here with
result
that is a DataFrame instead of an ndarray/EA. In those cases, the make_block call below raises.