Skip to content

Commit 1fee753

Browse files
committed
Fix the bug where channel is not being released back to the pool when there is an exception thrown
1 parent d72f134 commit 1fee753

File tree

4 files changed

+121
-2
lines changed

4 files changed

+121
-2
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "Netty Nio Http Client",
3+
"type": "bugfix",
4+
"description": "Fix a bug where the channel fails to be released if there is an exception thrown."
5+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,8 @@ private String getMessageForTooManyAcquireOperationsError() {
312312
*/
313313
private void closeAndRelease(Channel channel) {
314314
log.trace("closing and releasing channel {}", channel.id().asLongText());
315-
channel.close().addListener(ignored -> context.channelPool().release(channel));
315+
channel.close();
316+
context.channelPool().release(channel);
316317
}
317318

318319
/**

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ public static ResponseHandler getInstance() {
165165
private static void closeAndRelease(ChannelHandlerContext ctx) {
166166
Channel channel = ctx.channel();
167167
RequestContext requestContext = channel.attr(REQUEST_CONTEXT_KEY).get();
168-
channel.close().addListener(i -> requestContext.channelPool().release(channel));
168+
ctx.close();
169+
requestContext.channelPool().release(channel);
169170
}
170171

171172
/**

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClientWireMockTest.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,17 @@
4141
import static org.mockito.Mockito.times;
4242
import static org.mockito.Mockito.when;
4343

44+
import com.github.tomakehurst.wiremock.http.Fault;
4445
import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener;
4546
import com.github.tomakehurst.wiremock.junit.WireMockRule;
47+
import io.netty.channel.Channel;
4648
import io.netty.channel.ChannelFactory;
4749
import io.netty.channel.ChannelFuture;
4850
import io.netty.channel.EventLoopGroup;
4951
import io.netty.channel.nio.NioEventLoopGroup;
5052
import io.netty.channel.pool.ChannelPool;
5153
import io.netty.channel.socket.nio.NioSocketChannel;
54+
import io.netty.util.AttributeKey;
5255
import java.io.IOException;
5356
import java.net.Socket;
5457
import java.net.URI;
@@ -225,6 +228,110 @@ protected ChannelPool newPool(URI key) {
225228
Mockito.verify(channelPool).close();
226229
}
227230

231+
@Test
232+
public void responseConnectionReused_shouldReleaseChannel() throws Exception {
233+
234+
ChannelFactory channelFactory = mock(ChannelFactory.class);
235+
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup(1);
236+
NioSocketChannel channel = new NioSocketChannel();
237+
238+
when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> channel);
239+
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.create(customEventLoopGroup, channelFactory);
240+
241+
NettyNioAsyncHttpClient customClient =
242+
(NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
243+
.eventLoopGroup(eventLoopGroup)
244+
.maxConcurrency(1)
245+
.build();
246+
247+
makeSimpleRequest(customClient);
248+
verifyChannelRelease(channel);
249+
assertThat(channel.isShutdown()).isFalse();
250+
251+
customClient.close();
252+
eventLoopGroup.eventLoopGroup().shutdownGracefully().awaitUninterruptibly();
253+
}
254+
255+
@Test
256+
public void connectionInactive_shouldReleaseChannel() throws Exception {
257+
258+
ChannelFactory channelFactory = mock(ChannelFactory.class);
259+
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup(1);
260+
NioSocketChannel channel = new NioSocketChannel();
261+
262+
when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> channel);
263+
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.create(customEventLoopGroup, channelFactory);
264+
265+
NettyNioAsyncHttpClient customClient =
266+
(NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
267+
.eventLoopGroup(eventLoopGroup)
268+
.maxConcurrency(1)
269+
.build();
270+
271+
272+
String body = randomAlphabetic(10);
273+
URI uri = URI.create("http://localhost:" + mockServer.port());
274+
SdkHttpRequest request = createRequest(uri);
275+
RecordingResponseHandler recorder = new RecordingResponseHandler();
276+
277+
278+
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withBody(body)
279+
.withStatus(500)
280+
.withFault(Fault.RANDOM_DATA_THEN_CLOSE)));
281+
282+
customClient.execute(AsyncExecuteRequest.builder()
283+
.request(request)
284+
.requestContentPublisher(createProvider(""))
285+
.responseHandler(recorder).build());
286+
287+
verifyChannelRelease(channel);
288+
assertThat(channel.isShutdown()).isTrue();
289+
290+
customClient.close();
291+
eventLoopGroup.eventLoopGroup().shutdownGracefully().awaitUninterruptibly();
292+
}
293+
294+
@Test
295+
public void responseConnectionClosed_shouldCloseAndReleaseChannel() throws Exception {
296+
297+
ChannelFactory channelFactory = mock(ChannelFactory.class);
298+
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup(1);
299+
NioSocketChannel channel = new NioSocketChannel();
300+
301+
when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> channel);
302+
303+
URI uri = URI.create("http://localhost:" + mockServer.port());
304+
SdkHttpRequest request = createRequest(uri);
305+
RecordingResponseHandler recorder = new RecordingResponseHandler();
306+
307+
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.create(customEventLoopGroup, channelFactory);
308+
309+
NettyNioAsyncHttpClient customClient =
310+
(NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
311+
.eventLoopGroup(eventLoopGroup)
312+
.maxConcurrency(1)
313+
.build();
314+
315+
String body = randomAlphabetic(10);
316+
317+
stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withBody(body)
318+
.withStatus(500)
319+
.withHeader("Connection", "close")
320+
));
321+
322+
customClient.execute(AsyncExecuteRequest.builder()
323+
.request(request)
324+
.requestContentPublisher(createProvider(""))
325+
.responseHandler(recorder).build());
326+
recorder.completeFuture.get(5, TimeUnit.SECONDS);
327+
328+
verifyChannelRelease(channel);
329+
assertThat(channel.isShutdown()).isTrue();
330+
331+
customClient.close();
332+
eventLoopGroup.eventLoopGroup().shutdownGracefully().awaitUninterruptibly();
333+
}
334+
228335
/**
229336
* Make a simple async request and wait for it to fiish.
230337
*
@@ -491,6 +598,11 @@ public void testExceptionMessageChanged_WhenConnectionTimeoutErrorEncountered()
491598
customClient.close();
492599
}
493600

601+
private void verifyChannelRelease(Channel channel) throws InterruptedException {
602+
Thread.sleep(1000);
603+
assertThat(channel.attr(AttributeKey.valueOf("channelPool")).get()).isNull();
604+
}
605+
494606
private RecordingResponseHandler makeSimpleRequestAndReturnResponseHandler(SdkAsyncHttpClient client) throws Exception {
495607
String body = randomAlphabetic(10);
496608
URI uri = URI.create("http://localhost:" + mockServer.port());

0 commit comments

Comments
 (0)