Skip to content

Commit 8852279

Browse files
committed
core: Add new weak task API
1 parent 1bf8e57 commit 8852279

File tree

7 files changed

+233
-8
lines changed

7 files changed

+233
-8
lines changed

src/libcore/pipes.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,6 +1234,16 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
12341234
(port, chan)
12351235
}
12361236

1237+
impl<T: Owned> PortOne<T> {
1238+
fn recv(self) -> T { recv_one(self) }
1239+
fn try_recv(self) -> Option<T> { try_recv_one(self) }
1240+
}
1241+
1242+
impl<T: Owned> ChanOne<T> {
1243+
fn send(self, data: T) { send_one(self, data) }
1244+
fn try_send(self, data: T) -> bool { try_send_one(self, data) }
1245+
}
1246+
12371247
/**
12381248
* Receive a message from a oneshot pipe, failing if the connection was
12391249
* closed.

src/libcore/private.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub mod at_exit;
3434
pub mod global;
3535
#[path = "private/finally.rs"]
3636
pub mod finally;
37+
#[path = "private/weak_task.rs"]
38+
pub mod weak_task;
3739

3840
extern mod rustrt {
3941
#[legacy_exports];

src/libcore/private/weak_task.rs

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*!
2+
Weak tasks
3+
4+
Weak tasks are a runtime feature for building global services that
5+
do not keep the runtime alive. Normally the runtime exits when all
6+
tasks exits, but if a task is weak then the runtime may exit while
7+
it is running, sending a notification to the task that the runtime
8+
is trying to shut down.
9+
*/
10+
11+
use option::{Some, None, swap_unwrap};
12+
use private::at_exit::at_exit;
13+
use private::global::global_data_clone_create;
14+
use private::finally::Finally;
15+
use pipes::{Port, Chan, SharedChan, stream};
16+
use task::{Task, task, spawn};
17+
use task::rt::{task_id, get_task_id};
18+
use send_map::linear::LinearMap;
19+
use ops::Drop;
20+
21+
type ShutdownMsg = ();
22+
23+
// XXX: This could be a PortOne but I've experienced bugginess
24+
// with oneshot pipes and try_send
25+
pub unsafe fn weaken_task(f: &fn(Port<ShutdownMsg>)) {
26+
let service = global_data_clone_create(global_data_key,
27+
create_global_service);
28+
let (shutdown_port, shutdown_chan) = stream::<ShutdownMsg>();
29+
let shutdown_port = ~mut Some(shutdown_port);
30+
let task = get_task_id();
31+
// Expect the weak task service to be alive
32+
assert service.try_send(RegisterWeakTask(task, shutdown_chan));
33+
unsafe { rust_inc_weak_task_count(); }
34+
do fn&() {
35+
let shutdown_port = swap_unwrap(&mut *shutdown_port);
36+
f(shutdown_port)
37+
}.finally || {
38+
unsafe { rust_dec_weak_task_count(); }
39+
// Service my have already exited
40+
service.send(UnregisterWeakTask(task));
41+
}
42+
}
43+
44+
type WeakTaskService = SharedChan<ServiceMsg>;
45+
type TaskHandle = task_id;
46+
47+
fn global_data_key(_v: WeakTaskService) { }
48+
49+
enum ServiceMsg {
50+
RegisterWeakTask(TaskHandle, Chan<ShutdownMsg>),
51+
UnregisterWeakTask(TaskHandle),
52+
Shutdown
53+
}
54+
55+
fn create_global_service() -> ~WeakTaskService {
56+
57+
debug!("creating global weak task service");
58+
let (port, chan) = stream::<ServiceMsg>();
59+
let port = ~mut Some(port);
60+
let chan = SharedChan(chan);
61+
let chan_clone = chan.clone();
62+
63+
do task().unlinked().spawn {
64+
debug!("running global weak task service");
65+
let port = swap_unwrap(&mut *port);
66+
let port = ~mut Some(port);
67+
do fn&() {
68+
let port = swap_unwrap(&mut *port);
69+
// The weak task service is itself a weak task
70+
debug!("weakening the weak service task");
71+
unsafe { rust_inc_weak_task_count(); }
72+
run_weak_task_service(port);
73+
}.finally {
74+
debug!("unweakening the weak service task");
75+
unsafe { rust_dec_weak_task_count(); }
76+
}
77+
}
78+
79+
do at_exit {
80+
debug!("shutting down weak task service");
81+
chan.send(Shutdown);
82+
}
83+
84+
return ~chan_clone;
85+
}
86+
87+
fn run_weak_task_service(port: Port<ServiceMsg>) {
88+
89+
let mut shutdown_map = LinearMap();
90+
91+
loop {
92+
match port.recv() {
93+
RegisterWeakTask(task, shutdown_chan) => {
94+
let previously_unregistered =
95+
shutdown_map.insert(task, shutdown_chan);
96+
assert previously_unregistered;
97+
}
98+
UnregisterWeakTask(task) => {
99+
match shutdown_map.pop(&task) {
100+
Some(shutdown_chan) => {
101+
// Oneshot pipes must send, even though
102+
// nobody will receive this
103+
shutdown_chan.send(());
104+
}
105+
None => fail
106+
}
107+
}
108+
Shutdown => break
109+
}
110+
}
111+
112+
do shutdown_map.consume |_, shutdown_chan| {
113+
// Weak task may have already exited
114+
shutdown_chan.send(());
115+
}
116+
}
117+
118+
extern {
119+
unsafe fn rust_inc_weak_task_count();
120+
unsafe fn rust_dec_weak_task_count();
121+
}
122+
123+
#[test]
124+
fn test_simple() unsafe {
125+
let (port, chan) = stream();
126+
do spawn unsafe {
127+
do weaken_task |_signal| {
128+
}
129+
chan.send(());
130+
}
131+
port.recv();
132+
}
133+
134+
#[test]
135+
fn test_weak_weak() unsafe {
136+
let (port, chan) = stream();
137+
do spawn unsafe {
138+
do weaken_task |_signal| {
139+
}
140+
do weaken_task |_signal| {
141+
}
142+
chan.send(());
143+
}
144+
port.recv();
145+
}
146+
147+
#[test]
148+
fn test_wait_for_signal() unsafe {
149+
do spawn unsafe {
150+
do weaken_task |signal| {
151+
signal.recv();
152+
}
153+
}
154+
}
155+
156+
#[test]
157+
fn test_wait_for_signal_many() unsafe {
158+
use uint;
159+
for uint::range(0, 100) |_| {
160+
do spawn unsafe {
161+
do weaken_task |signal| {
162+
signal.recv();
163+
}
164+
}
165+
}
166+
}
167+
168+
#[test]
169+
fn test_select_stream_and_oneshot() unsafe {
170+
use pipes::select2i;
171+
use either::{Left, Right};
172+
173+
let (port, chan) = stream();
174+
let (waitport, waitchan) = stream();
175+
do spawn unsafe {
176+
do weaken_task |signal| {
177+
match select2i(&port, &signal) {
178+
Left(*) => (),
179+
Right(*) => fail
180+
}
181+
}
182+
waitchan.send(());
183+
}
184+
chan.send(());
185+
waitport.recv();
186+
}
187+

src/rt/rust_builtin.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,18 @@ rust_get_global_data_ptr() {
10381038
return &task->kernel->global_data;
10391039
}
10401040

1041+
extern "C" void
1042+
rust_inc_weak_task_count() {
1043+
rust_task *task = rust_get_current_task();
1044+
task->kernel->inc_weak_task_count();
1045+
}
1046+
1047+
extern "C" void
1048+
rust_dec_weak_task_count() {
1049+
rust_task *task = rust_get_current_task();
1050+
task->kernel->dec_weak_task_count();
1051+
}
1052+
10411053
//
10421054
// Local Variables:
10431055
// mode: C++

src/rt/rust_kernel.cpp

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -377,17 +377,12 @@ rust_kernel::weaken_task(rust_port_id chan) {
377377
KLOG_("Weakening task with channel %" PRIdPTR, chan);
378378
weak_task_chans.push_back(chan);
379379
}
380-
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
381-
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
382-
if (new_non_weak_tasks == 0) {
383-
begin_shutdown();
384-
}
380+
inc_weak_task_count();
385381
}
386382

387383
void
388384
rust_kernel::unweaken_task(rust_port_id chan) {
389-
uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
390-
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
385+
dec_weak_task_count();
391386
{
392387
scoped_lock with(weak_task_lock);
393388
KLOG_("Unweakening task with channel %" PRIdPTR, chan);
@@ -399,6 +394,21 @@ rust_kernel::unweaken_task(rust_port_id chan) {
399394
}
400395
}
401396

397+
void
398+
rust_kernel::inc_weak_task_count() {
399+
uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
400+
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
401+
if (new_non_weak_tasks == 0) {
402+
begin_shutdown();
403+
}
404+
}
405+
406+
void
407+
rust_kernel::dec_weak_task_count() {
408+
uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
409+
KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
410+
}
411+
402412
void
403413
rust_kernel::end_weak_tasks() {
404414
std::vector<rust_port_id> chancopies;

src/rt/rust_kernel.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ class rust_kernel {
187187
void unregister_task();
188188
void weaken_task(rust_port_id chan);
189189
void unweaken_task(rust_port_id chan);
190+
void inc_weak_task_count();
191+
void dec_weak_task_count();
190192

191193
bool send_to_port(rust_port_id chan, void *sptr);
192194

src/rt/rustrt.def.in

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,6 @@ linenoiseHistoryLoad
211211
rust_raw_thread_start
212212
rust_raw_thread_join_delete
213213
rust_register_exit_function
214-
rust_get_global_data_ptr
214+
rust_get_global_data_ptr
215+
rust_inc_weak_task_count
216+
rust_dec_weak_task_count

0 commit comments

Comments
 (0)