Skip to content

Commit baf9712

Browse files
authored
fix: pass pool_timer to hyper_util to enable the idle cleanup task (#2434)
* fix: pass pool_timer to hyper_util to enable the idle cleanup task * tests: integration test for pool idle timeout
1 parent d85f44b commit baf9712

File tree

3 files changed

+45
-1
lines changed

3 files changed

+45
-1
lines changed

src/async_impl/client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -726,8 +726,8 @@ impl ClientBuilder {
726726
}
727727
}
728728

729-
#[cfg(not(target_arch = "wasm32"))]
730729
builder.timer(hyper_util::rt::TokioTimer::new());
730+
builder.pool_timer(hyper_util::rt::TokioTimer::new());
731731
builder.pool_idle_timeout(config.pool_idle_timeout);
732732
builder.pool_max_idle_per_host(config.pool_max_idle_per_host);
733733
connector.set_keepalive(config.tcp_keepalive);

tests/client.rs

+21
Original file line numberDiff line numberDiff line change
@@ -572,3 +572,24 @@ async fn highly_concurrent_requests_to_slow_http2_server_with_low_max_concurrent
572572

573573
server.shutdown().await;
574574
}
575+
576+
#[tokio::test]
577+
async fn close_connection_after_idle_timeout() {
578+
let mut server = server::http(move |_| async move { http::Response::default() });
579+
580+
let client = reqwest::Client::builder()
581+
.pool_idle_timeout(std::time::Duration::from_secs(1))
582+
.build()
583+
.unwrap();
584+
585+
let url = format!("http://{}", server.addr());
586+
587+
client.get(&url).send().await.unwrap();
588+
589+
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
590+
591+
assert!(server
592+
.events()
593+
.iter()
594+
.any(|e| matches!(e, server::Event::ConnectionClosed)));
595+
}

tests/support/server.rs

+23
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,27 @@ use tokio::sync::oneshot;
1212
pub struct Server {
1313
addr: net::SocketAddr,
1414
panic_rx: std_mpsc::Receiver<()>,
15+
events_rx: std_mpsc::Receiver<Event>,
1516
shutdown_tx: Option<oneshot::Sender<()>>,
1617
}
1718

19+
#[non_exhaustive]
20+
pub enum Event {
21+
ConnectionClosed,
22+
}
23+
1824
impl Server {
1925
pub fn addr(&self) -> net::SocketAddr {
2026
self.addr
2127
}
28+
29+
pub fn events(&mut self) -> Vec<Event> {
30+
let mut events = Vec::new();
31+
while let Ok(event) = self.events_rx.try_recv() {
32+
events.push(event);
33+
}
34+
events
35+
}
2236
}
2337

2438
impl Drop for Server {
@@ -67,6 +81,7 @@ where
6781

6882
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
6983
let (panic_tx, panic_rx) = std_mpsc::channel();
84+
let (events_tx, events_rx) = std_mpsc::channel();
7085
let tname = format!(
7186
"test({})-support-server",
7287
test_name,
@@ -92,8 +107,10 @@ where
92107
async move { Ok::<_, Infallible>(fut.await) }
93108
});
94109
let builder = builder.clone();
110+
let events_tx = events_tx.clone();
95111
tokio::spawn(async move {
96112
let _ = builder.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(io), svc).await;
113+
let _ = events_tx.send(Event::ConnectionClosed);
97114
});
98115
}
99116
}
@@ -105,6 +122,7 @@ where
105122
Server {
106123
addr,
107124
panic_rx,
125+
events_rx,
108126
shutdown_tx: Some(shutdown_tx),
109127
}
110128
})
@@ -152,6 +170,7 @@ where
152170

153171
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
154172
let (panic_tx, panic_rx) = std_mpsc::channel();
173+
let (events_tx, events_rx) = std_mpsc::channel();
155174
let tname = format!(
156175
"test({})-support-server",
157176
test_name,
@@ -169,9 +188,11 @@ where
169188
Some(accepted) = endpoint.accept() => {
170189
let conn = accepted.await.expect("accepted");
171190
let mut h3_conn = h3::server::Connection::new(h3_quinn::Connection::new(conn)).await.unwrap();
191+
let events_tx = events_tx.clone();
172192
let func = func.clone();
173193
tokio::spawn(async move {
174194
while let Ok(Some((req, stream))) = h3_conn.accept().await {
195+
let events_tx = events_tx.clone();
175196
let func = func.clone();
176197
tokio::spawn(async move {
177198
let (mut tx, rx) = stream.split();
@@ -198,6 +219,7 @@ where
198219
}
199220
}
200221
tx.finish().await.unwrap();
222+
events_tx.send(Event::ConnectionClosed).unwrap();
201223
});
202224
}
203225
});
@@ -211,6 +233,7 @@ where
211233
Server {
212234
addr,
213235
panic_rx,
236+
events_rx,
214237
shutdown_tx: Some(shutdown_tx),
215238
}
216239
})

0 commit comments

Comments
 (0)