Skip to content

Commit e9215fd

Browse files
authored
Support streaming response body in HTTP/3 (#2517)
Closes #2603
1 parent f63c631 commit e9215fd

File tree

3 files changed

+62
-24
lines changed

3 files changed

+62
-24
lines changed

Cargo.toml

+7-2
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ tokio-socks = { version = "0.5.2", optional = true }
161161
hickory-resolver = { version = "0.24", optional = true, features = ["tokio-runtime"] }
162162

163163
# HTTP/3 experimental support
164-
h3 = { version = "0.0.6", optional = true }
165-
h3-quinn = { version = "0.0.7", optional = true }
164+
h3 = { version = "0.0.7", optional = true }
165+
h3-quinn = { version = "0.0.8", optional = true }
166166
quinn = { version = "0.11.1", default-features = false, features = ["rustls", "runtime-tokio"], optional = true }
167167
slab = { version = "0.4.9", optional = true } # just to get minimal versions working with quinn
168168
futures-channel = { version = "0.3", optional = true }
@@ -255,6 +255,11 @@ path = "examples/form.rs"
255255
name = "simple"
256256
path = "examples/simple.rs"
257257

258+
[[example]]
259+
name = "h3_simple"
260+
path = "examples/h3_simple.rs"
261+
required-features = ["http3", "rustls-tls"]
262+
258263
[[example]]
259264
name = "connect_via_lower_priority_tokio_runtime"
260265
path = "examples/connect_via_lower_priority_tokio_runtime.rs"

examples/h3_simple.rs

+6-13
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,7 @@
77
#[cfg(not(target_arch = "wasm32"))]
88
#[tokio::main]
99
async fn main() -> Result<(), reqwest::Error> {
10-
use http::Version;
11-
use reqwest::{Client, IntoUrl, Response};
12-
13-
async fn get<T: IntoUrl + Clone>(url: T) -> reqwest::Result<Response> {
14-
Client::builder()
15-
.http3_prior_knowledge()
16-
.build()?
17-
.get(url)
18-
.version(Version::HTTP_3)
19-
.send()
20-
.await
21-
}
10+
let client = reqwest::Client::builder().http3_prior_knowledge().build()?;
2211

2312
// Some simple CLI args requirements...
2413
let url = match std::env::args().nth(1) {
@@ -31,7 +20,11 @@ async fn main() -> Result<(), reqwest::Error> {
3120

3221
eprintln!("Fetching {url:?}...");
3322

34-
let res = get(url).await?;
23+
let res = client
24+
.get(url)
25+
.version(http::Version::HTTP_3)
26+
.send()
27+
.await?;
3528

3629
eprintln!("Response: {:?} {}", res.version(), res.status());
3730
eprintln!("Headers: {:#?}\n", res.headers());

src/async_impl/h3_client/pool.rs

+49-9
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use bytes::Bytes;
22
use std::collections::HashMap;
33
use std::future;
4+
use std::pin::Pin;
45
use std::sync::mpsc::{Receiver, TryRecvError};
56
use std::sync::{Arc, Mutex};
7+
use std::task::{Context, Poll};
68
use std::time::Duration;
79
use tokio::sync::watch;
810
use tokio::time::Instant;
@@ -206,7 +208,6 @@ impl PoolClient {
206208
&mut self,
207209
req: Request<Body>,
208210
) -> Result<Response<ResponseBody>, BoxError> {
209-
use http_body_util::{BodyExt, Full};
210211
use hyper::body::Body as _;
211212

212213
let (head, req_body) = req.into_parts();
@@ -232,14 +233,7 @@ impl PoolClient {
232233

233234
let resp = stream.recv_response().await?;
234235

235-
let mut resp_body = Vec::new();
236-
while let Some(chunk) = stream.recv_data().await? {
237-
resp_body.extend(chunk.chunk())
238-
}
239-
240-
let resp_body = Full::new(resp_body.into())
241-
.map_err(|never| match never {})
242-
.boxed();
236+
let resp_body = crate::async_impl::body::boxed(Incoming::new(stream, resp.headers()));
243237

244238
Ok(resp.map(|_| resp_body))
245239
}
@@ -275,6 +269,52 @@ impl PoolConnection {
275269
}
276270
}
277271

272+
struct Incoming<S, B> {
273+
inner: h3::client::RequestStream<S, B>,
274+
content_length: Option<u64>,
275+
}
276+
277+
impl<S, B> Incoming<S, B> {
278+
fn new(stream: h3::client::RequestStream<S, B>, headers: &http::header::HeaderMap) -> Self {
279+
Self {
280+
inner: stream,
281+
content_length: headers
282+
.get(http::header::CONTENT_LENGTH)
283+
.and_then(|h| h.to_str().ok())
284+
.and_then(|v| v.parse().ok()),
285+
}
286+
}
287+
}
288+
289+
impl<S, B> http_body::Body for Incoming<S, B>
290+
where
291+
S: h3::quic::RecvStream,
292+
{
293+
type Data = Bytes;
294+
type Error = crate::error::Error;
295+
296+
fn poll_frame(
297+
mut self: Pin<&mut Self>,
298+
cx: &mut Context,
299+
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
300+
match futures_core::ready!(self.inner.poll_recv_data(cx)) {
301+
Ok(Some(mut b)) => Poll::Ready(Some(Ok(hyper::body::Frame::data(
302+
b.copy_to_bytes(b.remaining()),
303+
)))),
304+
Ok(None) => Poll::Ready(None),
305+
Err(e) => Poll::Ready(Some(Err(crate::error::body(e)))),
306+
}
307+
}
308+
309+
fn size_hint(&self) -> hyper::body::SizeHint {
310+
if let Some(content_length) = self.content_length {
311+
hyper::body::SizeHint::with_exact(content_length)
312+
} else {
313+
hyper::body::SizeHint::default()
314+
}
315+
}
316+
}
317+
278318
pub(crate) fn extract_domain(uri: &mut Uri) -> Result<Key, Error> {
279319
let uri_clone = uri.clone();
280320
match (uri_clone.scheme(), uri_clone.authority()) {

0 commit comments

Comments
 (0)