Skip to content

REF: concat on bm_axis==0 #43626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 17, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 77 additions & 100 deletions pandas/core/internals/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
)
from pandas.core.dtypes.dtypes import ExtensionDtype

import pandas.core.algorithms as algos
from pandas.core.arrays import (
DatetimeArray,
ExtensionArray,
Expand Down Expand Up @@ -189,19 +188,29 @@ def concatenate_managers(
if isinstance(mgrs_indexers[0][0], ArrayManager):
return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy)

# Assertions disabled for performance
# for tup in mgrs_indexers:
# # caller is responsible for ensuring this
# indexers = tup[1]
# assert concat_axis not in indexers

if concat_axis == 0:
return _concat_managers_axis0(mgrs_indexers, axes, copy)

mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)

concat_plans = [
_get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers
]
concat_plan = _combine_concat_plans(concat_plans, concat_axis)
# Assertion disabled for performance
# assert all(not x[1] for x in mgrs_indexers)

concat_plans = [_get_mgr_concatenation_plan(mgr) for mgr, _ in mgrs_indexers]
concat_plan = _combine_concat_plans(concat_plans)
blocks = []

for placement, join_units in concat_plan:
unit = join_units[0]
blk = unit.block

if len(join_units) == 1 and not join_units[0].indexers:
if len(join_units) == 1:
values = blk.values
if copy:
values = values.copy()
Expand All @@ -225,7 +234,7 @@ def concatenate_managers(

fastpath = blk.values.dtype == values.dtype
else:
values = _concatenate_join_units(join_units, concat_axis, copy=copy)
values = _concatenate_join_units(join_units, copy=copy)
fastpath = False

if fastpath:
Expand All @@ -238,6 +247,42 @@ def concatenate_managers(
return BlockManager(tuple(blocks), axes)


def _concat_managers_axis0(
mgrs_indexers, axes: list[Index], copy: bool
) -> BlockManager:
"""
concat_managers specialized to concat_axis=0, with reindexing already
having been done in _maybe_reindex_columns_na_proxy.
"""
had_reindexers = {
i: len(mgrs_indexers[i][1]) > 0 for i in range(len(mgrs_indexers))
}
mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)

mgrs = [x[0] for x in mgrs_indexers]

offset = 0
blocks = []
for i, mgr in enumerate(mgrs):
# If we already reindexed, then we definitely don't need another copy
made_copy = had_reindexers[i]

for blk in mgr.blocks:
if made_copy:
nb = blk.copy(deep=False)
elif copy:
nb = blk.copy()
else:
# by slicing instead of copy(deep=False), we get a new array
# object, see test_concat_copy
nb = blk.getitem_block(slice(None))
nb._mgr_locs = nb._mgr_locs.add(offset)
blocks.append(nb)

offset += len(mgr.items)
return BlockManager(tuple(blocks), axes)


def _maybe_reindex_columns_na_proxy(
axes: list[Index], mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]]
) -> list[tuple[BlockManager, dict[int, np.ndarray]]]:
Expand All @@ -248,36 +293,33 @@ def _maybe_reindex_columns_na_proxy(
Columns added in this reindexing have dtype=np.void, indicating they
should be ignored when choosing a column's final dtype.
"""
new_mgrs_indexers = []
new_mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]] = []

for mgr, indexers in mgrs_indexers:
# We only reindex for axis=0 (i.e. columns), as this can be done cheaply
if 0 in indexers:
new_mgr = mgr.reindex_indexer(
axes[0],
indexers[0],
axis=0,
# For axis=0 (i.e. columns) we use_na_proxy and only_slice, so this
# is a cheap reindexing.
for i, indexer in indexers.items():
mgr = mgr.reindex_indexer(
axes[i],
indexers[i],
axis=i,
copy=False,
only_slice=True,
only_slice=True, # only relevant for i==0
allow_dups=True,
use_na_proxy=True,
use_na_proxy=True, # only relevant for i==0
)
new_indexers = indexers.copy()
del new_indexers[0]
new_mgrs_indexers.append((new_mgr, new_indexers))
else:
new_mgrs_indexers.append((mgr, indexers))
new_mgrs_indexers.append((mgr, {}))

return new_mgrs_indexers


def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarray]):
def _get_mgr_concatenation_plan(mgr: BlockManager):
"""
Construct concatenation plan for given block manager and indexers.
Construct concatenation plan for given block manager.

Parameters
----------
mgr : BlockManager
indexers : dict of {axis: indexer}

Returns
-------
Expand All @@ -287,27 +329,11 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra
# Calculate post-reindex shape , save for item axis which will be separate
# for each block anyway.
mgr_shape_list = list(mgr.shape)
for ax, indexer in indexers.items():
mgr_shape_list[ax] = len(indexer)
mgr_shape = tuple(mgr_shape_list)

assert 0 not in indexers

needs_filling = False
if 1 in indexers:
# indexers[1] is shared by all the JoinUnits, so we can save time
# by only doing this check once
if (indexers[1] == -1).any():
needs_filling = True

if mgr.is_single_block:
blk = mgr.blocks[0]
return [
(
blk.mgr_locs,
JoinUnit(blk, mgr_shape, indexers, needs_filling=needs_filling),
)
]
return [(blk.mgr_locs, JoinUnit(blk, mgr_shape))]

blknos = mgr.blknos
blklocs = mgr.blklocs
Expand All @@ -318,8 +344,6 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra
assert placements.is_slice_like
assert blkno != -1

join_unit_indexers = indexers.copy()

shape_list = list(mgr_shape)
shape_list[0] = len(placements)
shape = tuple(shape_list)
Expand Down Expand Up @@ -351,30 +375,21 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra
# Assertions disabled for performance
# assert blk._mgr_locs.as_slice == placements.as_slice
# assert blk.shape[0] == shape[0]
unit = JoinUnit(blk, shape, join_unit_indexers, needs_filling=needs_filling)
unit = JoinUnit(blk, shape)

plan.append((placements, unit))

return plan


class JoinUnit:
def __init__(
self, block: Block, shape: Shape, indexers=None, *, needs_filling: bool = False
):
def __init__(self, block: Block, shape: Shape):
# Passing shape explicitly is required for cases when block is None.
# Note: block is None implies indexers is None, but not vice-versa
if indexers is None:
indexers = {}
# we should *never* have `0 in indexers`
self.block = block
self.indexers = indexers
self.shape = shape

self.needs_filling = needs_filling

def __repr__(self) -> str:
return f"{type(self).__name__}({repr(self.block)}, {self.indexers})"
return f"{type(self).__name__}({repr(self.block)})"

@cache_readonly
def is_na(self) -> bool:
Expand All @@ -391,24 +406,14 @@ def get_reindexed_values(self, empty_dtype: DtypeObj) -> ArrayLike:

else:

if (not self.indexers) and (not self.block._can_consolidate):
if not self.block._can_consolidate:
# preserve these for validation in concat_compat
return self.block.values

# No dtype upcasting is done here, it will be performed during
# concatenation itself.
values = self.block.values

if not self.indexers:
# If there's no indexing to be done, we want to signal outside
# code that this array must be copied explicitly. This is done
# by returning a view and checking `retval.base`.
values = values.view()

else:
for ax, indexer in self.indexers.items():
values = algos.take_nd(values, indexer, axis=ax)

return values


Expand Down Expand Up @@ -446,15 +451,10 @@ def make_na_array(dtype: DtypeObj, shape: Shape) -> ArrayLike:
return missing_arr


def _concatenate_join_units(
join_units: list[JoinUnit], concat_axis: int, copy: bool
) -> ArrayLike:
def _concatenate_join_units(join_units: list[JoinUnit], copy: bool) -> ArrayLike:
"""
Concatenate values from several join units along selected axis.
Concatenate values from several join units along axis=1.
"""
if concat_axis == 0 and len(join_units) > 1:
# Concatenating join units along ax0 is handled in _merge_blocks.
raise AssertionError("Concatenating join units along axis0")

empty_dtype = _get_empty_dtype(join_units)

Expand Down Expand Up @@ -488,7 +488,7 @@ def _concatenate_join_units(
concat_values = ensure_block_shape(concat_values, 2)

else:
concat_values = concat_compat(to_concat, axis=concat_axis)
concat_values = concat_compat(to_concat, axis=1)

return concat_values

Expand Down Expand Up @@ -532,7 +532,7 @@ def _get_empty_dtype(join_units: Sequence[JoinUnit]) -> DtypeObj:
empty_dtype = join_units[0].block.dtype
return empty_dtype

needs_can_hold_na = any(unit.is_na or unit.needs_filling for unit in join_units)
needs_can_hold_na = any(unit.is_na for unit in join_units)

dtypes = [unit.block.dtype for unit in join_units if not unit.is_na]

Expand Down Expand Up @@ -569,9 +569,6 @@ def _is_uniform_join_units(join_units: list[JoinUnit]) -> bool:
# unless we're an extension dtype.
all(not ju.is_na or ju.block.is_extension for ju in join_units)
and
# no blocks with indexers (as then the dimensions do not fit)
all(not ju.indexers for ju in join_units)
and
# only use this path when there is something to concatenate
len(join_units) > 1
)
Expand All @@ -591,25 +588,17 @@ def _trim_join_unit(join_unit: JoinUnit, length: int) -> JoinUnit:

Extra items that didn't fit are returned as a separate block.
"""
assert 0 not in join_unit.indexers
extra_indexers = join_unit.indexers

extra_block = join_unit.block.getitem_block(slice(length, None))
join_unit.block = join_unit.block.getitem_block(slice(length))

extra_shape = (join_unit.shape[0] - length,) + join_unit.shape[1:]
join_unit.shape = (length,) + join_unit.shape[1:]

# extra_indexers does not introduce any -1s, so we can inherit needs_filling
return JoinUnit(
block=extra_block,
indexers=extra_indexers,
shape=extra_shape,
needs_filling=join_unit.needs_filling,
)
return JoinUnit(block=extra_block, shape=extra_shape)


def _combine_concat_plans(plans, concat_axis: int):
def _combine_concat_plans(plans):
"""
Combine multiple concatenation plans into one.

Expand All @@ -619,18 +608,6 @@ def _combine_concat_plans(plans, concat_axis: int):
for p in plans[0]:
yield p[0], [p[1]]

elif concat_axis == 0:
offset = 0
for plan in plans:
last_plc = None

for plc, unit in plan:
yield plc.add(offset), [unit]
last_plc = plc

if last_plc is not None:
offset += last_plc.as_slice.stop

else:
# singleton list so we can modify it as a side-effect within _next_or_none
num_ended = [0]
Expand Down