Skip to content

Commit db41dfc

Browse files
committed
---
yaml --- r: 147453 b: refs/heads/try2 c: 429313d h: refs/heads/master i: 147451: e1dbe9c v: v3
1 parent f775292 commit db41dfc

File tree

18 files changed

+645
-401
lines changed

18 files changed

+645
-401
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ refs/heads/snap-stage3: 78a7676898d9f80ab540c6df5d4c9ce35bb50463
55
refs/heads/try: 519addf6277dbafccbb4159db4b710c37eaa2ec5
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
8-
refs/heads/try2: 1ca77268d97b62e2fcaa1642aaf9313e164963b3
8+
refs/heads/try2: 429313de69cb2ddd1f076017968d1862ef02b455
99
refs/heads/dist-snap: ba4081a5a8573875fed17545846f6f6902c8ba8d
1010
refs/tags/release-0.2: c870d2dffb391e14efb05aa27898f1f6333a9596
1111
refs/tags/release-0.3: b5f0d0f648d9a6153664837026ba1be43d3e2503

branches/try2/src/librustuv/addrinfo.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@
1111
use ai = std::io::net::addrinfo;
1212
use std::libc::c_int;
1313
use std::ptr::null;
14-
use std::rt::BlockedTask;
15-
use std::rt::local::Local;
16-
use std::rt::sched::Scheduler;
14+
use std::rt::task::BlockedTask;
1715

1816
use net;
19-
use super::{Loop, UvError, Request, wait_until_woken_after};
17+
use super::{Loop, UvError, Request, wait_until_woken_after, wakeup};
2018
use uvll;
2119

2220
struct Addrinfo {
@@ -108,8 +106,7 @@ impl GetAddrInfoRequest {
108106
cx.status = status;
109107
cx.addrinfo = Some(Addrinfo { handle: res });
110108

111-
let sched: ~Scheduler = Local::take();
112-
sched.resume_blocked_task_immediately(cx.slot.take_unwrap());
109+
wakeup(&mut cx.slot);
113110
}
114111
}
115112
}
@@ -188,7 +185,6 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] {
188185
#[cfg(test, not(target_os="android"))]
189186
mod test {
190187
use std::io::net::ip::{SocketAddr, Ipv4Addr};
191-
use super::*;
192188
use super::super::local_loop;
193189

194190
#[test]

branches/try2/src/librustuv/async.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ mod test_remote {
129129
use std::rt::thread::Thread;
130130
use std::rt::tube::Tube;
131131

132-
use super::*;
133132
use super::super::local_loop;
134133

135134
// Make sure that we can fire watchers in remote threads and that they

branches/try2/src/librustuv/file.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ use std::cast::transmute;
1414
use std::cast;
1515
use std::libc::{c_int, c_char, c_void, size_t};
1616
use std::libc;
17-
use std::rt::BlockedTask;
17+
use std::rt::task::BlockedTask;
1818
use std::io::{FileStat, IoError};
1919
use std::io;
20-
use std::rt::local::Local;
2120
use std::rt::rtio;
22-
use std::rt::sched::{Scheduler, SchedHandle};
21+
use std::vec;
2322

24-
use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
25-
use uvio::HomingIO;
23+
use homing::{HomingIO, HomeHandle};
24+
use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after, wakeup};
25+
use uvio::UvIoFactory;
2626
use uvll;
2727

2828
pub struct FsRequest {
@@ -34,19 +34,19 @@ pub struct FileWatcher {
3434
priv loop_: Loop,
3535
priv fd: c_int,
3636
priv close: rtio::CloseBehavior,
37-
priv home: SchedHandle,
37+
priv home: HomeHandle,
3838
}
3939

4040
impl FsRequest {
41-
pub fn open(loop_: &Loop, path: &CString, flags: int, mode: int)
41+
pub fn open(io: &mut UvIoFactory, path: &CString, flags: int, mode: int)
4242
-> Result<FileWatcher, UvError>
4343
{
4444
execute(|req, cb| unsafe {
45-
uvll::uv_fs_open(loop_.handle,
45+
uvll::uv_fs_open(io.uv_loop(),
4646
req, path.with_ref(|p| p), flags as c_int,
4747
mode as c_int, cb)
4848
}).map(|req|
49-
FileWatcher::new(*loop_, req.get_result() as c_int,
49+
FileWatcher::new(io, req.get_result() as c_int,
5050
rtio::CloseSynchronously)
5151
)
5252
}
@@ -320,8 +320,7 @@ fn execute(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
320320
let slot: &mut Option<BlockedTask> = unsafe {
321321
cast::transmute(uvll::get_data_for_req(req))
322322
};
323-
let sched: ~Scheduler = Local::take();
324-
sched.resume_blocked_task_immediately(slot.take_unwrap());
323+
wakeup(slot);
325324
}
326325
}
327326

@@ -331,16 +330,17 @@ fn execute_nop(f: |*uvll::uv_fs_t, uvll::uv_fs_cb| -> c_int)
331330
}
332331

333332
impl HomingIO for FileWatcher {
334-
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
333+
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
335334
}
336335

337336
impl FileWatcher {
338-
pub fn new(loop_: Loop, fd: c_int, close: rtio::CloseBehavior) -> FileWatcher {
337+
pub fn new(io: &mut UvIoFactory, fd: c_int,
338+
close: rtio::CloseBehavior) -> FileWatcher {
339339
FileWatcher {
340-
loop_: loop_,
340+
loop_: Loop::wrap(io.uv_loop()),
341341
fd: fd,
342342
close: close,
343-
home: get_handle_to_current_scheduler!()
343+
home: io.make_handle(),
344344
}
345345
}
346346

@@ -448,7 +448,6 @@ mod test {
448448
use std::io;
449449
use std::str;
450450
use std::vec;
451-
use super::*;
452451
use l = super::super::local_loop;
453452

454453
#[test]

branches/try2/src/librustuv/homing.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
//! Homing I/O implementation
12+
//!
13+
//! In libuv, whenever a handle is created on an I/O loop it is illegal to use
14+
//! that handle outside of that I/O loop. We use libuv I/O with our green
15+
//! scheduler, and each green scheduler corresponds to a different I/O loop on a
16+
//! different OS thread. Green tasks are also free to roam among schedulers,
17+
//! which implies that it is possible to create an I/O handle on one event loop
18+
//! and then attempt to use it on another.
19+
//!
20+
//! In order to solve this problem, this module implements the notion of a
21+
//! "homing operation" which will transplant a task from its currently running
22+
//! scheduler back onto the original I/O loop. This is accomplished entirely at
23+
//! the librustuv layer with very little cooperation from the scheduler (which
24+
//! we don't even know exists technically).
25+
//!
26+
//! These homing operations are completed by first realizing that we're on the
27+
//! wrong I/O loop, then descheduling ourselves, sending ourselves to the
28+
//! correct I/O loop, and then waking up the I/O loop in order to process its
29+
//! local queue of tasks which need to run.
30+
//!
31+
//! This enqueueing is done with a concurrent queue from libstd, and the
32+
//! signalling is achieved with an async handle.
33+
34+
use std::rt::local::Local;
35+
use std::rt::rtio::LocalIo;
36+
use std::rt::task::{Task, BlockedTask};
37+
38+
use ForbidUnwind;
39+
use queue::{Queue, QueuePool};
40+
41+
/// A handle to a remote libuv event loop. This handle will keep the event loop
42+
/// alive while active in order to ensure that a homing operation can always be
43+
/// completed.
44+
///
45+
/// Handles are clone-able in order to derive new handles from existing handles
46+
/// (very useful for when accepting a socket from a server).
47+
pub struct HomeHandle {
48+
priv queue: Queue,
49+
priv id: uint,
50+
}
51+
52+
impl HomeHandle {
53+
pub fn new(id: uint, pool: &mut QueuePool) -> HomeHandle {
54+
HomeHandle { queue: pool.queue(), id: id }
55+
}
56+
57+
fn send(&mut self, task: BlockedTask) {
58+
self.queue.push(task);
59+
}
60+
}
61+
62+
impl Clone for HomeHandle {
63+
fn clone(&self) -> HomeHandle {
64+
HomeHandle {
65+
queue: self.queue.clone(),
66+
id: self.id,
67+
}
68+
}
69+
}
70+
71+
pub trait HomingIO {
72+
fn home<'r>(&'r mut self) -> &'r mut HomeHandle;
73+
74+
/// This function will move tasks to run on their home I/O scheduler. Note
75+
/// that this function does *not* pin the task to the I/O scheduler, but
76+
/// rather it simply moves it to running on the I/O scheduler.
77+
fn go_to_IO_home(&mut self) -> uint {
78+
let _f = ForbidUnwind::new("going home");
79+
80+
let mut cur_task: ~Task = Local::take();
81+
let cur_loop_id = {
82+
let mut io = cur_task.local_io().expect("libuv must have I/O");
83+
io.get().id()
84+
};
85+
86+
// Try at all costs to avoid the homing operation because it is quite
87+
// expensive. Hence, we only deschedule/send if we're not on the correct
88+
// event loop. If we're already on the home event loop, then we're good
89+
// to go (remember we have no preemption, so we're guaranteed to stay on
90+
// this event loop as long as we avoid the scheduler).
91+
if cur_loop_id != self.home().id {
92+
cur_task.deschedule(1, |task| {
93+
self.home().send(task);
94+
Ok(())
95+
});
96+
97+
// Once we wake up, assert that we're in the right location
98+
let cur_loop_id = {
99+
let mut io = LocalIo::borrow().expect("libuv must have I/O");
100+
io.get().id()
101+
};
102+
assert_eq!(cur_loop_id, self.home().id);
103+
104+
cur_loop_id
105+
} else {
106+
Local::put(cur_task);
107+
cur_loop_id
108+
}
109+
}
110+
111+
/// Fires a single homing missile, returning another missile targeted back
112+
/// at the original home of this task. In other words, this function will
113+
/// move the local task to its I/O scheduler and then return an RAII wrapper
114+
/// which will return the task home.
115+
fn fire_homing_missile(&mut self) -> HomingMissile {
116+
HomingMissile { io_home: self.go_to_IO_home() }
117+
}
118+
}
119+
120+
/// After a homing operation has been completed, this will return the current
121+
/// task back to its appropriate home (if applicable). The field is used to
122+
/// assert that we are where we think we are.
123+
struct HomingMissile {
124+
priv io_home: uint,
125+
}
126+
127+
impl HomingMissile {
128+
/// Check at runtime that the task has *not* transplanted itself to a
129+
/// different I/O loop while executing.
130+
pub fn check(&self, msg: &'static str) {
131+
let mut io = LocalIo::borrow().expect("libuv must have I/O");
132+
assert!(io.get().id() == self.io_home, "{}", msg);
133+
}
134+
}
135+
136+
impl Drop for HomingMissile {
137+
fn drop(&mut self) {
138+
let _f = ForbidUnwind::new("leaving home");
139+
140+
// It would truly be a sad day if we had moved off the home I/O
141+
// scheduler while we were doing I/O.
142+
self.check("task moved away from the home scheduler");
143+
}
144+
}

branches/try2/src/librustuv/idle.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ impl Drop for IdleWatcher {
9797

9898
#[cfg(test)]
9999
mod test {
100-
use super::*;
101100
use std::rt::tube::Tube;
102101
use std::rt::rtio::{Callback, PausableIdleCallback};
103102
use super::super::local_loop;

0 commit comments

Comments
 (0)