Skip to content

Commit cd88ecc

Browse files
authored
Merge pull request #15 from rabbitmq/environment-lazy-init
Add a lazy initialization option in the environment builder
2 parents 8f04f42 + eec8823 commit cd88ecc

File tree

6 files changed

+183
-29
lines changed

6 files changed

+183
-29
lines changed

src/docs/asciidoc/api.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ can maintain before a new connection is open. The value must be between 1 and 25
186186
a new connection is open. The value must be between 1 and 255.
187187
|255
188188

189+
|`lazyInitialization`
190+
|To delay the connection opening until necessary.
191+
|false
192+
189193
|`tls`
190194
|Configuration helper for TLS.
191195
|TLS is enabled if a `rabbitmq-stream+tls` URI is provided.

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

+13
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,19 @@ public interface EnvironmentBuilder {
286286
EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
287287
BackOffDelayPolicy topologyUpdateBackOffDelayPolicy);
288288

289+
/**
290+
* To delay the connection opening until necessary.
291+
*
292+
* <p>No connection will be open before it is necessary (for stream management or
293+
* producer/consumer creation).
294+
*
295+
* <p>Default is false.
296+
*
297+
* @param lazy
298+
* @return this builder instance
299+
*/
300+
EnvironmentBuilder lazyInitialization(boolean lazy);
301+
289302
/**
290303
* Create the {@link Environment} instance.
291304
*

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+56-23
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.rabbitmq.stream.StreamCreator;
3030
import com.rabbitmq.stream.StreamException;
3131
import com.rabbitmq.stream.compression.CompressionCodecFactory;
32+
import com.rabbitmq.stream.impl.Client.ClientParameters;
3233
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
3334
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
3435
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration;
@@ -83,6 +84,8 @@ class StreamEnvironment implements Environment {
8384
private final Clock clock = new Clock();
8485
private final ScheduledFuture<?> clockRefreshFuture;
8586
private final ByteBufAllocator byteBufAllocator;
87+
private final AtomicBoolean locatorInitialized = new AtomicBoolean(false);
88+
private final Runnable locatorInitializationSequence;
8689
private volatile Client locator;
8790

8891
StreamEnvironment(
@@ -96,7 +99,8 @@ class StreamEnvironment implements Environment {
9699
int maxTrackingConsumersByConnection,
97100
int maxConsumersByConnection,
98101
DefaultTlsConfiguration tlsConfiguration,
99-
ByteBufAllocator byteBufAllocator) {
102+
ByteBufAllocator byteBufAllocator,
103+
boolean lazyInit) {
100104
this(
101105
scheduledExecutorService,
102106
clientParametersPrototype,
@@ -109,6 +113,7 @@ class StreamEnvironment implements Environment {
109113
maxConsumersByConnection,
110114
tlsConfiguration,
111115
byteBufAllocator,
116+
lazyInit,
112117
cp -> new Client(cp));
113118
}
114119

@@ -124,6 +129,7 @@ class StreamEnvironment implements Environment {
124129
int maxConsumersByConnection,
125130
DefaultTlsConfiguration tlsConfiguration,
126131
ByteBufAllocator byteBufAllocator,
132+
boolean lazyInit,
127133
Function<Client.ClientParameters, Client> clientFactory) {
128134
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
129135
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
@@ -235,29 +241,41 @@ class StreamEnvironment implements Environment {
235241
}
236242
};
237243
shutdownListenerReference.set(shutdownListener);
238-
RuntimeException lastException = null;
239-
for (Address address : addresses) {
240-
address = addressResolver.resolve(address);
241-
Client.ClientParameters locatorParameters =
242-
clientParametersPrototype
243-
.duplicate()
244-
.host(address.host())
245-
.port(address.port())
246-
.clientProperty("connection_name", "rabbitmq-stream-locator")
247-
.shutdownListener(shutdownListenerReference.get());
248-
try {
249-
this.locator = clientFactory.apply(locatorParameters);
250-
LOGGER.debug("Locator connected to {}", address);
251-
break;
252-
} catch (RuntimeException e) {
253-
LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
254-
lastException = e;
255-
}
256-
}
257-
if (this.locator == null) {
258-
throw lastException;
244+
ClientParameters clientParametersForInit = clientParametersPrototype.duplicate();
245+
Runnable locatorInitSequence =
246+
() -> {
247+
RuntimeException lastException = null;
248+
for (Address address : addresses) {
249+
address = addressResolver.resolve(address);
250+
Client.ClientParameters locatorParameters =
251+
clientParametersForInit
252+
.duplicate()
253+
.host(address.host())
254+
.port(address.port())
255+
.clientProperty("connection_name", "rabbitmq-stream-locator")
256+
.shutdownListener(shutdownListenerReference.get());
257+
try {
258+
this.locator = clientFactory.apply(locatorParameters);
259+
LOGGER.debug("Locator connected to {}", address);
260+
break;
261+
} catch (RuntimeException e) {
262+
LOGGER.debug("Error while try to connect to {}: {}", address, e.getMessage());
263+
lastException = e;
264+
}
265+
}
266+
if (this.locator == null) {
267+
throw lastException;
268+
}
269+
};
270+
if (lazyInit) {
271+
this.locatorInitializationSequence = locatorInitSequence;
272+
} else {
273+
locatorInitSequence.run();
274+
locatorInitialized.set(true);
275+
this.locatorInitializationSequence = () -> {};
259276
}
260-
this.codec = locator.codec();
277+
this.codec =
278+
clientParametersPrototype.codec == null ? Codecs.DEFAULT : clientParametersPrototype.codec;
261279
this.clockRefreshFuture =
262280
this.scheduledExecutorService.scheduleAtFixedRate(
263281
() -> this.clock.refresh(), 1, 1, SECONDS);
@@ -318,13 +336,26 @@ public ByteBufAllocator byteBufAllocator() {
318336
return byteBufAllocator;
319337
}
320338

339+
private void maybeInitializeLocator() {
340+
if (this.locatorInitialized.compareAndSet(false, true)) {
341+
try {
342+
this.locatorInitializationSequence.run();
343+
} catch (RuntimeException e) {
344+
this.locatorInitialized.set(false);
345+
throw e;
346+
}
347+
}
348+
}
349+
321350
@Override
322351
public StreamCreator streamCreator() {
352+
maybeInitializeLocator();
323353
return new StreamStreamCreator(this);
324354
}
325355

326356
@Override
327357
public void deleteStream(String stream) {
358+
maybeInitializeLocator();
328359
Client.Response response = this.locator().delete(stream);
329360
if (!response.isOk()) {
330361
throw new StreamException(
@@ -339,6 +370,7 @@ public void deleteStream(String stream) {
339370

340371
@Override
341372
public ProducerBuilder producerBuilder() {
373+
maybeInitializeLocator();
342374
return new StreamProducerBuilder(this);
343375
}
344376

@@ -360,6 +392,7 @@ void removeConsumer(StreamConsumer consumer) {
360392

361393
@Override
362394
public ConsumerBuilder consumerBuilder() {
395+
maybeInitializeLocator();
363396
return new StreamConsumerBuilder(this);
364397
}
365398

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
6060
private int maxConsumersByConnection = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
6161
private CompressionCodecFactory compressionCodecFactory;
6262
private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
63+
private boolean lazyInit = false;
6364

6465
public StreamEnvironmentBuilder() {}
6566

@@ -266,6 +267,12 @@ public EnvironmentBuilder maxConsumersByConnection(int maxConsumersByConnection)
266267
return this;
267268
}
268269

270+
@Override
271+
public EnvironmentBuilder lazyInitialization(boolean lazy) {
272+
this.lazyInit = lazy;
273+
return this;
274+
}
275+
269276
@Override
270277
public TlsConfiguration tls() {
271278
this.tls.enable();
@@ -290,7 +297,8 @@ public Environment build() {
290297
maxTrackingConsumersByConnection,
291298
maxConsumersByConnection,
292299
tls,
293-
byteBufAllocator);
300+
byteBufAllocator,
301+
lazyInit);
294302
}
295303

296304
static final class DefaultTlsConfiguration implements TlsConfiguration {

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+72-5
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@
2626
import com.rabbitmq.stream.AuthenticationFailureException;
2727
import com.rabbitmq.stream.BackOffDelayPolicy;
2828
import com.rabbitmq.stream.ChannelCustomizer;
29+
import com.rabbitmq.stream.ConfirmationHandler;
2930
import com.rabbitmq.stream.Constants;
3031
import com.rabbitmq.stream.Consumer;
3132
import com.rabbitmq.stream.Environment;
3233
import com.rabbitmq.stream.EnvironmentBuilder;
3334
import com.rabbitmq.stream.Host;
35+
import com.rabbitmq.stream.Message;
36+
import com.rabbitmq.stream.OffsetSpecification;
3437
import com.rabbitmq.stream.Producer;
3538
import com.rabbitmq.stream.StreamException;
3639
import com.rabbitmq.stream.impl.Client.StreamMetadata;
@@ -60,6 +63,8 @@
6063
import org.junit.jupiter.api.Test;
6164
import org.junit.jupiter.api.TestInfo;
6265
import org.junit.jupiter.api.extension.ExtendWith;
66+
import org.junit.jupiter.params.ParameterizedTest;
67+
import org.junit.jupiter.params.provider.ValueSource;
6368

6469
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
6570
public class StreamEnvironmentTest {
@@ -158,9 +163,10 @@ void environmentCreationShouldSucceedWhenUsingTls() {
158163
.close();
159164
}
160165

161-
@Test
162-
void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed() {
163-
Environment environment = environmentBuilder.build();
166+
@ParameterizedTest
167+
@ValueSource(booleans = {false, true})
168+
void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed(boolean lazyInit) {
169+
Environment environment = environmentBuilder.lazyInitialization(lazyInit).build();
164170
Collection<Producer> producers =
165171
IntStream.range(0, 2)
166172
.mapToObj(i -> environment.producerBuilder().stream(stream).build())
@@ -317,8 +323,7 @@ void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info)
317323

318324
@Test
319325
@TestUtils.DisabledIfRabbitMqCtlNotSet
320-
void environmentPublishersConsumersShouldCloseSuccessfullyWhenBrokerIsDown(TestInfo info)
321-
throws Exception {
326+
void environmentPublishersConsumersShouldCloseSuccessfullyWhenBrokerIsDown() throws Exception {
322327
Environment environment =
323328
environmentBuilder
324329
.recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(10)))
@@ -439,4 +444,66 @@ void streamCreationShouldBeIdempotent(TestInfo info) {
439444
assertThat(client.delete(s).isOk()).isTrue();
440445
}
441446
}
447+
448+
@Test
449+
void instanciationShouldSucceedWhenLazyInitIsEnabledAndHostIsNotKnown() {
450+
String dummyHost = UUID.randomUUID().toString();
451+
Address dummyAddress = new Address(dummyHost, Client.DEFAULT_PORT);
452+
try (Environment env =
453+
environmentBuilder
454+
.host(dummyHost)
455+
.addressResolver(a -> dummyAddress)
456+
.lazyInitialization(true)
457+
.build()) {
458+
459+
assertThatThrownBy(() -> env.streamCreator().stream("should not have been created").create())
460+
.isInstanceOf(StreamException.class);
461+
assertThatThrownBy(() -> env.deleteStream("should not exist"))
462+
.isInstanceOf(StreamException.class);
463+
assertThatThrownBy(() -> env.producerBuilder().stream(stream).build())
464+
.isInstanceOf(StreamException.class);
465+
assertThatThrownBy(
466+
() ->
467+
env.consumerBuilder().stream(stream)
468+
.messageHandler((context, message) -> {})
469+
.build())
470+
.isInstanceOf(StreamException.class);
471+
}
472+
}
473+
474+
@ParameterizedTest
475+
@ValueSource(booleans = {false, true})
476+
void createPublishConsumeDelete(boolean lazyInit, TestInfo info) {
477+
try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) {
478+
String s = streamName(info);
479+
env.streamCreator().stream(s).create();
480+
int messageCount = 50_000;
481+
CountDownLatch confirmLatch = new CountDownLatch(messageCount);
482+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
483+
484+
Producer producer = env.producerBuilder().stream(s).build();
485+
ConfirmationHandler confirmationHandler = confirmationStatus -> confirmLatch.countDown();
486+
IntStream.range(0, messageCount)
487+
.forEach(
488+
i -> {
489+
Message message =
490+
producer.messageBuilder().addData("".getBytes(StandardCharsets.UTF_8)).build();
491+
producer.send(message, confirmationHandler);
492+
});
493+
494+
latchAssert(confirmLatch).completes();
495+
496+
Consumer consumer =
497+
env.consumerBuilder().stream(s)
498+
.offset(OffsetSpecification.first())
499+
.messageHandler((context, message) -> consumeLatch.countDown())
500+
.build();
501+
502+
latchAssert(consumeLatch).completes();
503+
504+
producer.close();
505+
consumer.close();
506+
env.deleteStream(s);
507+
}
508+
}
442509
}

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static org.mockito.Mockito.*;
1818

1919
import com.rabbitmq.stream.BackOffDelayPolicy;
20+
import com.rabbitmq.stream.impl.Client.ClientParameters;
2021
import io.netty.buffer.ByteBufAllocator;
2122
import java.net.URI;
2223
import java.time.Duration;
@@ -29,6 +30,8 @@
2930
import org.junit.jupiter.api.AfterEach;
3031
import org.junit.jupiter.api.BeforeEach;
3132
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.CsvSource;
3235
import org.mockito.Mock;
3336
import org.mockito.MockitoAnnotations;
3437

@@ -81,6 +84,7 @@ Client.ClientParameters duplicate() {
8184
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
8285
null,
8386
ByteBufAllocator.DEFAULT,
87+
false,
8488
cf);
8589
}
8690

@@ -140,7 +144,32 @@ void shouldTryUrisOnInitializationFailure() throws Exception {
140144
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
141145
null,
142146
ByteBufAllocator.DEFAULT,
147+
false,
143148
cf);
144149
verify(cf, times(3)).apply(any(Client.ClientParameters.class));
145150
}
151+
152+
@ParameterizedTest
153+
@CsvSource({"false,1", "true,0"})
154+
void shouldNotOpenConnectionWhenLazyInitIsEnabled(
155+
boolean lazyInit, int expectedConnectionCreation) throws Exception {
156+
reset(cf);
157+
when(cf.apply(any(Client.ClientParameters.class))).thenReturn(client);
158+
environment =
159+
new StreamEnvironment(
160+
scheduledExecutorService,
161+
new ClientParameters(),
162+
Collections.emptyList(),
163+
recoveryBackOffDelayPolicy,
164+
topologyUpdateBackOffDelayPolicy,
165+
host -> host,
166+
ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT,
167+
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
168+
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
169+
null,
170+
ByteBufAllocator.DEFAULT,
171+
lazyInit,
172+
cf);
173+
verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class));
174+
}
146175
}

0 commit comments

Comments
 (0)