Skip to content

Commit b4b9083

Browse files
committed
Various code clean up including fanout Http2SettingsFrameHandler, adding unit tests, updating javadoc
1 parent 8a19cce commit b4b9083

File tree

7 files changed

+262
-104
lines changed

7 files changed

+262
-104
lines changed

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java

Lines changed: 13 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
@SdkPublicApi
7777
public final class NettyNioAsyncHttpClient implements SdkAsyncHttpClient {
7878
private static final Logger log = LoggerFactory.getLogger(NettyNioAsyncHttpClient.class);
79+
private static final long MAX_STREAMS_ALLOWED = 4294967295L; // unsigned 32-bit, 2^32 -1
7980

8081
private final SdkEventLoopGroup sdkEventLoopGroup;
8182
private final SdkChannelPoolMap<URI, ChannelPool> pools;
@@ -87,7 +88,7 @@ public final class NettyNioAsyncHttpClient implements SdkAsyncHttpClient {
8788
NettyNioAsyncHttpClient(DefaultBuilder builder, AttributeMap serviceDefaultsMap) {
8889
this.configuration = new NettyConfiguration(serviceDefaultsMap);
8990
this.protocol = serviceDefaultsMap.get(SdkHttpConfigurationOption.PROTOCOL);
90-
this.maxStreams = builder.maxHttp2Streams == null ? Integer.MAX_VALUE : builder.maxHttp2Streams;
91+
this.maxStreams = builder.maxHttp2Streams == null ? MAX_STREAMS_ALLOWED : builder.maxHttp2Streams;
9192
this.sdkEventLoopGroup = eventLoopGroup(builder);
9293
this.pools = createChannelPoolMap();
9394
this.sdkChannelOptions = channelOptions(builder);
@@ -106,10 +107,6 @@ public final class NettyNioAsyncHttpClient implements SdkAsyncHttpClient {
106107
this.maxStreams = maxStreams;
107108
}
108109

109-
private SdkChannelOptions channelOptions(DefaultBuilder builder) {
110-
return builder.sdkChannelOptions;
111-
}
112-
113110
@Override
114111
public CompletableFuture<Void> execute(AsyncExecuteRequest request) {
115112
RequestContext ctx = createRequestContext(request);
@@ -120,6 +117,10 @@ public static Builder builder() {
120117
return new DefaultBuilder();
121118
}
122119

120+
private SdkChannelOptions channelOptions(DefaultBuilder builder) {
121+
return builder.sdkChannelOptions;
122+
}
123+
123124
private RequestContext createRequestContext(AsyncExecuteRequest request) {
124125
ChannelPool pool = pools.get(poolKey(request.request()));
125126
return new RequestContext(pool, request, configuration);
@@ -304,7 +305,7 @@ public interface Builder extends SdkAsyncHttpClient.Builder<NettyNioAsyncHttpCli
304305
/**
305306
* Sets the {@link SdkEventLoopGroup} to use for the Netty HTTP client. This event loop group may be shared
306307
* across multiple HTTP clients for better resource and thread utilization. The preferred way to create
307-
* an {@link EventLoopGroup} is by using the {@link SdkEventLoopGroup#builder()})} method which will choose the
308+
* an {@link EventLoopGroup} is by using the {@link SdkEventLoopGroup#builder()} method which will choose the
308309
* optimal implementation per the platform.
309310
*
310311
* <p>The {@link EventLoopGroup} <b>MUST</b> be closed by the caller when it is ready to
@@ -348,12 +349,15 @@ public interface Builder extends SdkAsyncHttpClient.Builder<NettyNioAsyncHttpCli
348349
Builder protocol(Protocol protocol);
349350

350351
/**
351-
* Add new socket channel option which will be used to create Netty Http client. This allows custom configuration
352-
* for Netty.
352+
* Configures additional {@link ChannelOption} which will be used to create Netty Http client. This allows custom
353+
* configuration for Netty.
354+
*
355+
* <p>
356+
* If a {@link ChannelOption} was previously configured, the old value is replaced.
357+
*
353358
* @param channelOption {@link ChannelOption} to set
354359
* @param value See {@link ChannelOption} to find the type of value for each option
355360
* @return This builder for method chaining.
356-
* @see SdkEventLoopGroup.Builder
357361
*/
358362
Builder putChannelOption(ChannelOption channelOption, Object value);
359363

@@ -387,12 +391,6 @@ private static final class DefaultBuilder implements Builder {
387391
private DefaultBuilder() {
388392
}
389393

390-
/**
391-
* Max allowed connections per endpoint allowed in the connection pool.
392-
*
393-
* @param maxConcurrency New value for max connections per endpoint.
394-
* @return This builder for method chaining.
395-
*/
396394
@Override
397395
public Builder maxConcurrency(Integer maxConcurrency) {
398396
standardOptions.put(MAX_CONNECTIONS, maxConcurrency);
@@ -403,12 +401,6 @@ public void setMaxConcurrency(Integer maxConnectionsPerEndpoint) {
403401
maxConcurrency(maxConnectionsPerEndpoint);
404402
}
405403

406-
/**
407-
* The maximum number of pending acquires allowed. Once this exceeds, acquire tries will be failed.
408-
*
409-
* @param maxPendingAcquires Max number of pending acquires
410-
* @return This builder for method chaining.
411-
*/
412404
@Override
413405
public Builder maxPendingConnectionAcquires(Integer maxPendingAcquires) {
414406
standardOptions.put(MAX_PENDING_CONNECTION_ACQUIRES, maxPendingAcquires);
@@ -419,12 +411,6 @@ public void setMaxPendingConnectionAcquires(Integer maxPendingAcquires) {
419411
maxPendingConnectionAcquires(maxPendingAcquires);
420412
}
421413

422-
/**
423-
* The amount of time to wait for a read on a socket before an exception is thrown.
424-
*
425-
* @param readTimeout timeout duration
426-
* @return this builder for method chaining.
427-
*/
428414
@Override
429415
public Builder readTimeout(Duration readTimeout) {
430416
Validate.isPositive(readTimeout, "readTimeout");
@@ -436,12 +422,6 @@ public void setReadTimeout(Duration readTimeout) {
436422
readTimeout(readTimeout);
437423
}
438424

439-
/**
440-
* The amount of time to wait for a write on a socket before an exception is thrown.
441-
*
442-
* @param writeTimeout timeout duration
443-
* @return this builder for method chaining.
444-
*/
445425
@Override
446426
public Builder writeTimeout(Duration writeTimeout) {
447427
Validate.isPositive(writeTimeout, "writeTimeout");
@@ -453,12 +433,6 @@ public void setWriteTimeout(Duration writeTimeout) {
453433
writeTimeout(writeTimeout);
454434
}
455435

456-
/**
457-
* The amount of time to wait when initially establishing a connection before giving up and timing out.
458-
*
459-
* @param timeout the timeout duration
460-
* @return this builder for method chaining.
461-
*/
462436
@Override
463437
public Builder connectionTimeout(Duration timeout) {
464438
Validate.isPositive(timeout, "connectionTimeout");
@@ -470,11 +444,6 @@ public void setConnectionTimeout(Duration connectionTimeout) {
470444
connectionTimeout(connectionTimeout);
471445
}
472446

473-
/**
474-
* The amount of time to wait when acquiring a connection from the pool before giving up and timing out.
475-
* @param connectionAcquisitionTimeout the timeout duration
476-
* @return this builder for method chaining.
477-
*/
478447
@Override
479448
public Builder connectionAcquisitionTimeout(Duration connectionAcquisitionTimeout) {
480449
Validate.isPositive(connectionAcquisitionTimeout, "connectionAcquisitionTimeout");

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/SdkEventLoopGroup.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,33 @@
3030
/**
3131
* Provides {@link EventLoopGroup} and {@link ChannelFactory} for {@link NettyNioAsyncHttpClient}.
3232
* <p>
33-
* There are two ways to create a new instance.
33+
* There are three ways to create a new instance.
3434
*
3535
* <ul>
36+
* <li>using {@link #builder()} to provide custom configuration of {@link EventLoopGroup}.
37+
* This is the preferred configuration method when you just want to customize the {@link EventLoopGroup}</li>
38+
*
39+
*
40+
* <li>Using {@link #create(EventLoopGroup)} to provide a custom {@link EventLoopGroup}. {@link ChannelFactory} will
41+
* be resolved based on the type of {@link EventLoopGroup} provided via
42+
* {@link SocketChannelResolver#resolveSocketChannelFactory(EventLoopGroup)}
43+
* </li>
44+
*
3645
* <li>Using {@link #create(EventLoopGroup, ChannelFactory)} to provide a custom {@link EventLoopGroup} and
3746
* {@link ChannelFactory}
47+
* </ul>
48+
*
3849
* <p>
39-
* The {@link EventLoopGroup} <b>MUST</b> be closed by the caller when it is ready to
40-
* be disposed. The SDK will not close the {@link EventLoopGroup} when the HTTP client is closed. See
41-
* {@link EventLoopGroup#shutdownGracefully()} to properly close the event loop group.
42-
* </li>
50+
* When configuring the {@link EventLoopGroup} of {@link NettyNioAsyncHttpClient}, if {@link SdkEventLoopGroup.Builder} is
51+
* passed to {@link NettyNioAsyncHttpClient.Builder#eventLoopGroupBuilder},
52+
* the {@link EventLoopGroup} is managed by the SDK and will be shutdown when the HTTP client is closed. Otherwise,
53+
* if an instance of {@link SdkEventLoopGroup} is passed to {@link NettyNioAsyncHttpClient.Builder#eventLoopGroup},
54+
* the {@link EventLoopGroup} <b>MUST</b> be closed by the caller when it is ready to be disposed. The SDK will not
55+
* close the {@link EventLoopGroup} when the HTTP client is closed. See {@link EventLoopGroup#shutdownGracefully()} to
56+
* properly close the event loop group.
4357
*
44-
* <li>using {@link #builder()} to provide custom configuration of {@link EventLoopGroup}. The {@link EventLoopGroup} created by
45-
* the builder is managed by the SDK and will be shutdown when the HTTP client is closed.
46-
* </li>
47-
* </ul>
58+
* @see NettyNioAsyncHttpClient.Builder#eventLoopGroupBuilder(Builder)
59+
* @see NettyNioAsyncHttpClient.Builder#eventLoopGroup(SdkEventLoopGroup)
4860
*/
4961
@SdkPublicApi
5062
public final class SdkEventLoopGroup {

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java

Lines changed: 4 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,38 +15,32 @@
1515

1616
package software.amazon.awssdk.http.nio.netty.internal;
1717

18-
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.MAX_CONCURRENT_STREAMS;
1918
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
2019
import static software.amazon.awssdk.utils.StringUtils.lowerCase;
2120

2221
import io.netty.channel.Channel;
23-
import io.netty.channel.ChannelHandlerContext;
2422
import io.netty.channel.ChannelInitializer;
2523
import io.netty.channel.ChannelPipeline;
26-
import io.netty.channel.SimpleChannelInboundHandler;
2724
import io.netty.channel.pool.AbstractChannelPoolHandler;
2825
import io.netty.channel.pool.ChannelPool;
2926
import io.netty.handler.codec.http.HttpClientCodec;
3027
import io.netty.handler.codec.http2.ForkedHttp2MultiplexCodecBuilder;
3128
import io.netty.handler.codec.http2.Http2FrameLogger;
3229
import io.netty.handler.codec.http2.Http2Settings;
33-
import io.netty.handler.codec.http2.Http2SettingsFrame;
3430
import io.netty.handler.logging.LogLevel;
3531
import io.netty.handler.logging.LoggingHandler;
3632
import io.netty.handler.ssl.SslContext;
37-
import java.io.IOException;
38-
import java.util.Optional;
3933
import java.util.concurrent.CompletableFuture;
4034
import java.util.concurrent.atomic.AtomicReference;
4135
import software.amazon.awssdk.annotations.SdkInternalApi;
4236
import software.amazon.awssdk.http.Protocol;
43-
import software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord;
37+
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2SettingsFrameHandler;
4438

4539
/**
46-
* Configures the client pipeline to support HTTP/2 frames with multiplexed streams.
40+
* ChannelPoolHandler to configure the client pipeline.
4741
*/
4842
@SdkInternalApi
49-
public class ChannelPipelineInitializer extends AbstractChannelPoolHandler {
43+
public final class ChannelPipelineInitializer extends AbstractChannelPoolHandler {
5044
private final Protocol protocol;
5145
private final SslContext sslCtx;
5246
private final long clientMaxStreams;
@@ -103,46 +97,7 @@ private void configureHttp2(Channel ch, ChannelPipeline pipeline) {
10397

10498
pipeline.addLast(codecBuilder.build());
10599

106-
pipeline.addLast(new SimpleChannelInboundHandler<Http2SettingsFrame>() {
107-
@Override
108-
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame msg) {
109-
Long serverMaxStreams = Optional.ofNullable(msg.settings().maxConcurrentStreams()).orElse(Long.MAX_VALUE);
110-
ch.attr(MAX_CONCURRENT_STREAMS).set(Math.min(clientMaxStreams, serverMaxStreams));
111-
ch.attr(PROTOCOL_FUTURE).get().complete(Protocol.HTTP2);
112-
}
113-
114-
@Override
115-
public void channelUnregistered(ChannelHandlerContext ctx) {
116-
if (!ch.attr(PROTOCOL_FUTURE).get().isDone()) {
117-
channelError(new IOException("The channel was closed before the protocol could be determined."), ch);
118-
}
119-
}
120-
121-
@Override
122-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
123-
channelError(cause, ch);
124-
}
125-
});
126-
}
127-
128-
private void channelError(Throwable cause, Channel ch) {
129-
ch.attr(PROTOCOL_FUTURE).get().completeExceptionally(cause);
130-
MultiplexedChannelRecord record = ch.attr(ChannelAttributeKey.CHANNEL_POOL_RECORD).get();
131-
// Deliver the exception to any child channels registered to this connection.
132-
if (record != null) {
133-
record.shutdownChildChannels(cause);
134-
}
135-
// Channel status may still be active at this point even if it's not so queue up the close so that status is
136-
// accurately updated
137-
ch.eventLoop().submit(() -> {
138-
try {
139-
if (ch.isActive()) {
140-
ch.close();
141-
}
142-
} finally {
143-
channelPoolRef.get().release(ch);
144-
}
145-
});
100+
pipeline.addLast(new Http2SettingsFrameHandler(ch, clientMaxStreams, channelPoolRef));
146101
}
147102

148103
private void configureHttp11(Channel ch, ChannelPipeline pipeline) {

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
@SdkInternalApi
3131
public class FutureCancelHandler extends SimpleChannelInboundHandler {
3232
@Override
33-
protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
33+
protected void channelRead0(ChannelHandlerContext ctx, Object o) {
3434
ReferenceCountUtil.retain(o);
3535
ctx.fireChannelRead(o);
3636
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal.http2;
17+
18+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.MAX_CONCURRENT_STREAMS;
19+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
20+
21+
import io.netty.channel.Channel;
22+
import io.netty.channel.ChannelHandlerContext;
23+
import io.netty.channel.SimpleChannelInboundHandler;
24+
import io.netty.channel.pool.ChannelPool;
25+
import io.netty.handler.codec.http2.Http2SettingsFrame;
26+
import java.io.IOException;
27+
import java.util.Optional;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
import software.amazon.awssdk.annotations.SdkInternalApi;
30+
import software.amazon.awssdk.http.Protocol;
31+
import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey;
32+
33+
/**
34+
* Configure channel based on the {@link Http2SettingsFrame} received from server
35+
*/
36+
@SdkInternalApi
37+
public final class Http2SettingsFrameHandler extends SimpleChannelInboundHandler<Http2SettingsFrame> {
38+
39+
private Channel channel;
40+
private final long clientMaxStreams;
41+
private AtomicReference<ChannelPool> channelPoolRef;
42+
43+
public Http2SettingsFrameHandler(Channel channel, long clientMaxStreams, AtomicReference<ChannelPool> channelPoolRef) {
44+
this.channel = channel;
45+
this.clientMaxStreams = clientMaxStreams;
46+
this.channelPoolRef = channelPoolRef;
47+
}
48+
49+
@Override
50+
protected void channelRead0(ChannelHandlerContext ctx, Http2SettingsFrame msg) {
51+
Long serverMaxStreams = Optional.ofNullable(msg.settings().maxConcurrentStreams()).orElse(Long.MAX_VALUE);
52+
channel.attr(MAX_CONCURRENT_STREAMS).set(Math.min(clientMaxStreams, serverMaxStreams));
53+
channel.attr(PROTOCOL_FUTURE).get().complete(Protocol.HTTP2);
54+
}
55+
56+
@Override
57+
public void channelUnregistered(ChannelHandlerContext ctx) {
58+
if (!channel.attr(PROTOCOL_FUTURE).get().isDone()) {
59+
channelError(new IOException("The channel was closed before the protocol could be determined."), channel);
60+
}
61+
}
62+
63+
@Override
64+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
65+
channelError(cause, channel);
66+
}
67+
68+
private void channelError(Throwable cause, Channel ch) {
69+
ch.attr(PROTOCOL_FUTURE).get().completeExceptionally(cause);
70+
MultiplexedChannelRecord record = ch.attr(ChannelAttributeKey.CHANNEL_POOL_RECORD).get();
71+
// Deliver the exception to any child channels registered to this connection.
72+
if (record != null) {
73+
record.shutdownChildChannels(cause);
74+
}
75+
// Channel status may still be active at this point even if it's not so queue up the close so that status is
76+
// accurately updated
77+
ch.eventLoop().submit(() -> {
78+
try {
79+
if (ch.isActive()) {
80+
ch.close();
81+
}
82+
} finally {
83+
channelPoolRef.get().release(ch);
84+
}
85+
});
86+
}
87+
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/MockChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.netty.channel.embedded.EmbeddedChannel;
1919

20-
class MockChannel extends EmbeddedChannel {
20+
public class MockChannel extends EmbeddedChannel {
2121
public MockChannel() throws Exception {
2222
super.doRegister();
2323
}

0 commit comments

Comments
 (0)