Skip to content

Commit 8777eb0

Browse files
committed
Add workaround to await channel pools to be closed before shutting down event loop group to avoid the race condition between channelPool.close and eventLoopGroup.shutdown
1 parent 19cb6d4 commit 8777eb0

File tree

8 files changed

+409
-120
lines changed

8 files changed

+409
-120
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "Netty NIO Async Http Client",
3+
"type": "bugfix",
4+
"description": "Add workaround to await channel pools to be closed before shutting down EventLoopGroup to avoid the race condition between `channelPool.close` and `eventLoopGroup.shutdown`. See [#1109](https://github.com/aws/aws-sdk-java-v2/issues/1109)."
5+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java

Lines changed: 20 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,17 @@
3030
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
3131
import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError;
3232

33-
import io.netty.bootstrap.Bootstrap;
3433
import io.netty.channel.ChannelOption;
3534
import io.netty.channel.EventLoopGroup;
3635
import io.netty.channel.pool.ChannelPool;
37-
import io.netty.channel.pool.SimpleChannelPool;
38-
import io.netty.handler.codec.http2.Http2SecurityUtil;
3936
import io.netty.handler.ssl.SslContext;
40-
import io.netty.handler.ssl.SslContextBuilder;
4137
import io.netty.handler.ssl.SslProvider;
42-
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
43-
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
4438
import java.net.URI;
4539
import java.time.Duration;
4640
import java.util.concurrent.CompletableFuture;
4741
import java.util.concurrent.ExecutionException;
4842
import java.util.concurrent.TimeUnit;
4943
import java.util.concurrent.TimeoutException;
50-
import java.util.concurrent.atomic.AtomicReference;
51-
import javax.net.ssl.SSLException;
52-
import javax.net.ssl.TrustManagerFactory;
5344
import org.slf4j.Logger;
5445
import org.slf4j.LoggerFactory;
5546
import software.amazon.awssdk.annotations.SdkPublicApi;
@@ -59,20 +50,14 @@
5950
import software.amazon.awssdk.http.SdkHttpRequest;
6051
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
6152
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
62-
import software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool;
63-
import software.amazon.awssdk.http.nio.netty.internal.ChannelPipelineInitializer;
64-
import software.amazon.awssdk.http.nio.netty.internal.HandlerRemovingChannelPool;
65-
import software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool;
66-
import software.amazon.awssdk.http.nio.netty.internal.HonorCloseOnReleaseChannelPool;
53+
import software.amazon.awssdk.http.nio.netty.internal.AwaitCloseChannelPoolMap;
6754
import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
6855
import software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor;
6956
import software.amazon.awssdk.http.nio.netty.internal.NonManagedEventLoopGroup;
70-
import software.amazon.awssdk.http.nio.netty.internal.ReleaseOnceChannelPool;
7157
import software.amazon.awssdk.http.nio.netty.internal.RequestContext;
7258
import software.amazon.awssdk.http.nio.netty.internal.SdkChannelOptions;
7359
import software.amazon.awssdk.http.nio.netty.internal.SdkChannelPoolMap;
7460
import software.amazon.awssdk.http.nio.netty.internal.SharedSdkEventLoopGroup;
75-
import software.amazon.awssdk.http.nio.netty.internal.http2.HttpOrHttp2ChannelPool;
7661
import software.amazon.awssdk.utils.AttributeMap;
7762
import software.amazon.awssdk.utils.Either;
7863
import software.amazon.awssdk.utils.Validate;
@@ -88,35 +73,31 @@ public final class NettyNioAsyncHttpClient implements SdkAsyncHttpClient {
8873
private static final long MAX_STREAMS_ALLOWED = 4294967295L; // unsigned 32-bit, 2^32 -1
8974

9075
private final SdkEventLoopGroup sdkEventLoopGroup;
91-
private final SdkChannelPoolMap<URI, ChannelPool> pools;
92-
private final SdkChannelOptions sdkChannelOptions;
76+
private final SdkChannelPoolMap<URI, ? extends ChannelPool> pools;
9377
private final NettyConfiguration configuration;
94-
private final long maxStreams;
95-
private SslProvider sslProvider;
96-
private Protocol protocol;
9778

98-
NettyNioAsyncHttpClient(DefaultBuilder builder, AttributeMap serviceDefaultsMap) {
79+
private NettyNioAsyncHttpClient(DefaultBuilder builder, AttributeMap serviceDefaultsMap) {
9980
this.configuration = new NettyConfiguration(serviceDefaultsMap);
100-
this.protocol = serviceDefaultsMap.get(SdkHttpConfigurationOption.PROTOCOL);
101-
this.maxStreams = builder.maxHttp2Streams == null ? MAX_STREAMS_ALLOWED : builder.maxHttp2Streams;
81+
Protocol protocol = serviceDefaultsMap.get(SdkHttpConfigurationOption.PROTOCOL);
82+
long maxStreams = builder.maxHttp2Streams == null ? MAX_STREAMS_ALLOWED : builder.maxHttp2Streams;
10283
this.sdkEventLoopGroup = eventLoopGroup(builder);
103-
this.pools = createChannelPoolMap();
104-
this.sdkChannelOptions = channelOptions(builder);
105-
this.sslProvider = resolveSslProvider(builder);
84+
this.pools = AwaitCloseChannelPoolMap.builder()
85+
.sdkChannelOptions(builder.sdkChannelOptions)
86+
.configuration(configuration)
87+
.protocol(protocol)
88+
.maxStreams(maxStreams)
89+
.sdkEventLoopGroup(sdkEventLoopGroup)
90+
.sslProvider(resolveSslProvider(builder))
91+
.build();
10692
}
10793

10894
@SdkTestInternalApi
10995
NettyNioAsyncHttpClient(SdkEventLoopGroup sdkEventLoopGroup,
110-
SdkChannelPoolMap<URI, ChannelPool> pools,
111-
SdkChannelOptions sdkChannelOptions,
112-
NettyConfiguration configuration,
113-
long maxStreams) {
96+
SdkChannelPoolMap<URI, ? extends ChannelPool> pools,
97+
NettyConfiguration configuration) {
11498
this.sdkEventLoopGroup = sdkEventLoopGroup;
11599
this.pools = pools;
116-
this.sdkChannelOptions = sdkChannelOptions;
117100
this.configuration = configuration;
118-
this.maxStreams = maxStreams;
119-
this.sslProvider = SslContext.defaultClientProvider();
120101
}
121102

122103
@Override
@@ -129,43 +110,24 @@ public static Builder builder() {
129110
return new DefaultBuilder();
130111
}
131112

132-
private SdkChannelOptions channelOptions(DefaultBuilder builder) {
133-
return builder.sdkChannelOptions;
134-
}
135-
136113
private RequestContext createRequestContext(AsyncExecuteRequest request) {
137114
ChannelPool pool = pools.get(poolKey(request.request()));
138115
return new RequestContext(pool, sdkEventLoopGroup.eventLoopGroup(), request, configuration);
139116
}
140117

141118
private SdkEventLoopGroup eventLoopGroup(DefaultBuilder builder) {
142119
Validate.isTrue(builder.eventLoopGroup == null || builder.eventLoopGroupBuilder == null,
143-
"The eventLoopGroup and the eventLoopGroupFactory can't both be configured.");
120+
"The eventLoopGroup and the eventLoopGroupFactory can't both be configured.");
144121
return Either.fromNullable(builder.eventLoopGroup, builder.eventLoopGroupBuilder)
145-
.map(e -> e.map(this::nonManagedEventLoopGroup, SdkEventLoopGroup.Builder::build))
146-
.orElseGet(SharedSdkEventLoopGroup::get);
122+
.map(e -> e.map(this::nonManagedEventLoopGroup, SdkEventLoopGroup.Builder::build))
123+
.orElseGet(SharedSdkEventLoopGroup::get);
147124
}
148125

149126
private static URI poolKey(SdkHttpRequest sdkRequest) {
150127
return invokeSafely(() -> new URI(sdkRequest.protocol(), null, sdkRequest.host(),
151128
sdkRequest.port(), null, null, null));
152129
}
153130

154-
private SslContext sslContext(String protocol) {
155-
if (!protocol.equalsIgnoreCase("https")) {
156-
return null;
157-
}
158-
try {
159-
return SslContextBuilder.forClient()
160-
.sslProvider(sslProvider)
161-
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
162-
.trustManager(getTrustManager())
163-
.build();
164-
} catch (SSLException e) {
165-
throw new RuntimeException(e);
166-
}
167-
}
168-
169131
private SslProvider resolveSslProvider(DefaultBuilder builder) {
170132
if (builder.sslProvider != null) {
171133
return builder.sslProvider;
@@ -174,64 +136,6 @@ private SslProvider resolveSslProvider(DefaultBuilder builder) {
174136
return SslContext.defaultClientProvider();
175137
}
176138

177-
private TrustManagerFactory getTrustManager() {
178-
return configuration.trustAllCertificates() ? InsecureTrustManagerFactory.INSTANCE : null;
179-
}
180-
181-
private SdkChannelPoolMap<URI, ChannelPool> createChannelPoolMap() {
182-
return new SdkChannelPoolMap<URI, ChannelPool>() {
183-
@Override
184-
protected ChannelPool newPool(URI key) {
185-
SslContext sslContext = sslContext(key.getScheme());
186-
Bootstrap bootstrap =
187-
new Bootstrap()
188-
.group(sdkEventLoopGroup.eventLoopGroup())
189-
.channelFactory(sdkEventLoopGroup.channelFactory())
190-
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeoutMillis())
191-
// TODO run some performance tests with and without this.
192-
.remoteAddress(key.getHost(), key.getPort());
193-
sdkChannelOptions.channelOptions().forEach(bootstrap::option);
194-
195-
AtomicReference<ChannelPool> channelPoolRef = new AtomicReference<>();
196-
ChannelPipelineInitializer handler =
197-
new ChannelPipelineInitializer(protocol, sslContext, maxStreams, channelPoolRef, configuration, key);
198-
channelPoolRef.set(createChannelPool(bootstrap, handler));
199-
return channelPoolRef.get();
200-
}
201-
};
202-
}
203-
204-
private ChannelPool createChannelPool(Bootstrap bootstrap, ChannelPipelineInitializer handler) {
205-
// Create a simple channel pool for pooling raw TCP connections to the service.
206-
ChannelPool channelPool = new SimpleChannelPool(bootstrap, handler);
207-
208-
// Wrap the channel pool such that the ChannelAttributeKey.CLOSE_ON_RELEASE flag is honored.
209-
channelPool = new HonorCloseOnReleaseChannelPool(channelPool);
210-
211-
// Wrap the channel pool such that HTTP 2 channels won't be released to the underlying pool while they're still in use.
212-
channelPool = new HttpOrHttp2ChannelPool(channelPool,
213-
bootstrap.config().group(),
214-
configuration.maxConnections(),
215-
configuration);
216-
217-
218-
// Wrap the channel pool such that we remove request-specific handlers with each request.
219-
channelPool = new HandlerRemovingChannelPool(channelPool);
220-
221-
// Wrap the channel pool such that an individual channel can only be released to the underlying pool once.
222-
channelPool = new ReleaseOnceChannelPool(channelPool);
223-
224-
// Wrap the channel pool to guarantee all channels checked out are healthy, and all unhealthy channels checked in are
225-
// closed.
226-
channelPool = new HealthCheckedChannelPool(bootstrap.config().group(), configuration, channelPool);
227-
228-
// Wrap the channel pool such that if the Promise given to acquire(Promise) is done when the channel is acquired
229-
// from the underlying pool, the channel is closed and released.
230-
channelPool = new CancellableAcquireChannelPool(bootstrap.config().group().next(), channelPool);
231-
232-
return channelPool;
233-
}
234-
235139
private SdkEventLoopGroup nonManagedEventLoopGroup(SdkEventLoopGroup eventLoopGroup) {
236140
return SdkEventLoopGroup.create(new NonManagedEventLoopGroup(eventLoopGroup.eventLoopGroup()),
237141
eventLoopGroup.channelFactory());
@@ -254,8 +158,8 @@ private void closeEventLoopUninterruptibly(EventLoopGroup eventLoopGroup) throws
254158
Thread.currentThread().interrupt();
255159
throw new RuntimeException(e);
256160
} catch (TimeoutException e) {
257-
throw new RuntimeException(String.format("Shutting down Netty EventLoopGroup did not complete within %s seconds",
258-
EVENTLOOP_SHUTDOWN_FUTURE_TIMEOUT_SECONDS));
161+
log.error(String.format("Shutting down Netty EventLoopGroup did not complete within %s seconds",
162+
EVENTLOOP_SHUTDOWN_FUTURE_TIMEOUT_SECONDS));
259163
}
260164
}
261165

0 commit comments

Comments
 (0)