Skip to content

Commit 97d6342

Browse files
committed
Synthesize a flush_chan upcall right before a channel's ref_count drops to zero. This should only happen in the Rust code and not in the drop glue, or on the unwind path. This change allows the task owning the channel to block on a flush and delete its own channel. This change also cleans up some code around rust_port and rust_chan.
1 parent 5917ca3 commit 97d6342

File tree

9 files changed

+113
-55
lines changed

9 files changed

+113
-55
lines changed

src/boot/me/trans.ml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2932,6 +2932,7 @@ let trans_visitor
29322932
(slot:Ast.slot)
29332933
(curr_iso:Ast.ty_iso option)
29342934
: unit =
2935+
check_and_flush_chan cell slot;
29352936
drop_slot (get_ty_params_of_current_frame()) cell slot curr_iso
29362937

29372938
and drop_ty_in_current_frame
@@ -4188,6 +4189,25 @@ let trans_visitor
41884189
let last_jumps = Array.map trans_arm at.Ast.alt_tag_arms in
41894190
Array.iter patch last_jumps
41904191

4192+
(* If we're about to drop a channel, synthesize an upcall_flush_chan.
4193+
* TODO: This should rather appear in a chan dtor when chans become
4194+
* objects. *)
4195+
and check_and_flush_chan
4196+
(cell:Il.cell)
4197+
(slot:Ast.slot)
4198+
: unit =
4199+
let ty = strip_mutable_or_constrained_ty (slot_ty slot) in
4200+
match simplified_ty ty with
4201+
Ast.TY_chan _ ->
4202+
annotate "check_and_flush_chan, flush_chan";
4203+
let rc = box_rc_cell cell in
4204+
emit (Il.cmp (Il.Cell rc) one);
4205+
let jump = mark () in
4206+
emit (Il.jmp Il.JNE Il.CodeNone);
4207+
trans_void_upcall "upcall_flush_chan" [| Il.Cell cell |];
4208+
patch jump;
4209+
| _ -> ()
4210+
41914211
and drop_slots_at_curr_stmt _ : unit =
41924212
let stmt = Stack.top curr_stmt in
41934213
match htab_search cx.ctxt_post_stmt_slot_drops stmt with

src/rt/rust_chan.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,22 @@ rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
1818
}
1919

2020
rust_chan::~rust_chan() {
21-
if (port && !port->is_proxy()) {
22-
port->delegate()->chans.swap_delete(this);
23-
}
21+
task->log(rust_log::MEM | rust_log::COMM,
22+
"del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this);
23+
24+
A(task->dom, is_associated() == false,
25+
"Channel must be disassociated before being freed.");
2426
}
2527

2628
/**
2729
* Link this channel with the specified port.
2830
*/
2931
void rust_chan::associate(maybe_proxy<rust_port> *port) {
3032
this->port = port;
31-
if (!port->is_proxy()) {
33+
if (port->is_proxy() == false) {
34+
task->log(rust_log::TASK,
35+
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
36+
this, port);
3237
this->port->delegate()->chans.push(this);
3338
}
3439
}
@@ -43,14 +48,23 @@ bool rust_chan::is_associated() {
4348
void rust_chan::disassociate() {
4449
A(task->dom, is_associated(), "Channel must be associated with a port.");
4550

51+
if (port->is_proxy() == false) {
52+
task->log(rust_log::TASK,
53+
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
54+
this, port->delegate());
55+
port->delegate()->chans.swap_delete(this);
56+
}
57+
4658
// Delete reference to the port.
4759
port = NULL;
4860
}
4961

5062
/**
51-
* Attempt to transmit channel data to the associated port.
63+
* Attempt to send data to the associated port.
5264
*/
53-
void rust_chan::transmit() {
65+
void rust_chan::send(void *sptr) {
66+
buffer.enqueue(sptr);
67+
5468
rust_dom *dom = task->dom;
5569
if (!is_associated()) {
5670
W(dom, is_associated(),
@@ -81,7 +95,6 @@ void rust_chan::transmit() {
8195

8296
return;
8397
}
84-
8598
//
8699
// Local Variables:
87100
// mode: C++

src/rt/rust_chan.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class rust_chan : public rc_base<rust_chan>,
1717
void disassociate();
1818
bool is_associated();
1919

20-
void transmit();
20+
void send(void *sptr);
2121
};
2222

2323
//

src/rt/rust_dom.cpp

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,6 @@ rust_dom::reap_dead_tasks() {
237237
rust_task *task = dead_tasks[i];
238238
if (task->ref_count == 0) {
239239
I(this, task->tasks_waiting_to_join.is_empty());
240-
241240
dead_tasks.swap_delete(task);
242241
log(rust_log::TASK,
243242
"deleting unreferenced dead task 0x%" PRIxPTR, task);
@@ -392,10 +391,9 @@ rust_dom::start_main_loop()
392391
// if progress is made in other domains.
393392

394393
if (scheduled_task == NULL) {
395-
log(rust_log::TASK,
396-
"all tasks are blocked, waiting for progress ...");
397-
if (_log.is_tracing(rust_log::TASK))
394+
if (_log.is_tracing(rust_log::TASK)) {
398395
log_state();
396+
}
399397
log(rust_log::TASK,
400398
"all tasks are blocked, scheduler yielding ...");
401399
sync::yield();
@@ -437,18 +435,6 @@ rust_dom::start_main_loop()
437435
log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
438436

439437
while (dead_tasks.length() > 0) {
440-
log(rust_log::DOM,
441-
"waiting for %d dead tasks to become dereferenced ...",
442-
dead_tasks.length());
443-
444-
if (_log.is_tracing(rust_log::DOM)) {
445-
for (size_t i = 0; i < dead_tasks.length(); i++) {
446-
log(rust_log::DOM,
447-
"task: 0x%" PRIxPTR ", index: %d, ref_count: %d",
448-
dead_tasks[i], i, dead_tasks[i]->ref_count);
449-
}
450-
}
451-
452438
if (_incoming_message_queue.is_empty()) {
453439
log(rust_log::DOM,
454440
"waiting for %d dead tasks to become dereferenced, "

src/rt/rust_message.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
9090
}
9191

9292
void data_message::process() {
93-
_port->remote_channel->buffer.enqueue(_buffer);
94-
_port->remote_channel->transmit();
93+
_port->remote_channel->send(_buffer);
9594
_target->log(rust_log::COMM, "<=== received data via message ===");
9695
}
9796

src/rt/rust_port.cpp

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,32 @@ rust_port::~rust_port() {
2121

2222
// Disassociate channels from this port.
2323
while (chans.is_empty() == false) {
24-
chans.pop()->disassociate();
24+
rust_chan *chan = chans.peek();
25+
chan->disassociate();
2526
}
2627

27-
// We're the only ones holding a reference to the remote channel, so
28-
// clean it up.
2928
delete remote_channel;
3029
}
3130

31+
bool rust_port::receive(void *dptr) {
32+
for (uint32_t i = 0; i < chans.length(); i++) {
33+
rust_chan *chan = chans[i];
34+
if (chan->buffer.is_empty() == false) {
35+
chan->buffer.dequeue(dptr);
36+
if (chan->buffer.is_empty() && chan->task->blocked()) {
37+
task->log(rust_log::COMM,
38+
"chan: 0x%" PRIxPTR
39+
" is flushing, wakeup task: 0x%" PRIxPTR,
40+
chan, chan->task);
41+
chan->task->wakeup(this);
42+
}
43+
task->log(rust_log::COMM, "<=== read data ===");
44+
return true;
45+
}
46+
}
47+
return false;
48+
}
49+
3250
void rust_port::log_state() {
3351
task->log(rust_log::COMM,
3452
"rust_port: 0x%" PRIxPTR ", associated channel(s): %d",

src/rt/rust_port.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class rust_port : public maybe_proxy<rust_port>,
1616
rust_port(rust_task *task, size_t unit_sz);
1717
~rust_port();
1818
void log_state();
19+
bool receive(void *dptr);
1920
};
2021

2122
//

src/rt/rust_task.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,11 @@ get_callee_save_fp(uintptr_t *top_of_callee_saves)
323323

324324
void
325325
rust_task::kill() {
326+
if (dead()) {
327+
// Task is already dead, can't kill what's already dead.
328+
return;
329+
}
330+
326331
// Note the distinction here: kill() is when you're in an upcall
327332
// from task A and want to force-fail task B, you do B->kill().
328333
// If you want to fail yourself you do self->fail(upcall_nargs).

src/rt/rust_upcall.cpp

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -97,27 +97,51 @@ upcall_new_chan(rust_task *task, rust_port *port) {
9797
return new (dom) rust_chan(task, port);
9898
}
9999

100+
/**
101+
* Called whenever this channel needs to be flushed. This can happen due to a
102+
* flush statement, or automatically whenever a channel's ref count is
103+
* about to drop to zero.
104+
*/
105+
extern "C" CDECL void
106+
upcall_flush_chan(rust_task *task, rust_chan *chan) {
107+
LOG_UPCALL_ENTRY(task);
108+
rust_dom *dom = task->dom;
109+
task->log(rust_log::UPCALL | rust_log::COMM,
110+
"flush chan: 0x%" PRIxPTR, chan);
111+
112+
if (chan->buffer.is_empty()) {
113+
return;
114+
}
115+
116+
A(dom, chan->port->is_proxy() == false,
117+
"Channels to remote ports should be flushed automatically.");
118+
119+
// Block on the port until this channel has been completely drained
120+
// by the port.
121+
task->block(chan->port);
122+
task->yield(2);
123+
}
124+
100125
/**
101126
* Called whenever the channel's ref count drops to zero.
127+
*
128+
* Cannot Yield: If the task were to unwind, the dropped ref would still
129+
* appear to be live, causing modify-after-free errors.
102130
*/
103131
extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
104132
LOG_UPCALL_ENTRY(task);
105-
rust_dom *dom = task->dom;
133+
106134
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
107135
"upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
108-
I(dom, !chan->ref_count);
109-
110-
if (!chan->buffer.is_empty() && chan->is_associated()) {
111-
A(dom, !chan->port->is_proxy(),
112-
"Channels to remote ports should be flushed automatically.");
113-
// A target port may still be reading from this channel.
114-
// Block on this channel until it has been completely drained
115-
// by the port.
116-
task->block(chan);
117-
task->yield(2);
118-
return;
119-
}
120136

137+
A(task->dom, chan->ref_count == 0,
138+
"Channel's ref count should be zero.");
139+
140+
if (chan->is_associated()) {
141+
A(task->dom, chan->buffer.is_empty(),
142+
"Channel's buffer should be empty.");
143+
chan->disassociate();
144+
}
121145
delete chan;
122146
}
123147

@@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
183207
"chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
184208
(uintptr_t) chan, (uintptr_t) sptr,
185209
chan->port->delegate()->unit_sz);
186-
chan->buffer.enqueue(sptr);
187-
chan->transmit();
210+
chan->send(sptr);
188211
task->log(rust_log::COMM, "=== sent data ===>");
189212
}
190213

@@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
197220
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
198221
port->chans.length());
199222

200-
for (uint32_t i = 0; i < port->chans.length(); i++) {
201-
rust_chan *chan = port->chans[i];
202-
if (chan->buffer.is_empty() == false) {
203-
chan->buffer.dequeue(dptr);
204-
if (chan->buffer.is_empty() && chan->task->blocked()) {
205-
chan->task->wakeup(chan);
206-
delete chan;
207-
}
208-
task->log(rust_log::COMM, "<=== read data ===");
209-
return;
210-
}
223+
if (port->receive(dptr)) {
224+
return;
211225
}
212226

213227
// No data was buffered on any incoming channel, so block this task
@@ -260,6 +274,8 @@ upcall_exit(rust_task *task) {
260274
LOG_UPCALL_ENTRY(task);
261275
task->log(rust_log::UPCALL | rust_log::TASK,
262276
"task ref_count: %d", task->ref_count);
277+
A(task->dom, task->ref_count >= 0,
278+
"Task ref_count should not be negative on exit!");
263279
task->die();
264280
task->notify_tasks_waiting_to_join();
265281
task->yield(1);

0 commit comments

Comments
 (0)