26
26
// out when the mutex needs to be deallocated because it's not after the closure
27
27
// finishes, but after the first successful closure finishes.
28
28
//
29
- // All in all, this is instead implemented with atomics and lock-free
30
- // operations ! Whee! Each `Once` has one word of atomic state, and this state is
29
+ // All in all, this is instead implemented with atomic operations and
30
+ // spin-locks ! Whee! Each `Once` has one word of atomic state, and this state is
31
31
// CAS'd on to determine what to do. There are four possible state of a `Once`:
32
32
//
33
33
// * Incomplete - no initialization has run yet, and no thread is currently
43
43
// immediately.
44
44
//
45
45
// With 4 states we need 2 bits to encode this, and we use the remaining bits
46
- // in the word we have allocated as a queue of threads waiting for the thread
47
- // responsible for entering the RUNNING state. This queue is just a linked list
48
- // of Waiter nodes which is monotonically increasing in size. Each node is
49
- // allocated on the stack, and whenever the running closure finishes it will
50
- // consume the entire queue and notify all waiters they should try again.
46
+ // in the word we have allocated to point to a queue of threads waiting for the
47
+ // thread responsible for entering the RUNNING state. These bits are also used
48
+ // to ensure that at most one thread can be dealing with the queue. If all payload
49
+ // bits are set to zero, it means the queue is being worked on and the current
50
+ // thread should spin.
51
+ //
52
+ // This queue simply contains a linked list of Waiter nodes which is monotonically
53
+ // increasing in size. Each node is allocated on the stack, and whenever the
54
+ // running closure finishes it will consume the entire queue and notify all waiters
55
+ // they should try again.
51
56
//
52
57
// You'll find a few more details in the implementation, but that's the gist of
53
58
// it!
61
66
// of the `Once` is acquired. So every load which can be the only one to
62
67
// load COMPLETED must have at least Acquire ordering, which means all
63
68
// three of them.
64
- // - `WaiterQueue ::Drop` is the only place that may store COMPLETED, and
69
+ // - `WaiterQueueGuard ::Drop` is the only place that may store COMPLETED, and
65
70
// must do so with Release ordering to make the result available.
66
- // - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and
67
- // needs to make the nodes available with Release ordering. The load in
68
- // its `compare_and_swap` can be Relaxed because it only has to compare
69
- // the atomic, not to read other data.
70
- // - `WaiterQueue::Drop` must see the `Waiter` nodes, so it must load
71
+ // - `wait` must acquire the spin-lock with Acquire ordering and release it
72
+ // with the Release ordering. The load before spinning can be Relaxed
73
+ // because it only has to handle the atomic, not to read other data.
74
+ // - `WaiterQueue::Drop` also need to obtain the spin-lock, so it must load
71
75
// `state_and_queue` with Acquire ordering.
72
76
// - There is just one store where `state_and_queue` is used only as a
73
77
// state flag, without having to synchronize data: switching the state
87
91
use crate :: cell:: Cell ;
88
92
use crate :: fmt;
89
93
use crate :: marker;
90
- use crate :: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
94
+ use crate :: ptr;
95
+ use crate :: sync:: atomic:: { spin_loop_hint, AtomicBool , AtomicUsize , Ordering } ;
91
96
use crate :: thread:: { self , Thread } ;
92
97
93
98
/// A synchronization primitive which can be used to run a one-time global
@@ -171,18 +176,29 @@ const STATE_MASK: usize = 0x3;
171
176
// `wait` would both hand out a mutable reference to its `Waiter` node, and keep
172
177
// a shared reference to check `signaled`. Instead we hold shared references and
173
178
// use interior mutability.
174
- #[ repr( align( 4 ) ) ] // Ensure the two lower bits are free to use as state bits.
175
179
struct Waiter {
176
180
thread : Cell < Option < Thread > > ,
177
181
signaled : AtomicBool ,
178
- next : * const Waiter ,
182
+ next : Cell < * const Waiter > ,
179
183
}
180
184
181
185
// Head of a linked list of waiters.
182
186
// Every node is a struct on the stack of a waiting thread.
183
- // Will wake up the waiters when it gets dropped, i.e. also on panic.
184
- struct WaiterQueue < ' a > {
187
+ // Note: Similar to `Waiter`, because a shared reference to `WaiterQueue` can be
188
+ // obtained by other threads, we cannot hold a mutable reference to it.
189
+ // This reason also disallows Drop to be implemented on it.
190
+ #[ repr( align( 4 ) ) ] // Ensure the two lower bits are free to use as state bits.
191
+ struct WaiterQueue {
192
+ head : Cell < * const Waiter > ,
193
+ }
194
+
195
+ // A guard that will wake up the waiters when it gets dropped, i.e. also on panic.
196
+ // A separate guard is used rather than implementing Drop on WaiterQueue to avoid
197
+ // a mutable reference to WaiterQueue from being implicit created to WaiterQueue
198
+ // during drop.
199
+ struct WaiterQueueGuard < ' a > {
185
200
state_and_queue : & ' a AtomicUsize ,
201
+ queue : & ' a WaiterQueue ,
186
202
set_state_on_drop_to : usize ,
187
203
}
188
204
@@ -397,6 +413,7 @@ impl Once {
397
413
}
398
414
POISONED | INCOMPLETE => {
399
415
// Try to register this thread as the one RUNNING.
416
+ // This simultaneously obtained the lock or the queue head.
400
417
let old = self . state_and_queue . compare_and_swap (
401
418
state_and_queue,
402
419
RUNNING ,
@@ -406,20 +423,28 @@ impl Once {
406
423
state_and_queue = old;
407
424
continue ;
408
425
}
409
- // `waiter_queue` will manage other waiting threads, and
410
- // wake them up on drop.
411
- let mut waiter_queue = WaiterQueue {
426
+
427
+ // `waiter_queue` will manage other waiting threads, and `queue_guard`
428
+ // will wake them up on drop.
429
+ let waiter_queue = WaiterQueue { head : Cell :: new ( ptr:: null ( ) ) } ;
430
+ let mut queue_guard = WaiterQueueGuard {
412
431
state_and_queue : & self . state_and_queue ,
432
+ queue : & waiter_queue,
413
433
set_state_on_drop_to : POISONED ,
414
434
} ;
435
+ let queue = & waiter_queue as * const WaiterQueue as usize ;
436
+ // Release the lock to make the WaiterQueue available for
437
+ // other threads to join.
438
+ self . state_and_queue . store ( queue | RUNNING , Ordering :: Release ) ;
439
+
415
440
// Run the initialization function, letting it know if we're
416
441
// poisoned or not.
417
442
let init_state = OnceState {
418
443
poisoned : state_and_queue == POISONED ,
419
444
set_state_on_drop_to : Cell :: new ( COMPLETE ) ,
420
445
} ;
421
446
init ( & init_state) ;
422
- waiter_queue . set_state_on_drop_to = init_state. set_state_on_drop_to . get ( ) ;
447
+ queue_guard . set_state_on_drop_to = init_state. set_state_on_drop_to . get ( ) ;
423
448
break ;
424
449
}
425
450
_ => {
@@ -437,43 +462,64 @@ impl Once {
437
462
fn wait ( state_and_queue : & AtomicUsize , mut current_state : usize ) {
438
463
// Note: the following code was carefully written to avoid creating a
439
464
// mutable reference to `node` that gets aliased.
465
+
466
+ // Create a node upfront to reduce time spent inside spin lock.
467
+ let node = Waiter {
468
+ thread : Cell :: new ( Some ( thread:: current ( ) ) ) ,
469
+ signaled : AtomicBool :: new ( false ) ,
470
+ next : Cell :: new ( ptr:: null ( ) ) ,
471
+ } ;
472
+
473
+ // Use spin-lock to lock a waiter queue.
440
474
loop {
441
475
// Don't queue this thread if the status is no longer running,
442
476
// otherwise we will not be woken up.
443
477
if current_state & STATE_MASK != RUNNING {
444
478
return ;
445
479
}
446
480
447
- // Create the node for our current thread.
448
- let node = Waiter {
449
- thread : Cell :: new ( Some ( thread:: current ( ) ) ) ,
450
- signaled : AtomicBool :: new ( false ) ,
451
- next : ( current_state & !STATE_MASK ) as * const Waiter ,
452
- } ;
453
- let me = & node as * const Waiter as usize ;
454
-
455
- // Try to slide in the node at the head of the linked list, making sure
456
- // that another thread didn't just replace the head of the linked list.
457
- let old = state_and_queue. compare_and_swap ( current_state, me | RUNNING , Ordering :: Release ) ;
481
+ // Currently locked, spin.
482
+ if current_state & !STATE_MASK == 0 {
483
+ current_state = state_and_queue. load ( Ordering :: Relaxed ) ;
484
+ spin_loop_hint ( ) ;
485
+ continue ;
486
+ }
487
+
488
+ // Try to lock the WaiterQueue.
489
+ let old = state_and_queue. compare_and_swap ( current_state, RUNNING , Ordering :: Acquire ) ;
458
490
if old != current_state {
459
491
current_state = old;
460
492
continue ;
461
493
}
462
494
463
- // We have enqueued ourselves, now lets wait.
464
- // It is important not to return before being signaled, otherwise we
465
- // would drop our `Waiter` node and leave a hole in the linked list
466
- // (and a dangling reference). Guard against spurious wakeups by
467
- // reparking ourselves until we are signaled.
468
- while !node. signaled . load ( Ordering :: Acquire ) {
469
- // If the managing thread happens to signal and unpark us before we
470
- // can park ourselves, the result could be this thread never gets
471
- // unparked. Luckily `park` comes with the guarantee that if it got
472
- // an `unpark` just before on an unparked thread is does not park.
473
- thread:: park ( ) ;
474
- }
475
495
break ;
476
496
}
497
+
498
+ // Insert our node into the linked list.
499
+ {
500
+ // SAFETY: This is okay because we have just "lock"ed it. Even the thread
501
+ // that creates this WaiterQueue would need to lock it before drop it, so
502
+ // the reference is definitely not dangling.
503
+ let queue = unsafe { & * ( ( current_state & !STATE_MASK ) as * const WaiterQueue ) } ;
504
+ node. next . set ( queue. head . get ( ) ) ;
505
+ queue. head . set ( & node as * const Waiter ) ;
506
+ }
507
+
508
+ // Unlock the WaiterQueue.
509
+ state_and_queue. store ( current_state, Ordering :: Release ) ;
510
+
511
+ // We have enqueued ourselves, now lets wait.
512
+ // It is important not to return before being signaled, otherwise we
513
+ // would drop our `Waiter` node and leave a hole in the linked list
514
+ // (and a dangling reference). Guard against spurious wakeups by
515
+ // reparking ourselves until we are signaled.
516
+ while !node. signaled . load ( Ordering :: Acquire ) {
517
+ // If the managing thread happens to signal and unpark us before we
518
+ // can park ourselves, the result could be this thread never gets
519
+ // unparked. Luckily `park` comes with the guarantee that if it got
520
+ // an `unpark` just before on an unparked thread is does not park.
521
+ thread:: park ( ) ;
522
+ }
477
523
}
478
524
479
525
#[ stable( feature = "std_debug" , since = "1.16.0" ) ]
@@ -483,14 +529,21 @@ impl fmt::Debug for Once {
483
529
}
484
530
}
485
531
486
- impl Drop for WaiterQueue < ' _ > {
532
+ impl Drop for WaiterQueueGuard < ' _ > {
487
533
fn drop ( & mut self ) {
488
- // Swap out our state with however we finished.
489
- let state_and_queue =
490
- self . state_and_queue . swap ( self . set_state_on_drop_to , Ordering :: AcqRel ) ;
534
+ // Lock the queue before we can access it.
535
+ loop {
536
+ let state_and_queue = self . state_and_queue . swap ( RUNNING , Ordering :: Acquire ) ;
537
+ if state_and_queue != RUNNING {
538
+ // Sanity check: We should get back the queue we originally put in.
539
+ assert_eq ! ( state_and_queue, self . queue as * const WaiterQueue as usize | RUNNING ) ;
540
+ break ;
541
+ }
542
+ spin_loop_hint ( ) ;
543
+ }
491
544
492
- // We should only ever see an old state which was RUNNING .
493
- assert_eq ! ( state_and_queue & STATE_MASK , RUNNING ) ;
545
+ // Set the state however we finished .
546
+ self . state_and_queue . store ( self . set_state_on_drop_to , Ordering :: Release ) ;
494
547
495
548
// Walk the entire linked list of waiters and wake them up (in lifo
496
549
// order, last to register is first to wake up).
@@ -499,9 +552,9 @@ impl Drop for WaiterQueue<'_> {
499
552
// free `node` if there happens to be has a spurious wakeup.
500
553
// So we have to take out the `thread` field and copy the pointer to
501
554
// `next` first.
502
- let mut queue = ( state_and_queue & ! STATE_MASK ) as * const Waiter ;
555
+ let mut queue = self . queue . head . get ( ) ;
503
556
while !queue. is_null ( ) {
504
- let next = ( * queue) . next ;
557
+ let next = ( * queue) . next . get ( ) ;
505
558
let thread = ( * queue) . thread . take ( ) . unwrap ( ) ;
506
559
( * queue) . signaled . store ( true , Ordering :: Release ) ;
507
560
// ^- FIXME (maybe): This is another case of issue #55005
0 commit comments