Skip to content

Commit f4ed554

Browse files
committed
Merge remote-tracking branch 'brson/io' into incoming
Conflicts: src/libstd/rt/sched.rs
2 parents bd30285 + 134bb0f commit f4ed554

File tree

14 files changed

+816
-200
lines changed

14 files changed

+816
-200
lines changed

src/libstd/rt/comm.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use ops::Drop;
2222
use kinds::Owned;
2323
use rt::sched::{Scheduler, Coroutine};
2424
use rt::local::Local;
25+
use rt::rtio::EventLoop;
2526
use unstable::intrinsics::{atomic_xchg, atomic_load};
2627
use util::Void;
2728
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
@@ -158,7 +159,7 @@ impl<T> PortOne<T> {
158159

159160
// Switch to the scheduler to put the ~Task into the Packet state.
160161
let sched = Local::take::<Scheduler>();
161-
do sched.deschedule_running_task_and_then |task| {
162+
do sched.deschedule_running_task_and_then |sched, task| {
162163
unsafe {
163164
// Atomically swap the task pointer into the Packet state, issuing
164165
// an acquire barrier to prevent reordering of the subsequent read
@@ -172,9 +173,15 @@ impl<T> PortOne<T> {
172173
}
173174
STATE_ONE => {
174175
// Channel is closed. Switch back and check the data.
176+
// NB: We have to drop back into the scheduler event loop here
177+
// instead of switching immediately back or we could end up
178+
// triggering infinite recursion on the scheduler's stack.
175179
let task: ~Coroutine = cast::transmute(task_as_state);
176-
let sched = Local::take::<Scheduler>();
177-
sched.resume_task_immediately(task);
180+
let task = Cell(task);
181+
do sched.event_loop.callback {
182+
let sched = Local::take::<Scheduler>();
183+
sched.resume_task_immediately(task.take());
184+
}
178185
}
179186
_ => util::unreachable()
180187
}
@@ -614,5 +621,15 @@ mod test {
614621
}
615622
}
616623
}
624+
625+
#[test]
626+
fn recv_a_lot() {
627+
// Regression test that we don't run out of stack in scheduler context
628+
do run_in_newsched_task {
629+
let (port, chan) = stream();
630+
for 10000.times { chan.send(()) }
631+
for 10000.times { port.recv() }
632+
}
633+
}
617634
}
618635

src/libstd/rt/local.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,30 +85,31 @@ impl Local for IoFactoryObject {
8585

8686
#[cfg(test)]
8787
mod test {
88+
use rt::test::*;
8889
use rt::sched::Scheduler;
8990
use rt::uv::uvio::UvEventLoop;
9091
use super::*;
9192

9293
#[test]
9394
fn thread_local_scheduler_smoke_test() {
94-
let scheduler = ~UvEventLoop::new_scheduler();
95+
let scheduler = ~new_test_uv_sched();
9596
Local::put(scheduler);
9697
let _scheduler: ~Scheduler = Local::take();
9798
}
9899

99100
#[test]
100101
fn thread_local_scheduler_two_instances() {
101-
let scheduler = ~UvEventLoop::new_scheduler();
102+
let scheduler = ~new_test_uv_sched();
102103
Local::put(scheduler);
103104
let _scheduler: ~Scheduler = Local::take();
104-
let scheduler = ~UvEventLoop::new_scheduler();
105+
let scheduler = ~new_test_uv_sched();
105106
Local::put(scheduler);
106107
let _scheduler: ~Scheduler = Local::take();
107108
}
108109

109110
#[test]
110111
fn borrow_smoke_test() {
111-
let scheduler = ~UvEventLoop::new_scheduler();
112+
let scheduler = ~new_test_uv_sched();
112113
Local::put(scheduler);
113114
unsafe {
114115
let _scheduler: *mut Scheduler = Local::unsafe_borrow();

src/libstd/rt/message_queue.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
// option. This file may not be copied, modified, or distributed
99
// except according to those terms.
1010

11+
//! A concurrent queue that supports multiple producers and a
12+
//! single consumer.
13+
1114
use container::Container;
1215
use kinds::Owned;
1316
use vec::OwnedVector;

src/libstd/rt/mod.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ mod work_queue;
8888
/// A parallel queue.
8989
mod message_queue;
9090

91+
/// A parallel data structure for tracking sleeping schedulers.
92+
mod sleeper_list;
93+
9194
/// Stack segments and caching.
9295
mod stack;
9396

@@ -145,12 +148,17 @@ pub mod thread_local_storage;
145148
pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
146149

147150
use self::sched::{Scheduler, Coroutine};
151+
use self::work_queue::WorkQueue;
148152
use self::uv::uvio::UvEventLoop;
153+
use self::sleeper_list::SleeperList;
149154

150155
init(crate_map);
151156

152157
let loop_ = ~UvEventLoop::new();
153-
let mut sched = ~Scheduler::new(loop_);
158+
let work_queue = WorkQueue::new();
159+
let sleepers = SleeperList::new();
160+
let mut sched = ~Scheduler::new(loop_, work_queue, sleepers);
161+
sched.no_sleep = true;
154162
let main_task = ~Coroutine::new(&mut sched.stack_pool, main);
155163

156164
sched.enqueue_task(main_task);
@@ -221,20 +229,18 @@ fn test_context() {
221229
use rt::uv::uvio::UvEventLoop;
222230
use cell::Cell;
223231
use rt::local::Local;
232+
use rt::test::new_test_uv_sched;
224233

225234
assert_eq!(context(), OldTaskContext);
226235
do run_in_bare_thread {
227236
assert_eq!(context(), GlobalContext);
228-
let mut sched = ~UvEventLoop::new_scheduler();
237+
let mut sched = ~new_test_uv_sched();
229238
let task = ~do Coroutine::new(&mut sched.stack_pool) {
230239
assert_eq!(context(), TaskContext);
231240
let sched = Local::take::<Scheduler>();
232-
do sched.deschedule_running_task_and_then() |task| {
241+
do sched.deschedule_running_task_and_then() |sched, task| {
233242
assert_eq!(context(), SchedulerContext);
234-
let task = Cell(task);
235-
do Local::borrow::<Scheduler> |sched| {
236-
sched.enqueue_task(task.take());
237-
}
243+
sched.enqueue_task(task);
238244
}
239245
};
240246
sched.enqueue_task(task);

src/libstd/rt/rtio.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use rt::uv::uvio;
1818
// XXX: ~object doesn't work currently so these are some placeholder
1919
// types to use instead
2020
pub type EventLoopObject = uvio::UvEventLoop;
21+
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
2122
pub type IoFactoryObject = uvio::UvIoFactory;
2223
pub type RtioTcpStreamObject = uvio::UvTcpStream;
2324
pub type RtioTcpListenerObject = uvio::UvTcpListener;
@@ -26,10 +27,20 @@ pub trait EventLoop {
2627
fn run(&mut self);
2728
fn callback(&mut self, ~fn());
2829
fn callback_ms(&mut self, ms: u64, ~fn());
30+
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
2931
/// The asynchronous I/O services. Not all event loops may provide one
3032
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
3133
}
3234

35+
pub trait RemoteCallback {
36+
/// Trigger the remote callback. Note that the number of times the callback
37+
/// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
38+
/// the callback will be called at least once, but multiple callbacks may be coalesced
39+
/// and callbacks may be called more often requested. Destruction also triggers the
40+
/// callback.
41+
fn fire(&mut self);
42+
}
43+
3344
pub trait IoFactory {
3445
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
3546
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;

0 commit comments

Comments
 (0)