From 4831ebae334fec2a6dc29cb7df6ef02f76f1c625 Mon Sep 17 00:00:00 2001 From: Brock Date: Wed, 15 Sep 2021 19:51:04 -0700 Subject: [PATCH 1/4] REF: simplicy concat_axis==0 case --- pandas/core/internals/concat.py | 51 ++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index 90b1c03d29d34..8137a27e05fe3 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -189,8 +189,36 @@ def concatenate_managers( if isinstance(mgrs_indexers[0][0], ArrayManager): return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy) + # Assertions disabled for performance + # for tup in mgrs_indexers: + # # caller is responsible for ensuring this + # indexers = tup[1] + # assert concat_axis not in indexers + mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers) + # Assertion disabled for performance + # assert all(not x[1] for x in mgrs_indexers) + + if concat_axis == 0: + mgrs = [x[0] for x in mgrs_indexers] + + offset = 0 + blocks = [] + for mgr in mgrs: + for blk in mgr.blocks: + if copy: + nb = blk.copy() + else: + # by slicing instead of copy(deep=False), we get a new array + # object, see test_concat_copy + nb = blk.getitem_block(slice(None)) + nb._mgr_locs = nb._mgr_locs.add(offset) + blocks.append(nb) + + offset += len(mgr.items) + return BlockManager(tuple(blocks), axes) + concat_plans = [ _get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers ] @@ -250,22 +278,19 @@ def _maybe_reindex_columns_na_proxy( """ new_mgrs_indexers = [] for mgr, indexers in mgrs_indexers: - # We only reindex for axis=0 (i.e. columns), as this can be done cheaply - if 0 in indexers: - new_mgr = mgr.reindex_indexer( - axes[0], - indexers[0], - axis=0, + # For axis=0 (i.e. columns) we use_na_proxy and only_slice, so this + # is a cheap reindexing. + for i, indexer in indexers.items(): + mgr = mgr.reindex_indexer( + axes[i], + indexers[i], + axis=i, copy=False, - only_slice=True, + only_slice=True, # only relevant for i==0 allow_dups=True, - use_na_proxy=True, + use_na_proxy=True, # only relevant for i==0 ) - new_indexers = indexers.copy() - del new_indexers[0] - new_mgrs_indexers.append((new_mgr, new_indexers)) - else: - new_mgrs_indexers.append((mgr, indexers)) + new_mgrs_indexers.append((mgr, {})) return new_mgrs_indexers From 2c9f2ad80b4d75eed20516087354d2b43d642012 Mon Sep 17 00:00:00 2001 From: Brock Date: Fri, 17 Sep 2021 11:13:19 -0700 Subject: [PATCH 2/4] REF: remove JoinUnit.indexers --- pandas/core/internals/concat.py | 147 ++++++++++---------------------- 1 file changed, 44 insertions(+), 103 deletions(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index 8137a27e05fe3..a576efe936e10 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -37,7 +37,6 @@ ) from pandas.core.dtypes.dtypes import ExtensionDtype -import pandas.core.algorithms as algos from pandas.core.arrays import ( DatetimeArray, ExtensionArray, @@ -201,35 +200,17 @@ def concatenate_managers( # assert all(not x[1] for x in mgrs_indexers) if concat_axis == 0: - mgrs = [x[0] for x in mgrs_indexers] - - offset = 0 - blocks = [] - for mgr in mgrs: - for blk in mgr.blocks: - if copy: - nb = blk.copy() - else: - # by slicing instead of copy(deep=False), we get a new array - # object, see test_concat_copy - nb = blk.getitem_block(slice(None)) - nb._mgr_locs = nb._mgr_locs.add(offset) - blocks.append(nb) - - offset += len(mgr.items) - return BlockManager(tuple(blocks), axes) - - concat_plans = [ - _get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers - ] - concat_plan = _combine_concat_plans(concat_plans, concat_axis) + return _concat_managers_axis0(mgrs_indexers, axes, copy) + + concat_plans = [_get_mgr_concatenation_plan(mgr) for mgr, _ in mgrs_indexers] + concat_plan = _combine_concat_plans(concat_plans) blocks = [] for placement, join_units in concat_plan: unit = join_units[0] blk = unit.block - if len(join_units) == 1 and not join_units[0].indexers: + if len(join_units) == 1: values = blk.values if copy: values = values.copy() @@ -253,7 +234,7 @@ def concatenate_managers( fastpath = blk.values.dtype == values.dtype else: - values = _concatenate_join_units(join_units, concat_axis, copy=copy) + values = _concatenate_join_units(join_units, copy=copy) fastpath = False if fastpath: @@ -266,6 +247,32 @@ def concatenate_managers( return BlockManager(tuple(blocks), axes) +def _concat_managers_axis0( + mgrs_indexers, axes: list[Index], copy: bool +) -> BlockManager: + """ + concat_managers specialized to concat_axis=0, with reindexing already + having been done in _maybe_reindex_columns_na_proxy. + """ + mgrs = [x[0] for x in mgrs_indexers] + + offset = 0 + blocks = [] + for mgr in mgrs: + for blk in mgr.blocks: + if copy: + nb = blk.copy() + else: + # by slicing instead of copy(deep=False), we get a new array + # object, see test_concat_copy + nb = blk.getitem_block(slice(None)) + nb._mgr_locs = nb._mgr_locs.add(offset) + blocks.append(nb) + + offset += len(mgr.items) + return BlockManager(tuple(blocks), axes) + + def _maybe_reindex_columns_na_proxy( axes: list[Index], mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]] ) -> list[tuple[BlockManager, dict[int, np.ndarray]]]: @@ -295,14 +302,13 @@ def _maybe_reindex_columns_na_proxy( return new_mgrs_indexers -def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarray]): +def _get_mgr_concatenation_plan(mgr: BlockManager): """ - Construct concatenation plan for given block manager and indexers. + Construct concatenation plan for given block manager. Parameters ---------- mgr : BlockManager - indexers : dict of {axis: indexer} Returns ------- @@ -312,27 +318,11 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra # Calculate post-reindex shape , save for item axis which will be separate # for each block anyway. mgr_shape_list = list(mgr.shape) - for ax, indexer in indexers.items(): - mgr_shape_list[ax] = len(indexer) mgr_shape = tuple(mgr_shape_list) - assert 0 not in indexers - - needs_filling = False - if 1 in indexers: - # indexers[1] is shared by all the JoinUnits, so we can save time - # by only doing this check once - if (indexers[1] == -1).any(): - needs_filling = True - if mgr.is_single_block: blk = mgr.blocks[0] - return [ - ( - blk.mgr_locs, - JoinUnit(blk, mgr_shape, indexers, needs_filling=needs_filling), - ) - ] + return [(blk.mgr_locs, JoinUnit(blk, mgr_shape))] blknos = mgr.blknos blklocs = mgr.blklocs @@ -343,8 +333,6 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra assert placements.is_slice_like assert blkno != -1 - join_unit_indexers = indexers.copy() - shape_list = list(mgr_shape) shape_list[0] = len(placements) shape = tuple(shape_list) @@ -376,7 +364,7 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra # Assertions disabled for performance # assert blk._mgr_locs.as_slice == placements.as_slice # assert blk.shape[0] == shape[0] - unit = JoinUnit(blk, shape, join_unit_indexers, needs_filling=needs_filling) + unit = JoinUnit(blk, shape) plan.append((placements, unit)) @@ -384,22 +372,13 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra class JoinUnit: - def __init__( - self, block: Block, shape: Shape, indexers=None, *, needs_filling: bool = False - ): + def __init__(self, block: Block, shape: Shape): # Passing shape explicitly is required for cases when block is None. - # Note: block is None implies indexers is None, but not vice-versa - if indexers is None: - indexers = {} - # we should *never* have `0 in indexers` self.block = block - self.indexers = indexers self.shape = shape - self.needs_filling = needs_filling - def __repr__(self) -> str: - return f"{type(self).__name__}({repr(self.block)}, {self.indexers})" + return f"{type(self).__name__}({repr(self.block)})" @cache_readonly def is_na(self) -> bool: @@ -416,7 +395,7 @@ def get_reindexed_values(self, empty_dtype: DtypeObj) -> ArrayLike: else: - if (not self.indexers) and (not self.block._can_consolidate): + if not self.block._can_consolidate: # preserve these for validation in concat_compat return self.block.values @@ -424,16 +403,6 @@ def get_reindexed_values(self, empty_dtype: DtypeObj) -> ArrayLike: # concatenation itself. values = self.block.values - if not self.indexers: - # If there's no indexing to be done, we want to signal outside - # code that this array must be copied explicitly. This is done - # by returning a view and checking `retval.base`. - values = values.view() - - else: - for ax, indexer in self.indexers.items(): - values = algos.take_nd(values, indexer, axis=ax) - return values @@ -471,15 +440,10 @@ def make_na_array(dtype: DtypeObj, shape: Shape) -> ArrayLike: return missing_arr -def _concatenate_join_units( - join_units: list[JoinUnit], concat_axis: int, copy: bool -) -> ArrayLike: +def _concatenate_join_units(join_units: list[JoinUnit], copy: bool) -> ArrayLike: """ - Concatenate values from several join units along selected axis. + Concatenate values from several join units along axis=1. """ - if concat_axis == 0 and len(join_units) > 1: - # Concatenating join units along ax0 is handled in _merge_blocks. - raise AssertionError("Concatenating join units along axis0") empty_dtype = _get_empty_dtype(join_units) @@ -513,7 +477,7 @@ def _concatenate_join_units( concat_values = ensure_block_shape(concat_values, 2) else: - concat_values = concat_compat(to_concat, axis=concat_axis) + concat_values = concat_compat(to_concat, axis=1) return concat_values @@ -594,9 +558,6 @@ def _is_uniform_join_units(join_units: list[JoinUnit]) -> bool: # unless we're an extension dtype. all(not ju.is_na or ju.block.is_extension for ju in join_units) and - # no blocks with indexers (as then the dimensions do not fit) - all(not ju.indexers for ju in join_units) - and # only use this path when there is something to concatenate len(join_units) > 1 ) @@ -616,8 +577,6 @@ def _trim_join_unit(join_unit: JoinUnit, length: int) -> JoinUnit: Extra items that didn't fit are returned as a separate block. """ - assert 0 not in join_unit.indexers - extra_indexers = join_unit.indexers extra_block = join_unit.block.getitem_block(slice(length, None)) join_unit.block = join_unit.block.getitem_block(slice(length)) @@ -625,16 +584,10 @@ def _trim_join_unit(join_unit: JoinUnit, length: int) -> JoinUnit: extra_shape = (join_unit.shape[0] - length,) + join_unit.shape[1:] join_unit.shape = (length,) + join_unit.shape[1:] - # extra_indexers does not introduce any -1s, so we can inherit needs_filling - return JoinUnit( - block=extra_block, - indexers=extra_indexers, - shape=extra_shape, - needs_filling=join_unit.needs_filling, - ) + return JoinUnit(block=extra_block, shape=extra_shape) -def _combine_concat_plans(plans, concat_axis: int): +def _combine_concat_plans(plans): """ Combine multiple concatenation plans into one. @@ -644,18 +597,6 @@ def _combine_concat_plans(plans, concat_axis: int): for p in plans[0]: yield p[0], [p[1]] - elif concat_axis == 0: - offset = 0 - for plan in plans: - last_plc = None - - for plc, unit in plan: - yield plc.add(offset), [unit] - last_plc = plc - - if last_plc is not None: - offset += last_plc.as_slice.stop - else: # singleton list so we can modify it as a side-effect within _next_or_none num_ended = [0] From 5dedf17066c903127f4a91f60b19003b934dba6c Mon Sep 17 00:00:00 2001 From: Brock Date: Fri, 17 Sep 2021 12:07:31 -0700 Subject: [PATCH 3/4] Avoid extra copy --- pandas/core/internals/concat.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index a576efe936e10..3db594e039e9e 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -194,14 +194,14 @@ def concatenate_managers( # indexers = tup[1] # assert concat_axis not in indexers + if concat_axis == 0: + return _concat_managers_axis0(mgrs_indexers, axes, copy) + mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers) # Assertion disabled for performance # assert all(not x[1] for x in mgrs_indexers) - if concat_axis == 0: - return _concat_managers_axis0(mgrs_indexers, axes, copy) - concat_plans = [_get_mgr_concatenation_plan(mgr) for mgr, _ in mgrs_indexers] concat_plan = _combine_concat_plans(concat_plans) blocks = [] @@ -254,13 +254,23 @@ def _concat_managers_axis0( concat_managers specialized to concat_axis=0, with reindexing already having been done in _maybe_reindex_columns_na_proxy. """ + had_reindexers = { + i: len(mgrs_indexers[i][1]) > 0 for i in range(len(mgrs_indexers)) + } + mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers) + mgrs = [x[0] for x in mgrs_indexers] offset = 0 blocks = [] - for mgr in mgrs: + for i, mgr in enumerate(mgrs): + # If we already reindexed, then we definitely don't need another copy + made_copy = had_reindexers[i] + for blk in mgr.blocks: - if copy: + if made_copy: + nb = blk.copy(deep=False) + elif copy: nb = blk.copy() else: # by slicing instead of copy(deep=False), we get a new array @@ -521,7 +531,7 @@ def _get_empty_dtype(join_units: Sequence[JoinUnit]) -> DtypeObj: empty_dtype = join_units[0].block.dtype return empty_dtype - needs_can_hold_na = any(unit.is_na or unit.needs_filling for unit in join_units) + needs_can_hold_na = any(unit.is_na for unit in join_units) dtypes = [unit.block.dtype for unit in join_units if not unit.is_na] From b54f56718bc53acc29060a8e7ccaedb5e658b418 Mon Sep 17 00:00:00 2001 From: Brock Date: Fri, 17 Sep 2021 15:43:04 -0700 Subject: [PATCH 4/4] mypy fixup --- pandas/core/internals/concat.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index 3db594e039e9e..e1a51e5783cb8 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -293,7 +293,8 @@ def _maybe_reindex_columns_na_proxy( Columns added in this reindexing have dtype=np.void, indicating they should be ignored when choosing a column's final dtype. """ - new_mgrs_indexers = [] + new_mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]] = [] + for mgr, indexers in mgrs_indexers: # For axis=0 (i.e. columns) we use_na_proxy and only_slice, so this # is a cheap reindexing.