Skip to content

Commit 08b3ae6

Browse files
committed
add mpmc_bounded_queue
mpmc was removed from stdlib, so we just vendor it for now (rust-lang/rust#19274)
1 parent ff6d5af commit 08b3ae6

File tree

3 files changed

+218
-1
lines changed

3 files changed

+218
-1
lines changed

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,4 @@ mod notify;
6363
mod os;
6464
mod poll;
6565
mod timer;
66+
mod mpmc_bounded_queue;

src/mpmc_bounded_queue.rs

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2+
* Redistribution and use in source and binary forms, with or without
3+
* modification, are permitted provided that the following conditions are met:
4+
*
5+
* 1. Redistributions of source code must retain the above copyright notice,
6+
* this list of conditions and the following disclaimer.
7+
*
8+
* 2. Redistributions in binary form must reproduce the above copyright
9+
* notice, this list of conditions and the following disclaimer in the
10+
* documentation and/or other materials provided with the distribution.
11+
*
12+
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13+
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14+
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15+
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16+
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18+
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19+
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20+
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21+
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22+
*
23+
* The views and conclusions contained in the software and documentation are
24+
* those of the authors and should not be interpreted as representing official
25+
* policies, either expressed or implied, of Dmitry Vyukov.
26+
*/
27+
28+
#![allow(missing_docs, dead_code)]
29+
30+
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
31+
32+
// This queue is copy pasted from old rust stdlib.
33+
34+
use std::sync::Arc;
35+
use std::num::UnsignedInt;
36+
use std::cell::UnsafeCell;
37+
38+
use std::sync::atomic::{AtomicUint,Relaxed,Release,Acquire};
39+
40+
struct Node<T> {
41+
sequence: AtomicUint,
42+
value: Option<T>,
43+
}
44+
45+
struct State<T> {
46+
pad0: [u8, ..64],
47+
buffer: Vec<UnsafeCell<Node<T>>>,
48+
mask: uint,
49+
pad1: [u8, ..64],
50+
enqueue_pos: AtomicUint,
51+
pad2: [u8, ..64],
52+
dequeue_pos: AtomicUint,
53+
pad3: [u8, ..64],
54+
}
55+
56+
pub struct Queue<T> {
57+
state: Arc<State<T>>,
58+
}
59+
60+
impl<T: Send> State<T> {
61+
fn with_capacity(capacity: uint) -> State<T> {
62+
let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
63+
if capacity < 2 {
64+
2u
65+
} else {
66+
// use next power of 2 as capacity
67+
capacity.next_power_of_two()
68+
}
69+
} else {
70+
capacity
71+
};
72+
let buffer = Vec::from_fn(capacity, |i| {
73+
UnsafeCell::new(Node { sequence:AtomicUint::new(i), value: None })
74+
});
75+
State{
76+
pad0: [0, ..64],
77+
buffer: buffer,
78+
mask: capacity-1,
79+
pad1: [0, ..64],
80+
enqueue_pos: AtomicUint::new(0),
81+
pad2: [0, ..64],
82+
dequeue_pos: AtomicUint::new(0),
83+
pad3: [0, ..64],
84+
}
85+
}
86+
87+
fn push(&self, value: T) -> bool {
88+
let mask = self.mask;
89+
let mut pos = self.enqueue_pos.load(Relaxed);
90+
loop {
91+
let node = &self.buffer[pos & mask];
92+
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
93+
let diff: int = seq as int - pos as int;
94+
95+
if diff == 0 {
96+
let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
97+
if enqueue_pos == pos {
98+
unsafe {
99+
(*node.get()).value = Some(value);
100+
(*node.get()).sequence.store(pos+1, Release);
101+
}
102+
break
103+
} else {
104+
pos = enqueue_pos;
105+
}
106+
} else if diff < 0 {
107+
return false
108+
} else {
109+
pos = self.enqueue_pos.load(Relaxed);
110+
}
111+
}
112+
true
113+
}
114+
115+
fn pop(&self) -> Option<T> {
116+
let mask = self.mask;
117+
let mut pos = self.dequeue_pos.load(Relaxed);
118+
loop {
119+
let node = &self.buffer[pos & mask];
120+
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
121+
let diff: int = seq as int - (pos + 1) as int;
122+
if diff == 0 {
123+
let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
124+
if dequeue_pos == pos {
125+
unsafe {
126+
let value = (*node.get()).value.take();
127+
(*node.get()).sequence.store(pos + mask + 1, Release);
128+
return value
129+
}
130+
} else {
131+
pos = dequeue_pos;
132+
}
133+
} else if diff < 0 {
134+
return None
135+
} else {
136+
pos = self.dequeue_pos.load(Relaxed);
137+
}
138+
}
139+
}
140+
}
141+
142+
impl<T: Send> Queue<T> {
143+
pub fn with_capacity(capacity: uint) -> Queue<T> {
144+
Queue{
145+
state: Arc::new(State::with_capacity(capacity))
146+
}
147+
}
148+
149+
pub fn push(&self, value: T) -> bool {
150+
self.state.push(value)
151+
}
152+
153+
pub fn pop(&self) -> Option<T> {
154+
self.state.pop()
155+
}
156+
}
157+
158+
impl<T: Send> Clone for Queue<T> {
159+
fn clone(&self) -> Queue<T> {
160+
Queue { state: self.state.clone() }
161+
}
162+
}
163+
164+
#[cfg(test)]
165+
mod tests {
166+
use super::Queue;
167+
168+
#[test]
169+
fn test() {
170+
let nthreads = 8u;
171+
let nmsgs = 1000u;
172+
let q = Queue::with_capacity(nthreads*nmsgs);
173+
assert_eq!(None, q.pop());
174+
let (tx, rx) = channel();
175+
176+
for _ in range(0, nthreads) {
177+
let q = q.clone();
178+
let tx = tx.clone();
179+
spawn(proc() {
180+
let q = q;
181+
for i in range(0, nmsgs) {
182+
assert!(q.push(i));
183+
}
184+
tx.send(());
185+
});
186+
}
187+
188+
let mut completion_rxs = vec![];
189+
for _ in range(0, nthreads) {
190+
let (tx, rx) = channel();
191+
completion_rxs.push(rx);
192+
let q = q.clone();
193+
spawn(proc() {
194+
let q = q;
195+
let mut i = 0u;
196+
loop {
197+
match q.pop() {
198+
None => {},
199+
Some(_) => {
200+
i += 1;
201+
if i == nmsgs { break }
202+
}
203+
}
204+
}
205+
tx.send(i);
206+
});
207+
}
208+
209+
for rx in completion_rxs.iter_mut() {
210+
assert_eq!(nmsgs, rx.recv());
211+
}
212+
for _ in range(0, nthreads) {
213+
rx.recv();
214+
}
215+
}
216+
}

src/notify.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::Arc;
22
use std::sync::atomic::{AtomicInt, Relaxed};
3-
use std::sync::mpmc_bounded_queue::Queue;
3+
use mpmc_bounded_queue::Queue;
44
use error::MioResult;
55
use io::IoHandle;
66
use os;

0 commit comments

Comments
 (0)