Skip to content

Commit 96edf94

Browse files
authored
Merge pull request #665 from rabbitmq/improve-recovery
Add log messages to recovery test
2 parents fdd59ff + 3a0056a commit 96edf94

11 files changed

+149
-45
lines changed

Diff for: src/docs/asciidoc/api.adoc

+9-1
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ Used as a prefix for connection names.
234234
|Contract to change resolved node address to connect to.
235235
|Pass-through (no-op)
236236

237+
|`locatorConnectionCount`
238+
|Number of locator connections to maintain (for metadata search)
239+
|The smaller of the number of URIs and 3.
240+
237241
|`tls`
238242
|Configuration helper for TLS.
239243
|TLS is enabled if a `rabbitmq-stream+tls` URI is provided.
@@ -293,8 +297,12 @@ include::{test-examples}/EnvironmentUsage.java[tag=address-resolver]
293297
<1> Set the load balancer address
294298
<2> Use load balancer address for initial connection
295299
<3> Ignore metadata hints, always use load balancer
300+
<4> Set the number of locator connections to maintain
296301

297-
The blog post covers the https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams/#client-workaround-with-a-load-balancer[underlying details of this workaround].
302+
Note the example above sets the number of locator connections the environment maintains.
303+
Locator connections are used to perform infrastructure-related operations (e.g. looking up the topology of a stream to find an appropriate node to connect to).
304+
The environment uses the number of passed-in URIs to choose an appropriate default number and will pick 1 in this case, which may be too low for a cluster deployment.
305+
This is why it is recommended to set the value explicitly, 3 being a good default.
298306

299307
==== Managing Streams
300308

Diff for: src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

+27-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import com.rabbitmq.stream.compression.Compression;
1818
import com.rabbitmq.stream.compression.CompressionCodecFactory;
19+
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder;
1920
import com.rabbitmq.stream.metrics.MetricsCollector;
2021
import com.rabbitmq.stream.sasl.CredentialsProvider;
2122
import com.rabbitmq.stream.sasl.SaslConfiguration;
@@ -62,14 +63,15 @@ public interface EnvironmentBuilder {
6263
* An {@link AddressResolver} to potentially change resolved node address to connect to.
6364
*
6465
* <p>Applications can use this abstraction to make sure connection attempts ignore metadata hints
65-
* and always go to a single point like a load balancer.
66+
* and always go to a single point like a load balancer. Consider setting {@link
67+
* #locatorConnectionCount(int)} when using a load balancer.
6668
*
6769
* <p>The default implementation does not perform any logic, it just returns the passed-in
6870
* address.
6971
*
7072
* <p><i>The default implementation is overridden automatically if the following conditions are
7173
* met: the host to connect to is <code>localhost</code>, the user is <code>guest</code>, and no
72-
* address resolver has been provided. The client will then always tries to connect to <code>
74+
* address resolver has been provided. The client will then always try to connect to <code>
7375
* localhost</code> to facilitate local development. Just provide a pass-through address resolver
7476
* to avoid this behavior, e.g.:</i>
7577
*
@@ -79,10 +81,11 @@ public interface EnvironmentBuilder {
7981
* .build();
8082
* </pre>
8183
*
82-
* @param addressResolver
84+
* @param addressResolver the address resolver
8385
* @return this builder instance
8486
* @see <a href="https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/">"Connecting to
8587
* Streams" blog post</a>
88+
* @see #locatorConnectionCount(int)
8689
*/
8790
EnvironmentBuilder addressResolver(AddressResolver addressResolver);
8891

@@ -395,6 +398,27 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
395398
*/
396399
EnvironmentBuilder forceLeaderForProducers(boolean forceLeader);
397400

401+
/**
402+
* Set the expected number of "locator" connections to maintain.
403+
*
404+
* <p>Locator connections are used to perform infrastructure-related operations (e.g. looking up
405+
* the topology of a stream to find an appropriate node to connect to).
406+
*
407+
* <p>It is recommended to maintain 2 to 3 locator connections. The environment uses the smaller
408+
* of the number of passed-in URIs and 3 by default (see {@link #uris(List)}).
409+
*
410+
* <p>The number of locator connections should be explicitly set when a load balancer is used, as
411+
* the environment cannot know the number of cluster nodes in this case (the only URI set is the
412+
* one of the load balancer).
413+
*
414+
* @param locatorConnectionCount number of expected locator connections
415+
* @return this builder instance
416+
* @see #uris(List)
417+
* @see #addressResolver(AddressResolver)
418+
* @since 0.21.0
419+
*/
420+
StreamEnvironmentBuilder locatorConnectionCount(int locatorConnectionCount);
421+
398422
/**
399423
* Create the {@link Environment} instance.
400424
*

Diff for: src/main/java/com/rabbitmq/stream/Message.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -32,18 +32,26 @@ public interface Message {
3232
/**
3333
* Does this message has a publishing ID?
3434
*
35-
* <p>Publishing IDs are used for de-duplication of outbound messages. They are not persisted.
35+
* <p>Publishing IDs are used for deduplication of outbound messages. They are not persisted.
3636
*
3737
* @return true if the message has a publishing ID, false otherwise
38+
* @see ProducerBuilder#name(String)
39+
* @see <a
40+
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">Deduplication
41+
* documentation</a>
3842
*/
3943
boolean hasPublishingId();
4044

4145
/**
4246
* Get the publishing ID for the message.
4347
*
44-
* <p>Publishing IDs are used for de-duplication of outbound messages. They are not persisted.
48+
* <p>Publishing IDs are used for deduplication of outbound messages. They are not persisted.
4549
*
4650
* @return the publishing ID of the message
51+
* @see ProducerBuilder#name(String)
52+
* @see <a
53+
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">Deduplication
54+
* documentation</a>
4755
*/
4856
long getPublishingId();
4957

Diff for: src/main/java/com/rabbitmq/stream/MessageBuilder.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -38,12 +38,16 @@ public interface MessageBuilder {
3838
Message build();
3939

4040
/**
41-
* Set the publishing ID (for de-duplication).
41+
* Set the publishing ID (for deduplication).
4242
*
4343
* <p>This is value is used only for outbound messages and is not persisted.
4444
*
4545
* @param publishingId
4646
* @return this builder instance
47+
* @see ProducerBuilder#name(String)
48+
* @see <a
49+
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">Deduplication
50+
* documentation</a>
4751
*/
4852
MessageBuilder publishingId(long publishingId);
4953

Diff for: src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@
2323
public interface ProducerBuilder {
2424

2525
/**
26-
* The logical name of the producer.
26+
* The producer name for deduplication (<b>read the <a
27+
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">documentation</a>
28+
* before use</b>).
2729
*
28-
* <p>Set a value to enable de-duplication.
30+
* <p>There must be only one producer instance at the same time using a given name.
2931
*
3032
* @param name
3133
* @return this builder instance
34+
* @see MessageBuilder#publishingId(long)
35+
* @see <a
36+
* href="https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#outbound-message-deduplication">Deduplication
37+
* documentation</a>
3238
*/
3339
ProducerBuilder name(String name);
3440

Diff for: src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+32-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.rabbitmq.stream.impl.Utils.*;
1919
import static java.lang.String.format;
2020
import static java.util.concurrent.TimeUnit.SECONDS;
21+
import static java.util.stream.Collectors.toList;
2122

2223
import com.rabbitmq.stream.*;
2324
import com.rabbitmq.stream.MessageHandler.Context;
@@ -53,6 +54,7 @@
5354
import java.util.function.LongSupplier;
5455
import java.util.function.Supplier;
5556
import java.util.stream.Collectors;
57+
import java.util.stream.IntStream;
5658
import javax.net.ssl.SSLException;
5759
import org.slf4j.Logger;
5860
import org.slf4j.LoggerFactory;
@@ -81,7 +83,7 @@ class StreamEnvironment implements Environment {
8183
private final ByteBufAllocator byteBufAllocator;
8284
private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false);
8385
private final Runnable locatorInitializationSequence;
84-
private final List<Locator> locators = new CopyOnWriteArrayList<>();
86+
private final List<Locator> locators;
8587
private final ExecutorServiceFactory executorServiceFactory;
8688
private final ObservationCollector<?> observationCollector;
8789

@@ -105,7 +107,8 @@ class StreamEnvironment implements Environment {
105107
boolean forceReplicaForConsumers,
106108
boolean forceLeaderForProducers,
107109
Duration producerNodeRetryDelay,
108-
Duration consumerNodeRetryDelay) {
110+
Duration consumerNodeRetryDelay,
111+
int expectedLocatorCount) {
109112
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
110113
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
111114
this.byteBufAllocator = byteBufAllocator;
@@ -147,7 +150,7 @@ class StreamEnvironment implements Environment {
147150
new Address(
148151
uriItem.getHost() == null ? "localhost" : uriItem.getHost(),
149152
uriItem.getPort() == -1 ? defaultPort : uriItem.getPort()))
150-
.collect(Collectors.toList());
153+
.collect(toList());
151154
}
152155

153156
AddressResolver addressResolverToUse = addressResolver;
@@ -179,7 +182,24 @@ class StreamEnvironment implements Environment {
179182

180183
this.addressResolver = addressResolverToUse;
181184

182-
this.addresses.forEach(address -> this.locators.add(new Locator(address)));
185+
int locatorCount;
186+
if (expectedLocatorCount > 0) {
187+
locatorCount = expectedLocatorCount;
188+
} else {
189+
locatorCount = Math.min(this.addresses.size(), 3);
190+
}
191+
LOGGER.debug("Using {} locator connection(s)", locatorCount);
192+
193+
List<Locator> lctrs =
194+
IntStream.range(0, locatorCount)
195+
.mapToObj(
196+
i -> {
197+
Address addr = this.addresses.get(i % this.addresses.size());
198+
return new Locator(addr);
199+
})
200+
.collect(toList());
201+
this.locators = List.copyOf(lctrs);
202+
183203
this.executorServiceFactory =
184204
new DefaultExecutorServiceFactory(
185205
this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
@@ -230,8 +250,8 @@ class StreamEnvironment implements Environment {
230250
Runnable locatorInitSequence =
231251
() -> {
232252
RuntimeException lastException = null;
233-
for (int i = 0; i < addresses.size(); i++) {
234-
Address address = addresses.get(i);
253+
for (int i = 0; i < locators.size(); i++) {
254+
Address address = addresses.get(i % addresses.size());
235255
Locator locator = locator(i);
236256
address = addressResolver.resolve(address);
237257
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
@@ -290,10 +310,10 @@ private ShutdownListener shutdownListener(
290310
Client.ShutdownListener shutdownListener =
291311
shutdownContext -> {
292312
if (shutdownContext.isShutdownUnexpected()) {
313+
String label = locator.label();
293314
locator.client(null);
294315
LOGGER.debug(
295-
"Unexpected locator disconnection for locator on '{}', trying to reconnect",
296-
locator.label());
316+
"Unexpected locator disconnection for locator on '{}', trying to reconnect", label);
297317
try {
298318
Client.ClientParameters newLocatorParameters =
299319
this.locatorParametersCopy().shutdownListener(shutdownListenerReference.get());
@@ -742,7 +762,7 @@ static <T> T locatorOperation(
742762
Function<Client, T> operation,
743763
Supplier<Client> clientSupplier,
744764
BackOffDelayPolicy backOffDelayPolicy) {
745-
int maxAttempt = 5;
765+
int maxAttempt = 3;
746766
int attempt = 0;
747767
boolean executed = false;
748768
Exception lastException = null;
@@ -991,7 +1011,9 @@ private String label() {
9911011
if (c == null) {
9921012
return address.host() + ":" + address.port();
9931013
} else {
994-
return c.getHost() + ":" + c.getPort();
1014+
return String.format(
1015+
"%s:%d [advertised %s:%d]",
1016+
c.getHost(), c.getPort(), c.serverAdvertisedHost(), c.serverAdvertisedPort());
9951017
}
9961018
}
9971019

Diff for: src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
7070
private ObservationCollector<?> observationCollector = ObservationCollector.NO_OP;
7171
private Duration producerNodeRetryDelay = Duration.ofMillis(500);
7272
private Duration consumerNodeRetryDelay = Duration.ofMillis(1000);
73+
private int locatorConnectionCount = -1;
7374

7475
public StreamEnvironmentBuilder() {}
7576

@@ -315,6 +316,12 @@ StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay)
315316
return this;
316317
}
317318

319+
@Override
320+
public StreamEnvironmentBuilder locatorConnectionCount(int locatorCount) {
321+
this.locatorConnectionCount = locatorCount;
322+
return this;
323+
}
324+
318325
@Override
319326
public Environment build() {
320327
if (this.compressionCodecFactory == null) {
@@ -349,7 +356,8 @@ public Environment build() {
349356
this.forceReplicaForConsumers,
350357
this.forceLeaderForProducers,
351358
this.producerNodeRetryDelay,
352-
this.consumerNodeRetryDelay);
359+
this.consumerNodeRetryDelay,
360+
this.locatorConnectionCount);
353361
}
354362

355363
static final class DefaultTlsConfiguration implements TlsConfiguration {

Diff for: src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java

+1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ void addressResolver() throws Exception {
9797
.host(entryPoint.host()) // <2>
9898
.port(entryPoint.port()) // <2>
9999
.addressResolver(address -> entryPoint) // <3>
100+
.locatorConnectionCount(3) // <4>
100101
.build();
101102
// end::address-resolver[]
102103
}

0 commit comments

Comments
 (0)