Skip to content

Commit 4006d3c

Browse files
committed
Add extension point to configure Netty's bootstrap
Useful to configure TCP settings and native transports. Fixes #269
1 parent 827476f commit 4006d3c

File tree

9 files changed

+189
-37
lines changed

9 files changed

+189
-37
lines changed

src/main/java/com/rabbitmq/stream/ChannelCustomizer.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -13,13 +13,17 @@
1313
1414
package com.rabbitmq.stream;
1515

16+
import com.rabbitmq.stream.EnvironmentBuilder.NettyConfiguration;
1617
import io.netty.channel.Channel;
1718
import java.util.Objects;
19+
import java.util.function.Consumer;
1820

1921
/**
2022
* An extension point to customize Netty's {@link io.netty.channel.Channel}s used for connection.
2123
*
22-
* @see EnvironmentBuilder#channelCustomizer(ChannelCustomizer)
24+
* @deprecated use {@link NettyConfiguration#channelCustomizer(Consumer)} from {@link
25+
* EnvironmentBuilder#netty()} instead
26+
* @see NettyConfiguration#netty()
2327
*/
2428
public interface ChannelCustomizer {
2529

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

+59-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -18,14 +18,17 @@
1818
import com.rabbitmq.stream.metrics.MetricsCollector;
1919
import com.rabbitmq.stream.sasl.CredentialsProvider;
2020
import com.rabbitmq.stream.sasl.SaslConfiguration;
21+
import io.netty.bootstrap.Bootstrap;
2122
import io.netty.buffer.ByteBufAllocator;
23+
import io.netty.channel.Channel;
2224
import io.netty.channel.EventLoopGroup;
2325
import io.netty.handler.ssl.SslContext;
2426
import io.netty.handler.ssl.SslContextBuilder;
2527
import java.time.Duration;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.function.Consumer;
2932

3033
/**
3134
* API to configure and create an {@link Environment}.
@@ -111,8 +114,6 @@ public interface EnvironmentBuilder {
111114
*/
112115
EnvironmentBuilder id(String id);
113116

114-
EnvironmentBuilder byteBufAllocator(ByteBufAllocator byteBufAllocator);
115-
116117
/**
117118
* Compression codec factory to use for compression in sub-entry batching.
118119
*
@@ -197,11 +198,23 @@ public interface EnvironmentBuilder {
197198
*/
198199
EnvironmentBuilder requestedMaxFrameSize(int requestedMaxFrameSize);
199200

201+
/**
202+
* Netty's {@link io.netty.buffer.ByteBuf} allocator.
203+
*
204+
* @param byteBufAllocator
205+
* @return this builder instance
206+
* @deprecated use {@link NettyConfiguration#byteBufAllocator(ByteBufAllocator)} from {@link
207+
* #netty()} instead
208+
*/
209+
EnvironmentBuilder byteBufAllocator(ByteBufAllocator byteBufAllocator);
210+
200211
/**
201212
* An extension point to customize Netty's {@link io.netty.channel.Channel}s used for connection.
202213
*
203214
* @param channelCustomizer
204215
* @return this builder instance
216+
* @deprecated use {@link NettyConfiguration#channelCustomizer(Consumer)} from {@link #netty()}
217+
* instead
205218
*/
206219
EnvironmentBuilder channelCustomizer(ChannelCustomizer channelCustomizer);
207220

@@ -384,4 +397,47 @@ interface TlsConfiguration {
384397
*/
385398
EnvironmentBuilder environmentBuilder();
386399
}
400+
401+
/**
402+
* Helper to configure netty.
403+
*
404+
* @return Netty configuration helper
405+
*/
406+
NettyConfiguration netty();
407+
408+
/** Helper to configure Netty */
409+
interface NettyConfiguration {
410+
411+
/**
412+
* Netty's {@link io.netty.buffer.ByteBuf} allocator.
413+
*
414+
* @param byteBufAllocator
415+
* @return the Netty configuration helper
416+
*/
417+
NettyConfiguration byteBufAllocator(ByteBufAllocator byteBufAllocator);
418+
419+
/**
420+
* An extension point to customize Netty's {@link io.netty.channel.Channel}s used for
421+
* connection.
422+
*
423+
* @param channelCustomizer
424+
* @return the Netty configuration helper
425+
*/
426+
NettyConfiguration channelCustomizer(Consumer<Channel> channelCustomizer);
427+
428+
/**
429+
* An extension point to customize Netty's {@link Bootstrap}s used to configure connections.
430+
*
431+
* @param bootstrapCustomizer
432+
* @return the Netty configuration helper
433+
*/
434+
NettyConfiguration bootstrapCustomizer(Consumer<Bootstrap> bootstrapCustomizer);
435+
436+
/**
437+
* Go back to the environment builder
438+
*
439+
* @return the environment builder
440+
*/
441+
EnvironmentBuilder environmentBuilder();
442+
}
387443
}

src/main/java/com/rabbitmq/stream/impl/Client.java

+20-11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -45,13 +45,13 @@
4545
import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
4646
import static com.rabbitmq.stream.impl.Utils.extractResponseCode;
4747
import static com.rabbitmq.stream.impl.Utils.formatConstant;
48+
import static com.rabbitmq.stream.impl.Utils.noOpConsumer;
4849
import static java.lang.String.format;
4950
import static java.lang.String.join;
5051
import static java.util.concurrent.TimeUnit.SECONDS;
5152

5253
import com.rabbitmq.stream.AuthenticationFailureException;
5354
import com.rabbitmq.stream.ByteCapacity;
54-
import com.rabbitmq.stream.ChannelCustomizer;
5555
import com.rabbitmq.stream.ChunkChecksum;
5656
import com.rabbitmq.stream.Codec;
5757
import com.rabbitmq.stream.Codec.EncodedMessage;
@@ -266,8 +266,8 @@ public Client(ClientParameters parameters) {
266266
parameters.byteBufAllocator == null
267267
? ByteBufAllocator.DEFAULT
268268
: parameters.byteBufAllocator);
269-
ChannelCustomizer channelCustomizer =
270-
parameters.channelCustomizer == null ? ch -> {} : parameters.channelCustomizer;
269+
Consumer<Channel> channelCustomizer =
270+
parameters.channelCustomizer == null ? noOpConsumer() : parameters.channelCustomizer;
271271
b.handler(
272272
new ChannelInitializer<SocketChannel>() {
273273
@Override
@@ -307,9 +307,12 @@ public void write(
307307

308308
ch.pipeline().addFirst("ssl", sslHandler);
309309
}
310-
channelCustomizer.customize(ch);
310+
channelCustomizer.accept(ch);
311311
}
312312
});
313+
Consumer<Bootstrap> bootstrapCustomizer =
314+
parameters.bootstrapCustomizer == null ? noOpConsumer() : parameters.bootstrapCustomizer;
315+
bootstrapCustomizer.accept(b);
313316

314317
ChannelFuture f;
315318
String clientConnectionName =
@@ -2144,13 +2147,14 @@ public static class ClientParameters {
21442147
private SaslConfiguration saslConfiguration = DefaultSaslConfiguration.PLAIN;
21452148
private CredentialsProvider credentialsProvider =
21462149
new DefaultUsernamePasswordCredentialsProvider("guest", "guest");
2147-
private ChannelCustomizer channelCustomizer = ch -> {};
21482150
private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON;
21492151
private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON;
21502152
private SslContext sslContext;
21512153
private boolean tlsHostnameVerification = true;
21522154
private ByteBufAllocator byteBufAllocator;
21532155
private Duration rpcTimeout;
2156+
private Consumer<Channel> channelCustomizer = noOpConsumer();
2157+
private Consumer<Bootstrap> bootstrapCustomizer = noOpConsumer();
21542158

21552159
public ClientParameters host(String host) {
21562160
this.host = host;
@@ -2256,11 +2260,6 @@ public ClientParameters requestedMaxFrameSize(int requestedMaxFrameSize) {
22562260
return this;
22572261
}
22582262

2259-
public ClientParameters channelCustomizer(ChannelCustomizer channelCustomizer) {
2260-
this.channelCustomizer = channelCustomizer;
2261-
return this;
2262-
}
2263-
22642263
public ClientParameters chunkChecksum(ChunkChecksum chunkChecksum) {
22652264
this.chunkChecksum = chunkChecksum;
22662265
return this;
@@ -2331,6 +2330,16 @@ Codec codec() {
23312330
return this.codec;
23322331
}
23332332

2333+
public ClientParameters channelCustomizer(Consumer<Channel> channelCustomizer) {
2334+
this.channelCustomizer = channelCustomizer;
2335+
return this;
2336+
}
2337+
2338+
public ClientParameters bootstrapCustomizer(Consumer<Bootstrap> bootstrapCustomizer) {
2339+
this.bootstrapCustomizer = bootstrapCustomizer;
2340+
return this;
2341+
}
2342+
23342343
ClientParameters duplicate() {
23352344
ClientParameters duplicate = new ClientParameters();
23362345
for (Field field : ClientParameters.class.getDeclaredFields()) {

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

+58-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -13,9 +13,10 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.Utils.noOpConsumer;
17+
1618
import com.rabbitmq.stream.AddressResolver;
1719
import com.rabbitmq.stream.BackOffDelayPolicy;
18-
import com.rabbitmq.stream.ChannelCustomizer;
1920
import com.rabbitmq.stream.ChunkChecksum;
2021
import com.rabbitmq.stream.Codec;
2122
import com.rabbitmq.stream.Environment;
@@ -26,7 +27,9 @@
2627
import com.rabbitmq.stream.metrics.MetricsCollector;
2728
import com.rabbitmq.stream.sasl.CredentialsProvider;
2829
import com.rabbitmq.stream.sasl.SaslConfiguration;
30+
import io.netty.bootstrap.Bootstrap;
2931
import io.netty.buffer.ByteBufAllocator;
32+
import io.netty.channel.Channel;
3033
import io.netty.channel.EventLoopGroup;
3134
import io.netty.handler.ssl.SslContext;
3235
import io.netty.handler.ssl.SslContextBuilder;
@@ -37,6 +40,7 @@
3740
import java.util.List;
3841
import java.util.Map;
3942
import java.util.concurrent.ScheduledExecutorService;
43+
import java.util.function.Consumer;
4044
import java.util.function.Function;
4145
import java.util.stream.Collectors;
4246
import javax.net.ssl.SSLException;
@@ -50,6 +54,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
5054
private String id = "rabbitmq-stream";
5155
private final Client.ClientParameters clientParameters = new Client.ClientParameters();
5256
private final DefaultTlsConfiguration tls = new DefaultTlsConfiguration(this);
57+
private final DefaultNettyConfiguration netty = new DefaultNettyConfiguration(this);
5358
private ScheduledExecutorService scheduledExecutorService;
5459
private List<URI> uris = Collections.emptyList();
5560
private BackOffDelayPolicy recoveryBackOffDelayPolicy =
@@ -62,7 +67,6 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
6267
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT;
6368
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
6469
private CompressionCodecFactory compressionCodecFactory;
65-
private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
6670
private boolean lazyInit = false;
6771
private Function<ClientConnectionType, String> connectionNamingStrategy;
6872

@@ -141,10 +145,10 @@ public EnvironmentBuilder id(String id) {
141145
return this;
142146
}
143147

148+
@SuppressWarnings("deprecation")
144149
@Override
145150
public EnvironmentBuilder byteBufAllocator(ByteBufAllocator byteBufAllocator) {
146-
this.byteBufAllocator = byteBufAllocator;
147-
this.clientParameters.byteBufAllocator(byteBufAllocator);
151+
this.netty().byteBufAllocator(byteBufAllocator);
148152
return this;
149153
}
150154

@@ -189,8 +193,10 @@ public StreamEnvironmentBuilder requestedMaxFrameSize(int requestedMaxFrameSize)
189193
return this;
190194
}
191195

192-
public StreamEnvironmentBuilder channelCustomizer(ChannelCustomizer channelCustomizer) {
193-
this.clientParameters.channelCustomizer(channelCustomizer);
196+
@SuppressWarnings("deprecation")
197+
public StreamEnvironmentBuilder channelCustomizer(
198+
com.rabbitmq.stream.ChannelCustomizer channelCustomizer) {
199+
this.netty().channelCustomizer(ch -> channelCustomizer.customize(ch));
194200
return this;
195201
}
196202

@@ -289,6 +295,11 @@ public TlsConfiguration tls() {
289295
return this.tls;
290296
}
291297

298+
@Override
299+
public NettyConfiguration netty() {
300+
return this.netty;
301+
}
302+
292303
@Override
293304
public Environment build() {
294305
if (this.compressionCodecFactory == null) {
@@ -298,6 +309,9 @@ public Environment build() {
298309
}
299310
this.id = this.id == null ? "rabbitmq-stream" : this.id;
300311
this.connectionNamingStrategy = Utils.defaultConnectionNamingStrategy(this.id + "-");
312+
this.clientParameters.byteBufAllocator(this.netty.byteBufAllocator);
313+
this.clientParameters.channelCustomizer(this.netty.channelCustomizer);
314+
this.clientParameters.bootstrapCustomizer(this.netty.bootstrapCustomizer);
301315
return new StreamEnvironment(
302316
scheduledExecutorService,
303317
clientParameters,
@@ -309,7 +323,7 @@ public Environment build() {
309323
maxTrackingConsumersByConnection,
310324
maxConsumersByConnection,
311325
tls,
312-
byteBufAllocator,
326+
netty.byteBufAllocator,
313327
lazyInit,
314328
connectionNamingStrategy);
315329
}
@@ -382,4 +396,40 @@ public SslContext sslContext() {
382396
return sslContext;
383397
}
384398
}
399+
400+
static class DefaultNettyConfiguration implements NettyConfiguration {
401+
402+
private final EnvironmentBuilder environmentBuilder;
403+
404+
private DefaultNettyConfiguration(EnvironmentBuilder environmentBuilder) {
405+
this.environmentBuilder = environmentBuilder;
406+
}
407+
408+
private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
409+
private Consumer<Channel> channelCustomizer = noOpConsumer();
410+
private Consumer<Bootstrap> bootstrapCustomizer = noOpConsumer();
411+
412+
@Override
413+
public NettyConfiguration byteBufAllocator(ByteBufAllocator byteBufAllocator) {
414+
this.byteBufAllocator = byteBufAllocator;
415+
return this;
416+
}
417+
418+
@Override
419+
public NettyConfiguration channelCustomizer(Consumer<Channel> channelCustomizer) {
420+
this.channelCustomizer = channelCustomizer;
421+
return this;
422+
}
423+
424+
@Override
425+
public NettyConfiguration bootstrapCustomizer(Consumer<Bootstrap> bootstrapCustomizer) {
426+
this.bootstrapCustomizer = bootstrapCustomizer;
427+
return this;
428+
}
429+
430+
@Override
431+
public EnvironmentBuilder environmentBuilder() {
432+
return this.environmentBuilder;
433+
}
434+
}
385435
}

src/main/java/com/rabbitmq/stream/impl/Utils.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -50,6 +50,9 @@
5050

5151
final class Utils {
5252

53+
@SuppressWarnings("rawtypes")
54+
private static final Consumer NO_OP_CONSUMER = o -> {};
55+
5356
static final LongConsumer NO_OP_LONG_CONSUMER = someLong -> {};
5457
static final LongSupplier NO_OP_LONG_SUPPLIER = () -> 0;
5558
static final X509TrustManager TRUST_EVERYTHING_TRUST_MANAGER = new TrustEverythingTrustManager();
@@ -75,6 +78,11 @@ final class Utils {
7578

7679
private Utils() {}
7780

81+
@SuppressWarnings("unchecked")
82+
static <T> Consumer<T> noOpConsumer() {
83+
return (Consumer<T>) NO_OP_CONSUMER;
84+
}
85+
7886
static Runnable makeIdempotent(Runnable action) {
7987
AtomicBoolean executed = new AtomicBoolean(false);
8088
return () -> {

0 commit comments

Comments
 (0)