diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-0d76926.json b/.changes/next-release/bugfix-AWSSDKforJavav2-0d76926.json new file mode 100644 index 000000000000..f1da6967d318 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-0d76926.json @@ -0,0 +1,5 @@ +{ + "category": "AWS SDK for Java v2", + "type": "bugfix", + "description": "Fixed a bug in asynchronous clients, where a service closing a connection between when a channel is acquired and handlers are attached could lead to response futures never being completed. Fixes [#1207](https://github.com/aws/aws-sdk-java-v2/issues/1207)." +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java index dd2e77334f54..a60071b9ac9a 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java @@ -62,6 +62,7 @@ import software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool; import software.amazon.awssdk.http.nio.netty.internal.ChannelPipelineInitializer; import software.amazon.awssdk.http.nio.netty.internal.HandlerRemovingChannelPool; +import software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool; import software.amazon.awssdk.http.nio.netty.internal.HonorCloseOnReleaseChannelPool; import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration; import software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor; @@ -220,6 +221,10 @@ private ChannelPool createChannelPool(Bootstrap bootstrap, ChannelPipelineInitia // Wrap the channel pool such that an individual channel can only be released to the underlying pool once. channelPool = new ReleaseOnceChannelPool(channelPool); + // Wrap the channel pool to guarantee all channels checked out are healthy, and all unhealthy channels checked in are + // closed. + channelPool = new HealthCheckedChannelPool(bootstrap.config().group(), configuration, channelPool); + // Wrap the channel pool such that if the Promise given to acquire(Promise) is done when the channel is acquired // from the underlying pool, the channel is closed and released. channelPool = new CancellableAcquireChannelPool(bootstrap.config().group().next(), channelPool); diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java new file mode 100644 index 000000000000..58945edef8b7 --- /dev/null +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java @@ -0,0 +1,164 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.pool.ChannelPool; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import software.amazon.awssdk.annotations.SdkInternalApi; + +/** + * An implementation of {@link ChannelPool} that validates the health of its connections. + * + * This wraps another {@code ChannelPool}, and verifies: + *
    + *
  1. All connections acquired from the underlying channel pool are in the active state.
  2. + *
  3. All connections released into the underlying pool that are not active, are closed before they are released.
  4. + *
+ * + * Acquisitions that fail due to an unhealthy underlying channel are retried until a healthy channel can be returned, or the + * {@link NettyConfiguration#connectionAcquireTimeoutMillis()} timeout is reached. + */ +@SdkInternalApi +public class HealthCheckedChannelPool implements ChannelPool { + private final EventLoopGroup eventLoopGroup; + private final int acquireTimeoutMillis; + private final ChannelPool delegate; + + public HealthCheckedChannelPool(EventLoopGroup eventLoopGroup, + NettyConfiguration configuration, + ChannelPool delegate) { + this.eventLoopGroup = eventLoopGroup; + this.acquireTimeoutMillis = configuration.connectionAcquireTimeoutMillis(); + this.delegate = delegate; + } + + @Override + public Future acquire() { + return acquire(eventLoopGroup.next().newPromise()); + } + + @Override + public Future acquire(Promise resultFuture) { + // Schedule a task to time out this acquisition, in case we can't acquire a channel fast enough. + ScheduledFuture timeoutFuture = + eventLoopGroup.schedule(() -> timeoutAcquire(resultFuture), acquireTimeoutMillis, TimeUnit.MILLISECONDS); + + tryAcquire(resultFuture, timeoutFuture); + return resultFuture; + } + + /** + * Time out the provided acquire future, if it hasn't already been completed. + */ + private void timeoutAcquire(Promise resultFuture) { + resultFuture.tryFailure(new TimeoutException("Acquire operation took longer than " + acquireTimeoutMillis + + " milliseconds.")); + } + + /** + * Try to acquire a channel from the underlying pool. This will keep retrying the acquisition until the provided result + * future is completed. + * + * @param resultFuture The future that should be completed with the acquired channel. If this is completed external to this + * function, this function will stop trying to acquire a channel. + * @param timeoutFuture The future for the timeout task. This future will be cancelled when a channel is acquired. + */ + private void tryAcquire(Promise resultFuture, ScheduledFuture timeoutFuture) { + // Something else completed the future (probably a timeout). Stop trying to get a channel. + if (resultFuture.isDone()) { + return; + } + + Promise delegateFuture = eventLoopGroup.next().newPromise(); + delegate.acquire(delegateFuture); + delegateFuture.addListener(f -> ensureAcquiredChannelIsHealthy(delegateFuture, resultFuture, timeoutFuture)); + } + + /** + * Validate that the channel returned by the underlying channel pool is healthy. If so, complete the result future with the + * channel returned by the underlying pool. If not, close the channel and try to get a different one. + * + * @param delegateFuture A completed promise as a result of invoking delegate.acquire(). + * @param resultFuture The future that should be completed with the healthy, acquired channel. + * @param timeoutFuture The future for the timeout task. This future will be cancelled when a channel is acquired. + */ + private void ensureAcquiredChannelIsHealthy(Promise delegateFuture, + Promise resultFuture, + ScheduledFuture timeoutFuture) { + // If our delegate failed to connect, forward down the failure. Don't try again. + if (!delegateFuture.isSuccess()) { + timeoutFuture.cancel(false); + resultFuture.tryFailure(delegateFuture.cause()); + return; + } + + // If our delegate gave us an unhealthy connection, close it and try to get a new one. + Channel channel = delegateFuture.getNow(); + if (!isHealthy(channel)) { + channel.close(); + delegate.release(channel); + tryAcquire(resultFuture, timeoutFuture); + return; + } + + // Cancel the timeout (best effort), and return back the healthy channel. + timeoutFuture.cancel(false); + if (!resultFuture.trySuccess(channel)) { + // If we couldn't give the channel to the result future (because it failed for some other reason), + // just return it to the pool. + release(channel); + } + } + + @Override + public Future release(Channel channel) { + closeIfUnhealthy(channel); + return delegate.release(channel); + } + + @Override + public Future release(Channel channel, Promise promise) { + closeIfUnhealthy(channel); + return delegate.release(channel, promise); + } + + @Override + public void close() { + delegate.close(); + } + + /** + * Close the provided channel, if it's considered unhealthy. + */ + private void closeIfUnhealthy(Channel channel) { + if (!isHealthy(channel)) { + channel.close(); + } + } + + /** + * Determine whether the provided channel is 'healthy' enough to use. + */ + private boolean isHealthy(Channel channel) { + return channel.isActive(); + } +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java index 3a751ee48cdb..8428f5f52428 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java @@ -130,8 +130,9 @@ private void makeRequestListener(Future channelFuture) { if (channelFuture.isSuccess()) { channel = channelFuture.getNow(); configureChannel(); - configurePipeline(); - makeRequest(); + if (tryConfigurePipeline()) { + makeRequest(); + } } else { handleFailure(() -> "Failed to create connection to " + endpoint(), channelFuture.cause()); } @@ -146,7 +147,7 @@ private void configureChannel() { channel.config().setOption(ChannelOption.AUTO_READ, false); } - private void configurePipeline() { + private boolean tryConfigurePipeline() { Protocol protocol = ChannelAttributeKey.getProtocolNow(channel); ChannelPipeline pipeline = channel.pipeline(); if (HTTP2.equals(protocol)) { @@ -156,10 +157,23 @@ private void configurePipeline() { String errorMsg = "Unknown protocol: " + protocol; closeAndRelease(channel); handleFailure(() -> errorMsg, new RuntimeException(errorMsg)); - return; + return false; } + pipeline.addLast(new HttpStreamsClientHandler()); pipeline.addLast(ResponseHandler.getInstance()); + + // It's possible that the channel could become inactive between checking it out from the pool, and adding our response + // handler (which will monitor for it going inactive from now on). + // Make sure it's active here, or the request will never complete: https://github.com/aws/aws-sdk-java-v2/issues/1207 + if (!channel.isActive()) { + String errorMessage = "Channel was closed before it could be written to."; + closeAndRelease(channel); + handleFailure(() -> errorMessage, new IOException(errorMessage)); + return false; + } + + return true; } private void makeRequest() { diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java new file mode 100644 index 000000000000..9a0ee5b3ad99 --- /dev/null +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java @@ -0,0 +1,205 @@ +/* + * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.internal.verification.VerificationModeFactory.times; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.pool.ChannelPool; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; +import software.amazon.awssdk.utils.AttributeMap; + +public class HealthCheckedChannelPoolTest { + private EventLoopGroup eventLoopGroup = Mockito.mock(EventLoopGroup.class); + private EventLoop eventLoop = Mockito.mock(EventLoop.class); + private ChannelPool downstreamChannelPool = Mockito.mock(ChannelPool.class); + private List channels = new ArrayList<>(); + private ScheduledFuture scheduledFuture = Mockito.mock(ScheduledFuture.class); + + private static final NettyConfiguration NETTY_CONFIGURATION = + new NettyConfiguration(AttributeMap.builder() + .put(CONNECTION_ACQUIRE_TIMEOUT, Duration.ofMillis(10)) + .build()); + + private HealthCheckedChannelPool channelPool = new HealthCheckedChannelPool(eventLoopGroup, + NETTY_CONFIGURATION, + downstreamChannelPool); + + @Before + public void reset() { + Mockito.reset(eventLoopGroup, eventLoop, downstreamChannelPool, scheduledFuture); + channels.clear(); + + Mockito.when(eventLoopGroup.next()).thenReturn(eventLoop); + Mockito.when(eventLoop.newPromise()) + .thenAnswer((Answer>) i -> new DefaultPromise<>(GlobalEventExecutor.INSTANCE)); + } + + @Test + public void acquireCanMakeJustOneCall() throws Exception { + stubForIgnoredTimeout(); + stubAcquireHealthySequence(true); + + Future acquire = channelPool.acquire(); + + acquire.get(5, TimeUnit.SECONDS); + + assertThat(acquire.isDone()).isTrue(); + assertThat(acquire.isSuccess()).isTrue(); + assertThat(acquire.getNow()).isEqualTo(channels.get(0)); + + Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any()); + } + + @Test + public void acquireCanMakeManyCalls() throws Exception { + stubForIgnoredTimeout(); + stubAcquireHealthySequence(false, false, false, false, true); + + Future acquire = channelPool.acquire(); + + acquire.get(5, TimeUnit.SECONDS); + + assertThat(acquire.isDone()).isTrue(); + assertThat(acquire.isSuccess()).isTrue(); + assertThat(acquire.getNow()).isEqualTo(channels.get(4)); + + Mockito.verify(downstreamChannelPool, Mockito.times(5)).acquire(any()); + } + + @Test + public void badDownstreamAcquiresCausesException() throws Exception { + stubForIgnoredTimeout(); + stubBadDownstreamAcquire(); + + Future acquire = channelPool.acquire(); + + try { + acquire.get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + // Expected + } + + assertThat(acquire.isDone()).isTrue(); + assertThat(acquire.isSuccess()).isFalse(); + assertThat(acquire.cause()).isInstanceOf(IOException.class); + + Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any()); + } + + @Test + public void slowAcquireTimesOut() throws Exception { + stubIncompleteDownstreamAcquire(); + + Mockito.when(eventLoopGroup.schedule(Mockito.any(Runnable.class), Mockito.eq(10), Mockito.eq(TimeUnit.MILLISECONDS))) + .thenAnswer(i -> scheduledFuture); + + Future acquire = channelPool.acquire(); + + ArgumentCaptor timeoutTask = ArgumentCaptor.forClass(Runnable.class); + Mockito.verify(eventLoopGroup).schedule(timeoutTask.capture(), anyLong(), any()); + timeoutTask.getValue().run(); + + try { + acquire.get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + // Expected + } + + assertThat(acquire.isDone()).isTrue(); + assertThat(acquire.isSuccess()).isFalse(); + assertThat(acquire.cause()).isInstanceOf(TimeoutException.class); + + Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any()); + } + + @Test + public void releaseHealthyDoesNotClose() { + Channel channel = Mockito.mock(Channel.class); + Mockito.when(channel.isActive()).thenReturn(true); + + channelPool.release(channel); + + Mockito.verify(channel, never()).close(); + Mockito.verify(downstreamChannelPool, times(1)).release(channel); + } + + @Test + public void releaseHealthyCloses() { + Channel channel = Mockito.mock(Channel.class); + Mockito.when(channel.isActive()).thenReturn(false); + + channelPool.release(channel); + + Mockito.verify(channel, times(1)).close(); + Mockito.verify(downstreamChannelPool, times(1)).release(channel); + } + + public void stubAcquireHealthySequence(Boolean... acquireHealthySequence) { + OngoingStubbing> stubbing = Mockito.when(downstreamChannelPool.acquire(any())); + for (boolean shouldAcquireBeHealthy : acquireHealthySequence) { + stubbing = stubbing.thenAnswer(invocation -> { + Promise promise = invocation.getArgumentAt(0, Promise.class); + Channel channel = Mockito.mock(Channel.class); + Mockito.when(channel.isActive()).thenReturn(shouldAcquireBeHealthy); + channels.add(channel); + promise.setSuccess(channel); + return promise; + }); + } + } + + public void stubBadDownstreamAcquire() { + Mockito.when(downstreamChannelPool.acquire(any())).thenAnswer(invocation -> { + Promise promise = invocation.getArgumentAt(0, Promise.class); + promise.setFailure(new IOException()); + return promise; + }); + } + + public void stubIncompleteDownstreamAcquire() { + Mockito.when(downstreamChannelPool.acquire(any())).thenAnswer(invocation -> invocation.getArgumentAt(0, Promise.class)); + } + + public void stubForIgnoredTimeout() { + Mockito.when(eventLoopGroup.schedule(any(Runnable.class), anyLong(), any())) + .thenAnswer(i -> scheduledFuture); + } +} \ No newline at end of file