Skip to content

Commit 8d9da34

Browse files
committed
Move event loop group setup to Netty configuration
References #269
1 parent 4006d3c commit 8d9da34

15 files changed

+76
-37
lines changed

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

+24-11
Original file line numberDiff line numberDiff line change
@@ -92,17 +92,6 @@ public interface EnvironmentBuilder {
9292
*/
9393
EnvironmentBuilder codec(Codec codec);
9494

95-
/**
96-
* The Netty {@link EventLoopGroup} instance to use.
97-
*
98-
* <p>The environment uses its own instance by default. It is the developer's responsibility to
99-
* close the {@link EventLoopGroup} they provide.
100-
*
101-
* @param eventLoopGroup
102-
* @return this builder instance
103-
*/
104-
EnvironmentBuilder eventLoopGroup(EventLoopGroup eventLoopGroup);
105-
10695
/**
10796
* Informational ID for this environment instance.
10897
*
@@ -198,6 +187,19 @@ public interface EnvironmentBuilder {
198187
*/
199188
EnvironmentBuilder requestedMaxFrameSize(int requestedMaxFrameSize);
200189

190+
/**
191+
* The Netty {@link EventLoopGroup} instance to use.
192+
*
193+
* <p>The environment uses its own instance by default. It is the developer's responsibility to
194+
* close the {@link EventLoopGroup} they provide.
195+
*
196+
* @param eventLoopGroup
197+
* @return this builder instance
198+
* @deprecated use {@link NettyConfiguration#eventLoopGroup(EventLoopGroup)} from {@link #netty()}
199+
* instead
200+
*/
201+
EnvironmentBuilder eventLoopGroup(EventLoopGroup eventLoopGroup);
202+
201203
/**
202204
* Netty's {@link io.netty.buffer.ByteBuf} allocator.
203205
*
@@ -408,6 +410,17 @@ interface TlsConfiguration {
408410
/** Helper to configure Netty */
409411
interface NettyConfiguration {
410412

413+
/**
414+
* The {@link EventLoopGroup} instance to use.
415+
*
416+
* <p>The environment uses its own instance by default. It is the developer's responsibility to
417+
* close the {@link EventLoopGroup} they provide.
418+
*
419+
* @param eventLoopGroup
420+
* @return the Netty configuration helper
421+
*/
422+
NettyConfiguration eventLoopGroup(EventLoopGroup eventLoopGroup);
423+
411424
/**
412425
* Netty's {@link io.netty.buffer.ByteBuf} allocator.
413426
*

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,10 @@ public EnvironmentBuilder compressionCodecFactory(
134134
return this;
135135
}
136136

137+
@SuppressWarnings("deprecation")
138+
@Override
137139
public EnvironmentBuilder eventLoopGroup(EventLoopGroup eventLoopGroup) {
138-
this.clientParameters.eventLoopGroup(eventLoopGroup);
140+
this.netty().eventLoopGroup(eventLoopGroup);
139141
return this;
140142
}
141143

@@ -309,6 +311,7 @@ public Environment build() {
309311
}
310312
this.id = this.id == null ? "rabbitmq-stream" : this.id;
311313
this.connectionNamingStrategy = Utils.defaultConnectionNamingStrategy(this.id + "-");
314+
this.clientParameters.eventLoopGroup(this.netty.eventLoopGroup);
312315
this.clientParameters.byteBufAllocator(this.netty.byteBufAllocator);
313316
this.clientParameters.channelCustomizer(this.netty.channelCustomizer);
314317
this.clientParameters.bootstrapCustomizer(this.netty.bootstrapCustomizer);
@@ -400,14 +403,20 @@ public SslContext sslContext() {
400403
static class DefaultNettyConfiguration implements NettyConfiguration {
401404

402405
private final EnvironmentBuilder environmentBuilder;
406+
private EventLoopGroup eventLoopGroup;
407+
private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
408+
private Consumer<Channel> channelCustomizer = noOpConsumer();
409+
private Consumer<Bootstrap> bootstrapCustomizer = noOpConsumer();
403410

404411
private DefaultNettyConfiguration(EnvironmentBuilder environmentBuilder) {
405412
this.environmentBuilder = environmentBuilder;
406413
}
407414

408-
private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
409-
private Consumer<Channel> channelCustomizer = noOpConsumer();
410-
private Consumer<Bootstrap> bootstrapCustomizer = noOpConsumer();
415+
@Override
416+
public NettyConfiguration eventLoopGroup(EventLoopGroup eventLoopGroup) {
417+
this.eventLoopGroup = eventLoopGroup;
418+
return this;
419+
}
411420

412421
@Override
413422
public NettyConfiguration byteBufAllocator(ByteBufAllocator byteBufAllocator) {

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -673,9 +673,9 @@ public Integer call() throws Exception {
673673
.addressResolver(addrResolver)
674674
.scheduledExecutorService(envExecutor)
675675
.metricsCollector(metricsCollector)
676-
.eventLoopGroup(eventLoopGroup)
677676
.netty()
678677
.byteBufAllocator(byteBufAllocator)
678+
.eventLoopGroup(eventLoopGroup)
679679
.environmentBuilder()
680680
.codec(codec)
681681
.maxProducersByConnection(this.producersByConnection)

src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -44,7 +44,12 @@ void defaultEnvironmentCanBeInstantiatedAndCanConnect() {
4444
try (Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup))) {
4545
Client.Response response = client.create(stream);
4646
assertThat(response.isOk()).isTrue();
47-
try (Environment environment = Environment.builder().eventLoopGroup(eventLoopGroup).build()) {
47+
try (Environment environment =
48+
Environment.builder()
49+
.netty()
50+
.eventLoopGroup(eventLoopGroup)
51+
.environmentBuilder()
52+
.build()) {
4853
environment.producerBuilder().stream(stream);
4954
} finally {
5055
response = client.delete(stream);

src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -68,7 +68,7 @@ static void afterAll() throws Exception {
6868
void init() {
6969
environmentBuilder = Environment.builder();
7070
environmentBuilder.addressResolver(add -> localhost());
71-
env = environmentBuilder.eventLoopGroup(eventLoopGroup).build();
71+
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
7272
}
7373

7474
@AfterEach

src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -69,7 +69,7 @@ static void afterAll() throws Exception {
6969
void init() {
7070
environmentBuilder = Environment.builder();
7171
environmentBuilder.addressResolver(add -> localhost());
72-
env = environmentBuilder.eventLoopGroup(eventLoopGroup).build();
72+
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
7373
}
7474

7575
@AfterEach

src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -48,7 +48,11 @@ public class SacStreamConsumerTest {
4848
@BeforeEach
4949
void init() {
5050
EnvironmentBuilder environmentBuilder =
51-
Environment.builder().eventLoopGroup(eventLoopGroup).maxConsumersByConnection(1);
51+
Environment.builder()
52+
.netty()
53+
.eventLoopGroup(eventLoopGroup)
54+
.environmentBuilder()
55+
.maxConsumersByConnection(1);
5256
environmentBuilder.addressResolver(add -> localhost());
5357
environment = environmentBuilder.build();
5458
}

src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -64,7 +64,8 @@ public class SacSuperStreamConsumerTest {
6464

6565
@BeforeEach
6666
void init(TestInfo info) throws Exception {
67-
EnvironmentBuilder environmentBuilder = Environment.builder().eventLoopGroup(eventLoopGroup);
67+
EnvironmentBuilder environmentBuilder =
68+
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
6869
environmentBuilder.addressResolver(add -> localhost());
6970
environment = environmentBuilder.build();
7071
superStream = TestUtils.streamName(info);

src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -110,7 +110,7 @@ static long offset(String line) {
110110
void init() throws Exception {
111111
environmentBuilder = Environment.builder();
112112
environmentBuilder.addressResolver(add -> localhost());
113-
env = environmentBuilder.eventLoopGroup(eventLoopGroup).build();
113+
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
114114
socket = new Socket("localhost", 61613);
115115
out = socket.getOutputStream();
116116
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -113,7 +113,9 @@ void init() {
113113
}
114114
EnvironmentBuilder environmentBuilder =
115115
Environment.builder()
116+
.netty()
116117
.eventLoopGroup(eventLoopGroup)
118+
.environmentBuilder()
117119
.recoveryBackOffDelayPolicy(
118120
BackOffDelayPolicy.fixedWithInitialDelay(recoveryInitialDelay, RECOVERY_DELAY))
119121
.topologyUpdateBackOffDelayPolicy(

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ void init() {
108108
environmentBuilder = Environment.builder();
109109
environmentBuilder.addressResolver(
110110
add -> add.port() == Client.DEFAULT_PORT ? localhost() : localhostTls());
111-
environmentBuilder.eventLoopGroup(eventLoopGroup);
111+
environmentBuilder.netty().eventLoopGroup(eventLoopGroup);
112112
}
113113

114114
@Test

src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -79,7 +79,9 @@ public class StreamProducerTest {
7979
void init() {
8080
EnvironmentBuilder environmentBuilder =
8181
Environment.builder()
82+
.netty()
8283
.eventLoopGroup(eventLoopGroup)
84+
.environmentBuilder()
8385
.recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)))
8486
.topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)));
8587
environmentBuilder.addressResolver(add -> localhost());

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -60,7 +60,8 @@ public class SuperStreamConsumerTest {
6060

6161
@BeforeEach
6262
void init(TestInfo info) throws Exception {
63-
EnvironmentBuilder environmentBuilder = Environment.builder().eventLoopGroup(eventLoopGroup);
63+
EnvironmentBuilder environmentBuilder =
64+
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
6465
environmentBuilder.addressResolver(add -> localhost());
6566
environment = environmentBuilder.build();
6667
superStream = TestUtils.streamName(info);

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -59,7 +59,8 @@ public class SuperStreamProducerTest {
5959

6060
@BeforeEach
6161
void init(TestInfo info) throws Exception {
62-
EnvironmentBuilder environmentBuilder = Environment.builder().eventLoopGroup(eventLoopGroup);
62+
EnvironmentBuilder environmentBuilder =
63+
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
6364
environmentBuilder.addressResolver(add -> localhost());
6465
environment = environmentBuilder.build();
6566
connection = new ConnectionFactory().newConnection();

src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -61,7 +61,8 @@ public class SuperStreamTest {
6161

6262
@BeforeEach
6363
void init(TestInfo info) throws Exception {
64-
EnvironmentBuilder environmentBuilder = Environment.builder().eventLoopGroup(eventLoopGroup);
64+
EnvironmentBuilder environmentBuilder =
65+
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
6566
environmentBuilder.addressResolver(add -> localhost());
6667
environment = environmentBuilder.build();
6768
connection = new ConnectionFactory().newConnection();

0 commit comments

Comments
 (0)