Skip to content

Commit fe03f33

Browse files
pzhan9facebook-github-bot
authored andcommitted
Introducer every-n-messages ReducePolicy
Summary: Publish to collect early feedback. The implementation details will change because the previous diffs in this stack might change. But the overall idea is the same: We only coalesce updates when the buffer size is bigger than the policy. Reviewed By: mariusae Differential Revision: D74691188 fbshipit-source-id: 11514551f8ec3bd286e5f89b7805560774987f20
1 parent 423c920 commit fe03f33

File tree

1 file changed

+71
-12
lines changed

1 file changed

+71
-12
lines changed

hyperactor/src/mailbox/mod.rs

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,30 @@ impl cap::sealed::CanOpenPort for Mailbox {
12771277
}
12781278
}
12791279

1280+
#[derive(Default)]
1281+
struct SplitPortBuffer(Vec<Serialized>);
1282+
1283+
impl SplitPortBuffer {
1284+
/// Push a new item to the buffer, and optionally return any items that should
1285+
/// be flushed.
1286+
fn push(&mut self, serialized: Serialized) -> Option<Vec<Serialized>> {
1287+
static HYPERACTOR_SPLIT_MAX_BUFFER_SIZE: OnceLock<usize> = OnceLock::new();
1288+
let limit = HYPERACTOR_SPLIT_MAX_BUFFER_SIZE.get_or_init(|| {
1289+
std::env::var("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE")
1290+
.ok()
1291+
.and_then(|val| val.parse::<usize>().ok())
1292+
.unwrap_or(5)
1293+
});
1294+
1295+
self.0.push(serialized);
1296+
if &self.0.len() >= limit {
1297+
Some(std::mem::take(&mut self.0))
1298+
} else {
1299+
None
1300+
}
1301+
}
1302+
}
1303+
12801304
impl cap::sealed::CanSplitPort for Mailbox {
12811305
fn split(&self, port_id: PortId, reducer_typehash: Option<u64>) -> PortId {
12821306
fn post(mailbox: &Mailbox, port_id: PortId, msg: Serialized) {
@@ -1302,23 +1326,22 @@ impl cap::sealed::CanSplitPort for Mailbox {
13021326
Ok(())
13031327
}),
13041328
Some(r) => {
1305-
let buffer = Arc::new(Mutex::new(Vec::<Serialized>::new()));
1329+
let buffer = Mutex::new(SplitPortBuffer::default());
13061330
Box::new(move |serialized: Serialized| {
13071331
// Hold the lock until messages are sent. This is to avoid another
13081332
// invocation of this method trying to send message concurrently and
13091333
// cause messages delivered out of order.
13101334
let mut buf = buffer.lock().unwrap();
1311-
buf.push(serialized);
1312-
// TODO(pzhang) add policy and use this buffer
1313-
let buffered = std::mem::take(&mut *buf);
1314-
let reduced = r.reduce_updates(buffered).map_err(|(e, mut b)| {
1315-
(
1316-
b.pop()
1317-
.expect("there should be at least one update from buffer"),
1318-
e,
1319-
)
1320-
})?;
1321-
post(&mailbox, port_id.clone(), reduced);
1335+
if let Some(buffered) = buf.push(serialized) {
1336+
let reduced = r.reduce_updates(buffered).map_err(|(e, mut b)| {
1337+
(
1338+
b.pop()
1339+
.expect("there should be at least one update from buffer"),
1340+
e,
1341+
)
1342+
})?;
1343+
post(&mailbox, port_id.clone(), reduced);
1344+
}
13221345
Ok(())
13231346
})
13241347
}
@@ -2208,6 +2231,7 @@ mod tests {
22082231
use std::time::Duration;
22092232

22102233
use timed_test::async_timed_test;
2234+
use tracing::Level;
22112235

22122236
use super::*;
22132237
use crate::Actor;
@@ -2224,6 +2248,7 @@ mod tests {
22242248
use crate::proc::Proc;
22252249
use crate::reference::ProcId;
22262250
use crate::reference::WorldId;
2251+
use crate::test_utils::tracing::set_tracing_env_filter;
22272252

22282253
#[test]
22292254
fn test_error() {
@@ -2937,6 +2962,9 @@ mod tests {
29372962

29382963
#[async_timed_test(timeout_secs = 30)]
29392964
async fn test_split_port_id_sum_reducer() {
2965+
std::env::set_var("HYPERACTOR_SPLIT_MAX_BUFFER_SIZE", "1");
2966+
set_tracing_env_filter(Level::INFO);
2967+
29402968
let sum_accumulator = accum::sum::<u64>();
29412969
let reducer_typehash = sum_accumulator.reducer_typehash();
29422970
let Setup {
@@ -2965,4 +2993,35 @@ mod tests {
29652993
let msg = receiver.try_recv().unwrap();
29662994
assert_eq!(msg, None);
29672995
}
2996+
2997+
#[async_timed_test(timeout_secs = 30)]
2998+
async fn test_split_port_id_every_n_messages() {
2999+
set_tracing_env_filter(Level::INFO);
3000+
let actor = Mailbox::new(
3001+
id!(test[0].actor),
3002+
BoxedMailboxSender::new(PanickingMailboxSender),
3003+
);
3004+
let (port_handle, mut receiver) = actor.open_port::<u64>();
3005+
let port_id = port_handle.bind().port_id().clone();
3006+
// Split it
3007+
let reducer_typehash = accum::sum::<u64>().reducer_typehash();
3008+
let split_port_id = port_id.split(&actor, Some(reducer_typehash));
3009+
3010+
// Send 9 messages.
3011+
for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {
3012+
post(&actor, split_port_id.clone(), msg);
3013+
}
3014+
// The first 5 should be batched and reduced once due
3015+
// to every_n_msgs = 5.
3016+
let messages = wait_for(&mut receiver, 1, Duration::from_secs(2))
3017+
.await
3018+
.unwrap();
3019+
assert_eq!(messages, vec![15]);
3020+
3021+
// the last message unfortranately will never come because they do not
3022+
// reach batch size.
3023+
RealClock.sleep(Duration::from_secs(2)).await;
3024+
let msg = receiver.try_recv().unwrap();
3025+
assert_eq!(msg, None);
3026+
}
29683027
}

0 commit comments

Comments
 (0)