Skip to content

Commit 5bc8e72

Browse files
committed
fix: limit the amount of pending-accept reset streams
Streams that have been received by the peer, but not accepted by the user, can also receive a RST_STREAM. This is a legitimate pattern: one could send a request and then shortly after, realize it is not needed, sending a CANCEL. However, since those streams are now "closed", they don't count towards the max concurrent streams. So, they will sit in the accept queue, using memory. In most cases, the user is calling `accept` in a loop, and they can accept requests that have been reset fast enough that this isn't an issue in practice. But if the peer is able to flood the network faster than the server accept loop can run (simply accepting, not processing requests; that tends to happen in a separate task), the memory could grow. So, this introduces a maximum count for streams in the pending-accept but remotely-reset state. If the maximum is reached, a GOAWAY frame with the error code of ENHANCE_YOUR_CALM is sent, and the connection marks itself as errored. ref CVE-2023-26964 ref GHSA-f8vr-r385-rh5r Closes hyperium/hyper#2877
1 parent 8088ca6 commit 5bc8e72

File tree

9 files changed

+148
-12
lines changed

9 files changed

+148
-12
lines changed

src/proto/connection.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use std::task::{Context, Poll};
1414
use std::time::Duration;
1515
use tokio::io::{AsyncRead, AsyncWrite};
1616

17+
const DEFAULT_MAX_REMOTE_RESET_STREAMS: usize = 20;
18+
1719
/// An H2 connection
1820
#[derive(Debug)]
1921
pub(crate) struct Connection<T, P, B: Buf = Bytes>
@@ -118,6 +120,7 @@ where
118120
.unwrap_or(false),
119121
local_reset_duration: config.reset_stream_duration,
120122
local_reset_max: config.reset_stream_max,
123+
remote_reset_max: DEFAULT_MAX_REMOTE_RESET_STREAMS,
121124
remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
122125
remote_max_initiated: config
123126
.settings
@@ -172,6 +175,11 @@ where
172175
self.inner.streams.max_recv_streams()
173176
}
174177

178+
#[cfg(feature = "unstable")]
179+
pub fn num_wired_streams(&self) -> usize {
180+
self.inner.streams.num_wired_streams()
181+
}
182+
175183
/// Returns `Ready` when the connection is ready to receive a frame.
176184
///
177185
/// Returns `Error` as this may raise errors that are caused by delayed

src/proto/streams/counts.rs

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,16 @@ pub(super) struct Counts {
2121
num_recv_streams: usize,
2222

2323
/// Maximum number of pending locally reset streams
24-
max_reset_streams: usize,
24+
max_local_reset_streams: usize,
2525

2626
/// Current number of pending locally reset streams
27-
num_reset_streams: usize,
27+
num_local_reset_streams: usize,
28+
29+
/// Max number of "pending accept" streams that were remotely reset
30+
max_remote_reset_streams: usize,
31+
32+
/// Current number of "pending accept" streams that were remotely reset
33+
num_remote_reset_streams: usize,
2834
}
2935

3036
impl Counts {
@@ -36,8 +42,10 @@ impl Counts {
3642
num_send_streams: 0,
3743
max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
3844
num_recv_streams: 0,
39-
max_reset_streams: config.local_reset_max,
40-
num_reset_streams: 0,
45+
max_local_reset_streams: config.local_reset_max,
46+
num_local_reset_streams: 0,
47+
max_remote_reset_streams: config.remote_reset_max,
48+
num_remote_reset_streams: 0,
4149
}
4250
}
4351

@@ -90,7 +98,7 @@ impl Counts {
9098

9199
/// Returns true if the number of pending reset streams can be incremented.
92100
pub fn can_inc_num_reset_streams(&self) -> bool {
93-
self.max_reset_streams > self.num_reset_streams
101+
self.max_local_reset_streams > self.num_local_reset_streams
94102
}
95103

96104
/// Increments the number of pending reset streams.
@@ -101,7 +109,34 @@ impl Counts {
101109
pub fn inc_num_reset_streams(&mut self) {
102110
assert!(self.can_inc_num_reset_streams());
103111

104-
self.num_reset_streams += 1;
112+
self.num_local_reset_streams += 1;
113+
}
114+
115+
pub(crate) fn max_remote_reset_streams(&self) -> usize {
116+
self.max_remote_reset_streams
117+
}
118+
119+
/// Returns true if the number of pending REMOTE reset streams can be
120+
/// incremented.
121+
pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
122+
self.max_remote_reset_streams > self.num_remote_reset_streams
123+
}
124+
125+
/// Increments the number of pending REMOTE reset streams.
126+
///
127+
/// # Panics
128+
///
129+
/// Panics on failure as this should have been validated before hand.
130+
pub(crate) fn inc_num_remote_reset_streams(&mut self) {
131+
assert!(self.can_inc_num_remote_reset_streams());
132+
133+
self.num_remote_reset_streams += 1;
134+
}
135+
136+
pub(crate) fn dec_num_remote_reset_streams(&mut self) {
137+
assert!(self.num_remote_reset_streams > 0);
138+
139+
self.num_remote_reset_streams -= 1;
105140
}
106141

107142
pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
@@ -194,8 +229,8 @@ impl Counts {
194229
}
195230

196231
fn dec_num_reset_streams(&mut self) {
197-
assert!(self.num_reset_streams > 0);
198-
self.num_reset_streams -= 1;
232+
assert!(self.num_local_reset_streams > 0);
233+
self.num_local_reset_streams -= 1;
199234
}
200235
}
201236

src/proto/streams/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ pub struct Config {
6060
/// Maximum number of locally reset streams to keep at a time
6161
pub local_reset_max: usize,
6262

63+
/// Maximum number of remotely reset "pending accept" streams to keep at a
64+
/// time. Going over this number results in a connection error.
65+
pub remote_reset_max: usize,
66+
6367
/// Initial window size of remote initiated streams
6468
pub remote_init_window_sz: WindowSize,
6569

src/proto/streams/recv.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -741,12 +741,39 @@ impl Recv {
741741
}
742742

743743
/// Handle remote sending an explicit RST_STREAM.
744-
pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) {
744+
pub fn recv_reset(
745+
&mut self,
746+
frame: frame::Reset,
747+
stream: &mut Stream,
748+
counts: &mut Counts,
749+
) -> Result<(), Error> {
750+
// Reseting a stream that the user hasn't accepted is possible,
751+
// but should be done with care. These streams will continue
752+
// to take up memory in the accept queue, but will no longer be
753+
// counted as "concurrent" streams.
754+
//
755+
// So, we have a separate limit for these.
756+
//
757+
// See https://github.com/hyperium/hyper/issues/2877
758+
if stream.is_pending_accept {
759+
if counts.can_inc_num_remote_reset_streams() {
760+
counts.inc_num_remote_reset_streams();
761+
} else {
762+
tracing::warn!(
763+
"recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
764+
counts.max_remote_reset_streams(),
765+
);
766+
return Err(Error::library_go_away(Reason::ENHANCE_YOUR_CALM));
767+
}
768+
}
769+
745770
// Notify the stream
746771
stream.state.recv_reset(frame, stream.is_pending_send);
747772

748773
stream.notify_send();
749774
stream.notify_recv();
775+
776+
Ok(())
750777
}
751778

752779
/// Handle a connection-level error
@@ -1033,7 +1060,6 @@ impl Recv {
10331060
cx: &Context,
10341061
stream: &mut Stream,
10351062
) -> Poll<Option<Result<Bytes, proto::Error>>> {
1036-
// TODO: Return error when the stream is reset
10371063
match stream.pending_recv.pop_front(&mut self.buffer) {
10381064
Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
10391065
Some(event) => {

src/proto/streams/state.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,13 @@ impl State {
360360
}
361361
}
362362

363+
pub fn is_remote_reset(&self) -> bool {
364+
match self.inner {
365+
Closed(Cause::Error(ref e)) => e.is_local(),
366+
_ => false,
367+
}
368+
}
369+
363370
/// Returns true if the stream is already reset.
364371
pub fn is_reset(&self) -> bool {
365372
match self.inner {

src/proto/streams/streams.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,12 @@ where
140140
// TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
141141
// the lock, so it can't.
142142
me.refs += 1;
143+
144+
// Pending-accepted remotely-reset streams are counted.
145+
if stream.state.is_remote_reset() {
146+
me.counts.dec_num_remote_reset_streams();
147+
}
148+
143149
StreamRef {
144150
opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
145151
send_buffer: self.send_buffer.clone(),
@@ -601,7 +607,7 @@ impl Inner {
601607
let actions = &mut self.actions;
602608

603609
self.counts.transition(stream, |counts, stream| {
604-
actions.recv.recv_reset(frame, stream);
610+
actions.recv.recv_reset(frame, stream, counts)?;
605611
actions.send.handle_error(send_buffer, stream, counts);
606612
assert!(stream.state.is_closed());
607613
Ok(())

src/server.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,13 @@ where
576576
pub fn max_concurrent_recv_streams(&self) -> usize {
577577
self.connection.max_recv_streams()
578578
}
579+
580+
// Could disappear at anytime.
581+
#[doc(hidden)]
582+
#[cfg(feature = "unstable")]
583+
pub fn num_wired_streams(&self) -> usize {
584+
self.connection.num_wired_streams()
585+
}
579586
}
580587

581588
#[cfg(feature = "stream")]

tests/h2-support/src/frames.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,10 @@ impl Mock<frame::GoAway> {
297297
self.reason(frame::Reason::FRAME_SIZE_ERROR)
298298
}
299299

300+
pub fn calm(self) -> Self {
301+
self.reason(frame::Reason::ENHANCE_YOUR_CALM)
302+
}
303+
300304
pub fn no_error(self) -> Self {
301305
self.reason(frame::Reason::NO_ERROR)
302306
}

tests/h2-tests/tests/stream_states.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![deny(warnings)]
22

3-
use futures::future::{join, join3, lazy, try_join};
3+
use futures::future::{join, join3, lazy, poll_fn, try_join};
44
use futures::{FutureExt, StreamExt, TryStreamExt};
55
use h2_support::prelude::*;
66
use h2_support::util::yield_once;
@@ -194,6 +194,45 @@ async fn closed_streams_are_released() {
194194
join(srv, h2).await;
195195
}
196196

197+
#[tokio::test]
198+
async fn reset_streams_dont_grow_memory_continuously() {
199+
//h2_support::trace_init!();
200+
let (io, mut client) = mock::new();
201+
202+
const N: u32 = 50;
203+
204+
let client = async move {
205+
let settings = client.assert_server_handshake().await;
206+
assert_default_settings!(settings);
207+
for n in (1..(N * 2)).step_by(2) {
208+
client
209+
.send_frame(frames::headers(n).request("GET", "https://a.b/").eos())
210+
.await;
211+
client.send_frame(frames::reset(n).protocol_error()).await;
212+
}
213+
tokio::time::timeout(
214+
std::time::Duration::from_secs(1),
215+
client.recv_frame(frames::go_away(41).calm()),
216+
)
217+
.await
218+
.expect("client goaway");
219+
};
220+
221+
let srv = async move {
222+
let mut srv = server::Builder::new()
223+
.handshake::<_, Bytes>(io)
224+
.await
225+
.expect("handshake");
226+
227+
poll_fn(|cx| srv.poll_closed(cx))
228+
.await
229+
.expect_err("server should error");
230+
// specifically, not 50;
231+
assert_eq!(21, srv.num_wired_streams());
232+
};
233+
join(srv, client).await;
234+
}
235+
197236
#[tokio::test]
198237
async fn errors_if_recv_frame_exceeds_max_frame_size() {
199238
h2_support::trace_init!();

0 commit comments

Comments
 (0)