17
17
* in std.
18
18
*/
19
19
20
-
20
+ use std :: cast ;
21
21
use std:: comm;
22
- use std:: unstable:: sync:: Exclusive ;
23
22
use std:: sync:: arc:: UnsafeArc ;
24
23
use std:: sync:: atomics;
25
24
use std:: unstable:: finally:: Finally ;
@@ -32,6 +31,10 @@ use arc::MutexArc;
32
31
* Internals
33
32
****************************************************************************/
34
33
34
+ pub mod mutex;
35
+ pub mod one;
36
+ mod mpsc_intrusive;
37
+
35
38
// Each waiting task receives on one of these.
36
39
#[ doc( hidden) ]
37
40
type WaitEnd = Port < ( ) > ;
@@ -54,7 +57,7 @@ impl WaitQueue {
54
57
comm:: Data ( ch) => {
55
58
// Send a wakeup signal. If the waiter was killed, its port will
56
59
// have closed. Keep trying until we get a live task.
57
- if ch. try_send_deferred ( ( ) ) {
60
+ if ch. try_send ( ( ) ) {
58
61
true
59
62
} else {
60
63
self . signal ( )
@@ -69,7 +72,7 @@ impl WaitQueue {
69
72
loop {
70
73
match self . head . try_recv ( ) {
71
74
comm:: Data ( ch) => {
72
- if ch. try_send_deferred ( ( ) ) {
75
+ if ch. try_send ( ( ) ) {
73
76
count += 1 ;
74
77
}
75
78
}
@@ -81,36 +84,45 @@ impl WaitQueue {
81
84
82
85
fn wait_end ( & self ) -> WaitEnd {
83
86
let ( wait_end, signal_end) = Chan :: new ( ) ;
84
- assert ! ( self . tail. try_send_deferred ( signal_end) ) ;
87
+ assert ! ( self . tail. try_send ( signal_end) ) ;
85
88
wait_end
86
89
}
87
90
}
88
91
89
92
// The building-block used to make semaphores, mutexes, and rwlocks.
90
- #[ doc( hidden) ]
91
93
struct SemInner < Q > {
94
+ lock : mutex:: Mutex ,
92
95
count : int ,
93
96
waiters : WaitQueue ,
94
97
// Can be either unit or another waitqueue. Some sems shouldn't come with
95
98
// a condition variable attached, others should.
96
99
blocked : Q
97
100
}
98
101
99
- #[ doc( hidden) ]
100
- struct Sem < Q > ( Exclusive < SemInner < Q > > ) ;
102
+ struct Sem < Q > ( UnsafeArc < SemInner < Q > > ) ;
101
103
102
104
#[ doc( hidden) ]
103
105
impl < Q : Send > Sem < Q > {
104
106
fn new ( count : int , q : Q ) -> Sem < Q > {
105
- Sem ( Exclusive :: new ( SemInner {
106
- count : count, waiters : WaitQueue :: new ( ) , blocked : q } ) )
107
+ Sem ( UnsafeArc :: new ( SemInner {
108
+ count : count,
109
+ waiters : WaitQueue :: new ( ) ,
110
+ blocked : q,
111
+ lock : mutex:: Mutex :: new ( ) ,
112
+ } ) )
113
+ }
114
+
115
+ unsafe fn with ( & self , f: |& mut SemInner < Q > |) {
116
+ let Sem ( ref arc) = * self ;
117
+ let state = arc. get ( ) ;
118
+ let _g = ( * state) . lock . lock ( ) ;
119
+ f ( cast:: transmute ( state) ) ;
107
120
}
108
121
109
122
pub fn acquire ( & self ) {
110
123
unsafe {
111
124
let mut waiter_nobe = None ;
112
- let Sem ( ref lock) = * self ;
113
- lock. with ( |state| {
125
+ self . with ( |state| {
114
126
state. count -= 1 ;
115
127
if state. count < 0 {
116
128
// Create waiter nobe, enqueue ourself, and tell
@@ -129,8 +141,7 @@ impl<Q:Send> Sem<Q> {
129
141
130
142
pub fn release ( & self ) {
131
143
unsafe {
132
- let Sem ( ref lock) = * self ;
133
- lock. with ( |state| {
144
+ self . with ( |state| {
134
145
state. count += 1 ;
135
146
if state. count <= 0 {
136
147
state. waiters . signal ( ) ;
@@ -210,8 +221,7 @@ impl<'a> Condvar<'a> {
210
221
let mut out_of_bounds = None ;
211
222
// Release lock, 'atomically' enqueuing ourselves in so doing.
212
223
unsafe {
213
- let Sem ( ref queue) = * self . sem ;
214
- queue. with ( |state| {
224
+ self . sem . with ( |state| {
215
225
if condvar_id < state. blocked . len ( ) {
216
226
// Drop the lock.
217
227
state. count += 1 ;
@@ -253,8 +263,7 @@ impl<'a> Condvar<'a> {
253
263
unsafe {
254
264
let mut out_of_bounds = None ;
255
265
let mut result = false ;
256
- let Sem ( ref lock) = * self . sem ;
257
- lock. with ( |state| {
266
+ self . sem . with ( |state| {
258
267
if condvar_id < state. blocked . len ( ) {
259
268
result = state. blocked [ condvar_id] . signal ( ) ;
260
269
} else {
@@ -276,8 +285,7 @@ impl<'a> Condvar<'a> {
276
285
let mut out_of_bounds = None ;
277
286
let mut queue = None ;
278
287
unsafe {
279
- let Sem ( ref lock) = * self . sem ;
280
- lock. with ( |state| {
288
+ self . sem . with ( |state| {
281
289
if condvar_id < state. blocked . len ( ) {
282
290
// To avoid :broadcast_heavy, we make a new waitqueue,
283
291
// swap it out with the old one, and broadcast on the
0 commit comments