Skip to content

Fixed a bug in the netty client, where a future may not always be completed. #1217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-0d76926.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "AWS SDK for Java v2",
"type": "bugfix",
"description": "Fixed a bug in asynchronous clients, where a service closing a connection between when a channel is acquired and handlers are attached could lead to response futures never being completed. Fixes [#1207](https://github.com/aws/aws-sdk-java-v2/issues/1207)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool;
import software.amazon.awssdk.http.nio.netty.internal.ChannelPipelineInitializer;
import software.amazon.awssdk.http.nio.netty.internal.HandlerRemovingChannelPool;
import software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool;
import software.amazon.awssdk.http.nio.netty.internal.HonorCloseOnReleaseChannelPool;
import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
import software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor;
Expand Down Expand Up @@ -220,6 +221,10 @@ private ChannelPool createChannelPool(Bootstrap bootstrap, ChannelPipelineInitia
// Wrap the channel pool such that an individual channel can only be released to the underlying pool once.
channelPool = new ReleaseOnceChannelPool(channelPool);

// Wrap the channel pool to guarantee all channels checked out are healthy, and all unhealthy channels checked in are
// closed.
channelPool = new HealthCheckedChannelPool(bootstrap.config().group(), configuration, channelPool);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should switch with CancellableAcquireChannelPool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


// Wrap the channel pool such that if the Promise given to acquire(Promise) is done when the channel is acquired
// from the underlying pool, the channel is closed and released.
channelPool = new CancellableAcquireChannelPool(bootstrap.config().group().next(), channelPool);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import software.amazon.awssdk.annotations.SdkInternalApi;

/**
* An implementation of {@link ChannelPool} that validates the health of its connections.
*
* This wraps another {@code ChannelPool}, and verifies:
* <ol>
* <li>All connections acquired from the underlying channel pool are in the active state.</li>
* <li>All connections released into the underlying pool that are not active, are closed before they are released.</li>
* </ol>
*
* Acquisitions that fail due to an unhealthy underlying channel are retried until a healthy channel can be returned, or the
* {@link NettyConfiguration#connectionAcquireTimeoutMillis()} timeout is reached.
*/
@SdkInternalApi
public class HealthCheckedChannelPool implements ChannelPool {
private final EventLoopGroup eventLoopGroup;
private final int acquireTimeoutMillis;
private final ChannelPool delegate;

public HealthCheckedChannelPool(EventLoopGroup eventLoopGroup,
NettyConfiguration configuration,
ChannelPool delegate) {
this.eventLoopGroup = eventLoopGroup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to use the whole group instead of a single loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Load balancing? It didn't seem like it would be terrible to try to better balance things across the whole group instead of an individual event loop.

this.acquireTimeoutMillis = configuration.connectionAcquireTimeoutMillis();
this.delegate = delegate;
}

@Override
public Future<Channel> acquire() {
return acquire(eventLoopGroup.next().newPromise());
}

@Override
public Future<Channel> acquire(Promise<Channel> resultFuture) {
// Schedule a task to time out this acquisition, in case we can't acquire a channel fast enough.
ScheduledFuture<?> timeoutFuture =
eventLoopGroup.schedule(() -> timeoutAcquire(resultFuture), acquireTimeoutMillis, TimeUnit.MILLISECONDS);

tryAcquire(resultFuture, timeoutFuture);
return resultFuture;
}

/**
* Time out the provided acquire future, if it hasn't already been completed.
*/
private void timeoutAcquire(Promise<Channel> resultFuture) {
resultFuture.tryFailure(new TimeoutException("Acquire operation took longer than " + acquireTimeoutMillis +
" milliseconds."));
}

/**
* Try to acquire a channel from the underlying pool. This will keep retrying the acquisition until the provided result
* future is completed.
*
* @param resultFuture The future that should be completed with the acquired channel. If this is completed external to this
* function, this function will stop trying to acquire a channel.
* @param timeoutFuture The future for the timeout task. This future will be cancelled when a channel is acquired.
*/
private void tryAcquire(Promise<Channel> resultFuture, ScheduledFuture<?> timeoutFuture) {
// Something else completed the future (probably a timeout). Stop trying to get a channel.
if (resultFuture.isDone()) {
return;
}

Promise<Channel> delegateFuture = eventLoopGroup.next().newPromise();
delegate.acquire(delegateFuture);
delegateFuture.addListener(f -> ensureAcquiredChannelIsHealthy(delegateFuture, resultFuture, timeoutFuture));
}

/**
* Validate that the channel returned by the underlying channel pool is healthy. If so, complete the result future with the
* channel returned by the underlying pool. If not, close the channel and try to get a different one.
*
* @param delegateFuture A completed promise as a result of invoking delegate.acquire().
* @param resultFuture The future that should be completed with the healthy, acquired channel.
* @param timeoutFuture The future for the timeout task. This future will be cancelled when a channel is acquired.
*/
private void ensureAcquiredChannelIsHealthy(Promise<Channel> delegateFuture,
Promise<Channel> resultFuture,
ScheduledFuture<?> timeoutFuture) {
// If our delegate failed to connect, forward down the failure. Don't try again.
if (!delegateFuture.isSuccess()) {
timeoutFuture.cancel(false);
resultFuture.tryFailure(delegateFuture.cause());
return;
}

// If our delegate gave us an unhealthy connection, close it and try to get a new one.
Channel channel = delegateFuture.getNow();
if (!isHealthy(channel)) {
channel.close();
delegate.release(channel);
tryAcquire(resultFuture, timeoutFuture);
return;
}

// Cancel the timeout (best effort), and return back the healthy channel.
timeoutFuture.cancel(false);
if (!resultFuture.trySuccess(channel)) {
// If we couldn't give the channel to the result future (because it failed for some other reason),
// just return it to the pool.
release(channel);
}
}

@Override
public Future<Void> release(Channel channel) {
closeIfUnhealthy(channel);
return delegate.release(channel);
}

@Override
public Future<Void> release(Channel channel, Promise<Void> promise) {
closeIfUnhealthy(channel);
return delegate.release(channel, promise);
}

@Override
public void close() {
delegate.close();
}

/**
* Close the provided channel, if it's considered unhealthy.
*/
private void closeIfUnhealthy(Channel channel) {
if (!isHealthy(channel)) {
channel.close();
}
}

/**
* Determine whether the provided channel is 'healthy' enough to use.
*/
private boolean isHealthy(Channel channel) {
return channel.isActive();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ private void makeRequestListener(Future<Channel> channelFuture) {
if (channelFuture.isSuccess()) {
channel = channelFuture.getNow();
configureChannel();
configurePipeline();
makeRequest();
if (tryConfigurePipeline()) {
makeRequest();
}
} else {
handleFailure(() -> "Failed to create connection to " + endpoint(), channelFuture.cause());
}
Expand All @@ -146,7 +147,7 @@ private void configureChannel() {
channel.config().setOption(ChannelOption.AUTO_READ, false);
}

private void configurePipeline() {
private boolean tryConfigurePipeline() {
Protocol protocol = ChannelAttributeKey.getProtocolNow(channel);
ChannelPipeline pipeline = channel.pipeline();
if (HTTP2.equals(protocol)) {
Expand All @@ -156,10 +157,23 @@ private void configurePipeline() {
String errorMsg = "Unknown protocol: " + protocol;
closeAndRelease(channel);
handleFailure(() -> errorMsg, new RuntimeException(errorMsg));
return;
return false;
}

pipeline.addLast(new HttpStreamsClientHandler());
pipeline.addLast(ResponseHandler.getInstance());

// It's possible that the channel could become inactive between checking it out from the pool, and adding our response
// handler (which will monitor for it going inactive from now on).
// Make sure it's active here, or the request will never complete: https://github.com/aws/aws-sdk-java-v2/issues/1207
if (!channel.isActive()) {
String errorMessage = "Channel was closed before it could be written to.";
closeAndRelease(channel);
handleFailure(() -> errorMessage, new IOException(errorMessage));
return false;
}

return true;
}

private void makeRequest() {
Expand Down
Loading