@@ -13,12 +13,13 @@ use std::cast;
13
13
use std:: io:: net:: ip;
14
14
use std:: io;
15
15
use std:: mem;
16
+ use std:: os;
17
+ use std:: ptr;
16
18
use std:: rt:: rtio;
17
19
use std:: sync:: arc:: UnsafeArc ;
18
20
19
21
use super :: { IoResult , retry, keep_going} ;
20
22
use super :: c;
21
- use super :: util;
22
23
23
24
////////////////////////////////////////////////////////////////////////////////
24
25
// sockaddr and misc bindings
@@ -117,8 +118,8 @@ fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
117
118
}
118
119
}
119
120
120
- pub fn getsockopt < T : Copy > ( fd : sock_t , opt : libc:: c_int ,
121
- val : libc:: c_int ) -> IoResult < T > {
121
+ fn getsockopt < T : Copy > ( fd : sock_t , opt : libc:: c_int ,
122
+ val : libc:: c_int ) -> IoResult < T > {
122
123
unsafe {
123
124
let mut slot: T = mem:: init ( ) ;
124
125
let mut len = mem:: size_of :: < T > ( ) as libc:: socklen_t ;
@@ -144,6 +145,21 @@ fn last_error() -> io::IoError {
144
145
super :: last_error ( )
145
146
}
146
147
148
+ fn ms_to_timeval ( ms : u64 ) -> libc:: timeval {
149
+ libc:: timeval {
150
+ tv_sec : ( ms / 1000 ) as libc:: time_t ,
151
+ tv_usec : ( ( ms % 1000 ) * 1000 ) as libc:: suseconds_t ,
152
+ }
153
+ }
154
+
155
+ fn timeout ( desc : & ' static str ) -> io:: IoError {
156
+ io:: IoError {
157
+ kind : io:: TimedOut ,
158
+ desc : desc,
159
+ detail : None ,
160
+ }
161
+ }
162
+
147
163
#[ cfg( windows) ] unsafe fn close ( sock : sock_t ) { let _ = libc:: closesocket ( sock) ; }
148
164
#[ cfg( unix) ] unsafe fn close ( sock : sock_t ) { let _ = libc:: close ( sock) ; }
149
165
@@ -254,7 +270,7 @@ impl TcpStream {
254
270
let addrp = & addr as * _ as * libc:: sockaddr ;
255
271
match timeout {
256
272
Some ( timeout) => {
257
- try!( util :: connect_timeout ( fd, addrp, len, timeout) ) ;
273
+ try!( TcpStream :: connect_timeout ( fd, addrp, len, timeout) ) ;
258
274
Ok ( ret)
259
275
} ,
260
276
None => {
@@ -266,6 +282,84 @@ impl TcpStream {
266
282
}
267
283
}
268
284
285
+ // See http://developerweb.net/viewtopic.php?id=3196 for where this is
286
+ // derived from.
287
+ fn connect_timeout ( fd : sock_t ,
288
+ addrp : * libc:: sockaddr ,
289
+ len : libc:: socklen_t ,
290
+ timeout_ms : u64 ) -> IoResult < ( ) > {
291
+ #[ cfg( unix) ] use INPROGRESS = libc:: EINPROGRESS ;
292
+ #[ cfg( windows) ] use INPROGRESS = libc:: WSAEINPROGRESS ;
293
+ #[ cfg( unix) ] use WOULDBLOCK = libc:: EWOULDBLOCK ;
294
+ #[ cfg( windows) ] use WOULDBLOCK = libc:: WSAEWOULDBLOCK ;
295
+
296
+ // Make sure the call to connect() doesn't block
297
+ try!( set_nonblocking ( fd, true ) ) ;
298
+
299
+ let ret = match unsafe { libc:: connect ( fd, addrp, len) } {
300
+ // If the connection is in progress, then we need to wait for it to
301
+ // finish (with a timeout). The current strategy for doing this is
302
+ // to use select() with a timeout.
303
+ -1 if os:: errno ( ) as int == INPROGRESS as int ||
304
+ os:: errno ( ) as int == WOULDBLOCK as int => {
305
+ let mut set: c:: fd_set = unsafe { mem:: init ( ) } ;
306
+ c:: fd_set ( & mut set, fd) ;
307
+ match await ( fd, & mut set, timeout_ms) {
308
+ 0 => Err ( timeout ( "connection timed out" ) ) ,
309
+ -1 => Err ( last_error ( ) ) ,
310
+ _ => {
311
+ let err: libc:: c_int = try!(
312
+ getsockopt ( fd, libc:: SOL_SOCKET , libc:: SO_ERROR ) ) ;
313
+ if err == 0 {
314
+ Ok ( ( ) )
315
+ } else {
316
+ Err ( io:: IoError :: from_errno ( err as uint , true ) )
317
+ }
318
+ }
319
+ }
320
+ }
321
+
322
+ -1 => Err ( last_error ( ) ) ,
323
+ _ => Ok ( ( ) ) ,
324
+ } ;
325
+
326
+ // be sure to turn blocking I/O back on
327
+ try!( set_nonblocking ( fd, false ) ) ;
328
+ return ret;
329
+
330
+ #[ cfg( unix) ]
331
+ fn set_nonblocking ( fd : sock_t , nb : bool ) -> IoResult < ( ) > {
332
+ let set = nb as libc:: c_int ;
333
+ super :: mkerr_libc ( retry ( || unsafe { c:: ioctl ( fd, c:: FIONBIO , & set) } ) )
334
+ }
335
+ #[ cfg( windows) ]
336
+ fn set_nonblocking ( fd : sock_t , nb : bool ) -> IoResult < ( ) > {
337
+ let mut set = nb as libc:: c_ulong ;
338
+ if unsafe { c:: ioctlsocket ( fd, c:: FIONBIO , & mut set) != 0 } {
339
+ Err ( last_error ( ) )
340
+ } else {
341
+ Ok ( ( ) )
342
+ }
343
+ }
344
+
345
+ #[ cfg( unix) ]
346
+ fn await ( fd : sock_t , set : & mut c:: fd_set , timeout : u64 ) -> libc:: c_int {
347
+ let start = :: io:: timer:: now ( ) ;
348
+ retry ( || unsafe {
349
+ // Recalculate the timeout each iteration (it is generally
350
+ // undefined what the value of the 'tv' is after select
351
+ // returns EINTR).
352
+ let tv = ms_to_timeval ( timeout - ( :: io:: timer:: now ( ) - start) ) ;
353
+ c:: select ( fd + 1 , ptr:: null ( ) , & * set, ptr:: null ( ) , & tv)
354
+ } )
355
+ }
356
+ #[ cfg( windows) ]
357
+ fn await ( _fd : sock_t , set : & mut c:: fd_set , timeout : u64 ) -> libc:: c_int {
358
+ let tv = ms_to_timeval ( timeout) ;
359
+ unsafe { c:: select ( 1 , ptr:: null ( ) , & * set, ptr:: null ( ) , & tv) }
360
+ }
361
+ }
362
+
269
363
pub fn fd ( & self ) -> sock_t {
270
364
// This unsafety is fine because it's just a read-only arc
271
365
unsafe { ( * self . inner . get ( ) ) . fd }
@@ -439,7 +533,7 @@ impl TcpAcceptor {
439
533
440
534
pub fn native_accept ( & mut self ) -> IoResult < TcpStream > {
441
535
if self . deadline != 0 {
442
- try!( util :: accept_deadline ( self . fd ( ) , self . deadline ) ) ;
536
+ try!( self . accept_deadline ( ) ) ;
443
537
}
444
538
unsafe {
445
539
let mut storage: libc:: sockaddr_storage = mem:: init ( ) ;
@@ -456,6 +550,25 @@ impl TcpAcceptor {
456
550
}
457
551
}
458
552
}
553
+
554
+ fn accept_deadline ( & mut self ) -> IoResult < ( ) > {
555
+ let mut set: c:: fd_set = unsafe { mem:: init ( ) } ;
556
+ c:: fd_set ( & mut set, self . fd ( ) ) ;
557
+
558
+ match retry ( || {
559
+ // If we're past the deadline, then pass a 0 timeout to select() so
560
+ // we can poll the status of the socket.
561
+ let now = :: io:: timer:: now ( ) ;
562
+ let ms = if self . deadline > now { 0 } else { self . deadline - now} ;
563
+ let tv = ms_to_timeval ( ms) ;
564
+ let n = if cfg ! ( windows) { 1 } else { self . fd ( ) as libc:: c_int + 1 } ;
565
+ unsafe { c:: select ( n, & set, ptr:: null ( ) , ptr:: null ( ) , & tv) }
566
+ } ) {
567
+ -1 => Err ( last_error ( ) ) ,
568
+ 0 => Err ( timeout ( "accept timed out" ) ) ,
569
+ _ => return Ok ( ( ) ) ,
570
+ }
571
+ }
459
572
}
460
573
461
574
impl rtio:: RtioSocket for TcpAcceptor {
@@ -472,7 +585,10 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
472
585
fn accept_simultaneously ( & mut self ) -> IoResult < ( ) > { Ok ( ( ) ) }
473
586
fn dont_accept_simultaneously ( & mut self ) -> IoResult < ( ) > { Ok ( ( ) ) }
474
587
fn set_timeout ( & mut self , timeout : Option < u64 > ) {
475
- self . deadline = timeout. map ( |a| :: io:: timer:: now ( ) + a) . unwrap_or ( 0 ) ;
588
+ self . deadline = match timeout {
589
+ None => 0 ,
590
+ Some ( t) => :: io:: timer:: now ( ) + t,
591
+ } ;
476
592
}
477
593
}
478
594
0 commit comments