@@ -21,16 +21,24 @@ impl<'a> DeadlineQueue<'a> {
21
21
Self { queue : VecDeque :: with_capacity ( capacity) }
22
22
}
23
23
24
+ fn now ( & self ) -> Instant {
25
+ Instant :: now ( )
26
+ }
27
+
24
28
pub ( crate ) fn push ( & mut self , id : TestId , test : & ' a CollectedTest ) {
25
- let deadline = Instant :: now ( ) + Duration :: from_secs ( TEST_WARN_TIMEOUT_S ) ;
29
+ let deadline = self . now ( ) + Duration :: from_secs ( TEST_WARN_TIMEOUT_S ) ;
30
+ if let Some ( back) = self . queue . back ( ) {
31
+ assert ! ( back. deadline <= deadline) ;
32
+ }
26
33
self . queue . push_back ( DeadlineEntry { id, test, deadline } ) ;
27
34
}
28
35
29
- /// Equivalent to `rx.read ()`, except that if any test exceeds its deadline
36
+ /// Equivalent to `rx.recv ()`, except that if a test exceeds its deadline
30
37
/// during the wait, the given callback will also be called for that test.
31
38
pub ( crate ) fn read_channel_while_checking_deadlines < T > (
32
39
& mut self ,
33
40
rx : & mpsc:: Receiver < T > ,
41
+ is_running : impl Fn ( TestId ) -> bool ,
34
42
mut on_deadline_passed : impl FnMut ( TestId , & CollectedTest ) ,
35
43
) -> Result < T , RecvError > {
36
44
loop {
@@ -39,18 +47,18 @@ impl<'a> DeadlineQueue<'a> {
39
47
// deadline, so do a normal receive.
40
48
return rx. recv ( ) ;
41
49
} ;
42
- let wait_duration = next_deadline. saturating_duration_since ( Instant :: now ( ) ) ;
50
+ let next_deadline_timeout = next_deadline. saturating_duration_since ( self . now ( ) ) ;
51
+
52
+ let recv_result = rx. recv_timeout ( next_deadline_timeout) ;
53
+ // Process deadlines after every receive attempt, regardless of
54
+ // outcome, so that we don't build up an unbounded backlog of stale
55
+ // entries due to a constant stream of tests finishing.
56
+ self . for_each_entry_past_deadline ( & is_running, & mut on_deadline_passed) ;
43
57
44
- let recv_result = rx. recv_timeout ( wait_duration) ;
45
58
match recv_result {
46
59
Ok ( value) => return Ok ( value) ,
47
- Err ( RecvTimeoutError :: Timeout ) => {
48
- // Notify the callback of tests that have exceeded their
49
- // deadline, then loop and do annother channel read.
50
- for DeadlineEntry { id, test, .. } in self . remove_tests_past_deadline ( ) {
51
- on_deadline_passed ( id, test) ;
52
- }
53
- }
60
+ // Deadlines have already been processed, so loop and do another receive.
61
+ Err ( RecvTimeoutError :: Timeout ) => { }
54
62
Err ( RecvTimeoutError :: Disconnected ) => return Err ( RecvError ) ,
55
63
}
56
64
}
@@ -60,14 +68,28 @@ impl<'a> DeadlineQueue<'a> {
60
68
Some ( self . queue . front ( ) ?. deadline )
61
69
}
62
70
63
- fn remove_tests_past_deadline ( & mut self ) -> Vec < DeadlineEntry < ' a > > {
64
- let now = Instant :: now ( ) ;
65
- let mut timed_out = vec ! [ ] ;
66
- while let Some ( deadline_entry) = pop_front_if ( & mut self . queue , |entry| now < entry. deadline )
67
- {
68
- timed_out. push ( deadline_entry) ;
71
+ fn for_each_entry_past_deadline (
72
+ & mut self ,
73
+ is_running : impl Fn ( TestId ) -> bool ,
74
+ mut on_deadline_passed : impl FnMut ( TestId , & CollectedTest ) ,
75
+ ) {
76
+ let now = self . now ( ) ;
77
+
78
+ // Clear out entries that are past their deadline, but only invoke the
79
+ // callback for tests that are still considered running.
80
+ while let Some ( entry) = pop_front_if ( & mut self . queue , |entry| entry. deadline <= now) {
81
+ if is_running ( entry. id ) {
82
+ on_deadline_passed ( entry. id , entry. test ) ;
83
+ }
84
+ }
85
+
86
+ // Also clear out any leading entries that are no longer running, even
87
+ // if their deadline hasn't been reached.
88
+ while let Some ( _) = pop_front_if ( & mut self . queue , |entry| !is_running ( entry. id ) ) { }
89
+
90
+ if let Some ( front) = self . queue . front ( ) {
91
+ assert ! ( now < front. deadline) ;
69
92
}
70
- timed_out
71
93
}
72
94
}
73
95
0 commit comments