1
- fn sender_terminate < T : send > ( p : * packet < T > ) {
2
- }
1
+ mod pipes {
2
+ import unsafe:: { forget, reinterpret_cast} ;
3
+
4
+ enum state {
5
+ empty,
6
+ full,
7
+ blocked,
8
+ terminated
9
+ }
10
+
11
+ type packet < T : send > = {
12
+ mut state : state ,
13
+ mut blocked_task : option < task:: task > ,
14
+ mut payload : option < T >
15
+ } ;
16
+
17
+ fn packet < T : send > ( ) -> * packet < T > unsafe {
18
+ let p: * packet < T > = unsafe :: transmute ( ~{
19
+ mut state: empty,
20
+ mut blocked_task: none :: < task:: task > ,
21
+ mut payload: none :: < T >
22
+ } ) ;
23
+ p
24
+ }
25
+
26
+ #[ abi = "rust-intrinsic" ]
27
+ mod rusti {
28
+ fn atomic_xchng ( & dst: int , src : int ) -> int { fail; }
29
+ fn atomic_xchng_acq ( & dst: int , src : int ) -> int { fail; }
30
+ fn atomic_xchng_rel ( & dst: int , src : int ) -> int { fail; }
31
+ }
32
+
33
+ // We should consider moving this to core::unsafe, although I
34
+ // suspect graydon would want us to use void pointers instead.
35
+ unsafe fn uniquify < T > ( x : * T ) -> ~T {
36
+ unsafe { unsafe :: reinterpret_cast ( x) }
37
+ }
38
+
39
+ fn swap_state_acq ( & dst: state , src : state ) -> state {
40
+ unsafe {
41
+ reinterpret_cast ( rusti:: atomic_xchng_acq (
42
+ * ( ptr:: mut_addr_of ( dst) as * mut int ) ,
43
+ src as int ) )
44
+ }
45
+ }
46
+
47
+ fn swap_state_rel ( & dst: state , src : state ) -> state {
48
+ unsafe {
49
+ reinterpret_cast ( rusti:: atomic_xchng_rel (
50
+ * ( ptr:: mut_addr_of ( dst) as * mut int ) ,
51
+ src as int ) )
52
+ }
53
+ }
54
+
55
+ fn send < T : send > ( -p : send_packet < T > , -payload : T ) {
56
+ let p = p. unwrap ( ) ;
57
+ let p = unsafe { uniquify ( p) } ;
58
+ assert ( * p) . payload == none;
59
+ ( * p) . payload <- some ( payload) ;
60
+ let old_state = swap_state_rel ( ( * p) . state , full) ;
61
+ alt old_state {
62
+ empty {
63
+ // Yay, fastpath.
3
64
4
- class send_packet<T : send> {
65
+ // The receiver will eventually clean this up.
66
+ unsafe { forget( p) ; }
67
+ }
68
+ full { fail "duplicate send" }
69
+ blocked {
70
+ // FIXME: once the target will actually block, tell the
71
+ // scheduler to wake it up.
72
+
73
+ // The receiver will eventually clean this up.
74
+ unsafe { forget( p) ; }
75
+ }
76
+ terminated {
77
+ // The receiver will never receive this. Rely on drop_glue
78
+ // to clean everything up.
79
+ }
80
+ }
81
+ }
82
+
83
+ fn recv < T : send > ( -p : recv_packet < T > ) -> option < T > {
84
+ let p = p. unwrap ( ) ;
85
+ let p = unsafe { uniquify ( p) } ;
86
+ loop {
87
+ let old_state = swap_state_acq ( ( * p) . state ,
88
+ blocked) ;
89
+ alt old_state {
90
+ empty | blocked { task : : yield ( ) ; }
91
+ full {
92
+ let mut payload = none;
93
+ payload <-> ( * p) . payload ;
94
+ ret some( option:: unwrap ( payload) )
95
+ }
96
+ terminated {
97
+ assert old_state == terminated;
98
+ ret none;
99
+ }
100
+ }
101
+ }
102
+ }
103
+
104
+ fn sender_terminate < T : send > ( p : * packet < T > ) {
105
+ let p = unsafe { uniquify ( p) } ;
106
+ alt swap_state_rel ( ( * p) . state , terminated) {
107
+ empty | blocked {
108
+ // The receiver will eventually clean up.
109
+ unsafe { forget( p) }
110
+ }
111
+ full {
112
+ // This is impossible
113
+ fail "you dun goofed"
114
+ }
115
+ terminated {
116
+ // I have to clean up, use drop_glue
117
+ }
118
+ }
119
+ }
120
+
121
+ fn receiver_terminate < T : send > ( p : * packet < T > ) {
122
+ let p = unsafe { uniquify ( p) } ;
123
+ alt swap_state_rel ( ( * p) . state , terminated) {
124
+ empty {
125
+ // the sender will clean up
126
+ unsafe { forget( p) }
127
+ }
128
+ blocked {
129
+ // this shouldn't happen.
130
+ fail "terminating a blocked packet"
131
+ }
132
+ terminated | full {
133
+ // I have to clean up, use drop_glue
134
+ }
135
+ }
136
+ }
137
+
138
+ class send_packet<T : send> {
5
139
let mut p: option < * packet < T > > ;
6
140
new ( p: * packet<T >) { self . p = some ( p) ; }
7
141
drop {
@@ -16,23 +150,128 @@ class send_packet<T: send> {
16
150
p <-> self . p ;
17
151
option:: unwrap ( p)
18
152
}
153
+ }
154
+
155
+ class recv_packet<T : send> {
156
+ let mut p: option < * packet < T > > ;
157
+ new ( p: * packet<T >) { self . p = some ( p) ; }
158
+ drop {
159
+ if self. p != none {
160
+ let mut p = none;
161
+ p <-> self . p ;
162
+ receiver_terminate ( option:: unwrap ( p) )
163
+ }
164
+ }
165
+ fn unwrap( ) -> * packet<T > {
166
+ let mut p = none;
167
+ p <-> self . p ;
168
+ option:: unwrap ( p)
169
+ }
170
+ }
171
+
172
+ fn entangle<T : send>( ) -> ( send_packet<T >, recv_packet<T >) {
173
+ let p = packet ( ) ;
174
+ ( send_packet ( p) , recv_packet ( p) )
175
+ }
19
176
}
20
177
21
- enum state {
22
- empty,
23
- full,
24
- blocked,
25
- terminated
178
+ mod pingpong {
179
+ enum ping = pipes:: send_packet<pong>;
180
+ enum pong = pipes:: send_packet<ping>;
181
+
182
+ fn liberate_ping ( -p: ping) -> pipes:: send_packet<pong> unsafe {
183
+ let addr : * pipes:: send_packet < pong > = alt p {
184
+ ping( x) { unsafe :: reinterpret_cast ( ptr:: addr_of ( x) ) }
185
+ } ;
186
+ let liberated_value <- * addr;
187
+ unsafe :: forget ( p ) ;
188
+ liberated_value
189
+ }
190
+
191
+ fn liberate_pong ( -p : pong ) -> pipes:: send_packet < ping > unsafe {
192
+ let addr : * pipes:: send_packet < ping > = alt p {
193
+ pong ( x) { unsafe :: reinterpret_cast ( ptr:: addr_of ( x) ) }
194
+ } ;
195
+ let liberated_value <- * addr;
196
+ unsafe :: forget ( p ) ;
197
+ liberated_value
198
+ }
199
+
200
+ fn init ( ) -> ( client:: ping , server:: ping ) {
201
+ pipes:: entangle ( )
202
+ }
203
+
204
+ mod client {
205
+ type ping = pipes:: send_packet < pingpong:: ping > ;
206
+ type pong = pipes:: recv_packet < pingpong:: pong > ;
207
+
208
+ fn do_ping ( -c : ping ) -> pong {
209
+ let ( sp, rp) = pipes:: entangle ( ) ;
210
+
211
+ pipes:: send ( c, ping ( sp) ) ;
212
+ rp
213
+ }
214
+
215
+ fn do_pong ( -c : pong ) -> ( ping , ( ) ) {
216
+ let packet = pipes:: recv ( c) ;
217
+ if packet == none {
218
+ fail "sender closed the connection"
219
+ }
220
+ ( liberate_pong ( option:: unwrap ( packet) ) , ( ) )
221
+ }
222
+ }
223
+
224
+ mod server {
225
+ type ping = pipes:: recv_packet < pingpong:: ping > ;
226
+ type pong = pipes:: send_packet < pingpong:: pong > ;
227
+
228
+ fn do_ping ( -c : ping ) -> ( pong , ( ) ) {
229
+ let packet = pipes:: recv ( c) ;
230
+ if packet == none {
231
+ fail "sender closed the connection"
232
+ }
233
+ ( liberate_ping ( option:: unwrap ( packet) ) , ( ) )
234
+ }
235
+
236
+ fn do_pong ( -c : pong ) -> ping {
237
+ let ( sp, rp) = pipes:: entangle ( ) ;
238
+ pipes:: send ( c, pong ( sp) ) ;
239
+ rp
240
+ }
241
+ }
26
242
}
27
243
28
- type packet<T : send> = {
29
- mut state: state,
30
- mut blocked_task: option<task:: task>,
31
- mut payload: option<T >
32
- } ;
244
+ fn client ( -chan: pingpong:: client:: ping) {
245
+ let chan = pingpong:: client:: do_ping ( chan) ;
246
+ log ( error, "Sent ping" ) ;
247
+ let ( chan, _data) = pingpong:: client:: do_pong ( chan) ;
248
+ log ( error, "Received pong" ) ;
249
+ }
250
+
251
+ fn server ( -chan: pingpong:: server:: ping) {
252
+ let ( chan, _data) = pingpong:: server:: do_ping ( chan) ;
253
+ log ( error, "Received ping" ) ;
254
+ let chan = pingpong:: server:: do_pong ( chan) ;
255
+ log ( error, "Sent pong" ) ;
256
+ }
33
257
34
258
fn main ( ) {
35
- let _s: send_packet < int > = send_packet ( ptr:: addr_of ( { mut state: empty,
36
- mut blocked_task: none,
37
- mut payload: some ( 42 ) } ) ) ;
259
+ /*
260
+ // Commented out because of option::get error
261
+
262
+ let (client_, server_) = pingpong::init();
263
+ let client_ = ~mut some(client_);
264
+ let server_ = ~mut some(server_);
265
+
266
+ task::spawn {|move client_|
267
+ let mut client__ = none;
268
+ *client_ <-> client__;
269
+ client(option::unwrap(client__));
270
+ };
271
+ task::spawn {|move server_|
272
+ let mut server_ˊ = none;
273
+ *server_ <-> server_ˊ;
274
+ server(option::unwrap(server_ˊ));
275
+ };
276
+ */
38
277
}
0 commit comments