@@ -12,6 +12,7 @@ use option::*;
12
12
use result:: * ;
13
13
use ops:: Drop ;
14
14
use cell:: { Cell , empty_cell} ;
15
+ use cast;
15
16
use cast:: transmute;
16
17
use clone:: Clone ;
17
18
use rt:: io:: IoError ;
@@ -23,6 +24,8 @@ use rt::sched::Scheduler;
23
24
use rt:: io:: { standard_error, OtherIoError } ;
24
25
use rt:: tube:: Tube ;
25
26
use rt:: local:: Local ;
27
+ use unstable:: sync:: { UnsafeAtomicRcBox , AtomicInt } ;
28
+ use unstable:: intrinsics;
26
29
27
30
#[ cfg( test) ] use container:: Container ;
28
31
#[ cfg( test) ] use uint;
@@ -82,6 +85,10 @@ impl EventLoop for UvEventLoop {
82
85
}
83
86
}
84
87
88
+ fn remote_callback ( & mut self , f : ~fn ( ) ) -> ~RemoteCallbackObject {
89
+ ~UvRemoteCallback :: new ( self . uvio . uv_loop ( ) , f)
90
+ }
91
+
85
92
fn io < ' a > ( & ' a mut self ) -> Option < & ' a mut IoFactoryObject > {
86
93
Some ( & mut self . uvio )
87
94
}
@@ -101,6 +108,85 @@ fn test_callback_run_once() {
101
108
}
102
109
}
103
110
111
+ pub struct UvRemoteCallback {
112
+ // The uv async handle for triggering the callback
113
+ async : AsyncWatcher ,
114
+ // An atomic flag to tell the callback to exit,
115
+ // set from the dtor.
116
+ exit_flag : UnsafeAtomicRcBox < AtomicInt >
117
+ }
118
+
119
+ impl UvRemoteCallback {
120
+ pub fn new ( loop_ : & mut Loop , f : ~fn ( ) ) -> UvRemoteCallback {
121
+ let exit_flag = UnsafeAtomicRcBox :: new ( AtomicInt :: new ( 0 ) ) ;
122
+ let exit_flag_clone = exit_flag. clone ( ) ;
123
+ let async = do AsyncWatcher :: new ( loop_) |watcher, status| {
124
+ assert ! ( status. is_none( ) ) ;
125
+ f ( ) ;
126
+ let exit_flag_ptr = exit_flag_clone. get ( ) ;
127
+ unsafe {
128
+ if ( * exit_flag_ptr) . load ( ) == 1 {
129
+ watcher. close ( ||( ) ) ;
130
+ }
131
+ }
132
+ } ;
133
+ UvRemoteCallback {
134
+ async: async ,
135
+ exit_flag : exit_flag
136
+ }
137
+ }
138
+ }
139
+
140
+ impl RemoteCallback for UvRemoteCallback {
141
+ fn fire ( & mut self ) { self . async . send ( ) }
142
+ }
143
+
144
+ impl Drop for UvRemoteCallback {
145
+ fn finalize ( & self ) {
146
+ unsafe {
147
+ let mut this: & mut UvRemoteCallback = cast:: transmute_mut ( self ) ;
148
+ let exit_flag_ptr = this. exit_flag . get ( ) ;
149
+ ( * exit_flag_ptr) . store ( 1 ) ;
150
+ this. async . send ( ) ;
151
+ }
152
+ }
153
+ }
154
+
155
+ #[ cfg( test) ]
156
+ mod test_remote {
157
+ use super :: * ;
158
+ use cell;
159
+ use cell:: Cell ;
160
+ use rt:: test:: * ;
161
+ use rt:: thread:: Thread ;
162
+ use rt:: tube:: Tube ;
163
+ use rt:: rtio:: EventLoop ;
164
+ use rt:: local:: Local ;
165
+ use rt:: sched:: Scheduler ;
166
+
167
+ #[ test]
168
+ fn test_uv_remote ( ) {
169
+ do run_in_newsched_task {
170
+ let mut tube = Tube :: new ( ) ;
171
+ let tube_clone = tube. clone ( ) ;
172
+ let remote_cell = cell:: empty_cell ( ) ;
173
+ do Local :: borrow :: < Scheduler > ( ) |sched| {
174
+ let tube_clone = tube_clone. clone ( ) ;
175
+ let tube_clone_cell = Cell ( tube_clone) ;
176
+ let remote = do sched. event_loop . remote_callback {
177
+ tube_clone_cell. take ( ) . send ( 1 ) ;
178
+ } ;
179
+ remote_cell. put_back ( remote) ;
180
+ }
181
+ let _thread = do Thread :: start {
182
+ remote_cell. take ( ) . fire ( ) ;
183
+ } ;
184
+
185
+ assert ! ( tube. recv( ) == 1 ) ;
186
+ }
187
+ }
188
+ }
189
+
104
190
pub struct UvIoFactory ( Loop ) ;
105
191
106
192
pub impl UvIoFactory {
0 commit comments