Skip to content

Commit e5b8aa0

Browse files
normanmaureryawkat
andauthored
HTTP2: Allow to manually manage window update frames when using Http2… (#14980)
…… (#14974) …StreamChannel Motivation: In some advanced use-cases it might be useful for the end-user to manually manage the window update frames and so have more control over the backpressure. Modifications: - Add a new Http2StreamChannelOption that allows to configure that window update frames will be send manually and adjust channel implementation to allow it - Add test case Result: Fixes #14945 Co-authored-by: Jonas Konrad <[email protected]>
1 parent 7880714 commit e5b8aa0

File tree

3 files changed

+245
-22
lines changed

3 files changed

+245
-22
lines changed

codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java

+113-17
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.netty.channel.ChannelHandlerContext;
2525
import io.netty.channel.ChannelId;
2626
import io.netty.channel.ChannelMetadata;
27+
import io.netty.channel.ChannelOption;
2728
import io.netty.channel.ChannelOutboundBuffer;
2829
import io.netty.channel.ChannelPipeline;
2930
import io.netty.channel.ChannelProgressivePromise;
@@ -41,6 +42,7 @@
4142
import io.netty.handler.ssl.SslCloseCompletionEvent;
4243
import io.netty.util.DefaultAttributeMap;
4344
import io.netty.util.ReferenceCountUtil;
45+
import io.netty.util.internal.ObjectUtil;
4446
import io.netty.util.internal.StringUtil;
4547
import io.netty.util.internal.logging.InternalLogger;
4648
import io.netty.util.internal.logging.InternalLoggerFactory;
@@ -49,6 +51,7 @@
4951
import java.net.SocketAddress;
5052
import java.nio.channels.ClosedChannelException;
5153
import java.util.ArrayDeque;
54+
import java.util.Map;
5255
import java.util.Queue;
5356
import java.util.concurrent.RejectedExecutionException;
5457
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -906,24 +909,19 @@ void readEOS() {
906909
readEOS = true;
907910
}
908911

909-
private void updateLocalWindowIfNeeded() {
910-
if (flowControlledBytes != 0 && !parentContext().isRemoved()) {
912+
private boolean updateLocalWindowIfNeeded() {
913+
if (flowControlledBytes != 0 && !parentContext().isRemoved() && config.autoStreamFlowControl) {
911914
int bytes = flowControlledBytes;
912915
flowControlledBytes = 0;
913-
ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
914-
// window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
915-
// completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
916-
// to assume there was a write done that needs to be flushed or we risk flow control starvation.
917-
writeDoneAndNoFlush = true;
918-
// Add a listener which will notify and teardown the stream
919-
// when a window update fails if needed or check the result of the future directly if it was completed
920-
// already.
921-
// See https://github.com/netty/netty/issues/9663
922-
if (future.isDone()) {
923-
windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
924-
} else {
925-
future.addListener(windowUpdateFrameWriteListener);
926-
}
916+
writeWindowUpdateFrame(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
917+
return true;
918+
}
919+
return false;
920+
}
921+
922+
void updateLocalWindowIfNeededAndFlush() {
923+
if (updateLocalWindowIfNeeded()) {
924+
flush();
927925
}
928926
}
929927

@@ -982,6 +980,24 @@ void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
982980
pipeline().fireChannelRead(frame);
983981
}
984982

983+
private ChannelFuture writeWindowUpdateFrame(Http2WindowUpdateFrame windowUpdateFrame) {
984+
ChannelFuture future = write0(parentContext(), windowUpdateFrame);
985+
// window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
986+
// completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
987+
// to assume there was a write done that needs to be flushed or we risk flow control starvation.
988+
writeDoneAndNoFlush = true;
989+
// Add a listener which will notify and teardown the stream
990+
// when a window update fails if needed or check the result of the future directly if it was completed
991+
// already.
992+
// See https://github.com/netty/netty/issues/9663
993+
if (future.isDone()) {
994+
windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
995+
} else {
996+
future.addListener(windowUpdateFrameWriteListener);
997+
}
998+
return future;
999+
}
1000+
9851001
@Override
9861002
public void write(Object msg, final ChannelPromise promise) {
9871003
// After this point its not possible to cancel a write anymore.
@@ -1001,7 +1017,42 @@ public void write(Object msg, final ChannelPromise promise) {
10011017
try {
10021018
if (msg instanceof Http2StreamFrame) {
10031019
Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1004-
writeHttp2StreamFrame(frame, promise);
1020+
if (msg instanceof Http2WindowUpdateFrame) {
1021+
Http2WindowUpdateFrame updateFrame = (Http2WindowUpdateFrame) msg;
1022+
if (config.autoStreamFlowControl) {
1023+
ReferenceCountUtil.release(msg);
1024+
promise.setFailure(new UnsupportedOperationException(
1025+
Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL + " is set to false"));
1026+
return;
1027+
}
1028+
try {
1029+
ObjectUtil.checkInRange(updateFrame.windowSizeIncrement(), 0,
1030+
flowControlledBytes, "windowSizeIncrement");
1031+
} catch (RuntimeException e) {
1032+
ReferenceCountUtil.release(updateFrame);
1033+
promise.setFailure(e);
1034+
return;
1035+
}
1036+
flowControlledBytes -= updateFrame.windowSizeIncrement();
1037+
if (parentContext().isRemoved()) {
1038+
ReferenceCountUtil.release(msg);
1039+
promise.setFailure(new ClosedChannelException());
1040+
return;
1041+
}
1042+
ChannelFuture f = writeWindowUpdateFrame(updateFrame);
1043+
if (f.isDone()) {
1044+
writeComplete(f, promise);
1045+
} else {
1046+
f.addListener(new ChannelFutureListener() {
1047+
@Override
1048+
public void operationComplete(ChannelFuture future) {
1049+
writeComplete(future, promise);
1050+
}
1051+
});
1052+
}
1053+
} else {
1054+
writeHttp2StreamFrame(frame, promise);
1055+
}
10051056
} else {
10061057
String msgStr = msg.toString();
10071058
ReferenceCountUtil.release(msg);
@@ -1152,6 +1203,8 @@ public ChannelOutboundBuffer outboundBuffer() {
11521203
* changes.
11531204
*/
11541205
private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1206+
1207+
volatile boolean autoStreamFlowControl = true;
11551208
Http2StreamChannelConfig(Channel channel) {
11561209
super(channel);
11571210
}
@@ -1175,6 +1228,49 @@ public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
11751228
super.setRecvByteBufAllocator(allocator);
11761229
return this;
11771230
}
1231+
1232+
@Override
1233+
public Map<ChannelOption<?>, Object> getOptions() {
1234+
return getOptions(
1235+
super.getOptions(),
1236+
Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL);
1237+
}
1238+
1239+
@SuppressWarnings("unchecked")
1240+
@Override
1241+
public <T> T getOption(ChannelOption<T> option) {
1242+
if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1243+
return (T) Boolean.valueOf(autoStreamFlowControl);
1244+
}
1245+
return super.getOption(option);
1246+
}
1247+
1248+
@Override
1249+
public <T> boolean setOption(ChannelOption<T> option, T value) {
1250+
validate(option, value);
1251+
if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1252+
boolean newValue = (Boolean) value;
1253+
boolean changed = newValue && !autoStreamFlowControl;
1254+
autoStreamFlowControl = (Boolean) value;
1255+
if (changed) {
1256+
if (channel.isRegistered()) {
1257+
final Http2ChannelUnsafe unsafe = (Http2ChannelUnsafe) channel.unsafe();
1258+
if (channel.eventLoop().inEventLoop()) {
1259+
unsafe.updateLocalWindowIfNeededAndFlush();
1260+
} else {
1261+
channel.eventLoop().execute(new Runnable() {
1262+
@Override
1263+
public void run() {
1264+
unsafe.updateLocalWindowIfNeededAndFlush();
1265+
}
1266+
});
1267+
}
1268+
}
1269+
}
1270+
return true;
1271+
}
1272+
return super.setOption(option, value);
1273+
}
11781274
}
11791275

11801276
private void maybeAddChannelToReadCompletePendingQueue() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2025 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package io.netty.handler.codec.http2;
17+
18+
import io.netty.channel.ChannelOption;
19+
20+
/**
21+
* {@link ChannelOption}s that are specific to {@link Http2StreamChannel}s.
22+
*
23+
* @param <T> the type of the value which is valid for the {@link ChannelOption}
24+
*/
25+
public final class Http2StreamChannelOption<T> extends ChannelOption<T> {
26+
private Http2StreamChannelOption(String name) {
27+
super(name);
28+
}
29+
30+
/**
31+
* When set to {@code true} {@link Http2WindowUpdateFrame}s will be automatically be generated and written for
32+
* {@link Http2StreamChannel}s as soon as frames are passed to the user via
33+
* {@link io.netty.channel.ChannelPipeline#fireChannelRead(Object)}. If the user wants more control on when a
34+
* window update is send its possible to set it to {@code false}. In this case the user is responsible to
35+
* generate the correct {@link Http2WindowUpdateFrame}s and eventually write these to the channel.
36+
* <p>
37+
* See <a href="https://datatracker.ietf.org/doc/html/rfc9113#section-5.2">RFC9113 5.2. Flow Control</a> for more
38+
* details.
39+
*/
40+
public static final ChannelOption<Boolean> AUTO_STREAM_FLOW_CONTROL =
41+
valueOf("AUTO_STREAM_FLOW_CONTROL");
42+
}

codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java

+90-5
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.junit.jupiter.params.ParameterizedTest;
4646
import org.junit.jupiter.params.provider.EnumSource;
4747
import org.junit.jupiter.params.provider.MethodSource;
48-
import org.junit.jupiter.params.provider.ValueSource;
4948
import org.mockito.ArgumentMatcher;
5049
import org.mockito.Mockito;
5150
import org.mockito.invocation.InvocationOnMock;
@@ -1563,11 +1562,23 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
15631562

15641563
@Test
15651564
public void windowUpdatesAreFlushed() {
1565+
windowUpdatesAreFlushed(true);
1566+
}
1567+
1568+
@Test
1569+
public void windowUpdatesNotDoneAutomatically() {
1570+
windowUpdatesAreFlushed(false);
1571+
}
1572+
1573+
private void windowUpdatesAreFlushed(boolean autoWriteWindowUpdateFrames) {
15661574
LastInboundHandler inboundHandler = new LastInboundHandler();
15671575
FlushSniffer flushSniffer = new FlushSniffer();
15681576
parentChannel.pipeline().addFirst(flushSniffer);
15691577

15701578
Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
1579+
childChannel.config().setOption(
1580+
Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, autoWriteWindowUpdateFrames);
1581+
15711582
assertTrue(childChannel.config().isAutoRead());
15721583
childChannel.config().setAutoRead(false);
15731584
assertFalse(childChannel.config().isAutoRead());
@@ -1591,16 +1602,90 @@ public void windowUpdatesAreFlushed() {
15911602
// Trigger a read of the second frame.
15921603
childChannel.read();
15931604
verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 1);
1594-
// We expect a flush here because the StreamChannel will flush the smaller increment but the
1595-
// connection will collect the bytes and decide not to send a wire level frame until more are consumed.
1596-
assertTrue(flushSniffer.checkFlush());
1605+
if (autoWriteWindowUpdateFrames) {
1606+
// We expect a flush here because the StreamChannel will flush the smaller increment but the
1607+
// connection will collect the bytes and decide not to send a wire level frame until more are consumed.
1608+
assertTrue(flushSniffer.checkFlush());
1609+
} else {
1610+
assertFalse(flushSniffer.checkFlush());
1611+
}
1612+
15971613
verify(frameWriter, never()).writeWindowUpdate(eqCodecCtx(), anyInt(), anyInt(), anyChannelPromise());
15981614

15991615
// Call read one more time which should trigger the writing of the flow control update.
16001616
childChannel.read();
1617+
if (autoWriteWindowUpdateFrames) {
1618+
verify(frameWriter).writeWindowUpdate(eqCodecCtx(), eq(0), eq(32 * 1024), anyChannelPromise());
1619+
verify(frameWriter).writeWindowUpdate(
1620+
eqCodecCtx(), eq(childChannel.stream().id()), eq(32 * 1024), anyChannelPromise());
1621+
assertTrue(flushSniffer.checkFlush());
1622+
} else {
1623+
verify(frameWriter, never()).writeWindowUpdate(eqCodecCtx(), anyInt(), anyInt(), anyChannelPromise());
1624+
assertFalse(flushSniffer.checkFlush());
1625+
1626+
// Let's manually send a window update frame now.
1627+
ChannelFuture f = childChannel.writeAndFlush(new DefaultHttp2WindowUpdateFrame(32 * 1024)
1628+
.stream(childChannel.stream()));
1629+
assertTrue(f.isSuccess());
1630+
verify(frameWriter).writeWindowUpdate(eqCodecCtx(), eq(0), eq(32 * 1024), anyChannelPromise());
1631+
verify(frameWriter).writeWindowUpdate(
1632+
eqCodecCtx(), eq(childChannel.stream().id()), eq(32 * 1024), anyChannelPromise());
1633+
assertTrue(flushSniffer.checkFlush());
1634+
1635+
// Let's try to send one more even though there are no more pending bytes
1636+
f = childChannel.writeAndFlush(new DefaultHttp2WindowUpdateFrame(32 * 1024)
1637+
.stream(childChannel.stream()));
1638+
assertNotNull(f.cause());
1639+
}
1640+
}
1641+
1642+
@Test
1643+
public void windowUpdatesSendWhenAutoReadEnabled() {
1644+
LastInboundHandler inboundHandler = new LastInboundHandler();
1645+
FlushSniffer flushSniffer = new FlushSniffer();
1646+
parentChannel.pipeline().addFirst(flushSniffer);
1647+
1648+
Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
1649+
childChannel.config().setOption(
1650+
Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, false);
1651+
1652+
assertTrue(childChannel.config().isAutoRead());
1653+
childChannel.config().setAutoRead(false);
1654+
assertFalse(childChannel.config().isAutoRead());
1655+
1656+
Http2HeadersFrame headersFrame = inboundHandler.readInbound();
1657+
assertNotNull(headersFrame);
1658+
1659+
assertTrue(flushSniffer.checkFlush());
1660+
1661+
// Write some bytes to get the channel into the idle state with buffered data and also verify we
1662+
// do not dispatch it until we receive a read() call.
1663+
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb(16 * 1024), 0, false);
1664+
frameInboundWriter.writeInboundData(childChannel.stream().id(), bb(16 * 1024), 0, false);
1665+
assertTrue(flushSniffer.checkFlush());
1666+
1667+
verify(frameWriter, never()).writeWindowUpdate(eqCodecCtx(), anyInt(), anyInt(), anyChannelPromise());
1668+
// only the first one was read because it was legacy auto-read behavior.
1669+
verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 1);
1670+
assertFalse(flushSniffer.checkFlush());
1671+
1672+
// Trigger a read of the second frame.
1673+
childChannel.read();
1674+
verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 1);
1675+
assertFalse(flushSniffer.checkFlush());
1676+
1677+
verify(frameWriter, never()).writeWindowUpdate(eqCodecCtx(), anyInt(), anyInt(), anyChannelPromise());
1678+
1679+
childChannel.read();
1680+
1681+
verify(frameWriter, never()).writeWindowUpdate(eqCodecCtx(), anyInt(), anyInt(), anyChannelPromise());
1682+
assertFalse(flushSniffer.checkFlush());
1683+
1684+
childChannel.config().setOption(
1685+
Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, true);
16011686
verify(frameWriter).writeWindowUpdate(eqCodecCtx(), eq(0), eq(32 * 1024), anyChannelPromise());
16021687
verify(frameWriter).writeWindowUpdate(
1603-
eqCodecCtx(), eq(childChannel.stream().id()), eq(32 * 1024), anyChannelPromise());
1688+
eqCodecCtx(), eq(childChannel.stream().id()), eq(32 * 1024), anyChannelPromise());
16041689
assertTrue(flushSniffer.checkFlush());
16051690
}
16061691

0 commit comments

Comments
 (0)