@@ -12,6 +12,7 @@ use prelude::v1::*;
12
12
13
13
use sync:: atomic:: { AtomicUsize , Ordering , ATOMIC_USIZE_INIT } ;
14
14
use sync:: poison:: { self , LockResult } ;
15
+ use sys:: time:: SteadyTime ;
15
16
use sys_common:: condvar as sys;
16
17
use sys_common:: mutex as sys_mutex;
17
18
use time:: Duration ;
@@ -153,20 +154,34 @@ impl Condvar {
153
154
///
154
155
/// Like `wait`, the lock specified will be re-acquired when this function
155
156
/// returns, regardless of whether the timeout elapsed or not.
156
- // Note that this method is *not* public, and this is quite intentional
157
- // because we're not quite sure about the semantics of relative vs absolute
158
- // durations or how the timing guarantees play into what the system APIs
159
- // provide. There are also additional concerns about the unix-specific
160
- // implementation which may need to be addressed.
161
- #[ allow( dead_code) ]
162
- fn wait_timeout < ' a , T > ( & self , guard : MutexGuard < ' a , T > , dur : Duration )
157
+ #[ unstable]
158
+ pub fn wait_timeout < ' a , T > ( & self , guard : MutexGuard < ' a , T > , dur : Duration )
163
159
-> LockResult < ( MutexGuard < ' a , T > , bool ) > {
164
160
unsafe {
165
161
let me: & ' static Condvar = & * ( self as * const _ ) ;
166
162
me. inner . wait_timeout ( guard, dur)
167
163
}
168
164
}
169
165
166
+ /// Wait on this condition variable for a notification, timing out after a
167
+ /// specified duration.
168
+ ///
169
+ /// The semantics of this function are equivalent to `wait_timeout` except
170
+ /// that the implementation will repeatedly wait while the duration has not
171
+ /// passed and the provided function returns `false`.
172
+ #[ unstable]
173
+ pub fn wait_timeout_with < ' a , T , F > ( & self ,
174
+ guard : MutexGuard < ' a , T > ,
175
+ dur : Duration ,
176
+ f : F )
177
+ -> LockResult < ( MutexGuard < ' a , T > , bool ) >
178
+ where F : FnMut ( LockResult < & mut T > ) -> bool {
179
+ unsafe {
180
+ let me: & ' static Condvar = & * ( self as * const _ ) ;
181
+ me. inner . wait_timeout_with ( guard, dur, f)
182
+ }
183
+ }
184
+
170
185
/// Wake up one blocked thread on this condvar.
171
186
///
172
187
/// If there is a blocked thread on this condition variable, then it will
@@ -220,9 +235,9 @@ impl StaticCondvar {
220
235
/// specified duration.
221
236
///
222
237
/// See `Condvar::wait_timeout`.
223
- #[ allow ( dead_code ) ] // may want to stabilize this later, see wait_timeout above
224
- fn wait_timeout < ' a , T > ( & ' static self , guard : MutexGuard < ' a , T > , dur : Duration )
225
- -> LockResult < ( MutexGuard < ' a , T > , bool ) > {
238
+ #[ unstable = " may be merged with Condvar in the future" ]
239
+ pub fn wait_timeout < ' a , T > ( & ' static self , guard : MutexGuard < ' a , T > , dur : Duration )
240
+ -> LockResult < ( MutexGuard < ' a , T > , bool ) > {
226
241
let ( poisoned, success) = unsafe {
227
242
let lock = mutex:: guard_lock ( & guard) ;
228
243
self . verify ( lock) ;
@@ -236,6 +251,50 @@ impl StaticCondvar {
236
251
}
237
252
}
238
253
254
+ /// Wait on this condition variable for a notification, timing out after a
255
+ /// specified duration.
256
+ ///
257
+ /// The implementation will repeatedly wait while the duration has not
258
+ /// passed and the function returns `false`.
259
+ ///
260
+ /// See `Condvar::wait_timeout_with`.
261
+ #[ unstable = "may be merged with Condvar in the future" ]
262
+ pub fn wait_timeout_with < ' a , T , F > ( & ' static self ,
263
+ guard : MutexGuard < ' a , T > ,
264
+ dur : Duration ,
265
+ mut f : F )
266
+ -> LockResult < ( MutexGuard < ' a , T > , bool ) >
267
+ where F : FnMut ( LockResult < & mut T > ) -> bool {
268
+ // This could be made more efficient by pushing the implementation into sys::condvar
269
+ let start = SteadyTime :: now ( ) ;
270
+ let mut guard_result: LockResult < MutexGuard < ' a , T > > = Ok ( guard) ;
271
+ while !f ( guard_result
272
+ . as_mut ( )
273
+ . map ( |g| & mut * * g)
274
+ . map_err ( |e| poison:: new_poison_error ( & mut * * e. get_mut ( ) ) ) ) {
275
+ let now = SteadyTime :: now ( ) ;
276
+ let consumed = & now - & start;
277
+ let guard = guard_result. unwrap_or_else ( |e| e. into_inner ( ) ) ;
278
+ let ( new_guard_result, no_timeout) = match self . wait_timeout ( guard, dur - consumed) {
279
+ Ok ( ( new_guard, no_timeout) ) => ( Ok ( new_guard) , no_timeout) ,
280
+ Err ( err) => {
281
+ let ( new_guard, no_timeout) = err. into_inner ( ) ;
282
+ ( Err ( poison:: new_poison_error ( new_guard) ) , no_timeout)
283
+ }
284
+ } ;
285
+ guard_result = new_guard_result;
286
+ if !no_timeout {
287
+ let result = f ( guard_result
288
+ . as_mut ( )
289
+ . map ( |g| & mut * * g)
290
+ . map_err ( |e| poison:: new_poison_error ( & mut * * e. get_mut ( ) ) ) ) ;
291
+ return poison:: map_result ( guard_result, |g| ( g, result) ) ;
292
+ }
293
+ }
294
+
295
+ poison:: map_result ( guard_result, |g| ( g, true ) )
296
+ }
297
+
239
298
/// Wake up one blocked thread on this condvar.
240
299
///
241
300
/// See `Condvar::notify_one`.
@@ -285,6 +344,7 @@ mod tests {
285
344
use super :: { StaticCondvar , CONDVAR_INIT } ;
286
345
use sync:: mpsc:: channel;
287
346
use sync:: { StaticMutex , MUTEX_INIT , Condvar , Mutex , Arc } ;
347
+ use sync:: atomic:: { AtomicUsize , ATOMIC_USIZE_INIT , Ordering } ;
288
348
use thread:: Thread ;
289
349
use time:: Duration ;
290
350
@@ -372,6 +432,49 @@ mod tests {
372
432
unsafe { C . destroy ( ) ; M . destroy ( ) ; }
373
433
}
374
434
435
+ #[ test]
436
+ fn wait_timeout_with ( ) {
437
+ static C : StaticCondvar = CONDVAR_INIT ;
438
+ static M : StaticMutex = MUTEX_INIT ;
439
+ static S : AtomicUsize = ATOMIC_USIZE_INIT ;
440
+
441
+ let g = M . lock ( ) . unwrap ( ) ;
442
+ let ( g, success) = C . wait_timeout_with ( g, Duration :: nanoseconds ( 1000 ) , |_| false ) . unwrap ( ) ;
443
+ assert ! ( !success) ;
444
+
445
+ let ( tx, rx) = channel ( ) ;
446
+ let _t = Thread :: scoped ( move || {
447
+ rx. recv ( ) . unwrap ( ) ;
448
+ let g = M . lock ( ) . unwrap ( ) ;
449
+ S . store ( 1 , Ordering :: SeqCst ) ;
450
+ C . notify_one ( ) ;
451
+ drop ( g) ;
452
+
453
+ rx. recv ( ) . unwrap ( ) ;
454
+ let g = M . lock ( ) . unwrap ( ) ;
455
+ S . store ( 2 , Ordering :: SeqCst ) ;
456
+ C . notify_one ( ) ;
457
+ drop ( g) ;
458
+
459
+ rx. recv ( ) . unwrap ( ) ;
460
+ let _g = M . lock ( ) . unwrap ( ) ;
461
+ S . store ( 3 , Ordering :: SeqCst ) ;
462
+ C . notify_one ( ) ;
463
+ } ) ;
464
+
465
+ let mut state = 0 ;
466
+ let ( _g, success) = C . wait_timeout_with ( g, Duration :: days ( 1 ) , |_| {
467
+ assert_eq ! ( state, S . load( Ordering :: SeqCst ) ) ;
468
+ tx. send ( ( ) ) . unwrap ( ) ;
469
+ state += 1 ;
470
+ match state {
471
+ 1 |2 => false ,
472
+ _ => true ,
473
+ }
474
+ } ) . unwrap ( ) ;
475
+ assert ! ( success) ;
476
+ }
477
+
375
478
#[ test]
376
479
#[ should_fail]
377
480
fn two_mutexes ( ) {
0 commit comments