Skip to content

Commit fe13789

Browse files
Aaron1011JohnTitor
andauthored
Use Pin<Box<S>> in BodyStream and SizedStream (#1328)
Fixes #1321 A better fix would be to change `MessageBody` to take a `Pin<&mut Self>`, rather than a `Pin<&mut Self>`. This will avoid requiring the use of `Box` for all consumers by allowing the caller to determine how to pin the `MessageBody` implementation (e.g. via stack pinning). However, doing so is a breaking change that will affect every user of `MessageBody`. By pinning the inner stream ourselves, we can fix the undefined behavior without breaking the API. I've included @sebzim4500's reproduction case as a new test case. However, due to the nature of undefined behavior, this could pass (and not segfault) even if underlying issue were to regress. Unfortunately, until rust-lang/unsafe-code-guidelines#148 is resolved, it's not even possible to write a Miri test that will pass when the bug is fixed. Co-authored-by: Yuki Okushi <[email protected]>
1 parent 3033f18 commit fe13789

File tree

2 files changed

+32
-10
lines changed

2 files changed

+32
-10
lines changed

actix-http/src/body.rs

+6-10
Original file line numberDiff line numberDiff line change
@@ -361,10 +361,8 @@ impl MessageBody for String {
361361

362362
/// Type represent streaming body.
363363
/// Response does not contain `content-length` header and appropriate transfer encoding is used.
364-
#[pin_project]
365364
pub struct BodyStream<S, E> {
366-
#[pin]
367-
stream: S,
365+
stream: Pin<Box<S>>,
368366
_t: PhantomData<E>,
369367
}
370368

@@ -375,7 +373,7 @@ where
375373
{
376374
pub fn new(stream: S) -> Self {
377375
BodyStream {
378-
stream,
376+
stream: Box::pin(stream),
379377
_t: PhantomData,
380378
}
381379
}
@@ -396,7 +394,7 @@ where
396394
/// ended on a zero-length chunk, but rather proceed until the underlying
397395
/// [`Stream`] ends.
398396
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
399-
let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
397+
let mut stream = self.stream.as_mut();
400398
loop {
401399
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
402400
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
@@ -408,19 +406,17 @@ where
408406

409407
/// Type represent streaming body. This body implementation should be used
410408
/// if total size of stream is known. Data get sent as is without using transfer encoding.
411-
#[pin_project]
412409
pub struct SizedStream<S> {
413410
size: u64,
414-
#[pin]
415-
stream: S,
411+
stream: Pin<Box<S>>,
416412
}
417413

418414
impl<S> SizedStream<S>
419415
where
420416
S: Stream<Item = Result<Bytes, Error>>,
421417
{
422418
pub fn new(size: u64, stream: S) -> Self {
423-
SizedStream { size, stream }
419+
SizedStream { size, stream: Box::pin(stream) }
424420
}
425421
}
426422

@@ -438,7 +434,7 @@ where
438434
/// ended on a zero-length chunk, but rather proceed until the underlying
439435
/// [`Stream`] ends.
440436
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
441-
let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
437+
let mut stream = self.stream.as_mut();
442438
loop {
443439
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
444440
Some(Ok(ref bytes)) if bytes.is_empty() => continue,

tests/test_weird_poll.rs

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Regression test for #/1321
2+
3+
use futures::task::{noop_waker, Context};
4+
use futures::stream::once;
5+
use actix_http::body::{MessageBody, BodyStream};
6+
use bytes::Bytes;
7+
8+
#[test]
9+
fn weird_poll() {
10+
let (sender, receiver) = futures::channel::oneshot::channel();
11+
let mut body_stream = Ok(BodyStream::new(once(async {
12+
let x = Box::new(0);
13+
let y = &x;
14+
receiver.await.unwrap();
15+
let _z = **y;
16+
Ok::<_, ()>(Bytes::new())
17+
})));
18+
19+
let waker = noop_waker();
20+
let mut context = Context::from_waker(&waker);
21+
22+
let _ = body_stream.as_mut().unwrap().poll_next(&mut context);
23+
sender.send(()).unwrap();
24+
let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context);
25+
}
26+

0 commit comments

Comments
 (0)