Skip to content

REF: restore _concat_managers_axis0 #50401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 27, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 68 additions & 40 deletions pandas/core/internals/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,21 @@ def concatenate_managers(
if isinstance(mgrs_indexers[0][0], ArrayManager):
return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy)

# Assertions disabled for performance
# for tup in mgrs_indexers:
# # caller is responsible for ensuring this
# indexers = tup[1]
# assert concat_axis not in indexers

if concat_axis == 0:
return _concat_managers_axis0(mgrs_indexers, axes, copy)

mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)

concat_plans = [
_get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers
]
concat_plan = _combine_concat_plans(concat_plans, concat_axis)
concat_plan = _combine_concat_plans(concat_plans)
blocks = []

for placement, join_units in concat_plan:
Expand Down Expand Up @@ -229,7 +238,7 @@ def concatenate_managers(

fastpath = blk.values.dtype == values.dtype
else:
values = _concatenate_join_units(join_units, concat_axis, copy=copy)
values = _concatenate_join_units(join_units, copy=copy)
fastpath = False

if fastpath:
Expand All @@ -242,6 +251,42 @@ def concatenate_managers(
return BlockManager(tuple(blocks), axes)


def _concat_managers_axis0(
mgrs_indexers, axes: list[Index], copy: bool
) -> BlockManager:
"""
concat_managers specialized to concat_axis=0, with reindexing already
having been done in _maybe_reindex_columns_na_proxy.
"""
had_reindexers = {
i: len(mgrs_indexers[i][1]) > 0 for i in range(len(mgrs_indexers))
}
mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)

mgrs = [x[0] for x in mgrs_indexers]

offset = 0
blocks = []
for i, mgr in enumerate(mgrs):
# If we already reindexed, then we definitely don't need another copy
made_copy = had_reindexers[i]

for blk in mgr.blocks:
if made_copy:
nb = blk.copy(deep=False)
elif copy:
nb = blk.copy()
else:
# by slicing instead of copy(deep=False), we get a new array
# object, see test_concat_copy
nb = blk.getitem_block(slice(None))
nb._mgr_locs = nb._mgr_locs.add(offset)
blocks.append(nb)

offset += len(mgr.items)
return BlockManager(tuple(blocks), axes)


def _maybe_reindex_columns_na_proxy(
axes: list[Index], mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]]
) -> list[tuple[BlockManager, dict[int, np.ndarray]]]:
Expand All @@ -252,25 +297,22 @@ def _maybe_reindex_columns_na_proxy(
Columns added in this reindexing have dtype=np.void, indicating they
should be ignored when choosing a column's final dtype.
"""
new_mgrs_indexers = []
new_mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]] = []

for mgr, indexers in mgrs_indexers:
# We only reindex for axis=0 (i.e. columns), as this can be done cheaply
if 0 in indexers:
new_mgr = mgr.reindex_indexer(
axes[0],
indexers[0],
axis=0,
# For axis=0 (i.e. columns) we use_na_proxy and only_slice, so this
# is a cheap reindexing.
for i, indexer in indexers.items():
mgr = mgr.reindex_indexer(
axes[i],
indexers[i],
axis=i,
copy=False,
only_slice=True,
only_slice=True, # only relevant for i==0
allow_dups=True,
use_na_proxy=True,
use_na_proxy=True, # only relevant for i==0
)
new_indexers = indexers.copy()
del new_indexers[0]
new_mgrs_indexers.append((new_mgr, new_indexers))
else:
new_mgrs_indexers.append((mgr, indexers))

new_mgrs_indexers.append((mgr, {}))
return new_mgrs_indexers


Expand All @@ -288,7 +330,9 @@ def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarra
plan : list of (BlockPlacement, JoinUnit) tuples

"""
# Calculate post-reindex shape , save for item axis which will be separate
assert len(indexers) == 0

# Calculate post-reindex shape, save for item axis which will be separate
# for each block anyway.
mgr_shape_list = list(mgr.shape)
for ax, indexer in indexers.items():
Expand Down Expand Up @@ -523,16 +567,10 @@ def get_reindexed_values(self, empty_dtype: DtypeObj, upcasted_na) -> ArrayLike:
return values


def _concatenate_join_units(
join_units: list[JoinUnit], concat_axis: AxisInt, copy: bool
) -> ArrayLike:
def _concatenate_join_units(join_units: list[JoinUnit], copy: bool) -> ArrayLike:
"""
Concatenate values from several join units along selected axis.
Concatenate values from several join units along axis=1.
"""
if concat_axis == 0 and len(join_units) > 1:
# Concatenating join units along ax0 is handled in _merge_blocks.
raise AssertionError("Concatenating join units along axis0")

empty_dtype = _get_empty_dtype(join_units)

has_none_blocks = any(unit.block.dtype.kind == "V" for unit in join_units)
Expand Down Expand Up @@ -573,7 +611,7 @@ def _concatenate_join_units(
concat_values = ensure_block_shape(concat_values, 2)

else:
concat_values = concat_compat(to_concat, axis=concat_axis)
concat_values = concat_compat(to_concat, axis=1)

return concat_values

Expand Down Expand Up @@ -701,28 +739,18 @@ def _trim_join_unit(join_unit: JoinUnit, length: int) -> JoinUnit:
return JoinUnit(block=extra_block, indexers=extra_indexers, shape=extra_shape)


def _combine_concat_plans(plans, concat_axis: AxisInt):
def _combine_concat_plans(plans):
"""
Combine multiple concatenation plans into one.

existing_plan is updated in-place.

We only get here with concat_axis == 1.
"""
if len(plans) == 1:
for p in plans[0]:
yield p[0], [p[1]]

elif concat_axis == 0:
offset = 0
for plan in plans:
last_plc = None

for plc, unit in plan:
yield plc.add(offset), [unit]
last_plc = plc

if last_plc is not None:
offset += last_plc.as_slice.stop

else:
# singleton list so we can modify it as a side-effect within _next_or_none
num_ended = [0]
Expand Down