Skip to content

compiletest: Fix deadline bugs in new executor #140031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/tools/compiletest/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ pub(crate) fn run_tests(config: &Config, tests: Vec<CollectedTest>) -> bool {
}

let completion = deadline_queue
.read_channel_while_checking_deadlines(&completion_rx, |_id, test| {
listener.test_timed_out(test);
})
.read_channel_while_checking_deadlines(
&completion_rx,
|id| running_tests.contains_key(&id),
|_id, test| listener.test_timed_out(test),
)
.expect("receive channel should never be closed early");

let RunningTest { test, join_handle } = running_tests.remove(&completion.id).unwrap();
Expand Down
58 changes: 40 additions & 18 deletions src/tools/compiletest/src/executor/deadline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@ impl<'a> DeadlineQueue<'a> {
Self { queue: VecDeque::with_capacity(capacity) }
}

fn now(&self) -> Instant {
Instant::now()
}

pub(crate) fn push(&mut self, id: TestId, test: &'a CollectedTest) {
let deadline = Instant::now() + Duration::from_secs(TEST_WARN_TIMEOUT_S);
let deadline = self.now() + Duration::from_secs(TEST_WARN_TIMEOUT_S);
if let Some(back) = self.queue.back() {
assert!(back.deadline <= deadline);
}
self.queue.push_back(DeadlineEntry { id, test, deadline });
}

/// Equivalent to `rx.read()`, except that if any test exceeds its deadline
/// Equivalent to `rx.recv()`, except that if a test exceeds its deadline
/// during the wait, the given callback will also be called for that test.
pub(crate) fn read_channel_while_checking_deadlines<T>(
&mut self,
rx: &mpsc::Receiver<T>,
is_running: impl Fn(TestId) -> bool,
mut on_deadline_passed: impl FnMut(TestId, &CollectedTest),
) -> Result<T, RecvError> {
loop {
Expand All @@ -39,18 +47,18 @@ impl<'a> DeadlineQueue<'a> {
// deadline, so do a normal receive.
return rx.recv();
};
let wait_duration = next_deadline.saturating_duration_since(Instant::now());
let next_deadline_timeout = next_deadline.saturating_duration_since(self.now());

let recv_result = rx.recv_timeout(next_deadline_timeout);
// Process deadlines after every receive attempt, regardless of
// outcome, so that we don't build up an unbounded backlog of stale
// entries due to a constant stream of tests finishing.
self.for_each_entry_past_deadline(&is_running, &mut on_deadline_passed);

let recv_result = rx.recv_timeout(wait_duration);
match recv_result {
Ok(value) => return Ok(value),
Err(RecvTimeoutError::Timeout) => {
// Notify the callback of tests that have exceeded their
// deadline, then loop and do annother channel read.
for DeadlineEntry { id, test, .. } in self.remove_tests_past_deadline() {
on_deadline_passed(id, test);
}
}
// Deadlines have already been processed, so loop and do another receive.
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => return Err(RecvError),
}
}
Expand All @@ -60,14 +68,28 @@ impl<'a> DeadlineQueue<'a> {
Some(self.queue.front()?.deadline)
}

fn remove_tests_past_deadline(&mut self) -> Vec<DeadlineEntry<'a>> {
let now = Instant::now();
let mut timed_out = vec![];
while let Some(deadline_entry) = pop_front_if(&mut self.queue, |entry| now < entry.deadline)
{
timed_out.push(deadline_entry);
fn for_each_entry_past_deadline(
&mut self,
is_running: impl Fn(TestId) -> bool,
mut on_deadline_passed: impl FnMut(TestId, &CollectedTest),
) {
let now = self.now();

// Clear out entries that are past their deadline, but only invoke the
// callback for tests that are still considered running.
while let Some(entry) = pop_front_if(&mut self.queue, |entry| entry.deadline <= now) {
if is_running(entry.id) {
on_deadline_passed(entry.id, entry.test);
}
}

// Also clear out any leading entries that are no longer running, even
// if their deadline hasn't been reached.
while let Some(_) = pop_front_if(&mut self.queue, |entry| !is_running(entry.id)) {}

if let Some(front) = self.queue.front() {
assert!(now < front.deadline);
}
timed_out
}
}

Expand Down
Loading