Skip to content

Awaits EventLoopGroup Shutdown when closing Netty client #1069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "Netty Nio HTTP Client",
"type": "bugfix",
"description": "Awaits `EventLoopGroup#shutdownGracefully` to complete when closing Netty client."
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ private boolean hasPublicStaticField(DetailAST ast) {
DetailAST classBody = ast.findFirstToken(TokenTypes.OBJBLOCK);
DetailAST maybeVariableDefinition = classBody.getFirstChild();

String className = ast.findFirstToken(TokenTypes.IDENT).getText();

// Filter out util classes
if (className.endsWith("Utils")) {
return false;
}

while (maybeVariableDefinition != null) {
if (maybeVariableDefinition.getType() == TokenTypes.VARIABLE_DEF) {
DetailAST modifiers = maybeVariableDefinition.findFirstToken(TokenTypes.MODIFIERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.REAP_IDLE_CONNECTIONS;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.WRITE_TIMEOUT;
import static software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration.EVENTLOOP_SHUTDOWN_FUTURE_TIMEOUT_SECONDS;
import static software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration.EVENTLOOP_SHUTDOWN_QUIET_PERIOD_SECONDS;
import static software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration.EVENTLOOP_SHUTDOWN_TIMEOUT_SECONDS;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError;

Expand All @@ -40,6 +43,9 @@
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
Expand Down Expand Up @@ -212,7 +218,23 @@ private SdkEventLoopGroup nonManagedEventLoopGroup(SdkEventLoopGroup eventLoopGr
@Override
public void close() {
runAndLogError(log, "Unable to close channel pools", pools::close);
runAndLogError(log, "Unable to shutdown event loop", sdkEventLoopGroup.eventLoopGroup()::shutdownGracefully);
runAndLogError(log, "Unable to shutdown event loop", () ->
closeEventLoopUninterruptibly(sdkEventLoopGroup.eventLoopGroup()));
}

private void closeEventLoopUninterruptibly(EventLoopGroup eventLoopGroup) throws ExecutionException {
try {
eventLoopGroup.shutdownGracefully(EVENTLOOP_SHUTDOWN_QUIET_PERIOD_SECONDS,
EVENTLOOP_SHUTDOWN_TIMEOUT_SECONDS,
TimeUnit.SECONDS)
.get(EVENTLOOP_SHUTDOWN_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(String.format("Shutting down Netty EventLoopGroup did not complete within %s seconds",
EVENTLOOP_SHUTDOWN_FUTURE_TIMEOUT_SECONDS));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration.EVENTLOOP_SHUTDOWN_QUIET_PERIOD_SECONDS;
import static software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration.EVENTLOOP_SHUTDOWN_TIMEOUT_SECONDS;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -58,7 +61,7 @@ public boolean isShuttingDown() {

@Override
public Future<?> shutdownGracefully() {
return delegate.shutdownGracefully();
return shutdownGracefully(EVENTLOOP_SHUTDOWN_QUIET_PERIOD_SECONDS, EVENTLOOP_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
*/
@SdkInternalApi
public final class NettyConfiguration {

public static final int EVENTLOOP_SHUTDOWN_QUIET_PERIOD_SECONDS = 2;
public static final int EVENTLOOP_SHUTDOWN_TIMEOUT_SECONDS = 15;
public static final int EVENTLOOP_SHUTDOWN_FUTURE_TIMEOUT_SECONDS = 16;

private final AttributeMap configuration;

public NettyConfiguration(AttributeMap configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.SUCCEEDED_FUTURE;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import software.amazon.awssdk.annotations.SdkInternalApi;

/**
Expand All @@ -31,8 +34,7 @@ public NonManagedEventLoopGroup(EventLoopGroup delegate) {
}

@Override
public Future<?> shutdownGracefully() {
return null;
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return SUCCEEDED_FUTURE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.SUCCEEDED_FUTURE;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
Expand Down Expand Up @@ -46,26 +49,38 @@ private SharedSdkEventLoopGroup() {
/**
* @return The default {@link SdkEventLoopGroup} that will be shared across all service clients.
* This is used when the customer does not specify a custom {@link SdkEventLoopGroup} or {@link SdkEventLoopGroup.Builder}.
* Each SdkEventLoopGroup returned is wrapped with a new {@link ReferenceCountingEventLoopGroup}.
*/
@SdkInternalApi
public static synchronized SdkEventLoopGroup get() {
if (sharedSdkEventLoopGroup == null) {
sharedSdkEventLoopGroup = SdkEventLoopGroup.builder().build();
}

referenceCount++;
return SdkEventLoopGroup.create(new ReferenceCountingEventLoopGroup(sharedSdkEventLoopGroup.eventLoopGroup()),
sharedSdkEventLoopGroup.channelFactory());
}

/**
* Decrement the reference count and close if necessary.
*
* @param quietPeriod the quite period to use
* @param timeout the timeout to use
* @param unit the time unit
*
* @return the close future. If the shared event loop group is still being used, return a completed close future,
* otherwise return the future from {@link EventLoopGroup#shutdownGracefully(long, long, TimeUnit)};
*/
private static synchronized void decrementReference() {
private static synchronized Future<?> decrementReference(long quietPeriod, long timeout, TimeUnit unit) {
referenceCount--;
if (referenceCount == 0) {
sharedSdkEventLoopGroup.eventLoopGroup().shutdownGracefully();
Future<?> shutdownGracefully =
sharedSdkEventLoopGroup.eventLoopGroup().shutdownGracefully(quietPeriod, timeout, unit);
sharedSdkEventLoopGroup = null;
return shutdownGracefully;
}
return SUCCEEDED_FUTURE;
}

@SdkTestInternalApi
Expand All @@ -86,13 +101,13 @@ private ReferenceCountingEventLoopGroup(EventLoopGroup delegate) {
}

@Override
public Future<?> shutdownGracefully() {
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
// Only want to decrement the reference the first time it's closed. Shutdown is idempotent and may be
// called multiple times.
if (hasBeenClosed.compareAndSet(false, true)) {
decrementReference();
return decrementReference(quietPeriod, timeout, unit);
}
return null;
return SUCCEEDED_FUTURE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.SucceededFuture;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand All @@ -27,6 +28,11 @@
@SdkInternalApi
public final class NettyUtils {

/**
* Completed succeed future.
*/
public static final SucceededFuture<?> SUCCEEDED_FUTURE = new SucceededFuture<>(null, null);

private NettyUtils() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,19 @@ public void customChannelFactoryIsUsed() throws Exception {
ChannelFactory channelFactory = mock(ChannelFactory.class);

when(channelFactory.newChannel()).thenAnswer((Answer<NioSocketChannel>) invocationOnMock -> new NioSocketChannel());
EventLoopGroup customEventLoopGroup = new NioEventLoopGroup();

SdkAsyncHttpClient customClient =
NettyNioAsyncHttpClient.builder()
.eventLoopGroup(SdkEventLoopGroup.create(new NioEventLoopGroup(), channelFactory))
.eventLoopGroup(SdkEventLoopGroup.create(customEventLoopGroup, channelFactory))
.build();

makeSimpleRequest(customClient);
customClient.close();

Mockito.verify(channelFactory, atLeastOnce()).newChannel();
assertThat(customEventLoopGroup.isShuttingDown()).isFalse();
customEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
}

@Test
Expand All @@ -212,10 +215,12 @@ protected ChannelPool newPool(URI key) {
SdkChannelOptions channelOptions = new SdkChannelOptions();
NettyConfiguration nettyConfiguration = new NettyConfiguration(AttributeMap.empty());

SdkAsyncHttpClient client = new NettyNioAsyncHttpClient(eventLoopGroup, sdkChannelPoolMap, channelOptions, nettyConfiguration, 1);
SdkAsyncHttpClient customerClient =
new NettyNioAsyncHttpClient(eventLoopGroup, sdkChannelPoolMap, channelOptions, nettyConfiguration, 1);

client.close();
customerClient.close();
assertThat(eventLoopGroup.eventLoopGroup().isShuttingDown()).isTrue();
assertThat(eventLoopGroup.eventLoopGroup().isTerminated()).isTrue();
assertThat(sdkChannelPoolMap).isEmpty();
Mockito.verify(channelPool).close();
}
Expand Down