Skip to content

Commit a0cd55a

Browse files
committed
core::rt: Add RemoteCallback trait and uv implementation
This is used for signalling the event loop from other threads.
1 parent 8f77a6f commit a0cd55a

File tree

3 files changed

+100
-3
lines changed

3 files changed

+100
-3
lines changed

src/libcore/rt/rtio.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use rt::uv::uvio;
1818
// XXX: ~object doesn't work currently so these are some placeholder
1919
// types to use instead
2020
pub type EventLoopObject = uvio::UvEventLoop;
21+
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
2122
pub type IoFactoryObject = uvio::UvIoFactory;
2223
pub type RtioTcpStreamObject = uvio::UvTcpStream;
2324
pub type RtioTcpListenerObject = uvio::UvTcpListener;
@@ -26,10 +27,20 @@ pub trait EventLoop {
2627
fn run(&mut self);
2728
fn callback(&mut self, ~fn());
2829
fn callback_ms(&mut self, ms: u64, ~fn());
30+
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
2931
/// The asynchronous I/O services. Not all event loops may provide one
3032
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
3133
}
3234

35+
pub trait RemoteCallback {
36+
/// Trigger the remote callback. Note that the number of times the callback
37+
/// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
38+
/// the callback will be called at least once, but multiple callbacks may be coalesced
39+
/// and callbacks may be called more often requested. Destruction also triggers the
40+
/// callback.
41+
fn fire(&mut self);
42+
}
43+
3344
pub trait IoFactory {
3445
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
3546
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;

src/libcore/rt/uv/async.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub struct AsyncWatcher(*uvll::uv_async_t);
2020
impl Watcher for AsyncWatcher { }
2121

2222
impl AsyncWatcher {
23-
fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher {
23+
pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher {
2424
unsafe {
2525
let handle = uvll::malloc_handle(UV_ASYNC);
2626
assert!(handle.is_not_null());
@@ -41,14 +41,14 @@ impl AsyncWatcher {
4141
}
4242
}
4343

44-
fn send(&mut self) {
44+
pub fn send(&mut self) {
4545
unsafe {
4646
let handle = self.native_handle();
4747
uvll::async_send(handle);
4848
}
4949
}
5050

51-
fn close(self, cb: NullCallback) {
51+
pub fn close(self, cb: NullCallback) {
5252
let mut this = self;
5353
let data = this.get_watcher_data();
5454
assert!(data.close_cb.is_none());

src/libcore/rt/uv/uvio.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use option::*;
1212
use result::*;
1313
use ops::Drop;
1414
use cell::{Cell, empty_cell};
15+
use cast;
1516
use cast::transmute;
1617
use clone::Clone;
1718
use rt::io::IoError;
@@ -23,6 +24,8 @@ use rt::sched::Scheduler;
2324
use rt::io::{standard_error, OtherIoError};
2425
use rt::tube::Tube;
2526
use rt::local::Local;
27+
use unstable::sync::{UnsafeAtomicRcBox, AtomicInt};
28+
use unstable::intrinsics;
2629

2730
#[cfg(test)] use container::Container;
2831
#[cfg(test)] use uint;
@@ -82,6 +85,10 @@ impl EventLoop for UvEventLoop {
8285
}
8386
}
8487

88+
fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
89+
~UvRemoteCallback::new(self.uvio.uv_loop(), f)
90+
}
91+
8592
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
8693
Some(&mut self.uvio)
8794
}
@@ -101,6 +108,85 @@ fn test_callback_run_once() {
101108
}
102109
}
103110

111+
pub struct UvRemoteCallback {
112+
// The uv async handle for triggering the callback
113+
async: AsyncWatcher,
114+
// An atomic flag to tell the callback to exit,
115+
// set from the dtor.
116+
exit_flag: UnsafeAtomicRcBox<AtomicInt>
117+
}
118+
119+
impl UvRemoteCallback {
120+
pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
121+
let exit_flag = UnsafeAtomicRcBox::new(AtomicInt::new(0));
122+
let exit_flag_clone = exit_flag.clone();
123+
let async = do AsyncWatcher::new(loop_) |watcher, status| {
124+
assert!(status.is_none());
125+
f();
126+
let exit_flag_ptr = exit_flag_clone.get();
127+
unsafe {
128+
if (*exit_flag_ptr).load() == 1 {
129+
watcher.close(||());
130+
}
131+
}
132+
};
133+
UvRemoteCallback {
134+
async: async,
135+
exit_flag: exit_flag
136+
}
137+
}
138+
}
139+
140+
impl RemoteCallback for UvRemoteCallback {
141+
fn fire(&mut self) { self.async.send() }
142+
}
143+
144+
impl Drop for UvRemoteCallback {
145+
fn finalize(&self) {
146+
unsafe {
147+
let mut this: &mut UvRemoteCallback = cast::transmute_mut(self);
148+
let exit_flag_ptr = this.exit_flag.get();
149+
(*exit_flag_ptr).store(1);
150+
this.async.send();
151+
}
152+
}
153+
}
154+
155+
#[cfg(test)]
156+
mod test_remote {
157+
use super::*;
158+
use cell;
159+
use cell::Cell;
160+
use rt::test::*;
161+
use rt::thread::Thread;
162+
use rt::tube::Tube;
163+
use rt::rtio::EventLoop;
164+
use rt::local::Local;
165+
use rt::sched::Scheduler;
166+
167+
#[test]
168+
fn test_uv_remote() {
169+
do run_in_newsched_task {
170+
let mut tube = Tube::new();
171+
let tube_clone = tube.clone();
172+
let remote_cell = cell::empty_cell();
173+
do Local::borrow::<Scheduler>() |sched| {
174+
let tube_clone = tube_clone.clone();
175+
let tube_clone_cell = Cell(tube_clone);
176+
let remote = do sched.event_loop.remote_callback {
177+
tube_clone_cell.take().send(1);
178+
};
179+
remote_cell.put_back(remote);
180+
}
181+
let _thread = do Thread::start {
182+
remote_cell.take().fire();
183+
};
184+
185+
assert!(tube.recv() == 1);
186+
}
187+
}
188+
}
189+
104190
pub struct UvIoFactory(Loop);
105191

106192
pub impl UvIoFactory {

0 commit comments

Comments
 (0)