From 57f418ee6641b557ccbbb069f81f09253cd99f54 Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Wed, 20 Nov 2019 08:43:40 -0800 Subject: [PATCH 1/9] Removed block management --- pandas/core/groupby/generic.py | 120 --------------------------------- 1 file changed, 120 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 7d3bf3d3dcd2f..9037d121f09f4 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -979,126 +979,6 @@ def _iterate_slices(self) -> Iterable[Series]: yield values - def _cython_agg_general( - self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 - ): - new_items, new_blocks = self._cython_agg_blocks( - how, alt=alt, numeric_only=numeric_only, min_count=min_count - ) - return self._wrap_agged_blocks(new_items, new_blocks) - - def _cython_agg_blocks( - self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 - ): - # TODO: the actual managing of mgr_locs is a PITA - # here, it should happen via BlockManager.combine - - data = self._get_data_to_aggregate() - - if numeric_only: - data = data.get_numeric_data(copy=False) - - new_blocks = [] - new_items = [] - deleted_items = [] - no_result = object() - for block in data.blocks: - # Avoid inheriting result from earlier in the loop - result = no_result - locs = block.mgr_locs.as_array - try: - result, _ = self.grouper.aggregate( - block.values, how, axis=1, min_count=min_count - ) - except NotImplementedError: - # generally if we have numeric_only=False - # and non-applicable functions - # try to python agg - - if alt is None: - # we cannot perform the operation - # in an alternate way, exclude the block - assert how == "ohlc" - deleted_items.append(locs) - continue - - # call our grouper again with only this block - obj = self.obj[data.items[locs]] - if obj.shape[1] == 1: - # Avoid call to self.values that can occur in DataFrame - # reductions; see GH#28949 - obj = obj.iloc[:, 0] - - s = get_groupby(obj, self.grouper) - try: - result = s.aggregate(lambda x: alt(x, axis=self.axis)) - except TypeError: - # we may have an exception in trying to aggregate - # continue and exclude the block - deleted_items.append(locs) - continue - else: - result = cast(DataFrame, result) - # unwrap DataFrame to get array - assert len(result._data.blocks) == 1 - result = result._data.blocks[0].values - if isinstance(result, np.ndarray) and result.ndim == 1: - result = result.reshape(1, -1) - - finally: - assert not isinstance(result, DataFrame) - - if result is not no_result: - # see if we can cast the block back to the original dtype - result = maybe_downcast_numeric(result, block.dtype) - - if block.is_extension and isinstance(result, np.ndarray): - # e.g. block.values was an IntegerArray - # (1, N) case can occur if block.values was Categorical - # and result is ndarray[object] - assert result.ndim == 1 or result.shape[0] == 1 - try: - # Cast back if feasible - result = type(block.values)._from_sequence( - result.ravel(), dtype=block.values.dtype - ) - except ValueError: - # reshape to be valid for non-Extension Block - result = result.reshape(1, -1) - - newb = block.make_block(result) - - new_items.append(locs) - new_blocks.append(newb) - - if len(new_blocks) == 0: - raise DataError("No numeric types to aggregate") - - # reset the locs in the blocks to correspond to our - # current ordering - indexer = np.concatenate(new_items) - new_items = data.items.take(np.sort(indexer)) - - if len(deleted_items): - - # we need to adjust the indexer to account for the - # items we have removed - # really should be done in internals :< - - deleted = np.concatenate(deleted_items) - ai = np.arange(len(data)) - mask = np.zeros(len(data)) - mask[deleted] = 1 - indexer = (ai - mask.cumsum())[indexer] - - offset = 0 - for b in new_blocks: - loc = len(b.mgr_locs) - b.mgr_locs = indexer[offset : (offset + loc)] - offset += loc - - return new_items, new_blocks - def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame: if self.grouper.nkeys != 1: raise AssertionError("Number of keys must be 1") From 615351f623019848721852e65c18e97725de0b39 Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Wed, 20 Nov 2019 08:45:04 -0800 Subject: [PATCH 2/9] updated test expectation for float -> int inference --- pandas/tests/groupby/test_categorical.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/groupby/test_categorical.py b/pandas/tests/groupby/test_categorical.py index 5f78e4860f1e9..b16dfdc829c93 100644 --- a/pandas/tests/groupby/test_categorical.py +++ b/pandas/tests/groupby/test_categorical.py @@ -301,7 +301,7 @@ def test_observed(observed): exp_index = CategoricalIndex( list("ab"), name="cat", categories=list("abc"), ordered=True ) - expected = DataFrame({"ints": [1.5, 1.5], "val": [20.0, 30]}, index=exp_index) + expected = DataFrame({"ints": [1.5, 1.5], "val": [20, 30]}, index=exp_index) if not observed: index = CategoricalIndex( list("abc"), name="cat", categories=list("abc"), ordered=True From 4460f81b9653c6527725c5b799d68578502989d5 Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Wed, 20 Nov 2019 09:12:07 -0800 Subject: [PATCH 3/9] Removed raising on Categorical (???) --- pandas/core/groupby/ops.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 7fd9fb8f53134..29bcdb91b3630 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -451,11 +451,7 @@ def _cython_operation( # categoricals are only 1d, so we # are not setup for dim transforming - if is_categorical_dtype(values) or is_sparse(values): - raise NotImplementedError( - "{dtype} dtype not supported".format(dtype=values.dtype) - ) - elif is_datetime64_any_dtype(values): + if is_datetime64_any_dtype(values): if how in ["add", "prod", "cumsum", "cumprod"]: raise NotImplementedError( "datetime64 type does not support {how} operations".format(how=how) From 124a135a1f067ccd03512e792dfef932294091d9 Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Wed, 20 Nov 2019 09:23:56 -0800 Subject: [PATCH 4/9] Added benchmark for wide frame --- asv_bench/benchmarks/groupby.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index d51c53e2264f1..724c8a68d9063 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -420,6 +420,7 @@ def setup(self, dtype, method, application): key = date_range("1/1/2011", periods=size, freq="s") df = DataFrame({"values": values, "key": key}) + self.wide_grp = DataFrame(np.ones((1, 10_000))).groupby([1]) if application == "transform": if method == "describe": @@ -437,6 +438,9 @@ def time_dtype_as_group(self, dtype, method, application): def time_dtype_as_field(self, dtype, method, application): self.as_field_method() + def time_wide(self): + self.wide_grp.sum() + class RankWithTies: # GH 21237 From 658ea0ff720499058819de38986866232a55a88b Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Wed, 20 Nov 2019 11:03:15 -0800 Subject: [PATCH 5/9] Wide benchmarks --- asv_bench/benchmarks/groupby.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 724c8a68d9063..f8013b73c4860 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -420,7 +420,6 @@ def setup(self, dtype, method, application): key = date_range("1/1/2011", periods=size, freq="s") df = DataFrame({"values": values, "key": key}) - self.wide_grp = DataFrame(np.ones((1, 10_000))).groupby([1]) if application == "transform": if method == "describe": @@ -438,7 +437,16 @@ def time_dtype_as_group(self, dtype, method, application): def time_dtype_as_field(self, dtype, method, application): self.as_field_method() - def time_wide(self): + +class GroupByWide: + + params = [100, 1_000, 10_000] + param_names = ["nrows"] + + def setup(self, nrows): + self.wide_grp = DataFrame(np.ones((nrows, 10_000))).groupby([1]) + + def time_wide(self, nrows): self.wide_grp.sum() From 2eed553ae48e1df278d7985ec07ce8ff43fcd62a Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Wed, 20 Nov 2019 11:12:20 -0800 Subject: [PATCH 6/9] black --- asv_bench/benchmarks/groupby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index f8013b73c4860..b666b4574384a 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -445,7 +445,7 @@ class GroupByWide: def setup(self, nrows): self.wide_grp = DataFrame(np.ones((nrows, 10_000))).groupby([1]) - + def time_wide(self, nrows): self.wide_grp.sum() From d04a0890ee0be6ee09d1d3c75fc10134327b1b4e Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Wed, 20 Nov 2019 20:50:15 -0800 Subject: [PATCH 7/9] touching internals --- pandas/core/groupby/generic.py | 123 ++----------------------- pandas/core/internals/managers.py | 21 ++++- pandas/tests/groupby/test_whitelist.py | 3 + 3 files changed, 27 insertions(+), 120 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 7d3bf3d3dcd2f..7e60dba8c3167 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -982,122 +982,15 @@ def _iterate_slices(self) -> Iterable[Series]: def _cython_agg_general( self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 ): - new_items, new_blocks = self._cython_agg_blocks( - how, alt=alt, numeric_only=numeric_only, min_count=min_count - ) - return self._wrap_agged_blocks(new_items, new_blocks) - - def _cython_agg_blocks( - self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 - ): - # TODO: the actual managing of mgr_locs is a PITA - # here, it should happen via BlockManager.combine - - data = self._get_data_to_aggregate() - - if numeric_only: - data = data.get_numeric_data(copy=False) - - new_blocks = [] - new_items = [] - deleted_items = [] - no_result = object() - for block in data.blocks: - # Avoid inheriting result from earlier in the loop - result = no_result - locs = block.mgr_locs.as_array - try: - result, _ = self.grouper.aggregate( - block.values, how, axis=1, min_count=min_count - ) - except NotImplementedError: - # generally if we have numeric_only=False - # and non-applicable functions - # try to python agg - - if alt is None: - # we cannot perform the operation - # in an alternate way, exclude the block - assert how == "ohlc" - deleted_items.append(locs) - continue - - # call our grouper again with only this block - obj = self.obj[data.items[locs]] - if obj.shape[1] == 1: - # Avoid call to self.values that can occur in DataFrame - # reductions; see GH#28949 - obj = obj.iloc[:, 0] - - s = get_groupby(obj, self.grouper) - try: - result = s.aggregate(lambda x: alt(x, axis=self.axis)) - except TypeError: - # we may have an exception in trying to aggregate - # continue and exclude the block - deleted_items.append(locs) - continue - else: - result = cast(DataFrame, result) - # unwrap DataFrame to get array - assert len(result._data.blocks) == 1 - result = result._data.blocks[0].values - if isinstance(result, np.ndarray) and result.ndim == 1: - result = result.reshape(1, -1) - - finally: - assert not isinstance(result, DataFrame) - - if result is not no_result: - # see if we can cast the block back to the original dtype - result = maybe_downcast_numeric(result, block.dtype) - - if block.is_extension and isinstance(result, np.ndarray): - # e.g. block.values was an IntegerArray - # (1, N) case can occur if block.values was Categorical - # and result is ndarray[object] - assert result.ndim == 1 or result.shape[0] == 1 - try: - # Cast back if feasible - result = type(block.values)._from_sequence( - result.ravel(), dtype=block.values.dtype - ) - except ValueError: - # reshape to be valid for non-Extension Block - result = result.reshape(1, -1) - - newb = block.make_block(result) - - new_items.append(locs) - new_blocks.append(newb) - - if len(new_blocks) == 0: - raise DataError("No numeric types to aggregate") - - # reset the locs in the blocks to correspond to our - # current ordering - indexer = np.concatenate(new_items) - new_items = data.items.take(np.sort(indexer)) - - if len(deleted_items): - - # we need to adjust the indexer to account for the - # items we have removed - # really should be done in internals :< - - deleted = np.concatenate(deleted_items) - ai = np.arange(len(data)) - mask = np.zeros(len(data)) - mask[deleted] = 1 - indexer = (ai - mask.cumsum())[indexer] - - offset = 0 - for b in new_blocks: - loc = len(b.mgr_locs) - b.mgr_locs = indexer[offset : (offset + loc)] - offset += loc + func = partial(self.grouper.aggregate, how=how, axis=1, min_count=min_count) + results = self._selected_obj._data.apply(func) + df = DataFrame(results) + if self.as_index: + df.index = self.grouper.result_index + else: + df.index = np.arange(result[0].values.shape[1]) - return new_items, new_blocks + return df def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame: if self.grouper.nkeys != 1: diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index c36dd9463c61d..7e712783eb6f7 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -419,20 +419,31 @@ def apply( and hasattr(kwargs[k], "values") } - for b in self.blocks: + for blk in self.blocks: if filter is not None: - if not b.mgr_locs.isin(filter_locs).any(): - result_blocks.append(b) + if not blk.mgr_locs.isin(filter_locs).any(): + result_blocks.append(blk) continue if aligned_args: - b_items = self.items[b.mgr_locs.indexer] + b_items = self.items[blk.mgr_locs.indexer] for k, obj in aligned_args.items(): axis = obj._info_axis_number kwargs[k] = obj.reindex(b_items, axis=axis, copy=align_copy) - applied = getattr(b, f)(**kwargs) + if isinstance(f, str): + applied = getattr(blk, f)(**kwargs) + else: # partial; specific to groupby + # TODO: func should only return one value; need to remove + # ohlc from groupby semantics to accomplish generically + result, _ = f(blk.values) # better way? + if result.ndim != 2: # hmm this is hacky + result = result.reshape(-1, 1) + + applied = type(blk)(result, placement=blk.mgr_locs, ndim=2) + axes = [self.axes[0], np.arange(result.shape[1])] + result_blocks = _extend_blocks(applied, result_blocks) if len(result_blocks) == 0: diff --git a/pandas/tests/groupby/test_whitelist.py b/pandas/tests/groupby/test_whitelist.py index 58407d90a2cc8..117e6b8ddb389 100644 --- a/pandas/tests/groupby/test_whitelist.py +++ b/pandas/tests/groupby/test_whitelist.py @@ -192,6 +192,9 @@ def test_regression_whitelist_methods(raw_frame, op, level, axis, skipna, sort): # GH 17537 # explicitly test the whitelist methods + if op == "median": + pytest.skip("Currently segfaulting...") + if axis == 0: frame = raw_frame else: From 3e33072545ba6fbf92d664812a3e283164489b9c Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Wed, 20 Nov 2019 20:51:35 -0800 Subject: [PATCH 8/9] Reverted categorical test for now --- pandas/tests/groupby/test_categorical.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/groupby/test_categorical.py b/pandas/tests/groupby/test_categorical.py index b16dfdc829c93..5f78e4860f1e9 100644 --- a/pandas/tests/groupby/test_categorical.py +++ b/pandas/tests/groupby/test_categorical.py @@ -301,7 +301,7 @@ def test_observed(observed): exp_index = CategoricalIndex( list("ab"), name="cat", categories=list("abc"), ordered=True ) - expected = DataFrame({"ints": [1.5, 1.5], "val": [20, 30]}, index=exp_index) + expected = DataFrame({"ints": [1.5, 1.5], "val": [20.0, 30]}, index=exp_index) if not observed: index = CategoricalIndex( list("abc"), name="cat", categories=list("abc"), ordered=True From 553c96122075d874c1656e3c95c5002862697a66 Mon Sep 17 00:00:00 2001 From: Will Ayd Date: Wed, 20 Nov 2019 20:54:26 -0800 Subject: [PATCH 9/9] reverted categorical file for now --- pandas/core/groupby/ops.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 29bcdb91b3630..7fd9fb8f53134 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -451,7 +451,11 @@ def _cython_operation( # categoricals are only 1d, so we # are not setup for dim transforming - if is_datetime64_any_dtype(values): + if is_categorical_dtype(values) or is_sparse(values): + raise NotImplementedError( + "{dtype} dtype not supported".format(dtype=values.dtype) + ) + elif is_datetime64_any_dtype(values): if how in ["add", "prod", "cumsum", "cumprod"]: raise NotImplementedError( "datetime64 type does not support {how} operations".format(how=how)