Skip to content

Commit b661313

Browse files
authored
PERF/REF: concat (#52672)
* PERF/REF: concat * Fix on 32bit
1 parent ac15528 commit b661313

File tree

3 files changed

+111
-133
lines changed

3 files changed

+111
-133
lines changed

pandas/_libs/internals.pyi

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ from pandas.core.arrays._mixins import NDArrayBackedExtensionArray
1919
from pandas.core.internals.blocks import Block as B
2020

2121
def slice_len(slc: slice, objlen: int = ...) -> int: ...
22+
def get_concat_blkno_indexers(
23+
blknos_list: list[npt.NDArray[np.intp]],
24+
) -> list[tuple[npt.NDArray[np.intp], BlockPlacement]]: ...
2225
def get_blkno_indexers(
2326
blknos: np.ndarray, # int64_t[:]
2427
group: bool = ...,

pandas/_libs/internals.pyx

+58
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,64 @@ cdef slice indexer_as_slice(intp_t[:] vals):
438438
return slice(start, stop, d)
439439

440440

441+
@cython.boundscheck(False)
442+
@cython.wraparound(False)
443+
def get_concat_blkno_indexers(list blknos_list not None):
444+
"""
445+
Given the mgr.blknos for a list of mgrs, break range(len(mgrs[0])) into
446+
slices such that within each slice blknos_list[i] is constant for each i.
447+
448+
This is a multi-Manager analogue to get_blkno_indexers with group=False.
449+
"""
450+
# we have the blknos for each of several BlockManagers
451+
# list[np.ndarray[int64_t]]
452+
cdef:
453+
Py_ssize_t i, j, k, start, ncols
454+
cnp.npy_intp n_mgrs
455+
ndarray[intp_t] blknos, cur_blknos, run_blknos
456+
BlockPlacement bp
457+
list result = []
458+
459+
n_mgrs = len(blknos_list)
460+
cur_blknos = cnp.PyArray_EMPTY(1, &n_mgrs, cnp.NPY_INTP, 0)
461+
462+
blknos = blknos_list[0]
463+
ncols = len(blknos)
464+
if ncols == 0:
465+
return []
466+
467+
start = 0
468+
for i in range(n_mgrs):
469+
blknos = blknos_list[i]
470+
cur_blknos[i] = blknos[0]
471+
assert len(blknos) == ncols
472+
473+
for i in range(1, ncols):
474+
# For each column, we check if the Block it is part of (i.e. blknos[i])
475+
# is the same the previous column (i.e. blknos[i-1]) *and* if this is
476+
# the case for each blknos in blknos_list. If not, we start a new "run".
477+
for k in range(n_mgrs):
478+
blknos = blknos_list[k]
479+
# assert cur_blknos[k] == blknos[i - 1]
480+
481+
if blknos[i] != blknos[i - 1]:
482+
bp = BlockPlacement(slice(start, i))
483+
run_blknos = cnp.PyArray_Copy(cur_blknos)
484+
result.append((run_blknos, bp))
485+
486+
start = i
487+
for j in range(n_mgrs):
488+
blknos = blknos_list[j]
489+
cur_blknos[j] = blknos[i]
490+
break # break out of `for k in range(n_mgrs)` loop
491+
492+
if start != ncols:
493+
bp = BlockPlacement(slice(start, ncols))
494+
run_blknos = cnp.PyArray_Copy(cur_blknos)
495+
result.append((run_blknos, bp))
496+
return result
497+
498+
441499
@cython.boundscheck(False)
442500
@cython.wraparound(False)
443501
def get_blkno_indexers(

pandas/core/internals/concat.py

+50-133
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,8 @@ def concatenate_managers(
202202

203203
mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)
204204

205-
concat_plans = [_get_mgr_concatenation_plan(mgr) for mgr, _ in mgrs_indexers]
206-
concat_plan = _combine_concat_plans(concat_plans)
205+
concat_plan = _get_combined_plan([mgr for mgr, _ in mgrs_indexers])
206+
207207
blocks = []
208208

209209
for placement, join_units in concat_plan:
@@ -224,7 +224,11 @@ def concatenate_managers(
224224
# _is_uniform_join_units ensures a single dtype, so
225225
# we can use np.concatenate, which is more performant
226226
# than concat_compat
227-
values = np.concatenate(vals, axis=1)
227+
# error: Argument 1 to "concatenate" has incompatible type
228+
# "List[Union[ndarray[Any, Any], ExtensionArray]]";
229+
# expected "Union[_SupportsArray[dtype[Any]],
230+
# _NestedSequence[_SupportsArray[dtype[Any]]]]"
231+
values = np.concatenate(vals, axis=1) # type: ignore[arg-type]
228232
elif is_1d_only_ea_dtype(blk.dtype):
229233
# TODO(EA2D): special-casing not needed with 2D EAs
230234
values = concat_compat(vals, axis=1, ea_compat_axis=True)
@@ -316,70 +320,57 @@ def _maybe_reindex_columns_na_proxy(
316320
return new_mgrs_indexers
317321

318322

319-
def _get_mgr_concatenation_plan(
320-
mgr: BlockManager,
321-
) -> list[tuple[BlockPlacement, JoinUnit]]:
322-
"""
323-
Construct concatenation plan for given block manager.
323+
def _get_combined_plan(
324+
mgrs: list[BlockManager],
325+
) -> list[tuple[BlockPlacement, list[JoinUnit]]]:
326+
plan = []
324327

325-
Parameters
326-
----------
327-
mgr : BlockManager
328+
max_len = mgrs[0].shape[0]
328329

329-
Returns
330-
-------
331-
plan : list of (BlockPlacement, JoinUnit) tuples
332-
"""
333-
if mgr.is_single_block:
334-
blk = mgr.blocks[0]
335-
return [(blk.mgr_locs, JoinUnit(blk))]
336-
337-
blknos = mgr.blknos
338-
blklocs = mgr.blklocs
339-
340-
plan: list[tuple[BlockPlacement, JoinUnit]] = []
341-
for blkno, placements in libinternals.get_blkno_placements(blknos, group=False):
342-
assert placements.is_slice_like
343-
assert blkno != -1
344-
345-
blk = mgr.blocks[blkno]
346-
ax0_blk_indexer = blklocs[placements.indexer]
347-
348-
unit_no_ax0_reindexing = (
349-
len(placements) == len(blk.mgr_locs)
350-
and
351-
# Fastpath detection of join unit not
352-
# needing to reindex its block: no ax0
353-
# reindexing took place and block
354-
# placement was sequential before.
355-
(
356-
(blk.mgr_locs.is_slice_like and blk.mgr_locs.as_slice.step == 1)
357-
or
358-
# Slow-ish detection: all indexer locs
359-
# are sequential (and length match is
360-
# checked above).
361-
# TODO: check unnecessary? unique_deltas?
362-
# can we shortcut other is_slice_like cases?
363-
(np.diff(ax0_blk_indexer) == 1).all()
364-
)
365-
)
330+
blknos_list = [mgr.blknos for mgr in mgrs]
331+
pairs = libinternals.get_concat_blkno_indexers(blknos_list)
332+
for ind, (blknos, bp) in enumerate(pairs):
333+
# assert bp.is_slice_like
334+
# assert len(bp) > 0
366335

367-
# Omit indexer if no item reindexing is required.
368-
if not unit_no_ax0_reindexing:
369-
# TODO: better max_len?
370-
max_len = max(len(ax0_blk_indexer), ax0_blk_indexer.max() + 1)
371-
slc = lib.maybe_indices_to_slice(ax0_blk_indexer, max_len)
372-
# TODO: in all extant test cases 2023-04-08 we have a slice here.
373-
# Will this always be the case?
374-
blk = blk.getitem_block(slc)
336+
units_for_bp = []
337+
for k, mgr in enumerate(mgrs):
338+
blkno = blknos[k]
375339

376-
unit = JoinUnit(blk)
340+
nb = _get_block_for_concat_plan(mgr, bp, blkno, max_len=max_len)
341+
unit = JoinUnit(nb)
342+
units_for_bp.append(unit)
377343

378-
plan.append((placements, unit))
344+
plan.append((bp, units_for_bp))
379345

380346
return plan
381347

382348

349+
def _get_block_for_concat_plan(
350+
mgr: BlockManager, bp: BlockPlacement, blkno: int, *, max_len: int
351+
) -> Block:
352+
blk = mgr.blocks[blkno]
353+
# Assertions disabled for performance:
354+
# assert bp.is_slice_like
355+
# assert blkno != -1
356+
# assert (mgr.blknos[bp] == blkno).all()
357+
358+
if len(bp) == len(blk.mgr_locs) and (
359+
blk.mgr_locs.is_slice_like and blk.mgr_locs.as_slice.step == 1
360+
):
361+
nb = blk
362+
else:
363+
ax0_blk_indexer = mgr.blklocs[bp.indexer]
364+
365+
slc = lib.maybe_indices_to_slice(ax0_blk_indexer, max_len)
366+
# TODO: in all extant test cases 2023-04-08 we have a slice here.
367+
# Will this always be the case?
368+
nb = blk.getitem_block(slc)
369+
370+
# assert nb.shape == (len(bp), mgr.shape[1])
371+
return nb
372+
373+
383374
class JoinUnit:
384375
def __init__(self, block: Block) -> None:
385376
self.block = block
@@ -575,7 +566,7 @@ def _get_empty_dtype(join_units: Sequence[JoinUnit]) -> DtypeObj:
575566
blk = join_units[0].block
576567
return blk.dtype
577568

578-
if _is_uniform_reindex(join_units):
569+
if lib.dtypes_all_equal([ju.block.dtype for ju in join_units]):
579570
empty_dtype = join_units[0].block.dtype
580571
return empty_dtype
581572

@@ -623,77 +614,3 @@ def _is_uniform_join_units(join_units: list[JoinUnit]) -> bool:
623614
# only use this path when there is something to concatenate
624615
len(join_units) > 1
625616
)
626-
627-
628-
def _is_uniform_reindex(join_units) -> bool:
629-
return (
630-
# TODO: should this be ju.block._can_hold_na?
631-
all(ju.block.is_extension for ju in join_units)
632-
and lib.dtypes_all_equal([ju.block.dtype for ju in join_units])
633-
)
634-
635-
636-
def _trim_join_unit(join_unit: JoinUnit, length: int) -> JoinUnit:
637-
"""
638-
Reduce join_unit's shape along item axis to length.
639-
640-
Extra items that didn't fit are returned as a separate block.
641-
"""
642-
643-
extra_block = join_unit.block.getitem_block(slice(length, None))
644-
join_unit.block = join_unit.block.getitem_block(slice(length))
645-
646-
return JoinUnit(block=extra_block)
647-
648-
649-
def _combine_concat_plans(plans):
650-
"""
651-
Combine multiple concatenation plans into one.
652-
653-
existing_plan is updated in-place.
654-
655-
We only get here with concat_axis == 1.
656-
"""
657-
if len(plans) == 1:
658-
for p in plans[0]:
659-
yield p[0], [p[1]]
660-
661-
else:
662-
# singleton list so we can modify it as a side-effect within _next_or_none
663-
num_ended = [0]
664-
665-
def _next_or_none(seq):
666-
retval = next(seq, None)
667-
if retval is None:
668-
num_ended[0] += 1
669-
return retval
670-
671-
plans = list(map(iter, plans))
672-
next_items = list(map(_next_or_none, plans))
673-
674-
while num_ended[0] != len(next_items):
675-
if num_ended[0] > 0:
676-
raise ValueError("Plan shapes are not aligned")
677-
678-
placements, units = zip(*next_items)
679-
680-
lengths = list(map(len, placements))
681-
min_len, max_len = min(lengths), max(lengths)
682-
683-
if min_len == max_len:
684-
yield placements[0], units
685-
next_items[:] = map(_next_or_none, plans)
686-
else:
687-
yielded_placement = None
688-
yielded_units = [None] * len(next_items)
689-
for i, (plc, unit) in enumerate(next_items):
690-
yielded_units[i] = unit
691-
if len(plc) > min_len:
692-
# _trim_join_unit updates unit in place, so only
693-
# placement needs to be sliced to skip min_len.
694-
next_items[i] = (plc[min_len:], _trim_join_unit(unit, min_len))
695-
else:
696-
yielded_placement = plc
697-
next_items[i] = _next_or_none(plans[i])
698-
699-
yield yielded_placement, yielded_units

0 commit comments

Comments
 (0)