29
29
30
30
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
31
31
32
+ use alloc:: arc:: Arc ;
33
+
32
34
use clone:: Clone ;
33
35
use kinds:: Send ;
34
36
use num:: next_power_of_two;
35
37
use option:: { Option , Some , None } ;
36
- use sync:: arc:: UnsafeArc ;
37
38
use sync:: atomics:: { AtomicUint , Relaxed , Release , Acquire } ;
38
39
use vec:: Vec ;
40
+ use ty:: Unsafe ;
39
41
40
42
struct Node < T > {
41
43
sequence : AtomicUint ,
@@ -44,7 +46,7 @@ struct Node<T> {
44
46
45
47
struct State < T > {
46
48
pad0 : [ u8 , ..64 ] ,
47
- buffer : Vec < Node < T > > ,
49
+ buffer : Vec < Unsafe < Node < T > > > ,
48
50
mask : uint ,
49
51
pad1 : [ u8 , ..64 ] ,
50
52
enqueue_pos : AtomicUint ,
@@ -54,7 +56,7 @@ struct State<T> {
54
56
}
55
57
56
58
pub struct Queue < T > {
57
- state : UnsafeArc < State < T > > ,
59
+ state : Arc < State < T > > ,
58
60
}
59
61
60
62
impl < T : Send > State < T > {
@@ -70,7 +72,7 @@ impl<T: Send> State<T> {
70
72
capacity
71
73
} ;
72
74
let buffer = Vec :: from_fn ( capacity, |i| {
73
- Node { sequence : AtomicUint :: new ( i) , value : None }
75
+ Unsafe :: new ( Node { sequence : AtomicUint :: new ( i) , value : None } )
74
76
} ) ;
75
77
State {
76
78
pad0 : [ 0 , ..64 ] ,
@@ -84,19 +86,21 @@ impl<T: Send> State<T> {
84
86
}
85
87
}
86
88
87
- fn push ( & mut self , value : T ) -> bool {
89
+ fn push ( & self , value : T ) -> bool {
88
90
let mask = self . mask ;
89
91
let mut pos = self . enqueue_pos . load ( Relaxed ) ;
90
92
loop {
91
- let node = self . buffer . get_mut ( pos & mask) ;
92
- let seq = node. sequence . load ( Acquire ) ;
93
+ let node = self . buffer . get ( pos & mask) ;
94
+ let seq = unsafe { ( * node. get ( ) ) . sequence . load ( Acquire ) } ;
93
95
let diff: int = seq as int - pos as int ;
94
96
95
97
if diff == 0 {
96
98
let enqueue_pos = self . enqueue_pos . compare_and_swap ( pos, pos+1 , Relaxed ) ;
97
99
if enqueue_pos == pos {
98
- node. value = Some ( value) ;
99
- node. sequence . store ( pos+1 , Release ) ;
100
+ unsafe {
101
+ ( * node. get ( ) ) . value = Some ( value) ;
102
+ ( * node. get ( ) ) . sequence . store ( pos+1 , Release ) ;
103
+ }
100
104
break
101
105
} else {
102
106
pos = enqueue_pos;
@@ -110,19 +114,21 @@ impl<T: Send> State<T> {
110
114
true
111
115
}
112
116
113
- fn pop ( & mut self ) -> Option < T > {
117
+ fn pop ( & self ) -> Option < T > {
114
118
let mask = self . mask ;
115
119
let mut pos = self . dequeue_pos . load ( Relaxed ) ;
116
120
loop {
117
- let node = self . buffer . get_mut ( pos & mask) ;
118
- let seq = node. sequence . load ( Acquire ) ;
121
+ let node = self . buffer . get ( pos & mask) ;
122
+ let seq = unsafe { ( * node. get ( ) ) . sequence . load ( Acquire ) } ;
119
123
let diff: int = seq as int - ( pos + 1 ) as int ;
120
124
if diff == 0 {
121
125
let dequeue_pos = self . dequeue_pos . compare_and_swap ( pos, pos+1 , Relaxed ) ;
122
126
if dequeue_pos == pos {
123
- let value = node. value . take ( ) ;
124
- node. sequence . store ( pos + mask + 1 , Release ) ;
125
- return value
127
+ unsafe {
128
+ let value = ( * node. get ( ) ) . value . take ( ) ;
129
+ ( * node. get ( ) ) . sequence . store ( pos + mask + 1 , Release ) ;
130
+ return value
131
+ }
126
132
} else {
127
133
pos = dequeue_pos;
128
134
}
@@ -138,24 +144,22 @@ impl<T: Send> State<T> {
138
144
impl < T : Send > Queue < T > {
139
145
pub fn with_capacity ( capacity : uint ) -> Queue < T > {
140
146
Queue {
141
- state : UnsafeArc :: new ( State :: with_capacity ( capacity) )
147
+ state : Arc :: new ( State :: with_capacity ( capacity) )
142
148
}
143
149
}
144
150
145
- pub fn push ( & mut self , value : T ) -> bool {
146
- unsafe { ( * self . state . get ( ) ) . push ( value) }
151
+ pub fn push ( & self , value : T ) -> bool {
152
+ self . state . push ( value)
147
153
}
148
154
149
- pub fn pop ( & mut self ) -> Option < T > {
150
- unsafe { ( * self . state . get ( ) ) . pop ( ) }
155
+ pub fn pop ( & self ) -> Option < T > {
156
+ self . state . pop ( )
151
157
}
152
158
}
153
159
154
160
impl < T : Send > Clone for Queue < T > {
155
161
fn clone ( & self ) -> Queue < T > {
156
- Queue {
157
- state : self . state . clone ( )
158
- }
162
+ Queue { state : self . state . clone ( ) }
159
163
}
160
164
}
161
165
0 commit comments