Skip to content

Commit 5f481ad

Browse files
committed
Delay connection to server until necessary
Not just when builders are created, but just before to-be-created objects are actually instantiated. References #14
1 parent 7828f8d commit 5f481ad

File tree

5 files changed

+15
-15
lines changed

5 files changed

+15
-15
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public Consumer build() {
8787
throw new IllegalArgumentException("A name must be set if a tracking strategy is specified");
8888
}
8989

90+
this.environment.maybeInitializeLocator();
9091
TrackingConfiguration trackingConfiguration;
9192
if (this.autoTrackingStrategy != null) {
9293
trackingConfiguration =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ public ByteBufAllocator byteBufAllocator() {
336336
return byteBufAllocator;
337337
}
338338

339-
private void maybeInitializeLocator() {
339+
void maybeInitializeLocator() {
340340
if (this.locatorInitialized.compareAndSet(false, true)) {
341341
try {
342342
this.locatorInitializationSequence.run();
@@ -349,13 +349,12 @@ private void maybeInitializeLocator() {
349349

350350
@Override
351351
public StreamCreator streamCreator() {
352-
maybeInitializeLocator();
353352
return new StreamStreamCreator(this);
354353
}
355354

356355
@Override
357356
public void deleteStream(String stream) {
358-
maybeInitializeLocator();
357+
this.maybeInitializeLocator();
359358
Client.Response response = this.locator().delete(stream);
360359
if (!response.isOk()) {
361360
throw new StreamException(
@@ -370,7 +369,6 @@ public void deleteStream(String stream) {
370369

371370
@Override
372371
public ProducerBuilder producerBuilder() {
373-
maybeInitializeLocator();
374372
return new StreamProducerBuilder(this);
375373
}
376374

@@ -392,7 +390,6 @@ void removeConsumer(StreamConsumer consumer) {
392390

393391
@Override
394392
public ConsumerBuilder consumerBuilder() {
395-
maybeInitializeLocator();
396393
return new StreamConsumerBuilder(this);
397394
}
398395

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+1
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public Producer build() {
155155
if (subEntrySize > 1 && compression == null) {
156156
compression = Compression.NONE;
157157
}
158+
this.environment.maybeInitializeLocator();
158159
Producer producer;
159160
if (this.routingKeyExtractor == null) {
160161
producer =

src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public void create() {
6767
if (stream == null) {
6868
throw new IllegalArgumentException("Stream cannot be null");
6969
}
70+
this.environment.maybeInitializeLocator();
7071
Client.Response response =
7172
environment.locator().create(stream, streamParametersBuilder.build());
7273
if (!response.isOk()

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,15 @@
2929
import com.rabbitmq.stream.ConfirmationHandler;
3030
import com.rabbitmq.stream.Constants;
3131
import com.rabbitmq.stream.Consumer;
32+
import com.rabbitmq.stream.ConsumerBuilder;
3233
import com.rabbitmq.stream.Environment;
3334
import com.rabbitmq.stream.EnvironmentBuilder;
3435
import com.rabbitmq.stream.Host;
3536
import com.rabbitmq.stream.Message;
3637
import com.rabbitmq.stream.OffsetSpecification;
3738
import com.rabbitmq.stream.Producer;
39+
import com.rabbitmq.stream.ProducerBuilder;
40+
import com.rabbitmq.stream.StreamCreator;
3841
import com.rabbitmq.stream.StreamException;
3942
import com.rabbitmq.stream.impl.Client.StreamMetadata;
4043
import com.rabbitmq.stream.impl.MonitoringTestUtils.EnvironmentInfo;
@@ -456,18 +459,15 @@ void instanciationShouldSucceedWhenLazyInitIsEnabledAndHostIsNotKnown() {
456459
.lazyInitialization(true)
457460
.build()) {
458461

459-
assertThatThrownBy(() -> env.streamCreator().stream("should not have been created").create())
460-
.isInstanceOf(StreamException.class);
462+
StreamCreator streamCreator = env.streamCreator().stream("should not have been created");
463+
assertThatThrownBy(() -> streamCreator.create()).isInstanceOf(StreamException.class);
461464
assertThatThrownBy(() -> env.deleteStream("should not exist"))
462465
.isInstanceOf(StreamException.class);
463-
assertThatThrownBy(() -> env.producerBuilder().stream(stream).build())
464-
.isInstanceOf(StreamException.class);
465-
assertThatThrownBy(
466-
() ->
467-
env.consumerBuilder().stream(stream)
468-
.messageHandler((context, message) -> {})
469-
.build())
470-
.isInstanceOf(StreamException.class);
466+
ProducerBuilder producerBuilder = env.producerBuilder().stream(this.stream);
467+
assertThatThrownBy(() -> producerBuilder.build()).isInstanceOf(StreamException.class);
468+
ConsumerBuilder consumerBuilder =
469+
env.consumerBuilder().stream(this.stream).messageHandler((context, message) -> {});
470+
assertThatThrownBy(() -> consumerBuilder.build()).isInstanceOf(StreamException.class);
471471
}
472472
}
473473

0 commit comments

Comments
 (0)