diff --git a/.changes/next-release/feature-AWSSDKforJavav2-dbf32f0.json b/.changes/next-release/feature-AWSSDKforJavav2-dbf32f0.json new file mode 100644 index 000000000000..78180ec1a407 --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-dbf32f0.json @@ -0,0 +1,5 @@ +{ + "category": "AWS SDK for Java v2", + "type": "feature", + "description": "Add support for `connectionTimeToLive`, `connectionMaxIdleTime` and `useIdleConnectionReaper` to the netty HTTP client." +} diff --git a/.changes/next-release/feature-AWSSDKforJavav2-f81a93c.json b/.changes/next-release/feature-AWSSDKforJavav2-f81a93c.json new file mode 100644 index 000000000000..dc07f4e3ffdf --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-f81a93c.json @@ -0,0 +1,5 @@ +{ + "category": "AWS SDK for Java v2", + "type": "feature", + "description": "Enable `useIdleConnectionReaper` by default for Netty and Apache." +} diff --git a/docs/LaunchChangelog.md b/docs/LaunchChangelog.md index bb3fd57c0fc8..4895d1c9c5a8 100644 --- a/docs/LaunchChangelog.md +++ b/docs/LaunchChangelog.md @@ -219,12 +219,12 @@ DynamoDbAsyncClient client = | Max Connections | `clientConfig.setMaxConnections(...)`
`clientConfig.withMaxConnections(...)` | `httpClientBuilder.maxConnections(...)` | `httpClientBuilder.maxConcurrency(...)` | | Connection Timeout | `clientConfig.setConnectionTimeout(...)`
`clientConfig.withConnectionTimeout(...)` | `httpClientBuilder.connectionTimeout(...)` | `httpClientBuilder.connectionTimeout(...)` | | Socket Timeout | `clientConfig.setSocketTimeout(...)`
`clientConfig.withSocketTimeout(...)` | `httpClientBuilder.socketTimeout(...)` | `httpClientBuilder.writeTimeout(...)`
`httpClientBuilder.readTimeout(...)` | -| Connection TTL | `clientConfig.setConnectionTTL(...)`
`clientConfig.withConnectionTTL(...)` | `httpClientBuilder.connectionTimeToLive(...)` | [Not Supported](https://github.com/aws/aws-sdk-java-v2/issues/856) | -| Connection Max Idle | `clientConfig.setConnectionMaxIdleMillis(...)`
`clientConfig.withConnectionMaxIdleMillis(...)` | `httpClientBuilder.connectionMaxIdleTime(...)` | [Not Supported](https://github.com/aws/aws-sdk-java-v2/issues/856) | +| Connection TTL | `clientConfig.setConnectionTTL(...)`
`clientConfig.withConnectionTTL(...)` | `httpClientBuilder.connectionTimeToLive(...)` | `httpClientBuilder.connectionTimeToLive(...)` | +| Connection Max Idle | `clientConfig.setConnectionMaxIdleMillis(...)`
`clientConfig.withConnectionMaxIdleMillis(...)` | `httpClientBuilder.connectionMaxIdleTime(...)` | `httpClientBuilder.connectionMaxIdleTime(...)` | | Validate After Inactivity | `clientConfig.setValidateAfterInactivityMillis(...)`
`clientConfig.withValidateAfterInactivityMillis(...)` | Not Supported ([Request Feature](https://github.com/aws/aws-sdk-java-v2/issues/new)) | Not Supported ([Request Feature](https://github.com/aws/aws-sdk-java-v2/issues/new)) | | Local Address | `clientConfig.setLocalAddress(...)`
`clientConfig.withLocalAddress(...)` | `httpClientBuilder.localAddress(...)` | [Not Supported](https://github.com/aws/aws-sdk-java-v2/issues/857) | | Expect-Continue Enabled | `clientConfig.setUseExpectContinue(...)`
`clientConfig.withUseExpectContinue(...)` | `httpClientBuilder.expectContinueEnabled(...)` | Not Supported ([Request Feature](https://github.com/aws/aws-sdk-java-v2/issues/new)) | -| Connection Reaper | `clientConfig.setUseReaper(...)`
`clientConfig.withReaper(...)` | `httpClientBuilder.useIdleConnectionReaper(...)` | [Not Supported](https://github.com/aws/aws-sdk-java-v2/issues/856) | +| Connection Reaper | `clientConfig.setUseReaper(...)`
`clientConfig.withReaper(...)` | `httpClientBuilder.useIdleConnectionReaper(...)` | `httpClientBuilder.useIdleConnectionReaper(...)` | | | `AmazonDynamoDBClientBuilder.standard()`
`.withClientConfiguration(clientConfiguration)`
`.build()` | `DynamoDbClient.builder()`
`.httpClientBuilder(httpClientBuilder)`
`.build()` | `DynamoDbAsyncClient.builder()`
`.httpClientBuilder(httpClientBuilder)`
`.build()` | diff --git a/http-client-spi/src/main/java/software/amazon/awssdk/http/SdkHttpConfigurationOption.java b/http-client-spi/src/main/java/software/amazon/awssdk/http/SdkHttpConfigurationOption.java index 2df02ad154a8..76b30a183d3e 100644 --- a/http-client-spi/src/main/java/software/amazon/awssdk/http/SdkHttpConfigurationOption.java +++ b/http-client-spi/src/main/java/software/amazon/awssdk/http/SdkHttpConfigurationOption.java @@ -53,6 +53,19 @@ public final class SdkHttpConfigurationOption extends AttributeMap.Key { public static final SdkHttpConfigurationOption CONNECTION_ACQUIRE_TIMEOUT = new SdkHttpConfigurationOption<>("ConnectionAcquireTimeout", Duration.class); + /** + * Timeout after which an idle connection should be closed. + */ + public static final SdkHttpConfigurationOption CONNECTION_MAX_IDLE_TIMEOUT = + new SdkHttpConfigurationOption<>("ConnectionMaxIdleTimeout", Duration.class); + + /** + * Timeout after which a connection should be closed, regardless of whether it is idle. Zero indicates an infinite amount + * of time. + */ + public static final SdkHttpConfigurationOption CONNECTION_TIME_TO_LIVE = + new SdkHttpConfigurationOption<>("ConnectionTimeToLive", Duration.class); + /** * Maximum number of connections allowed in a connection pool. */ @@ -64,6 +77,7 @@ public final class SdkHttpConfigurationOption extends AttributeMap.Key { */ public static final SdkHttpConfigurationOption PROTOCOL = new SdkHttpConfigurationOption<>("Protocol", Protocol.class); + /** * Maximum number of requests allowed to wait for a connection. */ @@ -77,10 +91,19 @@ public final class SdkHttpConfigurationOption extends AttributeMap.Key { public static final SdkHttpConfigurationOption TRUST_ALL_CERTIFICATES = new SdkHttpConfigurationOption<>("TrustAllCertificates", Boolean.class); + /** + * Whether idle connection should be removed after the {@link #CONNECTION_MAX_IDLE_TIMEOUT} has passed. + */ + public static final SdkHttpConfigurationOption REAP_IDLE_CONNECTIONS = + new SdkHttpConfigurationOption<>("ReapIdleConnections", Boolean.class); + private static final Duration DEFAULT_SOCKET_READ_TIMEOUT = Duration.ofSeconds(30); private static final Duration DEFAULT_SOCKET_WRITE_TIMEOUT = Duration.ofSeconds(30); private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2); private static final Duration DEFAULT_CONNECTION_ACQUIRE_TIMEOUT = Duration.ofSeconds(10); + private static final Duration DEFAULT_CONNECTION_MAX_IDLE_TIMEOUT = Duration.ofSeconds(60); + private static final Duration DEFAULT_CONNECTION_TIME_TO_LIVE = Duration.ZERO; + private static final Boolean DEFAULT_REAP_IDLE_CONNECTIONS = Boolean.TRUE; private static final int DEFAULT_MAX_CONNECTIONS = 50; private static final int DEFAULT_MAX_CONNECTION_ACQUIRES = 10_000; private static final Boolean DEFAULT_TRUST_ALL_CERTIFICATES = Boolean.FALSE; @@ -93,10 +116,13 @@ public final class SdkHttpConfigurationOption extends AttributeMap.Key { .put(WRITE_TIMEOUT, DEFAULT_SOCKET_WRITE_TIMEOUT) .put(CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT) .put(CONNECTION_ACQUIRE_TIMEOUT, DEFAULT_CONNECTION_ACQUIRE_TIMEOUT) + .put(CONNECTION_MAX_IDLE_TIMEOUT, DEFAULT_CONNECTION_MAX_IDLE_TIMEOUT) + .put(CONNECTION_TIME_TO_LIVE, DEFAULT_CONNECTION_TIME_TO_LIVE) .put(MAX_CONNECTIONS, DEFAULT_MAX_CONNECTIONS) .put(MAX_PENDING_CONNECTION_ACQUIRES, DEFAULT_MAX_CONNECTION_ACQUIRES) .put(PROTOCOL, DEFAULT_PROTOCOL) .put(TRUST_ALL_CERTIFICATES, DEFAULT_TRUST_ALL_CERTIFICATES) + .put(REAP_IDLE_CONNECTIONS, DEFAULT_REAP_IDLE_CONNECTIONS) .build(); private final String name; diff --git a/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/ApacheHttpClient.java b/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/ApacheHttpClient.java index 06fe8976a757..a1ad7a8bf0ce 100644 --- a/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/ApacheHttpClient.java +++ b/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/ApacheHttpClient.java @@ -19,7 +19,9 @@ import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.MAX_CONNECTIONS; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT; @@ -134,7 +136,7 @@ private ConnectionManagerAwareHttpClient createClient(ApacheHttpClient.DefaultBu builder.setRequestExecutor(new HttpRequestExecutor()) // SDK handles decompression .disableContentCompression() - .setKeepAliveStrategy(buildKeepAliveStrategy(configuration)) + .setKeepAliveStrategy(buildKeepAliveStrategy(standardOptions)) .disableRedirectHandling() .disableAutomaticRetries() .setUserAgent("") // SDK will set the user agent header in the pipeline. Don't let Apache waste time @@ -144,7 +146,7 @@ private ConnectionManagerAwareHttpClient createClient(ApacheHttpClient.DefaultBu if (useIdleConnectionReaper(configuration)) { IdleConnectionReaper.getInstance().registerConnectionManager( - cm, connectionMaxIdleTime(configuration).toMillis()); + cm, resolvedOptions.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis()); } return new ApacheSdkHttpClient(builder.build(), cm); @@ -167,16 +169,11 @@ private void addProxyConfig(HttpClientBuilder builder, } } - private ConnectionKeepAliveStrategy buildKeepAliveStrategy(ApacheHttpClient.DefaultBuilder configuration) { - long maxIdle = connectionMaxIdleTime(configuration).toMillis(); + private ConnectionKeepAliveStrategy buildKeepAliveStrategy(AttributeMap standardOptions) { + long maxIdle = standardOptions.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis(); return maxIdle > 0 ? new SdkConnectionKeepAliveStrategy(maxIdle) : null; } - private Duration connectionMaxIdleTime(DefaultBuilder configuration) { - return Optional.ofNullable(configuration.connectionMaxIdleTime) - .orElse(DefaultConfiguration.MAX_IDLE_CONNECTION_TIME); - } - private boolean useIdleConnectionReaper(DefaultBuilder configuration) { return Boolean.TRUE.equals(configuration.useIdleConnectionReaper); } @@ -336,7 +333,7 @@ public interface Builder extends SdkHttpClient.Builder * Configure whether the idle connections in the connection pool should be closed asynchronously. *

* When enabled, connections left idling for longer than {@link #connectionMaxIdleTime(Duration)} will be - * closed. If no value is set, the default value of {@link DefaultConfiguration#MAX_IDLE_CONNECTION_TIME} is used. + * closed. This will not close connections currently in use. By default, this is enabled. */ Builder useIdleConnectionReaper(Boolean useConnectionReaper); } @@ -346,8 +343,6 @@ private static final class DefaultBuilder implements Builder { private ProxyConfiguration proxyConfiguration = ProxyConfiguration.builder().build(); private InetAddress localAddress; private Boolean expectContinueEnabled; - private Duration connectionTimeToLive; - private Duration connectionMaxIdleTime; private Boolean useIdleConnectionReaper; private DefaultBuilder() { @@ -431,7 +426,7 @@ public void setExpectContinueEnabled(Boolean useExpectContinue) { @Override public Builder connectionTimeToLive(Duration connectionTimeToLive) { - this.connectionTimeToLive = connectionTimeToLive; + standardOptions.put(CONNECTION_TIME_TO_LIVE, connectionTimeToLive); return this; } @@ -441,7 +436,7 @@ public void setConnectionTimeToLive(Duration connectionTimeToLive) { @Override public Builder connectionMaxIdleTime(Duration maxIdleConnectionTimeout) { - this.connectionMaxIdleTime = maxIdleConnectionTimeout; + standardOptions.put(CONNECTION_MAX_IDLE_TIMEOUT, maxIdleConnectionTimeout); return this; } @@ -478,9 +473,7 @@ public HttpClientConnectionManager create(ApacheHttpClient.DefaultBuilder config null, DefaultSchemePortResolver.INSTANCE, null, - Optional.ofNullable(configuration.connectionTimeToLive) - .orElse(DefaultConfiguration.CONNECTION_POOL_TTL) - .toMillis(), + standardOptions.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE).toMillis(), TimeUnit.MILLISECONDS); cm.setDefaultMaxPerRoute(standardOptions.get(SdkHttpConfigurationOption.MAX_CONNECTIONS)); diff --git a/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/internal/DefaultConfiguration.java b/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/internal/DefaultConfiguration.java index 81bf04c51e29..9f9fb987814c 100644 --- a/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/internal/DefaultConfiguration.java +++ b/http-clients/apache-client/src/main/java/software/amazon/awssdk/http/apache/internal/DefaultConfiguration.java @@ -15,7 +15,6 @@ package software.amazon.awssdk.http.apache.internal; -import java.time.Duration; import software.amazon.awssdk.annotations.SdkInternalApi; /** @@ -23,19 +22,6 @@ */ @SdkInternalApi public final class DefaultConfiguration { - - /** - * The default maximum idle time (in milliseconds) for a connection to be idle in the connection pool and - * still be eligible for reuse. - */ - public static final Duration MAX_IDLE_CONNECTION_TIME = Duration.ofSeconds(60); - - /** - * The default expiration time for a connection in the connection pool. - * A value of -1 means infinite TTL in Apache. - */ - public static final Duration CONNECTION_POOL_TTL = Duration.ofMillis(-1); - public static final Boolean EXPECT_CONTINUE_ENABLED = Boolean.TRUE; private DefaultConfiguration() { diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java index f06a589a4f0c..fe2352ad8f34 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java @@ -16,10 +16,13 @@ package software.amazon.awssdk.http.nio.netty; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.MAX_CONNECTIONS; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.MAX_PENDING_CONNECTION_ACQUIRES; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.REAP_IDLE_CONNECTIONS; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.WRITE_TIMEOUT; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError; @@ -28,6 +31,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.pool.ChannelPool; +import io.netty.channel.pool.SimpleChannelPool; import io.netty.handler.codec.http2.Http2SecurityUtil; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; @@ -39,7 +43,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLException; import javax.net.ssl.TrustManagerFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.annotations.SdkPublicApi; @@ -51,6 +54,7 @@ import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.internal.ChannelPipelineInitializer; import software.amazon.awssdk.http.nio.netty.internal.HandlerRemovingChannelPool; +import software.amazon.awssdk.http.nio.netty.internal.HonorCloseOnReleaseChannelPool; import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration; import software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor; import software.amazon.awssdk.http.nio.netty.internal.NonManagedEventLoopGroup; @@ -166,18 +170,39 @@ protected ChannelPool newPool(URI key) { // TODO run some performance tests with and without this. .remoteAddress(key.getHost(), key.getPort()); sdkChannelOptions.channelOptions().forEach(bootstrap::option); + AtomicReference channelPoolRef = new AtomicReference<>(); ChannelPipelineInitializer handler = - new ChannelPipelineInitializer(protocol, sslContext, maxStreams, channelPoolRef); - channelPoolRef.set(new ReleaseOnceChannelPool( - new HandlerRemovingChannelPool( - new HttpOrHttp2ChannelPool(bootstrap, handler, - configuration.maxConnections(), configuration)))); + new ChannelPipelineInitializer(protocol, sslContext, maxStreams, channelPoolRef, configuration); + channelPoolRef.set(createChannelPool(bootstrap, handler)); return channelPoolRef.get(); } }; } + private ChannelPool createChannelPool(Bootstrap bootstrap, ChannelPipelineInitializer handler) { + // Create a simple channel pool for pooling raw TCP connections to the service. + ChannelPool channelPool = new SimpleChannelPool(bootstrap, handler); + + // Wrap the channel pool such that the ChannelAttributeKey.CLOSE_ON_RELEASE flag is honored. + channelPool = new HonorCloseOnReleaseChannelPool(channelPool); + + // Wrap the channel pool such that HTTP 2 channels won't be released to the underlying pool while they're still in use. + channelPool = new HttpOrHttp2ChannelPool(channelPool, + bootstrap, + configuration.maxConnections(), + configuration); + + + // Wrap the channel pool such that we remove request-specific handlers with each request. + channelPool = new HandlerRemovingChannelPool(channelPool); + + // Wrap the channel pool such that an individual channel can only be released to the underlying pool once. + channelPool = new ReleaseOnceChannelPool(channelPool); + + return channelPool; + } + private SdkEventLoopGroup nonManagedEventLoopGroup(SdkEventLoopGroup eventLoopGroup) { return SdkEventLoopGroup.create(new NonManagedEventLoopGroup(eventLoopGroup.eventLoopGroup()), eventLoopGroup.channelFactory()); @@ -250,6 +275,32 @@ public interface Builder extends SdkAsyncHttpClient.Builder + * When enabled, connections left idling for longer than {@link #connectionMaxIdleTime(Duration)} will be + * closed. This will not close connections currently in use. By default, this is enabled. + */ + Builder useIdleConnectionReaper(Boolean useConnectionReaper); + /** * Sets the {@link SdkEventLoopGroup} to use for the Netty HTTP client. This event loop group may be shared * across multiple HTTP clients for better resource and thread utilization. The preferred way to create @@ -435,6 +486,38 @@ public void setConnectionAcquisitionTimeout(Duration connectionAcquisitionTimeou connectionAcquisitionTimeout(connectionAcquisitionTimeout); } + @Override + public Builder connectionTimeToLive(Duration connectionTimeToLive) { + Validate.isPositive(connectionTimeToLive, "connectionTimeToLive"); + standardOptions.put(CONNECTION_TIME_TO_LIVE, connectionTimeToLive); + return this; + } + + public void setConnectionTimeToLive(Duration connectionTimeToLive) { + connectionTimeToLive(connectionTimeToLive); + } + + @Override + public Builder connectionMaxIdleTime(Duration connectionMaxIdleTime) { + Validate.isPositive(connectionMaxIdleTime, "connectionMaxIdleTime"); + standardOptions.put(CONNECTION_MAX_IDLE_TIMEOUT, connectionMaxIdleTime); + return this; + } + + public void setConnectionMaxIdleTime(Duration connectionMaxIdleTime) { + connectionMaxIdleTime(connectionMaxIdleTime); + } + + @Override + public Builder useIdleConnectionReaper(Boolean useIdleConnectionReaper) { + standardOptions.put(REAP_IDLE_CONNECTIONS, useIdleConnectionReaper); + return this; + } + + public void setUseIdleConnectionReaper(Boolean useIdleConnectionReaper) { + useIdleConnectionReaper(useIdleConnectionReaper); + } + @Override public Builder eventLoopGroup(SdkEventLoopGroup eventLoopGroup) { this.eventLoopGroup = eventLoopGroup; diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java index 757da65c505d..17807e5c2d93 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java @@ -71,6 +71,11 @@ public final class ChannelAttributeKey { */ static final AttributeKey IN_USE = AttributeKey.newInstance("aws.http.nio.netty.async.inUse"); + /** + * Whether the channel should be closed once it is released. + */ + static final AttributeKey CLOSE_ON_RELEASE = AttributeKey.newInstance("aws.http.nio.netty.async.closeOnRelease"); + private ChannelAttributeKey() { } diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java index d17dd84dff6f..6ea92e9f4a6d 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java @@ -51,15 +51,18 @@ public class ChannelPipelineInitializer extends AbstractChannelPoolHandler { private final SslContext sslCtx; private final long clientMaxStreams; private final AtomicReference channelPoolRef; + private final NettyConfiguration configuration; public ChannelPipelineInitializer(Protocol protocol, SslContext sslCtx, long clientMaxStreams, - AtomicReference channelPoolRef) { + AtomicReference channelPoolRef, + NettyConfiguration configuration) { this.protocol = protocol; this.sslCtx = sslCtx; this.clientMaxStreams = clientMaxStreams; this.channelPoolRef = channelPoolRef; + this.configuration = configuration; } @Override @@ -77,6 +80,14 @@ public void channelCreated(Channel ch) { configureHttp11(ch, pipeline); } + if (configuration.reapIdleConnections()) { + pipeline.addLast(new IdleConnectionReaperHandler(configuration.idleTimeoutMillis())); + } + + if (configuration.connectionTtlMillis() > 0) { + pipeline.addLast(new OldConnectionReaperHandler(configuration.connectionTtlMillis())); + } + pipeline.addLast(new FutureCancelHandler()); pipeline.addLast(new UnusedChannelExceptionHandler()); pipeline.addLast(new LoggingHandler(LogLevel.DEBUG)); diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HonorCloseOnReleaseChannelPool.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HonorCloseOnReleaseChannelPool.java new file mode 100644 index 000000000000..c080eb7e2ccc --- /dev/null +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HonorCloseOnReleaseChannelPool.java @@ -0,0 +1,77 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop; + +import io.netty.channel.Channel; +import io.netty.channel.pool.ChannelPool; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.utils.Logger; + +/** + * Wrap a channel pool so that {@link ChannelAttributeKey#CLOSE_ON_RELEASE} is honored when a channel is released to the + * underlying pool. + * + * When a channel is released and {@link ChannelAttributeKey#CLOSE_ON_RELEASE} is true on the channel, the channel will be closed + * before it is released to the underlying pool. + */ +@SdkInternalApi +public class HonorCloseOnReleaseChannelPool implements ChannelPool { + private static final Logger log = Logger.loggerFor(HonorCloseOnReleaseChannelPool.class); + private final ChannelPool delegatePool; + + public HonorCloseOnReleaseChannelPool(ChannelPool delegatePool) { + this.delegatePool = delegatePool; + } + + @Override + public Future acquire() { + return delegatePool.acquire(); + } + + @Override + public Future acquire(Promise promise) { + return delegatePool.acquire(promise); + } + + @Override + public Future release(Channel channel) { + return release(channel, channel.eventLoop().newPromise()); + } + + @Override + public Future release(Channel channel, Promise promise) { + doInEventLoop(channel.eventLoop(), () -> { + boolean shouldCloseOnRelease = Boolean.TRUE.equals(channel.attr(ChannelAttributeKey.CLOSE_ON_RELEASE).get()); + + if (shouldCloseOnRelease && channel.isOpen() && !channel.eventLoop().isShuttingDown()) { + log.debug(() -> "Closing connection (" + channel.id() + "), instead of releasing it."); + channel.close(); + } + + delegatePool.release(channel, promise); + }); + return promise; + } + + @Override + public void close() { + delegatePool.close(); + } +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/IdleConnectionReaperHandler.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/IdleConnectionReaperHandler.java new file mode 100644 index 000000000000..9511566e70b3 --- /dev/null +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/IdleConnectionReaperHandler.java @@ -0,0 +1,50 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import java.util.concurrent.TimeUnit; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.utils.Logger; + +/** + * A handler that closes unused channels that have not had any traffic on them for a configurable amount of time. + */ +@SdkInternalApi +public class IdleConnectionReaperHandler extends IdleStateHandler { + private static final Logger log = Logger.loggerFor(IdleConnectionReaperHandler.class); + private final int maxIdleTimeMillis; + + public IdleConnectionReaperHandler(int maxIdleTimeMillis) { + super(0, 0, maxIdleTimeMillis, TimeUnit.MILLISECONDS); + this.maxIdleTimeMillis = maxIdleTimeMillis; + } + + @Override + protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent event) { + assert ctx.channel().eventLoop().inEventLoop(); + + boolean channelNotInUse = Boolean.FALSE.equals(ctx.channel().attr(ChannelAttributeKey.IN_USE).get()); + + if (channelNotInUse && ctx.channel().isOpen()) { + log.debug(() -> "Closing unused connection (" + ctx.channel().id() + ") because it has been idle for longer than " + + maxIdleTimeMillis + " milliseconds."); + ctx.close(); + } + } +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyConfiguration.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyConfiguration.java index ba9d9b096e81..66f313458e34 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyConfiguration.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyConfiguration.java @@ -68,4 +68,16 @@ public int readTimeoutMillis() { public int writeTimeoutMillis() { return saturatedCast(configuration.get(SdkHttpConfigurationOption.WRITE_TIMEOUT).toMillis()); } + + public int idleTimeoutMillis() { + return saturatedCast(configuration.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis()); + } + + public int connectionTtlMillis() { + return saturatedCast(configuration.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE).toMillis()); + } + + public boolean reapIdleConnections() { + return configuration.get(SdkHttpConfigurationOption.REAP_IDLE_CONNECTIONS); + } } diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/OldConnectionReaperHandler.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/OldConnectionReaperHandler.java new file mode 100644 index 000000000000..da926d11974c --- /dev/null +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/OldConnectionReaperHandler.java @@ -0,0 +1,99 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; + +/** + * A handler that will close channels after they have reached their time-to-live, regardless of usage. + * + * Channels that are not in use will be closed immediately, and channels that are in use will be closed when they are next + * released to the underlying connection pool (via {@link ChannelAttributeKey#CLOSE_ON_RELEASE}). + */ +@SdkInternalApi +public class OldConnectionReaperHandler extends ChannelDuplexHandler { + private static final Logger log = Logger.loggerFor(OldConnectionReaperHandler.class); + private final int connectionTtlMillis; + + private ScheduledFuture channelKiller; + + public OldConnectionReaperHandler(int connectionTtlMillis) { + Validate.isPositive(connectionTtlMillis, "connectionTtlMillis"); + this.connectionTtlMillis = connectionTtlMillis; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + initialize(ctx); + super.handlerAdded(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + initialize(ctx); + super.channelActive(ctx); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + initialize(ctx); + super.channelRegistered(ctx); + } + + private void initialize(ChannelHandlerContext ctx) { + if (channelKiller == null) { + channelKiller = ctx.channel().eventLoop().schedule(() -> closeChannel(ctx), + connectionTtlMillis, + TimeUnit.MILLISECONDS); + } + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + destroy(); + } + + private void destroy() { + if (channelKiller != null) { + channelKiller.cancel(false); + channelKiller = null; + } + } + + private void closeChannel(ChannelHandlerContext ctx) { + assert ctx.channel().eventLoop().inEventLoop(); + + if (ctx.channel().isOpen()) { + if (Boolean.FALSE.equals(ctx.channel().attr(ChannelAttributeKey.IN_USE).get())) { + log.debug(() -> "Closing unused connection (" + ctx.channel().id() + ") because it has reached its maximum " + + "time to live of " + connectionTtlMillis + " milliseconds."); + ctx.close(); + } else { + log.debug(() -> "Connection (" + ctx.channel().id() + ") will be closed during its next release, because it " + + "has reached its maximum time to live of " + connectionTtlMillis + " milliseconds."); + ctx.channel().attr(ChannelAttributeKey.CLOSE_ON_RELEASE).set(true); + } + } + + channelKiller = null; + } +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java index 7a9efe27132e..1b536e7fedc4 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java @@ -23,8 +23,6 @@ import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.pool.ChannelPool; -import io.netty.channel.pool.ChannelPoolHandler; -import io.netty.channel.pool.SimpleChannelPool; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @@ -41,8 +39,7 @@ */ @SdkInternalApi public class HttpOrHttp2ChannelPool implements ChannelPool { - - private final ChannelPool simpleChannelPool; + private final ChannelPool delegatePool; private final int maxConcurrency; private final EventLoop eventLoop; private final NettyConfiguration configuration; @@ -50,11 +47,11 @@ public class HttpOrHttp2ChannelPool implements ChannelPool { private Promise protocolImplPromise; private ChannelPool protocolImpl; - public HttpOrHttp2ChannelPool(Bootstrap bootstrap, - ChannelPoolHandler handler, + public HttpOrHttp2ChannelPool(ChannelPool delegatePool, + Bootstrap bootstrap, int maxConcurrency, NettyConfiguration configuration) { - this.simpleChannelPool = new SimpleChannelPool(bootstrap, handler); + this.delegatePool = delegatePool; this.maxConcurrency = maxConcurrency; this.eventLoop = bootstrap.config().group().next(); this.configuration = configuration; @@ -95,22 +92,20 @@ private void acquire0(Promise promise) { */ private void initializeProtocol() { protocolImplPromise = new DefaultPromise<>(eventLoop); - simpleChannelPool.acquire() - .addListener((GenericFutureListener>) future -> { - if (future.isSuccess()) { - Channel newChannel = future.getNow(); - newChannel.attr(PROTOCOL_FUTURE).get() - .whenComplete((r, e) -> { - if (e != null) { - failProtocolImplPromise(e); - } else { - protocolImplPromise.setSuccess(configureProtocol(newChannel, r)); - } - }); - } else { - failProtocolImplPromise(future.cause()); - } - }); + delegatePool.acquire().addListener((GenericFutureListener>) future -> { + if (future.isSuccess()) { + Channel newChannel = future.getNow(); + newChannel.attr(PROTOCOL_FUTURE).get().whenComplete((r, e) -> { + if (e != null) { + failProtocolImplPromise(e); + } else { + protocolImplPromise.setSuccess(configureProtocol(newChannel, r)); + } + }); + } else { + failProtocolImplPromise(future.cause()); + } + }); } /** @@ -127,7 +122,7 @@ private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) { if (Protocol.HTTP1_1 == protocol) { // For HTTP/1.1 we use a traditional channel pool without multiplexing protocolImpl = BetterFixedChannelPool.builder() - .channelPool(simpleChannelPool) + .channelPool(delegatePool) .executor(eventLoop) .acquireTimeoutAction(BetterFixedChannelPool.AcquireTimeoutAction.FAIL) .acquireTimeoutMillis(configuration.connectionAcquireTimeoutMillis()) @@ -136,7 +131,7 @@ private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) { .build(); } else { ChannelPool h2Pool = new Http2MultiplexedChannelPool( - simpleChannelPool, eventLoop, newChannel.attr(MAX_CONCURRENT_STREAMS).get()); + delegatePool, eventLoop, newChannel.attr(MAX_CONCURRENT_STREAMS).get()); protocolImpl = BetterFixedChannelPool.builder() .channelPool(h2Pool) .executor(eventLoop) @@ -147,7 +142,7 @@ private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) { .build(); } // Give the channel back so it can be acquired again by protocolImpl - simpleChannelPool.release(newChannel); + delegatePool.release(newChannel); return protocolImpl; } @@ -168,7 +163,7 @@ private void release0(Channel channel, Promise promise) { if (protocolImpl == null) { // If protocolImpl is null that means the first connection failed to establish. Release it back to the // underlying connection pool. - simpleChannelPool.release(channel, promise); + delegatePool.release(channel, promise); } else { protocolImpl.release(channel, promise); } diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/EmptyPublisher.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/EmptyPublisher.java new file mode 100644 index 000000000000..78af13d3869b --- /dev/null +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/EmptyPublisher.java @@ -0,0 +1,45 @@ +package software.amazon.awssdk.http.nio.netty; + +import java.nio.ByteBuffer; +import java.util.Optional; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.http.async.SdkHttpContentPublisher; + +public class EmptyPublisher implements SdkHttpContentPublisher { + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(new EmptySubscription(subscriber)); + } + + @Override + public Optional contentLength() { + return Optional.of(0L); + } + + private static class EmptySubscription implements Subscription { + private final Subscriber subscriber; + private volatile boolean done; + + EmptySubscription(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long l) { + if (!done) { + done = true; + if (l <= 0) { + this.subscriber.onError(new IllegalArgumentException("Demand must be positive")); + } else { + this.subscriber.onComplete(); + } + } + } + + @Override + public void cancel() { + done = true; + } + } +} diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClientSpiVerificationTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClientSpiVerificationTest.java index c001e3a93ccc..1c47bc28bc2d 100644 --- a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClientSpiVerificationTest.java +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClientSpiVerificationTest.java @@ -31,7 +31,6 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -50,7 +49,6 @@ import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; -import software.amazon.awssdk.http.async.SdkHttpContentPublisher; import software.amazon.awssdk.utils.AttributeMap; /** @@ -149,20 +147,7 @@ private SdkHttpFullRequest createRequest(URI endpoint, }).build(); } - private static class EmptyPublisher implements SdkHttpContentPublisher { - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(new EmptySubscription(subscriber)); - } - - @Override - public Optional contentLength() { - return Optional.of(0L); - } - } - private static class TestResponseHandler implements SdkAsyncHttpResponseHandler { - @Override public void onHeaders(SdkHttpResponse headers) { } @@ -177,32 +162,6 @@ public void onError(Throwable error) { } } - private static class EmptySubscription implements Subscription { - private final Subscriber subscriber; - private volatile boolean done; - - EmptySubscription(Subscriber subscriber) { - this.subscriber = subscriber; - } - - @Override - public void request(long l) { - if (!done) { - done = true; - if (l <= 0) { - this.subscriber.onError(new IllegalArgumentException("Demand must be positive")); - } else { - this.subscriber.onComplete(); - } - } - } - - @Override - public void cancel() { - done = true; - } - } - private static class DrainingSubscriber implements Subscriber { private Subscription subscription; diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/ConnectionReaperTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/ConnectionReaperTest.java new file mode 100644 index 000000000000..d596f60a9433 --- /dev/null +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/ConnectionReaperTest.java @@ -0,0 +1,130 @@ +package software.amazon.awssdk.http.nio.netty.internal; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; + +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import java.net.URI; +import java.time.Duration; +import java.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.internal.verification.AtLeast; +import org.mockito.internal.verification.Times; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.http.SdkHttpRequest; +import software.amazon.awssdk.http.async.AsyncExecuteRequest; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.EmptyPublisher; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.RecordingResponseHandler; + +@RunWith(MockitoJUnitRunner.class) +public class ConnectionReaperTest { + private static final WiremockNetworkTrafficListener TRAFFIC_LISTENER = Mockito.mock(WiremockNetworkTrafficListener.class); + + @Rule + public final WireMockRule mockServer = new WireMockRule(wireMockConfig().dynamicPort() + .dynamicHttpsPort() + .networkTrafficListener(TRAFFIC_LISTENER)); + + @Before + public void methodSetup() { + reset(TRAFFIC_LISTENER); + } + + @Test + public void idleConnectionReaperDoesNotReapActiveConnections() throws InterruptedException { + Duration maxIdleTime = Duration.ofSeconds(2); + + try(SdkAsyncHttpClient client = NettyNioAsyncHttpClient.builder() + .connectionMaxIdleTime(maxIdleTime) + .buildWithDefaults(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS)) { + Instant end = Instant.now().plus(maxIdleTime.plusSeconds(1)); + + // Send requests for longer than the max-idle time, ensuring no connections are closed. + while (Instant.now().isBefore(end)) { + makeRequest(client); + Thread.sleep(100); + verify(TRAFFIC_LISTENER, new Times(0)).closed(any()); + } + + // Do nothing for longer than the max-idle time, ensuring connections are closed. + Thread.sleep(maxIdleTime.plusSeconds(1).toMillis()); + + verify(TRAFFIC_LISTENER, new AtLeast(1)).closed(any()); + } + + } + + @Test + public void oldConnectionReaperReapsActiveConnections() throws InterruptedException { + Duration connectionTtl = Duration.ofMillis(200); + + try (SdkAsyncHttpClient client = NettyNioAsyncHttpClient.builder() + .connectionTimeToLive(connectionTtl) + .buildWithDefaults(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS)) { + + Instant end = Instant.now().plus(Duration.ofSeconds(5)); + + verify(TRAFFIC_LISTENER, new Times(0)).closed(any()); + + // Send requests frequently, validating that connections are still being closed. + while (Instant.now().isBefore(end)) { + makeRequest(client); + Thread.sleep(100); + } + + verify(TRAFFIC_LISTENER, new AtLeast(20)).closed(any()); + } + } + + @Test + public void noReapingWorks() throws InterruptedException { + try (SdkAsyncHttpClient client = NettyNioAsyncHttpClient.builder() + .connectionMaxIdleTime(Duration.ofMillis(10)) + .useIdleConnectionReaper(false) + .buildWithDefaults(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS)) { + + + verify(TRAFFIC_LISTENER, new Times(0)).closed(any()); + makeRequest(client); + + Thread.sleep(2_000); + + verify(TRAFFIC_LISTENER, new Times(0)).closed(any()); + } + } + + + private void makeRequest(SdkAsyncHttpClient client) { + stubFor(WireMock.any(urlPathEqualTo("/")).willReturn(aResponse().withBody(randomAlphabetic(10)))); + + URI uri = URI.create("http://localhost:" + mockServer.port()); + client.execute(AsyncExecuteRequest.builder() + .request(SdkHttpRequest.builder() + .uri(uri) + .method(SdkHttpMethod.GET) + .encodedPath("/") + .putHeader("Host", uri.getHost()) + .build()) + .requestContentPublisher(new EmptyPublisher()) + .responseHandler(new RecordingResponseHandler()) + .build()) + .join(); + } + +} diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HonorCloseOnReleaseChannelPoolTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HonorCloseOnReleaseChannelPoolTest.java new file mode 100644 index 000000000000..f4978158c21f --- /dev/null +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HonorCloseOnReleaseChannelPoolTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; + +import io.netty.channel.pool.ChannelPool; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.verification.Times; + +public class HonorCloseOnReleaseChannelPoolTest { + @Test + public void releaseDoesntCloseIfNotFlagged() throws Exception { + ChannelPool channelPool = Mockito.mock(ChannelPool.class); + + MockChannel channel = new MockChannel(); + channel.attr(ChannelAttributeKey.CLOSE_ON_RELEASE).set(false); + + new HonorCloseOnReleaseChannelPool(channelPool).release(channel); + channel.runAllPendingTasks(); + + assertThat(channel.isOpen()).isTrue(); + Mockito.verify(channelPool, new Times(0)).release(any()); + Mockito.verify(channelPool, new Times(1)).release(any(), any()); + } + + @Test + public void releaseClosesIfFlagged() throws Exception { + ChannelPool channelPool = Mockito.mock(ChannelPool.class); + + MockChannel channel = new MockChannel(); + channel.attr(ChannelAttributeKey.CLOSE_ON_RELEASE).set(true); + + new HonorCloseOnReleaseChannelPool(channelPool).release(channel); + channel.runAllPendingTasks(); + + assertThat(channel.isOpen()).isFalse(); + Mockito.verify(channelPool, new Times(0)).release(any()); + Mockito.verify(channelPool, new Times(1)).release(any(), any()); + } +} \ No newline at end of file diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/MockChannel.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/MockChannel.java new file mode 100644 index 000000000000..34149b01fd7c --- /dev/null +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/MockChannel.java @@ -0,0 +1,31 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import io.netty.channel.embedded.EmbeddedChannel; + +class MockChannel extends EmbeddedChannel { + public MockChannel() throws Exception { + super.doRegister(); + } + + public void runAllPendingTasks() throws InterruptedException { + super.runPendingTasks(); + while (runScheduledPendingTasks() != -1) { + Thread.sleep(1); + } + } +} diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/OldConnectionReaperHandlerTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/OldConnectionReaperHandlerTest.java new file mode 100644 index 000000000000..bcbbddf7d8ab --- /dev/null +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/OldConnectionReaperHandlerTest.java @@ -0,0 +1,64 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; + +import io.netty.channel.ChannelHandlerContext; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.verification.Times; + +public class OldConnectionReaperHandlerTest { + @Test + @SuppressWarnings("unchecked") + public void inUseChannelsAreFlaggedToBeClosed() throws Exception { + // Given + MockChannel channel = new MockChannel(); + channel.attr(ChannelAttributeKey.IN_USE).set(true); + + ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); + Mockito.when(ctx.channel()).thenReturn(channel); + + // When + new OldConnectionReaperHandler(1).handlerAdded(ctx); + channel.runAllPendingTasks(); + + // Then + Mockito.verify(ctx, new Times(0)).close(); + Mockito.verify(ctx, new Times(0)).close(any()); + assertThat(channel.attr(ChannelAttributeKey.CLOSE_ON_RELEASE).get()).isTrue(); + } + + @Test + public void notInUseChannelsAreClosed() throws Exception { + // Given + MockChannel channel = new MockChannel(); + channel.attr(ChannelAttributeKey.IN_USE).set(false); + + ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); + Mockito.when(ctx.channel()).thenReturn(channel); + + // When + new OldConnectionReaperHandler(1).handlerAdded(ctx); + channel.runAllPendingTasks(); + + // Then + Mockito.verify(ctx, new Times(1)).close(); + Mockito.verify(ctx, new Times(0)).close(any()); + } +} \ No newline at end of file diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/SharedSdkEventLoopGroupTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/SharedSdkEventLoopGroupTest.java index b14502eb581f..86803f5ed4ec 100644 --- a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/SharedSdkEventLoopGroupTest.java +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/SharedSdkEventLoopGroupTest.java @@ -22,7 +22,7 @@ import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup; public class SharedSdkEventLoopGroupTest { - + @Test public void referenceCountIsInitiallyZero() { assertThat(SharedSdkEventLoopGroup.referenceCount()).isEqualTo(0);