Skip to content

Commit 310e78b

Browse files
committed
Add LastHttpContentHandler to flag if the LastHttpContent has been received
Set KEEP_ALIVE to false if the channel is to be closed
1 parent 4c0e42e commit 310e78b

File tree

9 files changed

+334
-16
lines changed

9 files changed

+334
-16
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "Netty NIO Http Client",
3+
"type": "bugfix",
4+
"description": "Fix a race condition where the channel is closed right after all content is buffered, causing `server failed to complete the response` error by adding a flag when `LastHttpContentHandler` is received."
5+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.http.nio.netty.internal;
1717

1818
import io.netty.channel.Channel;
19+
import io.netty.handler.codec.http.LastHttpContent;
1920
import io.netty.util.AttributeKey;
2021
import java.nio.ByteBuffer;
2122
import java.util.concurrent.CompletableFuture;
@@ -60,6 +61,12 @@ public final class ChannelAttributeKey {
6061
static final AttributeKey<Boolean> RESPONSE_COMPLETE_KEY = AttributeKey.newInstance(
6162
"aws.http.nio.netty.async.responseComplete");
6263

64+
/**
65+
* {@link AttributeKey} to keep track of whether we have received the {@link LastHttpContent}.
66+
*/
67+
static final AttributeKey<Boolean> LAST_HTTP_CONTENT_RECEIVED_KEY = AttributeKey.newInstance(
68+
"aws.http.nio.netty.async.lastHttpContentReceived");
69+
6370
static final AttributeKey<CompletableFuture<Void>> EXECUTE_FUTURE_KEY = AttributeKey.newInstance(
6471
"aws.http.nio.netty.async.executeFuture");
6572

@@ -72,7 +79,6 @@ public final class ChannelAttributeKey {
7279
*/
7380
static final AttributeKey<Boolean> KEEP_ALIVE = AttributeKey.newInstance("aws.http.nio.netty.async.keepAlive");
7481

75-
7682
/**
7783
* Whether the channel is still in use
7884
*/

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPool.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ private void removePerRequestHandlers(Channel channel) {
7878
if (channel.isOpen() || channel.isRegistered()) {
7979
removeIfExists(channel.pipeline(),
8080
HttpStreamsClientHandler.class,
81+
LastHttpContentHandler.class,
8182
ResponseHandler.class,
8283
ReadTimeoutHandler.class,
8384
WriteTimeoutHandler.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
19+
20+
import io.netty.channel.ChannelHandler;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.channel.ChannelInboundHandlerAdapter;
23+
import io.netty.handler.codec.http.LastHttpContent;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.utils.Logger;
26+
27+
/**
28+
* Marks {@code ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY} if {@link LastHttpContent} is received.
29+
*/
30+
@SdkInternalApi
31+
@ChannelHandler.Sharable
32+
public final class LastHttpContentHandler extends ChannelInboundHandlerAdapter {
33+
private static final LastHttpContentHandler INSTANCE = new LastHttpContentHandler();
34+
private static final Logger logger = Logger.loggerFor(LastHttpContent.class);
35+
36+
@Override
37+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
38+
if (msg instanceof LastHttpContent) {
39+
logger.debug(() -> "Received LastHttpContent " + ctx.channel());
40+
ctx.channel().attr(LAST_HTTP_CONTENT_RECEIVED_KEY).set(true);
41+
}
42+
43+
ctx.fireChannelRead(msg);
44+
}
45+
46+
public static LastHttpContentHandler create() {
47+
return INSTANCE;
48+
}
49+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
2121
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTION_ID_KEY;
2222
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.IN_USE;
23+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
24+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
2325
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
2426
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
2527

@@ -145,6 +147,7 @@ private void configureChannel() {
145147
channel.attr(EXECUTE_FUTURE_KEY).set(executeFuture);
146148
channel.attr(REQUEST_CONTEXT_KEY).set(context);
147149
channel.attr(RESPONSE_COMPLETE_KEY).set(false);
150+
channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).set(false);
148151
channel.attr(IN_USE).set(true);
149152
channel.config().setOption(ChannelOption.AUTO_READ, false);
150153
}
@@ -162,6 +165,7 @@ private boolean tryConfigurePipeline() {
162165
return false;
163166
}
164167

168+
pipeline.addLast(LastHttpContentHandler.create());
165169
pipeline.addLast(new HttpStreamsClientHandler());
166170
pipeline.addLast(ResponseHandler.getInstance());
167171

@@ -336,6 +340,7 @@ private String getMessageForClosedChannel() {
336340
*/
337341
private void closeAndRelease(Channel channel) {
338342
log.trace("closing and releasing channel {}", channel.id().asLongText());
343+
channel.attr(KEEP_ALIVE).set(false);
339344
channel.close();
340345
context.channelPool().release(channel);
341346
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static java.util.stream.Collectors.mapping;
2020
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
2121
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
22+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
2223
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
2324
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
2425
import static software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils.tryCatch;
@@ -138,12 +139,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
138139
}
139140

140141
@Override
141-
public void channelInactive(ChannelHandlerContext handlerCtx) throws Exception {
142-
notifyIfResponseNotCompleted(handlerCtx);
143-
}
144-
145-
@Override
146-
public void channelUnregistered(ChannelHandlerContext handlerCtx) throws Exception {
142+
public void channelInactive(ChannelHandlerContext handlerCtx) {
147143
notifyIfResponseNotCompleted(handlerCtx);
148144
}
149145

@@ -158,6 +154,7 @@ public static ResponseHandler getInstance() {
158154
*/
159155
private static void closeAndRelease(ChannelHandlerContext ctx) {
160156
Channel channel = ctx.channel();
157+
channel.attr(KEEP_ALIVE).set(false);
161158
RequestContext requestContext = channel.attr(REQUEST_CONTEXT_KEY).get();
162159
ctx.close();
163160
requestContext.channelPool().release(channel);
@@ -386,7 +383,10 @@ private Throwable wrapException(Throwable originalCause) {
386383
private void notifyIfResponseNotCompleted(ChannelHandlerContext handlerCtx) {
387384
RequestContext requestCtx = handlerCtx.channel().attr(REQUEST_CONTEXT_KEY).get();
388385
boolean responseCompleted = handlerCtx.channel().attr(RESPONSE_COMPLETE_KEY).get();
389-
if (!responseCompleted) {
386+
boolean lastHttpContentReceived = handlerCtx.channel().attr(LAST_HTTP_CONTENT_RECEIVED_KEY).get();
387+
handlerCtx.channel().attr(KEEP_ALIVE).set(false);
388+
389+
if (!responseCompleted && !lastHttpContentReceived) {
390390
IOException err = new IOException("Server failed to send complete response");
391391
runAndLogError("Fail to execute SdkAsyncHttpResponseHandler#onError", () -> requestCtx.handler().onError(err));
392392
executeFuture(handlerCtx).completeExceptionally(err);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.LAST_HTTP_CONTENT_RECEIVED_KEY;
20+
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.codec.http.LastHttpContent;
23+
import org.junit.After;
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
import org.mockito.Mockito;
27+
28+
29+
public class LastHttpContentHandlerTest {
30+
31+
private MockChannel channel;
32+
private ChannelHandlerContext handlerContext;
33+
private LastHttpContentHandler contentHandler = LastHttpContentHandler.create();
34+
35+
@Before
36+
public void setup() throws Exception {
37+
channel = new MockChannel();
38+
channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).set(false);
39+
handlerContext = Mockito.mock(ChannelHandlerContext.class);
40+
Mockito.when(handlerContext.channel()).thenReturn(channel);
41+
}
42+
43+
@After
44+
public void cleanup() {
45+
channel.close();
46+
}
47+
48+
@Test
49+
public void lastHttpContentReceived_shouldSetAttribute() {
50+
LastHttpContent lastHttpContent = LastHttpContent.EMPTY_LAST_CONTENT;
51+
contentHandler.channelRead(handlerContext, lastHttpContent);
52+
53+
assertThat(channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).get()).isTrue();
54+
}
55+
56+
@Test
57+
public void otherContentReceived_shouldNotSetAttribute() {
58+
String content = "some content";
59+
contentHandler.channelRead(handlerContext, content);
60+
61+
assertThat(channel.attr(LAST_HTTP_CONTENT_RECEIVED_KEY).get()).isFalse();
62+
}
63+
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutorTest.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
package software.amazon.awssdk.http.nio.netty.internal;
22

3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.Matchers.any;
5+
import static org.mockito.Mockito.mock;
6+
import static org.mockito.Mockito.verify;
7+
import static org.mockito.Mockito.when;
8+
39
import io.netty.channel.Channel;
410
import io.netty.channel.EventLoop;
511
import io.netty.channel.EventLoopGroup;
612
import io.netty.channel.nio.NioEventLoopGroup;
713
import io.netty.channel.pool.ChannelPool;
814
import io.netty.util.concurrent.Promise;
15+
import java.util.concurrent.CompletableFuture;
916
import org.junit.After;
1017
import org.junit.Before;
1118
import org.junit.Test;
@@ -14,14 +21,6 @@
1421
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
1522
import software.amazon.awssdk.utils.AttributeMap;
1623

17-
import java.util.concurrent.CompletableFuture;
18-
19-
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.mockito.Matchers.any;
21-
import static org.mockito.Mockito.mock;
22-
import static org.mockito.Mockito.verify;
23-
import static org.mockito.Mockito.when;
24-
2524
public class NettyRequestExecutorTest {
2625

2726
private ChannelPool mockChannelPool;

0 commit comments

Comments
 (0)