Skip to content

Commit 05accaa

Browse files
committed
implement arc::unwrap and add exclusive_unwrap tests
1 parent 842db0b commit 05accaa

File tree

1 file changed

+200
-20
lines changed

1 file changed

+200
-20
lines changed

src/libcore/unsafe.rs

Lines changed: 200 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export transmute_mut, transmute_immut, transmute_region, transmute_mut_region;
55

66
export SharedMutableState, shared_mutable_state, clone_shared_mutable_state;
77
export get_shared_mutable_state, get_shared_immutable_state;
8+
export unwrap_shared_mutable_state;
89
export Exclusive, exclusive;
910

1011
import task::atomically;
@@ -69,24 +70,120 @@ unsafe fn transmute_mut_region<T>(+ptr: &a/mut T) -> &b/mut T {
6970
* Shared state & exclusive ARC
7071
****************************************************************************/
7172

72-
type ArcData<T> = {
73-
mut count: libc::intptr_t,
74-
data: T
75-
};
73+
// An unwrapper uses this protocol to communicate with the "other" task that
74+
// drops the last refcount on an arc. Unfortunately this can't be a proper
75+
// pipe protocol because the unwrapper has to access both stages at once.
76+
type UnwrapProto = ~mut option<(pipes::chan_one<()>, pipes::port_one<bool>)>;
77+
78+
struct ArcData<T> {
79+
mut count: libc::intptr_t;
80+
mut unwrapper: libc::uintptr_t; // either a UnwrapProto or 0
81+
// FIXME(#3224) should be able to make this non-option to save memory, and
82+
// in unwrap() use "let ~ArcData { data: result, _ } = thing" to unwrap it
83+
mut data: option<T>;
84+
}
7685

7786
struct ArcDestruct<T> {
78-
let data: *libc::c_void;
79-
new(data: *libc::c_void) { self.data = data; }
80-
drop unsafe {
81-
let data: ~ArcData<T> = unsafe::reinterpret_cast(self.data);
82-
let new_count = rustrt::rust_atomic_decrement(&mut data.count);
83-
assert new_count >= 0;
84-
if new_count == 0 {
85-
// drop glue takes over.
86-
} else {
87-
unsafe::forget(data);
88-
}
89-
}
87+
mut data: *libc::c_void;
88+
new(data: *libc::c_void) { self.data = data; }
89+
drop unsafe {
90+
if self.data.is_null() {
91+
return; // Happens when destructing an unwrapper's handle.
92+
}
93+
do task::unkillable {
94+
let data: ~ArcData<T> = unsafe::reinterpret_cast(self.data);
95+
let new_count = rustrt::rust_atomic_decrement(&mut data.count);
96+
assert new_count >= 0;
97+
if new_count == 0 {
98+
// Were we really last, or should we hand off to an unwrapper?
99+
// It's safe to not xchg because the unwrapper will set the
100+
// unwrap lock *before* dropping his/her reference. In effect,
101+
// being here means we're the only *awake* task with the data.
102+
if data.unwrapper != 0 {
103+
let p: UnwrapProto =
104+
unsafe::reinterpret_cast(data.unwrapper);
105+
let (message, response) = option::swap_unwrap(p);
106+
// Send 'ready' and wait for a response.
107+
pipes::send_one(message, ());
108+
// Unkillable wait. Message guaranteed to come.
109+
if pipes::recv_one(response) {
110+
// Other task got the data.
111+
unsafe::forget(data);
112+
} else {
113+
// Other task was killed. drop glue takes over.
114+
}
115+
} else {
116+
// drop glue takes over.
117+
}
118+
} else {
119+
unsafe::forget(data);
120+
}
121+
}
122+
}
123+
}
124+
125+
unsafe fn unwrap_shared_mutable_state<T: send>(+rc: SharedMutableState<T>)
126+
-> T {
127+
struct DeathThroes<T> {
128+
mut ptr: option<~ArcData<T>>;
129+
mut response: option<pipes::chan_one<bool>>;
130+
drop unsafe {
131+
let response = option::swap_unwrap(&mut self.response);
132+
// In case we get killed early, we need to tell the person who
133+
// tried to wake us whether they should hand-off the data to us.
134+
if task::failing() {
135+
pipes::send_one(response, false);
136+
// Either this swap_unwrap or the one below (at "Got here")
137+
// ought to run.
138+
unsafe::forget(option::swap_unwrap(&mut self.ptr));
139+
} else {
140+
assert self.ptr.is_none();
141+
pipes::send_one(response, true);
142+
}
143+
}
144+
}
145+
146+
do task::unkillable {
147+
let ptr: ~ArcData<T> = unsafe::reinterpret_cast(rc.data);
148+
let (c1,p1) = pipes::oneshot(); // ()
149+
let (c2,p2) = pipes::oneshot(); // bool
150+
let server: UnwrapProto = ~mut some((c1,p2));
151+
let serverp: libc::uintptr_t = unsafe::transmute(server);
152+
// Try to put our server end in the unwrapper slot.
153+
if rustrt::rust_compare_and_swap_ptr(&mut ptr.unwrapper, 0, serverp) {
154+
// Got in. Step 0: Tell destructor not to run. We are now it.
155+
rc.data = ptr::null();
156+
// Step 1 - drop our own reference.
157+
let new_count = rustrt::rust_atomic_decrement(&mut ptr.count);
158+
assert new_count >= 0;
159+
if new_count == 0 {
160+
// We were the last owner. Can unwrap immediately.
161+
// Also we have to free the server endpoints.
162+
let _server: UnwrapProto = unsafe::transmute(serverp);
163+
option::swap_unwrap(&mut ptr.data)
164+
// drop glue takes over.
165+
} else {
166+
// The *next* person who sees the refcount hit 0 will wake us.
167+
let end_result =
168+
DeathThroes { ptr: some(ptr), response: some(c2) };
169+
let mut p1 = some(p1); // argh
170+
do task::rekillable {
171+
pipes::recv_one(option::swap_unwrap(&mut p1));
172+
}
173+
// Got here. Back in the 'unkillable' without getting killed.
174+
// Recover ownership of ptr, then take the data out.
175+
let ptr = option::swap_unwrap(&mut end_result.ptr);
176+
option::swap_unwrap(&mut ptr.data)
177+
// drop glue takes over.
178+
}
179+
} else {
180+
// Somebody else was trying to unwrap. Avoid guaranteed deadlock.
181+
unsafe::forget(ptr);
182+
// Also we have to free the (rejected) server endpoints.
183+
let _server: UnwrapProto = unsafe::transmute(serverp);
184+
fail ~"Another task is already unwrapping this ARC!";
185+
}
186+
}
90187
}
91188
92189
/**
@@ -98,7 +195,7 @@ struct ArcDestruct<T> {
98195
type SharedMutableState<T: send> = ArcDestruct<T>;
99196
100197
unsafe fn shared_mutable_state<T: send>(+data: T) -> SharedMutableState<T> {
101-
let data = ~{mut count: 1, data: data};
198+
let data = ~ArcData { count: 1, unwrapper: 0, data: some(data) };
102199
unsafe {
103200
let ptr = unsafe::transmute(data);
104201
ArcDestruct(ptr)
@@ -112,9 +209,9 @@ unsafe fn get_shared_mutable_state<T: send>(rc: &SharedMutableState<T>)
112209
let ptr: ~ArcData<T> = unsafe::reinterpret_cast((*rc).data);
113210
assert ptr.count > 0;
114211
// Cast us back into the correct region
115-
let r = unsafe::reinterpret_cast(&ptr.data);
212+
let r = unsafe::transmute_region(option::get_ref(&ptr.data));
116213
unsafe::forget(ptr);
117-
return r;
214+
return unsafe::transmute_mut(r);
118215
}
119216
}
120217
#[inline(always)]
@@ -124,7 +221,7 @@ unsafe fn get_shared_immutable_state<T: send>(rc: &SharedMutableState<T>)
124221
let ptr: ~ArcData<T> = unsafe::reinterpret_cast((*rc).data);
125222
assert ptr.count > 0;
126223
// Cast us back into the correct region
127-
let r = unsafe::reinterpret_cast(&ptr.data);
224+
let r = unsafe::transmute_region(option::get_ref(&ptr.data));
128225
unsafe::forget(ptr);
129226
return r;
130227
}
@@ -156,6 +253,11 @@ extern mod rustrt {
156253
fn rust_atomic_decrement(p: &mut libc::intptr_t)
157254
-> libc::intptr_t;
158255

256+
#[rust_stack]
257+
fn rust_compare_and_swap_ptr(address: &mut libc::uintptr_t,
258+
oldval: libc::uintptr_t,
259+
newval: libc::uintptr_t) -> bool;
260+
159261
fn rust_create_little_lock() -> rust_little_lock;
160262
fn rust_destroy_little_lock(lock: rust_little_lock);
161263
fn rust_lock_little_lock(lock: rust_little_lock);
@@ -227,6 +329,15 @@ impl<T: send> Exclusive<T> {
227329
}
228330
}
229331
332+
// FIXME(#2585) make this a by-move method on the exclusive
333+
#[cfg(stage1)]
334+
fn unwrap_exclusive<T: send>(+arc: Exclusive<T>) -> T {
335+
let Exclusive { x: x } = arc;
336+
let inner = unsafe { unwrap_shared_mutable_state(x) };
337+
let ExData { data: data, _ } = inner;
338+
data
339+
}
340+
230341
/****************************************************************************
231342
* Tests
232343
****************************************************************************/
@@ -313,4 +424,73 @@ mod tests {
313424
assert *one == 1;
314425
}
315426
}
427+
428+
#[test]
429+
#[cfg(stage1)]
430+
fn exclusive_unwrap_basic() {
431+
let x = exclusive(~~"hello");
432+
assert unwrap_exclusive(x) == ~~"hello";
433+
}
434+
435+
#[test]
436+
#[cfg(stage1)]
437+
fn exclusive_unwrap_contended() {
438+
let x = exclusive(~~"hello");
439+
let x2 = ~mut some(x.clone());
440+
do task::spawn {
441+
let x2 = option::swap_unwrap(x2);
442+
do x2.with |_hello| { }
443+
task::yield();
444+
}
445+
assert unwrap_exclusive(x) == ~~"hello";
446+
447+
// Now try the same thing, but with the child task blocking.
448+
let x = exclusive(~~"hello");
449+
let x2 = ~mut some(x.clone());
450+
let mut res = none;
451+
do task::task().future_result(|+r| res = some(r)).spawn {
452+
let x2 = option::swap_unwrap(x2);
453+
assert unwrap_exclusive(x2) == ~~"hello";
454+
}
455+
// Have to get rid of our reference before blocking.
456+
{ let _x = move x; } // FIXME(#3161) util::ignore doesn't work here
457+
let res = option::swap_unwrap(&mut res);
458+
future::get(&res);
459+
}
460+
461+
#[test] #[should_fail] #[ignore(cfg(windows))]
462+
#[cfg(stage1)]
463+
fn exclusive_unwrap_conflict() {
464+
let x = exclusive(~~"hello");
465+
let x2 = ~mut some(x.clone());
466+
let mut res = none;
467+
do task::task().future_result(|+r| res = some(r)).spawn {
468+
let x2 = option::swap_unwrap(x2);
469+
assert unwrap_exclusive(x2) == ~~"hello";
470+
}
471+
assert unwrap_exclusive(x) == ~~"hello";
472+
let res = option::swap_unwrap(&mut res);
473+
future::get(&res);
474+
}
475+
476+
#[test] #[ignore(cfg(windows))]
477+
#[cfg(stage1)]
478+
fn exclusive_unwrap_deadlock() {
479+
// This is not guaranteed to get to the deadlock before being killed,
480+
// but it will show up sometimes, and if the deadlock were not there,
481+
// the test would nondeterministically fail.
482+
let result = do task::try {
483+
// a task that has two references to the same exclusive will
484+
// deadlock when it unwraps. nothing to be done about that.
485+
let x = exclusive(~~"hello");
486+
let x2 = x.clone();
487+
do task::spawn {
488+
for 10.times { task::yield(); } // try to let the unwrapper go
489+
fail; // punt it awake from its deadlock
490+
}
491+
let _z = unwrap_exclusive(x);
492+
do x2.with |_hello| { }
493+
};
494+
assert result.is_err();
495+
}
316496
}

0 commit comments

Comments
 (0)