@@ -22,11 +22,13 @@ use rt::select::{SelectInner, SelectPortInner};
22
22
use select:: { Select , SelectPort } ;
23
23
use unstable:: atomics:: { AtomicUint , AtomicOption , Acquire , Relaxed , SeqCst } ;
24
24
use unstable:: sync:: UnsafeArc ;
25
+ use util;
25
26
use util:: Void ;
26
27
use comm:: { GenericChan , GenericSmartChan , GenericPort , Peekable , SendDeferred } ;
27
28
use cell:: Cell ;
28
29
use clone:: Clone ;
29
30
use tuple:: ImmutableTuple ;
31
+ use mutable:: Mut ;
30
32
31
33
/// A combined refcount / BlockedTask-as-uint pointer.
32
34
///
@@ -431,28 +433,28 @@ type StreamPortOne<T> = PortOne<StreamPayload<T>>;
431
433
432
434
/// A channel with unbounded size.
433
435
pub struct Chan < T > {
434
- // FIXME #5372. Using Cell because we don't take &mut self
435
- next : Cell < StreamChanOne < T > >
436
+ // FIXME #5372. Using Mut because we don't take &mut self
437
+ next : Mut < StreamChanOne < T > >
436
438
}
437
439
438
440
/// An port with unbounded size.
439
441
pub struct Port < T > {
440
- // FIXME #5372. Using Cell because we don't take &mut self
441
- next : Cell < StreamPortOne < T > >
442
+ // FIXME #5372. Using Mut because we don't take &mut self
443
+ next : Mut < Option < StreamPortOne < T > > >
442
444
}
443
445
444
446
pub fn stream < T : Send > ( ) -> ( Port < T > , Chan < T > ) {
445
447
let ( pone, cone) = oneshot ( ) ;
446
- let port = Port { next : Cell :: new ( pone) } ;
447
- let chan = Chan { next : Cell :: new ( cone) } ;
448
+ let port = Port { next : Mut :: new ( Some ( pone) ) } ;
449
+ let chan = Chan { next : Mut :: new ( cone) } ;
448
450
return ( port, chan) ;
449
451
}
450
452
451
453
impl < T : Send > Chan < T > {
452
454
fn try_send_inner ( & self , val : T , do_resched : bool ) -> bool {
453
- let ( next_pone, next_cone ) = oneshot ( ) ;
454
- let cone = self . next . take ( ) ;
455
- self . next . put_back ( next_cone ) ;
455
+ let ( next_pone, mut cone ) = oneshot ( ) ;
456
+ let mut b = self . next . borrow_mut ( ) ;
457
+ util :: swap ( & mut cone , b . get ( ) ) ;
456
458
cone. try_send_inner ( StreamPayload { val : val, next : next_pone } , do_resched)
457
459
}
458
460
}
@@ -489,10 +491,11 @@ impl<T: Send> GenericPort<T> for Port<T> {
489
491
}
490
492
491
493
fn try_recv ( & self ) -> Option < T > {
492
- do self . next . take_opt ( ) . map_default ( None ) |pone| {
494
+ let mut b = self . next . borrow_mut ( ) ;
495
+ do b. get ( ) . take ( ) . map_default ( None ) |pone| {
493
496
match pone. try_recv ( ) {
494
497
Some ( StreamPayload { val, next } ) => {
495
- self . next . put_back ( next) ;
498
+ * b . get ( ) = Some ( next) ;
496
499
Some ( val)
497
500
}
498
501
None => None
@@ -503,7 +506,7 @@ impl<T: Send> GenericPort<T> for Port<T> {
503
506
504
507
impl < T : Send > Peekable < T > for Port < T > {
505
508
fn peek ( & self ) -> bool {
506
- self . next . with_mut_ref ( |p| p. peek ( ) )
509
+ self . next . map_mut ( |p| p. get_mut_ref ( ) . peek ( ) )
507
510
}
508
511
}
509
512
@@ -514,18 +517,18 @@ impl<T: Send> Peekable<T> for Port<T> {
514
517
impl < ' self , T : Send > SelectInner for & ' self Port < T > {
515
518
#[ inline]
516
519
fn optimistic_check ( & mut self ) -> bool {
517
- do self . next . with_mut_ref |pone| { pone. optimistic_check ( ) }
520
+ do self . next . map_mut |pone| { pone. get_mut_ref ( ) . optimistic_check ( ) }
518
521
}
519
522
520
523
#[ inline]
521
524
fn block_on ( & mut self , sched : & mut Scheduler , task : BlockedTask ) -> bool {
522
- let task = Cell :: new ( task ) ;
523
- do self . next . with_mut_ref |pone| { pone . block_on ( sched, task. take ( ) ) }
525
+ let mut b = self . next . borrow_mut ( ) ;
526
+ b . get ( ) . get_mut_ref ( ) . block_on ( sched, task)
524
527
}
525
528
526
529
#[ inline]
527
530
fn unblock_from ( & mut self ) -> bool {
528
- do self . next . with_mut_ref |pone| { pone. unblock_from ( ) }
531
+ do self . next . map_mut |pone| { pone. get_mut_ref ( ) . unblock_from ( ) }
529
532
}
530
533
}
531
534
@@ -552,9 +555,10 @@ impl<T: Send> Select for Port<T> { }
552
555
553
556
impl < ' self , T : Send > SelectPortInner < T > for & ' self Port < T > {
554
557
fn recv_ready ( self ) -> Option < T > {
555
- match self . next . take ( ) . recv_ready ( ) {
558
+ let mut b = self . next . borrow_mut ( ) ;
559
+ match b. get ( ) . take_unwrap ( ) . recv_ready ( ) {
556
560
Some ( StreamPayload { val, next } ) => {
557
- self . next . put_back ( next) ;
561
+ * b . get ( ) = Some ( next) ;
558
562
Some ( val)
559
563
}
560
564
None => None
@@ -571,7 +575,7 @@ pub struct SharedChan<T> {
571
575
572
576
impl < T : Send > SharedChan < T > {
573
577
pub fn new ( chan : Chan < T > ) -> SharedChan < T > {
574
- let next = chan. next . take ( ) ;
578
+ let next = chan. next . unwrap ( ) ;
575
579
let next = AtomicOption :: new ( ~next) ;
576
580
SharedChan { next : UnsafeArc :: new ( next) }
577
581
}
@@ -625,7 +629,7 @@ pub struct SharedPort<T> {
625
629
impl < T : Send > SharedPort < T > {
626
630
pub fn new ( port : Port < T > ) -> SharedPort < T > {
627
631
// Put the data port into a new link pipe
628
- let next_data_port = port. next . take ( ) ;
632
+ let next_data_port = port. next . unwrap ( ) . unwrap ( ) ;
629
633
let ( next_link_port, next_link_chan) = oneshot ( ) ;
630
634
next_link_chan. send ( next_data_port) ;
631
635
let next_link = AtomicOption :: new ( ~next_link_port) ;
0 commit comments