diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-0d76926.json b/.changes/next-release/bugfix-AWSSDKforJavav2-0d76926.json
new file mode 100644
index 000000000000..f1da6967d318
--- /dev/null
+++ b/.changes/next-release/bugfix-AWSSDKforJavav2-0d76926.json
@@ -0,0 +1,5 @@
+{
+ "category": "AWS SDK for Java v2",
+ "type": "bugfix",
+ "description": "Fixed a bug in asynchronous clients, where a service closing a connection between when a channel is acquired and handlers are attached could lead to response futures never being completed. Fixes [#1207](https://github.com/aws/aws-sdk-java-v2/issues/1207)."
+}
diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java
index dd2e77334f54..a60071b9ac9a 100644
--- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java
+++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java
@@ -62,6 +62,7 @@
import software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool;
import software.amazon.awssdk.http.nio.netty.internal.ChannelPipelineInitializer;
import software.amazon.awssdk.http.nio.netty.internal.HandlerRemovingChannelPool;
+import software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool;
import software.amazon.awssdk.http.nio.netty.internal.HonorCloseOnReleaseChannelPool;
import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
import software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor;
@@ -220,6 +221,10 @@ private ChannelPool createChannelPool(Bootstrap bootstrap, ChannelPipelineInitia
// Wrap the channel pool such that an individual channel can only be released to the underlying pool once.
channelPool = new ReleaseOnceChannelPool(channelPool);
+ // Wrap the channel pool to guarantee all channels checked out are healthy, and all unhealthy channels checked in are
+ // closed.
+ channelPool = new HealthCheckedChannelPool(bootstrap.config().group(), configuration, channelPool);
+
// Wrap the channel pool such that if the Promise given to acquire(Promise) is done when the channel is acquired
// from the underlying pool, the channel is closed and released.
channelPool = new CancellableAcquireChannelPool(bootstrap.config().group().next(), channelPool);
diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java
new file mode 100644
index 000000000000..58945edef8b7
--- /dev/null
+++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.nio.netty.internal;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+
+/**
+ * An implementation of {@link ChannelPool} that validates the health of its connections.
+ *
+ * This wraps another {@code ChannelPool}, and verifies:
+ *
+ * - All connections acquired from the underlying channel pool are in the active state.
+ * - All connections released into the underlying pool that are not active, are closed before they are released.
+ *
+ *
+ * Acquisitions that fail due to an unhealthy underlying channel are retried until a healthy channel can be returned, or the
+ * {@link NettyConfiguration#connectionAcquireTimeoutMillis()} timeout is reached.
+ */
+@SdkInternalApi
+public class HealthCheckedChannelPool implements ChannelPool {
+ private final EventLoopGroup eventLoopGroup;
+ private final int acquireTimeoutMillis;
+ private final ChannelPool delegate;
+
+ public HealthCheckedChannelPool(EventLoopGroup eventLoopGroup,
+ NettyConfiguration configuration,
+ ChannelPool delegate) {
+ this.eventLoopGroup = eventLoopGroup;
+ this.acquireTimeoutMillis = configuration.connectionAcquireTimeoutMillis();
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Future acquire() {
+ return acquire(eventLoopGroup.next().newPromise());
+ }
+
+ @Override
+ public Future acquire(Promise resultFuture) {
+ // Schedule a task to time out this acquisition, in case we can't acquire a channel fast enough.
+ ScheduledFuture> timeoutFuture =
+ eventLoopGroup.schedule(() -> timeoutAcquire(resultFuture), acquireTimeoutMillis, TimeUnit.MILLISECONDS);
+
+ tryAcquire(resultFuture, timeoutFuture);
+ return resultFuture;
+ }
+
+ /**
+ * Time out the provided acquire future, if it hasn't already been completed.
+ */
+ private void timeoutAcquire(Promise resultFuture) {
+ resultFuture.tryFailure(new TimeoutException("Acquire operation took longer than " + acquireTimeoutMillis +
+ " milliseconds."));
+ }
+
+ /**
+ * Try to acquire a channel from the underlying pool. This will keep retrying the acquisition until the provided result
+ * future is completed.
+ *
+ * @param resultFuture The future that should be completed with the acquired channel. If this is completed external to this
+ * function, this function will stop trying to acquire a channel.
+ * @param timeoutFuture The future for the timeout task. This future will be cancelled when a channel is acquired.
+ */
+ private void tryAcquire(Promise resultFuture, ScheduledFuture> timeoutFuture) {
+ // Something else completed the future (probably a timeout). Stop trying to get a channel.
+ if (resultFuture.isDone()) {
+ return;
+ }
+
+ Promise delegateFuture = eventLoopGroup.next().newPromise();
+ delegate.acquire(delegateFuture);
+ delegateFuture.addListener(f -> ensureAcquiredChannelIsHealthy(delegateFuture, resultFuture, timeoutFuture));
+ }
+
+ /**
+ * Validate that the channel returned by the underlying channel pool is healthy. If so, complete the result future with the
+ * channel returned by the underlying pool. If not, close the channel and try to get a different one.
+ *
+ * @param delegateFuture A completed promise as a result of invoking delegate.acquire().
+ * @param resultFuture The future that should be completed with the healthy, acquired channel.
+ * @param timeoutFuture The future for the timeout task. This future will be cancelled when a channel is acquired.
+ */
+ private void ensureAcquiredChannelIsHealthy(Promise delegateFuture,
+ Promise resultFuture,
+ ScheduledFuture> timeoutFuture) {
+ // If our delegate failed to connect, forward down the failure. Don't try again.
+ if (!delegateFuture.isSuccess()) {
+ timeoutFuture.cancel(false);
+ resultFuture.tryFailure(delegateFuture.cause());
+ return;
+ }
+
+ // If our delegate gave us an unhealthy connection, close it and try to get a new one.
+ Channel channel = delegateFuture.getNow();
+ if (!isHealthy(channel)) {
+ channel.close();
+ delegate.release(channel);
+ tryAcquire(resultFuture, timeoutFuture);
+ return;
+ }
+
+ // Cancel the timeout (best effort), and return back the healthy channel.
+ timeoutFuture.cancel(false);
+ if (!resultFuture.trySuccess(channel)) {
+ // If we couldn't give the channel to the result future (because it failed for some other reason),
+ // just return it to the pool.
+ release(channel);
+ }
+ }
+
+ @Override
+ public Future release(Channel channel) {
+ closeIfUnhealthy(channel);
+ return delegate.release(channel);
+ }
+
+ @Override
+ public Future release(Channel channel, Promise promise) {
+ closeIfUnhealthy(channel);
+ return delegate.release(channel, promise);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ /**
+ * Close the provided channel, if it's considered unhealthy.
+ */
+ private void closeIfUnhealthy(Channel channel) {
+ if (!isHealthy(channel)) {
+ channel.close();
+ }
+ }
+
+ /**
+ * Determine whether the provided channel is 'healthy' enough to use.
+ */
+ private boolean isHealthy(Channel channel) {
+ return channel.isActive();
+ }
+}
diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java
index 3a751ee48cdb..8428f5f52428 100644
--- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java
+++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java
@@ -130,8 +130,9 @@ private void makeRequestListener(Future channelFuture) {
if (channelFuture.isSuccess()) {
channel = channelFuture.getNow();
configureChannel();
- configurePipeline();
- makeRequest();
+ if (tryConfigurePipeline()) {
+ makeRequest();
+ }
} else {
handleFailure(() -> "Failed to create connection to " + endpoint(), channelFuture.cause());
}
@@ -146,7 +147,7 @@ private void configureChannel() {
channel.config().setOption(ChannelOption.AUTO_READ, false);
}
- private void configurePipeline() {
+ private boolean tryConfigurePipeline() {
Protocol protocol = ChannelAttributeKey.getProtocolNow(channel);
ChannelPipeline pipeline = channel.pipeline();
if (HTTP2.equals(protocol)) {
@@ -156,10 +157,23 @@ private void configurePipeline() {
String errorMsg = "Unknown protocol: " + protocol;
closeAndRelease(channel);
handleFailure(() -> errorMsg, new RuntimeException(errorMsg));
- return;
+ return false;
}
+
pipeline.addLast(new HttpStreamsClientHandler());
pipeline.addLast(ResponseHandler.getInstance());
+
+ // It's possible that the channel could become inactive between checking it out from the pool, and adding our response
+ // handler (which will monitor for it going inactive from now on).
+ // Make sure it's active here, or the request will never complete: https://github.com/aws/aws-sdk-java-v2/issues/1207
+ if (!channel.isActive()) {
+ String errorMessage = "Channel was closed before it could be written to.";
+ closeAndRelease(channel);
+ handleFailure(() -> errorMessage, new IOException(errorMessage));
+ return false;
+ }
+
+ return true;
}
private void makeRequest() {
diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java
new file mode 100644
index 000000000000..9a0ee5b3ad99
--- /dev/null
+++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.nio.netty.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.never;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+import software.amazon.awssdk.utils.AttributeMap;
+
+public class HealthCheckedChannelPoolTest {
+ private EventLoopGroup eventLoopGroup = Mockito.mock(EventLoopGroup.class);
+ private EventLoop eventLoop = Mockito.mock(EventLoop.class);
+ private ChannelPool downstreamChannelPool = Mockito.mock(ChannelPool.class);
+ private List channels = new ArrayList<>();
+ private ScheduledFuture> scheduledFuture = Mockito.mock(ScheduledFuture.class);
+
+ private static final NettyConfiguration NETTY_CONFIGURATION =
+ new NettyConfiguration(AttributeMap.builder()
+ .put(CONNECTION_ACQUIRE_TIMEOUT, Duration.ofMillis(10))
+ .build());
+
+ private HealthCheckedChannelPool channelPool = new HealthCheckedChannelPool(eventLoopGroup,
+ NETTY_CONFIGURATION,
+ downstreamChannelPool);
+
+ @Before
+ public void reset() {
+ Mockito.reset(eventLoopGroup, eventLoop, downstreamChannelPool, scheduledFuture);
+ channels.clear();
+
+ Mockito.when(eventLoopGroup.next()).thenReturn(eventLoop);
+ Mockito.when(eventLoop.newPromise())
+ .thenAnswer((Answer>) i -> new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
+ }
+
+ @Test
+ public void acquireCanMakeJustOneCall() throws Exception {
+ stubForIgnoredTimeout();
+ stubAcquireHealthySequence(true);
+
+ Future acquire = channelPool.acquire();
+
+ acquire.get(5, TimeUnit.SECONDS);
+
+ assertThat(acquire.isDone()).isTrue();
+ assertThat(acquire.isSuccess()).isTrue();
+ assertThat(acquire.getNow()).isEqualTo(channels.get(0));
+
+ Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any());
+ }
+
+ @Test
+ public void acquireCanMakeManyCalls() throws Exception {
+ stubForIgnoredTimeout();
+ stubAcquireHealthySequence(false, false, false, false, true);
+
+ Future acquire = channelPool.acquire();
+
+ acquire.get(5, TimeUnit.SECONDS);
+
+ assertThat(acquire.isDone()).isTrue();
+ assertThat(acquire.isSuccess()).isTrue();
+ assertThat(acquire.getNow()).isEqualTo(channels.get(4));
+
+ Mockito.verify(downstreamChannelPool, Mockito.times(5)).acquire(any());
+ }
+
+ @Test
+ public void badDownstreamAcquiresCausesException() throws Exception {
+ stubForIgnoredTimeout();
+ stubBadDownstreamAcquire();
+
+ Future acquire = channelPool.acquire();
+
+ try {
+ acquire.get(5, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ // Expected
+ }
+
+ assertThat(acquire.isDone()).isTrue();
+ assertThat(acquire.isSuccess()).isFalse();
+ assertThat(acquire.cause()).isInstanceOf(IOException.class);
+
+ Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any());
+ }
+
+ @Test
+ public void slowAcquireTimesOut() throws Exception {
+ stubIncompleteDownstreamAcquire();
+
+ Mockito.when(eventLoopGroup.schedule(Mockito.any(Runnable.class), Mockito.eq(10), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenAnswer(i -> scheduledFuture);
+
+ Future acquire = channelPool.acquire();
+
+ ArgumentCaptor timeoutTask = ArgumentCaptor.forClass(Runnable.class);
+ Mockito.verify(eventLoopGroup).schedule(timeoutTask.capture(), anyLong(), any());
+ timeoutTask.getValue().run();
+
+ try {
+ acquire.get(5, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ // Expected
+ }
+
+ assertThat(acquire.isDone()).isTrue();
+ assertThat(acquire.isSuccess()).isFalse();
+ assertThat(acquire.cause()).isInstanceOf(TimeoutException.class);
+
+ Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any());
+ }
+
+ @Test
+ public void releaseHealthyDoesNotClose() {
+ Channel channel = Mockito.mock(Channel.class);
+ Mockito.when(channel.isActive()).thenReturn(true);
+
+ channelPool.release(channel);
+
+ Mockito.verify(channel, never()).close();
+ Mockito.verify(downstreamChannelPool, times(1)).release(channel);
+ }
+
+ @Test
+ public void releaseHealthyCloses() {
+ Channel channel = Mockito.mock(Channel.class);
+ Mockito.when(channel.isActive()).thenReturn(false);
+
+ channelPool.release(channel);
+
+ Mockito.verify(channel, times(1)).close();
+ Mockito.verify(downstreamChannelPool, times(1)).release(channel);
+ }
+
+ public void stubAcquireHealthySequence(Boolean... acquireHealthySequence) {
+ OngoingStubbing> stubbing = Mockito.when(downstreamChannelPool.acquire(any()));
+ for (boolean shouldAcquireBeHealthy : acquireHealthySequence) {
+ stubbing = stubbing.thenAnswer(invocation -> {
+ Promise promise = invocation.getArgumentAt(0, Promise.class);
+ Channel channel = Mockito.mock(Channel.class);
+ Mockito.when(channel.isActive()).thenReturn(shouldAcquireBeHealthy);
+ channels.add(channel);
+ promise.setSuccess(channel);
+ return promise;
+ });
+ }
+ }
+
+ public void stubBadDownstreamAcquire() {
+ Mockito.when(downstreamChannelPool.acquire(any())).thenAnswer(invocation -> {
+ Promise promise = invocation.getArgumentAt(0, Promise.class);
+ promise.setFailure(new IOException());
+ return promise;
+ });
+ }
+
+ public void stubIncompleteDownstreamAcquire() {
+ Mockito.when(downstreamChannelPool.acquire(any())).thenAnswer(invocation -> invocation.getArgumentAt(0, Promise.class));
+ }
+
+ public void stubForIgnoredTimeout() {
+ Mockito.when(eventLoopGroup.schedule(any(Runnable.class), anyLong(), any()))
+ .thenAnswer(i -> scheduledFuture);
+ }
+}
\ No newline at end of file