@@ -433,6 +433,86 @@ impl RtioTcpStream for UvTcpStream {
433
433
}
434
434
}
435
435
436
+ pub struct UvUdpStream {
437
+ watcher : UdpWatcher ,
438
+ address : IpAddr
439
+ }
440
+
441
+ impl UvUdpStream {
442
+ fn watcher ( & self ) -> UdpWatcher { self . watcher }
443
+ fn address ( & self ) -> IpAddr { self . address }
444
+ }
445
+
446
+ impl Drop for UvUdpStream {
447
+ fn finalize ( & self ) {
448
+ rtdebug ! ( "closing udp stream" ) ;
449
+ let watcher = self . watcher ( ) ;
450
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
451
+ do scheduler. deschedule_running_task_and_then |_, task| {
452
+ let task_cell = Cell ( task) ;
453
+ do watcher. close {
454
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
455
+ scheduler. resume_task_immediately ( task_cell. take ( ) ) ;
456
+ }
457
+ }
458
+ }
459
+ }
460
+
461
+ impl RtioUdpStream for UvUdpStream {
462
+ fn read ( & mut self , buf : & mut [ u8 ] ) -> Result < uint , IoError > {
463
+ let result_cell = empty_cell ( ) ;
464
+ let result_cell_ptr: * Cell < Result < uint , IoError > > = & result_cell;
465
+
466
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
467
+ assert ! ( scheduler. in_task_context( ) ) ;
468
+ let watcher = self . watcher ( ) ;
469
+ let connection_address = self . address ( ) ;
470
+ let buf_ptr: * & mut [ u8 ] = & buf;
471
+ do scheduler. deschedule_running_task_and_then |sched, task| {
472
+ rtdebug ! ( "read: entered scheduler context" ) ;
473
+ assert ! ( !sched. in_task_context( ) ) ;
474
+ let mut watcher = watcher;
475
+ let task_cell = Cell ( task) ;
476
+ // XXX: see note in RtioTcpStream implementation for UvTcpStream
477
+ let alloc: AllocCallback = |_| unsafe {
478
+ slice_to_uv_buf ( * buf_ptr)
479
+ } ;
480
+ do watcher. recv_start ( alloc) |watcher, nread, _buf, addr, flags, status| {
481
+ let _ = flags; // TODO actually use flags
482
+
483
+ // XXX: see note in RtioTcpStream implementation for UvTcpStream
484
+ let mut watcher = watcher;
485
+ watcher. recv_stop ( ) ;
486
+
487
+ let incoming_address = net:: uv_ip4_to_ip4 ( & addr) ;
488
+ let result = if status. is_none ( ) {
489
+ assert ! ( nread >= 0 ) ;
490
+ if incoming_address != connection_address {
491
+ Ok ( 0 u)
492
+ } else {
493
+ Ok ( nread as uint )
494
+ }
495
+ } else {
496
+ Err ( uv_error_to_io_error ( status. unwrap ( ) ) )
497
+ } ;
498
+
499
+ unsafe { ( * result_cell_ptr) . put_back ( result) ; }
500
+
501
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
502
+ scheduler. resume_task_immediately ( task_cell. take ( ) ) ;
503
+ }
504
+ }
505
+
506
+ assert!( !result_cell. is_empty ( ) ) ;
507
+ return result_cell. take ( ) ;
508
+ }
509
+
510
+ fn write ( & mut self , buf : & [ u8 ] ) -> Result < ( ) , IoError > {
511
+ let _ = buf;
512
+ fail ! ( )
513
+ }
514
+ }
515
+
436
516
#[ test]
437
517
fn test_simple_io_no_connect ( ) {
438
518
do run_in_newsched_task {
0 commit comments