@@ -5,6 +5,7 @@ use std::{fmt, mem};
5
5
6
6
use bytes:: { Bytes , BytesMut } ;
7
7
use futures_core:: Stream ;
8
+ use futures_util:: ready;
8
9
use pin_project:: { pin_project, project} ;
9
10
10
11
use crate :: error:: Error ;
@@ -389,12 +390,19 @@ where
389
390
BodySize :: Stream
390
391
}
391
392
393
+ /// Attempts to pull out the next value of the underlying [`Stream`].
394
+ ///
395
+ /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
396
+ /// ended on a zero-length chunk, but rather proceed until the underlying
397
+ /// [`Stream`] ends.
392
398
fn poll_next ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Option < Result < Bytes , Error > > > {
393
- unsafe { Pin :: new_unchecked ( self ) }
394
- . project ( )
395
- . stream
396
- . poll_next ( cx)
397
- . map ( |res| res. map ( |res| res. map_err ( std:: convert:: Into :: into) ) )
399
+ let mut stream = unsafe { Pin :: new_unchecked ( self ) } . project ( ) . stream ;
400
+ loop {
401
+ return Poll :: Ready ( match ready ! ( stream. as_mut( ) . poll_next( cx) ) {
402
+ Some ( Ok ( ref bytes) ) if bytes. is_empty ( ) => continue ,
403
+ opt => opt. map ( |res| res. map_err ( Into :: into) ) ,
404
+ } ) ;
405
+ }
398
406
}
399
407
}
400
408
@@ -424,17 +432,26 @@ where
424
432
BodySize :: Sized64 ( self . size )
425
433
}
426
434
435
+ /// Attempts to pull out the next value of the underlying [`Stream`].
436
+ ///
437
+ /// Empty values are skipped to prevent [`SizedStream`]'s transmission being
438
+ /// ended on a zero-length chunk, but rather proceed until the underlying
439
+ /// [`Stream`] ends.
427
440
fn poll_next ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Option < Result < Bytes , Error > > > {
428
- unsafe { Pin :: new_unchecked ( self ) }
429
- . project ( )
430
- . stream
431
- . poll_next ( cx)
441
+ let mut stream = unsafe { Pin :: new_unchecked ( self ) } . project ( ) . stream ;
442
+ loop {
443
+ return Poll :: Ready ( match ready ! ( stream. as_mut( ) . poll_next( cx) ) {
444
+ Some ( Ok ( ref bytes) ) if bytes. is_empty ( ) => continue ,
445
+ val => val,
446
+ } ) ;
447
+ }
432
448
}
433
449
}
434
450
435
451
#[ cfg( test) ]
436
452
mod tests {
437
453
use super :: * ;
454
+ use futures:: stream;
438
455
use futures_util:: future:: poll_fn;
439
456
440
457
impl Body {
@@ -589,4 +606,45 @@ mod tests {
589
606
BodySize :: Sized ( 25 )
590
607
) ;
591
608
}
609
+
610
+ mod body_stream {
611
+ use super :: * ;
612
+
613
+ #[ actix_rt:: test]
614
+ async fn skips_empty_chunks ( ) {
615
+ let mut body = BodyStream :: new ( stream:: iter (
616
+ [ "1" , "" , "2" ]
617
+ . iter ( )
618
+ . map ( |& v| Ok ( Bytes :: from ( v) ) as Result < Bytes , ( ) > ) ,
619
+ ) ) ;
620
+ assert_eq ! (
621
+ poll_fn( |cx| body. poll_next( cx) ) . await . unwrap( ) . ok( ) ,
622
+ Some ( Bytes :: from( "1" ) ) ,
623
+ ) ;
624
+ assert_eq ! (
625
+ poll_fn( |cx| body. poll_next( cx) ) . await . unwrap( ) . ok( ) ,
626
+ Some ( Bytes :: from( "2" ) ) ,
627
+ ) ;
628
+ }
629
+ }
630
+
631
+ mod sized_stream {
632
+ use super :: * ;
633
+
634
+ #[ actix_rt:: test]
635
+ async fn skips_empty_chunks ( ) {
636
+ let mut body = SizedStream :: new (
637
+ 2 ,
638
+ stream:: iter ( [ "1" , "" , "2" ] . iter ( ) . map ( |& v| Ok ( Bytes :: from ( v) ) ) ) ,
639
+ ) ;
640
+ assert_eq ! (
641
+ poll_fn( |cx| body. poll_next( cx) ) . await . unwrap( ) . ok( ) ,
642
+ Some ( Bytes :: from( "1" ) ) ,
643
+ ) ;
644
+ assert_eq ! (
645
+ poll_fn( |cx| body. poll_next( cx) ) . await . unwrap( ) . ok( ) ,
646
+ Some ( Bytes :: from( "2" ) ) ,
647
+ ) ;
648
+ }
649
+ }
592
650
}
0 commit comments