Skip to content

Commit df05d5a

Browse files
committed
---
yaml --- r: 108250 b: refs/heads/dist-snap c: 065e121 h: refs/heads/master v: v3
1 parent e239ec1 commit df05d5a

File tree

4 files changed

+123
-11
lines changed

4 files changed

+123
-11
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ refs/heads/try: f64fdf524a434f0e5cd0bc91d09c144723f3c90d
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
88
refs/heads/try2: 147ecfdd8221e4a4d4e090486829a06da1e0ca3c
9-
refs/heads/dist-snap: 411a01feb302dd64b669658562f2052decc80f19
9+
refs/heads/dist-snap: 065e121fc2675631e27b761d2ad5df3ee0c95976
1010
refs/tags/release-0.2: c870d2dffb391e14efb05aa27898f1f6333a9596
1111
refs/tags/release-0.3: b5f0d0f648d9a6153664837026ba1be43d3e2503
1212
refs/heads/try3: 9387340aab40a73e8424c48fd42f0c521a4875c0

branches/dist-snap/src/libstd/comm/select.rs

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ impl Select {
151151
/// event could either be that data is available or the corresponding
152152
/// channel has been closed.
153153
pub fn wait(&self) -> uint {
154+
self.wait2(false)
155+
}
156+
157+
/// Helper method for skipping the preflight checks during testing
158+
fn wait2(&self, do_preflight_checks: bool) -> uint {
154159
// Note that this is currently an inefficient implementation. We in
155160
// theory have knowledge about all ports in the set ahead of time, so
156161
// this method shouldn't really have to iterate over all of them yet
@@ -175,7 +180,7 @@ impl Select {
175180
let mut amt = 0;
176181
for p in self.iter() {
177182
amt += 1;
178-
if (*p).packet.can_recv() {
183+
if do_preflight_checks && (*p).packet.can_recv() {
179184
return (*p).id;
180185
}
181186
}
@@ -507,7 +512,7 @@ mod test {
507512
let (p2, c2) = Chan::<()>::new();
508513
let (p, c) = Chan::new();
509514
spawn(proc() {
510-
let mut s = Select::new();
515+
let s = Select::new();
511516
let mut h1 = s.handle(&p1);
512517
let mut h2 = s.handle(&p2);
513518
unsafe { h2.add(); }
@@ -521,4 +526,91 @@ mod test {
521526
c2.send(());
522527
p.recv();
523528
})
529+
530+
test!(fn preflight1() {
531+
let (p, c) = Chan::new();
532+
c.send(());
533+
select!(
534+
() = p.recv() => {},
535+
)
536+
})
537+
538+
test!(fn preflight2() {
539+
let (p, c) = Chan::new();
540+
c.send(());
541+
c.send(());
542+
select!(
543+
() = p.recv() => {},
544+
)
545+
})
546+
547+
test!(fn preflight3() {
548+
let (p, c) = Chan::new();
549+
drop(c.clone());
550+
c.send(());
551+
select!(
552+
() = p.recv() => {},
553+
)
554+
})
555+
556+
test!(fn preflight4() {
557+
let (p, c) = Chan::new();
558+
c.send(());
559+
let s = Select::new();
560+
let mut h = s.handle(&p);
561+
unsafe { h.add(); }
562+
assert_eq!(s.wait2(false), h.id);
563+
})
564+
565+
test!(fn preflight5() {
566+
let (p, c) = Chan::new();
567+
c.send(());
568+
c.send(());
569+
let s = Select::new();
570+
let mut h = s.handle(&p);
571+
unsafe { h.add(); }
572+
assert_eq!(s.wait2(false), h.id);
573+
})
574+
575+
test!(fn preflight6() {
576+
let (p, c) = Chan::new();
577+
drop(c.clone());
578+
c.send(());
579+
let s = Select::new();
580+
let mut h = s.handle(&p);
581+
unsafe { h.add(); }
582+
assert_eq!(s.wait2(false), h.id);
583+
})
584+
585+
test!(fn preflight7() {
586+
let (p, c) = Chan::<()>::new();
587+
drop(c);
588+
let s = Select::new();
589+
let mut h = s.handle(&p);
590+
unsafe { h.add(); }
591+
assert_eq!(s.wait2(false), h.id);
592+
})
593+
594+
test!(fn preflight8() {
595+
let (p, c) = Chan::new();
596+
c.send(());
597+
drop(c);
598+
p.recv();
599+
let s = Select::new();
600+
let mut h = s.handle(&p);
601+
unsafe { h.add(); }
602+
assert_eq!(s.wait2(false), h.id);
603+
})
604+
605+
test!(fn preflight9() {
606+
let (p, c) = Chan::new();
607+
drop(c.clone());
608+
c.send(());
609+
drop(c);
610+
p.recv();
611+
let s = Select::new();
612+
let mut h = s.handle(&p);
613+
unsafe { h.add(); }
614+
assert_eq!(s.wait2(false), h.id);
615+
})
524616
}

branches/dist-snap/src/libstd/comm/shared.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,17 @@ impl<T: Send> Packet<T> {
398398
cnt == DISCONNECTED || cnt - self.steals > 0
399399
}
400400

401+
// increment the count on the channel (used for selection)
402+
fn bump(&mut self, amt: int) -> int {
403+
match self.cnt.fetch_add(amt, atomics::SeqCst) {
404+
DISCONNECTED => {
405+
self.cnt.store(DISCONNECTED, atomics::SeqCst);
406+
DISCONNECTED
407+
}
408+
n => n
409+
}
410+
}
411+
401412
// Inserts the blocked task for selection on this port, returning it back if
402413
// the port already has data on it.
403414
//
@@ -408,8 +419,8 @@ impl<T: Send> Packet<T> {
408419
match self.decrement(task) {
409420
Ok(()) => Ok(()),
410421
Err(task) => {
411-
let prev = self.cnt.fetch_add(1, atomics::SeqCst);
412-
assert!(prev >= 0);
422+
let prev = self.bump(1);
423+
assert!(prev == DISCONNECTED || prev >= 0);
413424
return Err(task);
414425
}
415426
}
@@ -440,11 +451,10 @@ impl<T: Send> Packet<T> {
440451
let cnt = self.cnt.load(atomics::SeqCst);
441452
if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
442453
};
443-
let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
454+
let prev = self.bump(steals + 1);
444455

445456
if prev == DISCONNECTED {
446457
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
447-
self.cnt.store(DISCONNECTED, atomics::SeqCst);
448458
true
449459
} else {
450460
let cur = prev + steals + 1;

branches/dist-snap/src/libstd/comm/stream.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,17 @@ impl<T: Send> Packet<T> {
333333
}
334334
}
335335

336+
// increment the count on the channel (used for selection)
337+
fn bump(&mut self, amt: int) -> int {
338+
match self.cnt.fetch_add(amt, atomics::SeqCst) {
339+
DISCONNECTED => {
340+
self.cnt.store(DISCONNECTED, atomics::SeqCst);
341+
DISCONNECTED
342+
}
343+
n => n
344+
}
345+
}
346+
336347
// Attempts to start selecting on this port. Like a oneshot, this can fail
337348
// immediately because of an upgrade.
338349
pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
@@ -351,8 +362,8 @@ impl<T: Send> Packet<T> {
351362
};
352363
// Undo our decrement above, and we should be guaranteed that the
353364
// previous value is positive because we're not going to sleep
354-
let prev = self.cnt.fetch_add(1, atomics::SeqCst);
355-
assert!(prev >= 0);
365+
let prev = self.bump(1);
366+
assert!(prev == DISCONNECTED || prev >= 0);
356367
return ret;
357368
}
358369
}
@@ -384,13 +395,12 @@ impl<T: Send> Packet<T> {
384395
// and in the stream case we can have at most one steal, so just assume
385396
// that we had one steal.
386397
let steals = 1;
387-
let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
398+
let prev = self.bump(steals + 1);
388399

389400
// If we were previously disconnected, then we know for sure that there
390401
// is no task in to_wake, so just keep going
391402
let has_data = if prev == DISCONNECTED {
392403
assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
393-
self.cnt.store(DISCONNECTED, atomics::SeqCst);
394404
true // there is data, that data is that we're disconnected
395405
} else {
396406
let cur = prev + steals + 1;

0 commit comments

Comments
 (0)