diff --git a/.changes/next-release/bugfix-NettyNIOAsyncHTTPClient-b5e47f7.json b/.changes/next-release/bugfix-NettyNIOAsyncHTTPClient-b5e47f7.json new file mode 100644 index 000000000000..3c7f4b8ed0f4 --- /dev/null +++ b/.changes/next-release/bugfix-NettyNIOAsyncHTTPClient-b5e47f7.json @@ -0,0 +1,6 @@ +{ + "category": "Netty NIO Async HTTP Client", + "contributor": "", + "type": "bugfix", + "description": "Fixed an issue in Netty async http client where NPE was thrown when the execution got cancelled before executionId was attached to the channel." +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandler.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandler.java index 66e94f0fc759..a3d434664a4b 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandler.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandler.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.Attribute; import java.io.IOException; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.http.nio.netty.internal.utils.NettyClientLogger; @@ -46,7 +47,17 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) { FutureCancelledException cancelledException = (FutureCancelledException) e; - if (currentRequestCancelled(ctx, cancelledException)) { + Long channelExecutionId = executionId(ctx); + + if (channelExecutionId == null) { + RequestContext requestContext = ctx.channel().attr(REQUEST_CONTEXT_KEY).get(); + LOG.warn(ctx.channel(), () -> String.format("Received a cancellation exception on a channel that doesn't have an " + + "execution Id attached. Exception's execution ID is %d. " + + "Exception is being ignored. Closing the channel", + executionId(ctx))); + ctx.close(); + requestContext.channelPool().release(ctx.channel()); + } else if (currentRequestCancelled(channelExecutionId, cancelledException)) { RequestContext requestContext = ctx.channel().attr(REQUEST_CONTEXT_KEY).get(); requestContext.handler().onError(e); ctx.fireExceptionCaught(new IOException("Request cancelled")); @@ -65,11 +76,16 @@ public static FutureCancelHandler getInstance() { return INSTANCE; } - private boolean currentRequestCancelled(ChannelHandlerContext ctx, FutureCancelledException e) { - return e.getExecutionId() == executionId(ctx); + private static boolean currentRequestCancelled(long executionId, FutureCancelledException e) { + return e.getExecutionId() == executionId; } - private Long executionId(ChannelHandlerContext ctx) { - return ctx.channel().attr(EXECUTION_ID_KEY).get(); + private static Long executionId(ChannelHandlerContext ctx) { + Attribute attr = ctx.channel().attr(EXECUTION_ID_KEY); + if (attr == null) { + return null; + } + + return attr.get(); } } diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java index 94ae238f91e7..5d5f9a794677 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java @@ -42,6 +42,7 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; +import io.netty.util.Attribute; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; @@ -130,8 +131,9 @@ private CompletableFuture createExecutionFuture(Promise channelPr Channel ch = channelPromise.getNow(); try { ch.eventLoop().submit(() -> { - if (ch.attr(IN_USE).get()) { - ch.pipeline().fireExceptionCaught(new FutureCancelledException(executionId, t)); + Attribute executionIdKey = ch.attr(EXECUTION_ID_KEY); + if (ch.attr(IN_USE) != null && ch.attr(IN_USE).get() && executionIdKey != null) { + ch.pipeline().fireExceptionCaught(new FutureCancelledException(this.executionId, t)); } else { ch.close().addListener(closeFuture -> context.channelPool().release(ch)); } diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandlerTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandlerTest.java index b80cd8e809d0..885eb8ac6644 100644 --- a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandlerTest.java +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandlerTest.java @@ -16,6 +16,8 @@ package software.amazon.awssdk.http.nio.netty.internal; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTION_ID_KEY; @@ -23,6 +25,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultChannelId; import io.netty.channel.EventLoopGroup; import io.netty.util.DefaultAttributeMap; import java.io.IOException; @@ -75,6 +78,7 @@ public void methodSetup() { when(ctx.channel()).thenReturn(channel); when(channel.attr(EXECUTION_ID_KEY)).thenReturn(attrMap.attr(EXECUTION_ID_KEY)); when(channel.attr(REQUEST_CONTEXT_KEY)).thenReturn(attrMap.attr(REQUEST_CONTEXT_KEY)); + when(channel.id()).thenReturn(DefaultChannelId.newInstance()); } @Test @@ -98,4 +102,15 @@ public void forwardsExceptionIfNotCancelledException() { verify(ctx).fireExceptionCaught(exceptionCaptor.capture()); assertThat(exceptionCaptor.getValue()).isEqualTo(err); } + + @Test + public void cancelledException_executionIdNull_shouldIgnoreExceptionAndCloseChannel() { + when(channel.attr(EXECUTION_ID_KEY)).thenReturn(null); + + FutureCancelledException cancelledException = new FutureCancelledException(1L, new CancellationException()); + handler.exceptionCaught(ctx, cancelledException); + + verify(ctx, never()).fireExceptionCaught(any(Throwable.class)); + verify(ctx).close(); + } }