Skip to content

Work on removing comm #4190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 14, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/tutorial-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ and child both need to exchange messages with each other. The
function `std::comm::DuplexStream()` supports this pattern. We'll
look briefly at how to use it.

To see how `spawn_conversation()` works, we will create a child task
To see how `DuplexStream()` works, we will create a child task
that repeatedly receives a `uint` message, converts it to a string, and sends
the string in response. The child terminates when it receives `0`.
Here is the function that implements the child task:
Expand Down
2 changes: 1 addition & 1 deletion src/libcore/core.rc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub mod send_map;

/* Tasks and communication */

pub mod comm;
pub mod oldcomm;
#[path = "task/mod.rs"]
pub mod task;
pub mod pipes;
Expand Down
6 changes: 3 additions & 3 deletions src/libcore/comm.rs → src/libcore/oldcomm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn PortPtr<T: Owned>(po: *rust_port) -> PortPtr<T> {
* Fails if the port is detached or dead. Fails if the port
* is owned by a different task.
*/
fn as_raw_port<T: Owned, U>(ch: comm::Chan<T>, f: fn(*rust_port) -> U) -> U {
fn as_raw_port<T: Owned, U>(ch: Chan<T>, f: fn(*rust_port) -> U) -> U {

struct PortRef {
p: *rust_port,
Expand Down Expand Up @@ -205,11 +205,11 @@ pub fn recv<T: Owned>(p: Port<T>) -> T { recv_((**p).po) }
pub fn peek<T: Owned>(p: Port<T>) -> bool { peek_((**p).po) }

#[doc(hidden)]
pub fn recv_chan<T: Owned>(ch: comm::Chan<T>) -> T {
pub fn recv_chan<T: Owned>(ch: Chan<T>) -> T {
as_raw_port(ch, |x|recv_(x))
}

fn peek_chan<T: Owned>(ch: comm::Chan<T>) -> bool {
fn peek_chan<T: Owned>(ch: Chan<T>) -> bool {
as_raw_port(ch, |x|peek_(x))
}

Expand Down
40 changes: 20 additions & 20 deletions src/libcore/os.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,36 +133,36 @@ mod global_env {
}

enum Msg {
MsgGetEnv(~str, comm::Chan<Option<~str>>),
MsgSetEnv(~str, ~str, comm::Chan<()>),
MsgEnv(comm::Chan<~[(~str,~str)]>)
MsgGetEnv(~str, oldcomm::Chan<Option<~str>>),
MsgSetEnv(~str, ~str, oldcomm::Chan<()>),
MsgEnv(oldcomm::Chan<~[(~str,~str)]>)
}

pub fn getenv(n: &str) -> Option<~str> {
let env_ch = get_global_env_chan();
let po = comm::Port();
comm::send(env_ch, MsgGetEnv(str::from_slice(n),
comm::Chan(&po)));
comm::recv(po)
let po = oldcomm::Port();
oldcomm::send(env_ch, MsgGetEnv(str::from_slice(n),
oldcomm::Chan(&po)));
oldcomm::recv(po)
}

pub fn setenv(n: &str, v: &str) {
let env_ch = get_global_env_chan();
let po = comm::Port();
comm::send(env_ch, MsgSetEnv(str::from_slice(n),
let po = oldcomm::Port();
oldcomm::send(env_ch, MsgSetEnv(str::from_slice(n),
str::from_slice(v),
comm::Chan(&po)));
comm::recv(po)
oldcomm::Chan(&po)));
oldcomm::recv(po)
}

pub fn env() -> ~[(~str,~str)] {
let env_ch = get_global_env_chan();
let po = comm::Port();
comm::send(env_ch, MsgEnv(comm::Chan(&po)));
comm::recv(po)
let po = oldcomm::Port();
oldcomm::send(env_ch, MsgEnv(oldcomm::Chan(&po)));
oldcomm::recv(po)
}

fn get_global_env_chan() -> comm::Chan<Msg> {
fn get_global_env_chan() -> oldcomm::Chan<Msg> {
let global_ptr = rustrt::rust_global_env_chan_ptr();
unsafe {
private::chan_from_global_ptr(global_ptr, || {
Expand All @@ -173,19 +173,19 @@ mod global_env {
}
}

fn global_env_task(msg_po: comm::Port<Msg>) {
fn global_env_task(msg_po: oldcomm::Port<Msg>) {
unsafe {
do private::weaken_task |weak_po| {
loop {
match comm::select2(msg_po, weak_po) {
match oldcomm::select2(msg_po, weak_po) {
either::Left(MsgGetEnv(ref n, resp_ch)) => {
comm::send(resp_ch, impl_::getenv(*n))
oldcomm::send(resp_ch, impl_::getenv(*n))
}
either::Left(MsgSetEnv(ref n, ref v, resp_ch)) => {
comm::send(resp_ch, impl_::setenv(*n, *v))
oldcomm::send(resp_ch, impl_::setenv(*n, *v))
}
either::Left(MsgEnv(resp_ch)) => {
comm::send(resp_ch, impl_::env())
oldcomm::send(resp_ch, impl_::env())
}
either::Right(_) => break
}
Expand Down
84 changes: 45 additions & 39 deletions src/libcore/private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
pub unsafe fn chan_from_global_ptr<T: Owned>(
global: GlobalPtr,
task_fn: fn() -> task::TaskBuilder,
f: fn~(comm::Port<T>)
) -> comm::Chan<T> {
f: fn~(oldcomm::Port<T>)
) -> oldcomm::Chan<T> {

enum Msg {
Proceed,
Expand All @@ -70,23 +70,29 @@ pub unsafe fn chan_from_global_ptr<T: Owned>(
log(debug,~"is probably zero...");
// There's no global channel. We must make it

let (setup_po, setup_ch) = do task_fn().spawn_conversation
|move f, setup_po, setup_ch| {
let po = comm::Port::<T>();
let ch = comm::Chan(&po);
comm::send(setup_ch, ch);
let (setup1_po, setup1_ch) = pipes::stream();
let (setup2_po, setup2_ch) = pipes::stream();

// XXX: Ugly type inference hints
let setup1_po: pipes::Port<oldcomm::Chan<T>> = setup1_po;
let setup2_po: pipes::Port<Msg> = setup2_po;

do task_fn().spawn |move f, move setup1_ch, move setup2_po| {
let po = oldcomm::Port::<T>();
let ch = oldcomm::Chan(&po);
setup1_ch.send(ch);

// Wait to hear if we are the official instance of
// this global task
match comm::recv::<Msg>(setup_po) {
match setup2_po.recv() {
Proceed => f(move po),
Abort => ()
}
};

log(debug,~"before setup recv..");
// This is the proposed global channel
let ch = comm::recv(setup_po);
let ch = setup1_po.recv();
// 0 is our sentinal value. It is not a valid channel
assert *ch != 0;

Expand All @@ -99,11 +105,11 @@ pub unsafe fn chan_from_global_ptr<T: Owned>(

if swapped {
// Success!
comm::send(setup_ch, Proceed);
setup2_ch.send(Proceed);
ch
} else {
// Somebody else got in before we did
comm::send(setup_ch, Abort);
setup2_ch.send(Abort);
cast::reinterpret_cast(&*global)
}
} else {
Expand All @@ -124,29 +130,29 @@ pub fn test_from_global_chan1() {
// Create the global channel, attached to a new task
let ch = unsafe {
do chan_from_global_ptr(globchanp, task::task) |po| {
let ch = comm::recv(po);
comm::send(ch, true);
let ch = comm::recv(po);
comm::send(ch, true);
let ch = oldcomm::recv(po);
oldcomm::send(ch, true);
let ch = oldcomm::recv(po);
oldcomm::send(ch, true);
}
};
// Talk to it
let po = comm::Port();
comm::send(ch, comm::Chan(&po));
assert comm::recv(po) == true;
let po = oldcomm::Port();
oldcomm::send(ch, oldcomm::Chan(&po));
assert oldcomm::recv(po) == true;

// This one just reuses the previous channel
let ch = unsafe {
do chan_from_global_ptr(globchanp, task::task) |po| {
let ch = comm::recv(po);
comm::send(ch, false);
let ch = oldcomm::recv(po);
oldcomm::send(ch, false);
}
};

// Talk to the original global task
let po = comm::Port();
comm::send(ch, comm::Chan(&po));
assert comm::recv(po) == true;
let po = oldcomm::Port();
oldcomm::send(ch, oldcomm::Chan(&po));
assert oldcomm::recv(po) == true;
}

#[test]
Expand All @@ -157,8 +163,8 @@ pub fn test_from_global_chan2() {
let globchan = 0;
let globchanp = ptr::addr_of(&globchan);

let resultpo = comm::Port();
let resultch = comm::Chan(&resultpo);
let resultpo = oldcomm::Port();
let resultch = oldcomm::Chan(&resultpo);

// Spawn a bunch of tasks that all want to compete to
// create the global channel
Expand All @@ -169,23 +175,23 @@ pub fn test_from_global_chan2() {
globchanp, task::task) |po| {

for uint::range(0, 10) |_j| {
let ch = comm::recv(po);
comm::send(ch, {i});
let ch = oldcomm::recv(po);
oldcomm::send(ch, {i});
}
}
};
let po = comm::Port();
comm::send(ch, comm::Chan(&po));
let po = oldcomm::Port();
oldcomm::send(ch, oldcomm::Chan(&po));
// We are The winner if our version of the
// task was installed
let winner = comm::recv(po);
comm::send(resultch, winner == i);
let winner = oldcomm::recv(po);
oldcomm::send(resultch, winner == i);
}
}
// There should be only one winner
let mut winners = 0u;
for uint::range(0u, 10u) |_i| {
let res = comm::recv(resultpo);
let res = oldcomm::recv(resultpo);
if res { winners += 1u };
}
assert winners == 1u;
Expand All @@ -211,23 +217,23 @@ pub fn test_from_global_chan2() {
* * Weak tasks must not be supervised. A supervised task keeps
* a reference to its parent, so the parent will not die.
*/
pub unsafe fn weaken_task(f: fn(comm::Port<()>)) {
let po = comm::Port();
let ch = comm::Chan(&po);
pub unsafe fn weaken_task(f: fn(oldcomm::Port<()>)) {
let po = oldcomm::Port();
let ch = oldcomm::Chan(&po);
unsafe {
rustrt::rust_task_weaken(cast::reinterpret_cast(&ch));
}
let _unweaken = Unweaken(ch);
f(po);

struct Unweaken {
ch: comm::Chan<()>,
ch: oldcomm::Chan<()>,
drop unsafe {
rustrt::rust_task_unweaken(cast::reinterpret_cast(&self.ch));
}
}

fn Unweaken(ch: comm::Chan<()>) -> Unweaken {
fn Unweaken(ch: oldcomm::Chan<()>) -> Unweaken {
Unweaken {
ch: ch
}
Expand All @@ -249,7 +255,7 @@ pub fn test_weaken_task_wait() {
do task::spawn_unlinked {
unsafe {
do weaken_task |po| {
comm::recv(po);
oldcomm::recv(po);
}
}
}
Expand All @@ -269,7 +275,7 @@ pub fn test_weaken_task_stress() {
unsafe {
do weaken_task |po| {
// Wait for it to tell us to die
comm::recv(po);
oldcomm::recv(po);
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/libcore/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,22 +307,22 @@ pub fn program_output(prog: &str, args: &[~str]) ->
// in parallel so we don't deadlock while blocking on one
// or the other. FIXME (#2625): Surely there's a much more
// clever way to do this.
let p = comm::Port();
let ch = comm::Chan(&p);
let p = oldcomm::Port();
let ch = oldcomm::Chan(&p);
do task::spawn_sched(task::SingleThreaded) {
let errput = readclose(pipe_err.in);
comm::send(ch, (2, move errput));
oldcomm::send(ch, (2, move errput));
};
do task::spawn_sched(task::SingleThreaded) {
let output = readclose(pipe_out.in);
comm::send(ch, (1, move output));
oldcomm::send(ch, (1, move output));
};
let status = run::waitpid(pid);
let mut errs = ~"";
let mut outs = ~"";
let mut count = 2;
while count > 0 {
let stream = comm::recv(p);
let stream = oldcomm::recv(p);
match stream {
(1, copy s) => {
outs = move s;
Expand Down
Loading