Skip to content

Commit 81d7be9

Browse files
authored
Fix nasty compaction threshold bug (#6939)
The actual bug is this one-liner, everything else is just niceties that come on top (see commit per commit): ![image](https://github.com/user-attachments/assets/f3d8a1d0-4eab-41c3-bdc4-2f2d830469f7) The chunk happens to be wrapped in an `Arc` at this point in time, and `Arc`s also implement `SizeBytes`... by always returning zero (considered amortized). Also switched the default config to 4MiB per chunk instead of 8MiB, which has proven much nicer after an afternoon of empirical testing.
1 parent 6fa323d commit 81d7be9

File tree

3 files changed

+98
-50
lines changed

3 files changed

+98
-50
lines changed

crates/store/re_chunk/src/merge.rs

+81-45
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ impl Chunk {
2020
///
2121
/// [concatenable]: [`Chunk::concatenable`]
2222
pub fn concatenated(&self, rhs: &Self) -> ChunkResult<Self> {
23+
re_tracing::profile_function!(format!(
24+
"lhs={} rhs={}",
25+
re_format::format_uint(self.num_rows()),
26+
re_format::format_uint(rhs.num_rows())
27+
));
28+
2329
let cl = self;
2430
let cr = rhs;
2531

@@ -38,56 +44,76 @@ impl Chunk {
3844

3945
let is_sorted = cl.is_sorted && cr.is_sorted && cl1 <= cr0;
4046

41-
let row_ids = arrow2::compute::concatenate::concatenate(&[&cl.row_ids, &cr.row_ids])?;
42-
#[allow(clippy::unwrap_used)] // concatenating 2 RowId arrays must yield another RowId array
43-
let row_ids = row_ids
44-
.as_any()
45-
.downcast_ref::<ArrowStructArray>()
46-
.unwrap()
47-
.clone();
47+
let row_ids = {
48+
re_tracing::profile_scope!("row_ids");
49+
50+
let row_ids = arrow2::compute::concatenate::concatenate(&[&cl.row_ids, &cr.row_ids])?;
51+
#[allow(clippy::unwrap_used)]
52+
// concatenating 2 RowId arrays must yield another RowId array
53+
row_ids
54+
.as_any()
55+
.downcast_ref::<ArrowStructArray>()
56+
.unwrap()
57+
.clone()
58+
};
4859

4960
// NOTE: We know they are the same set, and they are in a btree => we can zip them.
50-
let timelines = izip!(self.timelines.iter(), rhs.timelines.iter())
51-
.filter_map(
52-
|((lhs_timeline, lhs_time_chunk), (rhs_timeline, rhs_time_chunk))| {
53-
debug_assert_eq!(lhs_timeline, rhs_timeline);
54-
lhs_time_chunk
55-
.concatenated(rhs_time_chunk)
56-
.map(|time_chunk| (*lhs_timeline, time_chunk))
57-
},
58-
)
59-
.collect();
61+
let timelines = {
62+
re_tracing::profile_scope!("timelines");
63+
izip!(self.timelines.iter(), rhs.timelines.iter())
64+
.filter_map(
65+
|((lhs_timeline, lhs_time_chunk), (rhs_timeline, rhs_time_chunk))| {
66+
debug_assert_eq!(lhs_timeline, rhs_timeline);
67+
lhs_time_chunk
68+
.concatenated(rhs_time_chunk)
69+
.map(|time_chunk| (*lhs_timeline, time_chunk))
70+
},
71+
)
72+
.collect()
73+
};
6074

6175
// First pass: concat right onto left.
62-
let mut components: BTreeMap<_, _> = self
63-
.components
64-
.iter()
65-
.filter_map(|(component_name, lhs_list_array)| {
66-
if let Some(rhs_list_array) = rhs.components.get(component_name) {
67-
let list_array = arrow2::compute::concatenate::concatenate(&[
68-
lhs_list_array,
69-
rhs_list_array,
70-
])
71-
.ok()?;
72-
let list_array = list_array
73-
.as_any()
74-
.downcast_ref::<ArrowListArray<i32>>()?
75-
.clone();
76-
Some((*component_name, list_array))
77-
} else {
78-
Some((
79-
*component_name,
80-
crate::util::pad_list_array_back(
76+
let mut components: BTreeMap<_, _> = {
77+
re_tracing::profile_scope!("components (r2l)");
78+
self.components
79+
.iter()
80+
.filter_map(|(component_name, lhs_list_array)| {
81+
re_tracing::profile_scope!(format!("{}", component_name.as_str()));
82+
if let Some(rhs_list_array) = rhs.components.get(component_name) {
83+
re_tracing::profile_scope!(format!(
84+
"concat (lhs={} rhs={})",
85+
re_format::format_uint(lhs_list_array.values().len()),
86+
re_format::format_uint(rhs_list_array.values().len()),
87+
));
88+
89+
let list_array = arrow2::compute::concatenate::concatenate(&[
8190
lhs_list_array,
82-
self.num_rows() + rhs.num_rows(),
83-
),
84-
))
85-
}
86-
})
87-
.collect();
91+
rhs_list_array,
92+
])
93+
.ok()?;
94+
let list_array = list_array
95+
.as_any()
96+
.downcast_ref::<ArrowListArray<i32>>()?
97+
.clone();
98+
99+
Some((*component_name, list_array))
100+
} else {
101+
re_tracing::profile_scope!("pad");
102+
Some((
103+
*component_name,
104+
crate::util::pad_list_array_back(
105+
lhs_list_array,
106+
self.num_rows() + rhs.num_rows(),
107+
),
108+
))
109+
}
110+
})
111+
.collect()
112+
};
88113

89114
// Second pass: concat left onto right, where necessary.
90-
components.extend(
115+
components.extend({
116+
re_tracing::profile_scope!("components (l2r)");
91117
rhs.components
92118
.iter()
93119
.filter_map(|(component_name, rhs_list_array)| {
@@ -96,7 +122,15 @@ impl Chunk {
96122
return None;
97123
}
98124

125+
re_tracing::profile_scope!(component_name.as_str());
126+
99127
if let Some(lhs_list_array) = self.components.get(component_name) {
128+
re_tracing::profile_scope!(format!(
129+
"concat (lhs={} rhs={})",
130+
re_format::format_uint(lhs_list_array.values().len()),
131+
re_format::format_uint(rhs_list_array.values().len()),
132+
));
133+
100134
let list_array = arrow2::compute::concatenate::concatenate(&[
101135
lhs_list_array,
102136
rhs_list_array,
@@ -106,8 +140,10 @@ impl Chunk {
106140
.as_any()
107141
.downcast_ref::<ArrowListArray<i32>>()?
108142
.clone();
143+
109144
Some((*component_name, list_array))
110145
} else {
146+
re_tracing::profile_scope!("pad");
111147
Some((
112148
*component_name,
113149
crate::util::pad_list_array_front(
@@ -117,8 +153,8 @@ impl Chunk {
117153
))
118154
}
119155
})
120-
.collect_vec(),
121-
);
156+
.collect_vec()
157+
});
122158

123159
let chunk = Self {
124160
id: ChunkId::new(),

crates/store/re_chunk_store/src/store.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,15 @@ impl ChunkStoreConfig {
9595
/// Default configuration, applicable to most use cases, according to empirical testing.
9696
pub const DEFAULT: Self = Self {
9797
enable_changelog: true,
98-
chunk_max_bytes: 8 * 1024 * 1024,
98+
99+
// Empirical testing shows that 4MiB is good middle-ground, big tensors and buffers can
100+
// become a bit too costly to concatenate beyond that.
101+
chunk_max_bytes: 4 * 1024 * 1024,
102+
103+
// Empirical testing shows that 1024 is the threshold after which we really start to get
104+
// dimishing returns space-wise.
99105
chunk_max_rows: 1024,
106+
100107
chunk_max_rows_if_unsorted: 256,
101108
};
102109

crates/store/re_chunk_store/src/writes.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use arrow2::array::{Array as _, ListArray as ArrowListArray};
55
use itertools::Itertools as _;
66

77
use re_chunk::{Chunk, EntityPath, RowId};
8-
use re_types_core::SizeBytes as _;
8+
use re_types_core::SizeBytes;
99

1010
use crate::{
1111
store::ChunkIdSetPerTime, ChunkStore, ChunkStoreChunkStats, ChunkStoreConfig, ChunkStoreDiff,
@@ -113,12 +113,17 @@ impl ChunkStore {
113113
let elected_rowid_min = elected_chunk.row_id_range().map(|(min, _)| min);
114114

115115
let mut compacted = if elected_rowid_min < chunk_rowid_min {
116+
re_tracing::profile_scope!("concat");
116117
elected_chunk.concatenated(chunk)?
117118
} else {
119+
re_tracing::profile_scope!("concat");
118120
chunk.concatenated(elected_chunk)?
119121
};
120122

121-
compacted.sort_if_unsorted();
123+
{
124+
re_tracing::profile_scope!("sort");
125+
compacted.sort_if_unsorted();
126+
}
122127

123128
re_log::trace!(
124129
"compacted {} ({} rows) and {} ({} rows) together, resulting in {} ({} rows)",
@@ -294,8 +299,8 @@ impl ChunkStore {
294299
return false;
295300
}
296301

297-
let total_bytes =
298-
chunk.total_size_bytes() + candidate.total_size_bytes();
302+
let total_bytes = <Chunk as SizeBytes>::total_size_bytes(chunk)
303+
+ <Chunk as SizeBytes>::total_size_bytes(candidate);
299304
let is_below_bytes_threshold = total_bytes <= chunk_max_bytes;
300305

301306
let total_rows = (chunk.num_rows() + candidate.num_rows()) as u64;

0 commit comments

Comments
 (0)