Skip to content

Commit 5ad0201

Browse files
Merge pull request #420 from rabbitmq/force-localhost-for-local-development
Force localhost for local development
2 parents e47a79e + 42fb8c7 commit 5ad0201

19 files changed

+81
-53
lines changed

src/docs/asciidoc/api.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ Creating publishers and consumers can cause problems as the client uses hints fr
7777
These connection hints can be accurate or less appropriate depending on the infrastructure.
7878
If you hit some connection problems at some point – like hostnames impossible to resolve for client applications - this https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/[blog post] should help you understand what is going on and fix the issues.
7979

80+
To make the local development experience simple, the client library can choose to always use `localhost` for producers and consumers.
81+
This happens if the following conditions are met: the initial host to connect to is `localhost`, the user is `guest`, and no custom address resolver has been provided.
82+
Provide a pass-through `AddressResolver` to `EnvironmentBuilder#addressResolver(AddressResolver)` to avoid this behavior.
83+
It is unlikely this behavior applies for any real-world deployment, where `localhost` and/or the default `guest` user should not be used.
84+
8085
===== Enabling TLS
8186

8287
TLS can be enabled by using the `rabbitmq-stream+tls` scheme in the URI.

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,26 @@ public interface EnvironmentBuilder {
6363
* <p>Applications can use this abstraction to make sure connection attempts ignore metadata hints
6464
* and always go to a single point like a load balancer.
6565
*
66+
* <p>The default implementation does not perform any logic, it just returns the passed-in
67+
* address.
68+
*
69+
* <p><i>The default implementation is overridden automatically if the following conditions are
70+
* met: the host to connect to is <code>localhost</code>, the user is <code>guest</code>, and no
71+
* address resolver has been provided. The client will then always tries to connect to <code>
72+
* localhost</code> to facilitate the so-called "beginner experience", that is a developer working
73+
* against a local RabbitMQ instance. Just provide a pass-through address resolver to avoid this
74+
* behavior, e.g.:</i>
75+
*
76+
* <pre>
77+
* Environment.builder()
78+
* .addressResolver(address -> address)
79+
* .build();
80+
* </pre>
81+
*
6682
* @param addressResolver
6783
* @return this builder instance
84+
* @see <a href="https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/">"Connecting to
85+
* Streams" blog post</a>
6886
*/
6987
EnvironmentBuilder addressResolver(AddressResolver addressResolver);
7088

@@ -172,7 +190,7 @@ public interface EnvironmentBuilder {
172190
*
173191
* @param requestedHeartbeat
174192
* @return this builder instance
175-
* @see <a href="https://rabbitmq.com/stream.html#protocol">See stream plugin documentation</a>
193+
* @see <a href="https://rabbitmq.com/stream.html#protocol">Stream plugin documentation</a>
176194
*/
177195
EnvironmentBuilder requestedHeartbeat(Duration requestedHeartbeat);
178196

@@ -183,7 +201,7 @@ public interface EnvironmentBuilder {
183201
*
184202
* @param requestedMaxFrameSize
185203
* @return this builder instance
186-
* @see <a href="https://rabbitmq.com/stream.html#protocol">See stream plugin documentation</a>
204+
* @see <a href="https://rabbitmq.com/stream.html#protocol">Stream plugin documentation</a>
187205
*/
188206
EnvironmentBuilder requestedMaxFrameSize(int requestedMaxFrameSize);
189207

src/main/java/com/rabbitmq/stream/impl/Client.java

+20-9
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.Constants.*;
17-
import static com.rabbitmq.stream.impl.Utils.encodeRequestCode;
18-
import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
19-
import static com.rabbitmq.stream.impl.Utils.extractResponseCode;
20-
import static com.rabbitmq.stream.impl.Utils.formatConstant;
21-
import static com.rabbitmq.stream.impl.Utils.noOpConsumer;
17+
import static com.rabbitmq.stream.impl.Utils.*;
2218
import static java.lang.String.format;
2319
import static java.lang.String.join;
2420
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -495,10 +491,21 @@ private void authenticate() {
495491
} else if (saslAuthenticateResponse.isChallenge()) {
496492
challenge = saslAuthenticateResponse.challenge;
497493
} else if (saslAuthenticateResponse.isAuthenticationFailure()) {
498-
throw new AuthenticationFailureException(
494+
String message =
499495
"Unexpected response code during authentication: "
500-
+ formatConstant(saslAuthenticateResponse.getResponseCode()),
501-
saslAuthenticateResponse.getResponseCode());
496+
+ formatConstant(saslAuthenticateResponse.getResponseCode());
497+
if (saslAuthenticateResponse.getResponseCode()
498+
== RESPONSE_CODE_AUTHENTICATION_FAILURE_LOOPBACK) {
499+
message +=
500+
". The user is not authorized to connect from a remote host. "
501+
+ "If the broker is running locally, make sure the '"
502+
+ this.host
503+
+ "' hostname is resolved to "
504+
+ "the loopback interface (localhost, 127.0.0.1, ::1). "
505+
+ "See https://www.rabbitmq.com/access-control.html#loopback-users.";
506+
}
507+
throw new AuthenticationFailureException(
508+
message, saslAuthenticateResponse.getResponseCode());
502509
} else {
503510
throw new StreamException(
504511
"Unexpected response code during authentication: "
@@ -2234,7 +2241,7 @@ public static class ClientParameters {
22342241
private ShutdownListener shutdownListener = shutdownContext -> {};
22352242
private SaslConfiguration saslConfiguration = DefaultSaslConfiguration.PLAIN;
22362243
private CredentialsProvider credentialsProvider =
2237-
new DefaultUsernamePasswordCredentialsProvider("guest", "guest");
2244+
new DefaultUsernamePasswordCredentialsProvider(DEFAULT_USERNAME, "guest");
22382245
private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON;
22392246
private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON;
22402247
private SslContext sslContext;
@@ -2438,6 +2445,10 @@ Codec codec() {
24382445
return this.codec;
24392446
}
24402447

2448+
CredentialsProvider credentialsProvider() {
2449+
return this.credentialsProvider;
2450+
}
2451+
24412452
public ClientParameters channelCustomizer(Consumer<Channel> channelCustomizer) {
24422453
this.channelCustomizer = channelCustomizer;
24432454
return this;

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+23-5
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry;
17-
import static com.rabbitmq.stream.impl.Utils.convertCodeToException;
18-
import static com.rabbitmq.stream.impl.Utils.exceptionMessage;
19-
import static com.rabbitmq.stream.impl.Utils.formatConstant;
20-
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
17+
import static com.rabbitmq.stream.impl.Utils.*;
2118
import static java.lang.String.format;
2219
import static java.util.concurrent.TimeUnit.SECONDS;
2320

@@ -31,6 +28,8 @@
3128
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
3229
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration;
3330
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
31+
import com.rabbitmq.stream.sasl.CredentialsProvider;
32+
import com.rabbitmq.stream.sasl.UsernamePasswordCredentialsProvider;
3433
import io.netty.buffer.ByteBufAllocator;
3534
import io.netty.channel.EventLoopGroup;
3635
import io.netty.channel.nio.NioEventLoopGroup;
@@ -109,7 +108,6 @@ class StreamEnvironment implements Environment {
109108
clientParametersPrototype = clientParametersPrototype.byteBufAllocator(byteBufAllocator);
110109
clientParametersPrototype = maybeSetUpClientParametersFromUris(uris, clientParametersPrototype);
111110

112-
this.addressResolver = addressResolver;
113111
this.observationCollector = observationCollector;
114112

115113
boolean tls;
@@ -148,6 +146,26 @@ class StreamEnvironment implements Environment {
148146
.collect(Collectors.toList());
149147
}
150148

149+
AddressResolver addressResolverToUse = addressResolver;
150+
if (this.addresses.size() == 1
151+
&& "localhost".equals(this.addresses.get(0).host())
152+
&& addressResolver == DEFAULT_ADDRESS_RESOLVER) {
153+
CredentialsProvider credentialsProvider = clientParametersPrototype.credentialsProvider();
154+
if (credentialsProvider instanceof UsernamePasswordCredentialsProvider) {
155+
String username = ((UsernamePasswordCredentialsProvider) credentialsProvider).getUsername();
156+
if (DEFAULT_USERNAME.equals(username)) {
157+
Address address = new Address("localhost", clientParametersPrototype.port());
158+
addressResolverToUse = ignored -> address;
159+
LOGGER.info(
160+
"Connecting to localhost with {} user, assuming development environment",
161+
DEFAULT_USERNAME);
162+
LOGGER.info("Using address resolver to always connect to localhost");
163+
}
164+
}
165+
}
166+
167+
this.addressResolver = addressResolverToUse;
168+
151169
this.addresses.forEach(address -> this.locators.add(new Locator(address)));
152170
this.executorServiceFactory =
153171
new DefaultExecutorServiceFactory(

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.Utils.DEFAULT_ADDRESS_RESOLVER;
1617
import static com.rabbitmq.stream.impl.Utils.noOpConsumer;
1718

1819
import com.rabbitmq.stream.*;
@@ -55,7 +56,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
5556
BackOffDelayPolicy.fixed(Duration.ofSeconds(5));
5657
private BackOffDelayPolicy topologyBackOffDelayPolicy =
5758
BackOffDelayPolicy.fixedWithInitialDelay(Duration.ofSeconds(5), Duration.ofSeconds(1));
58-
private AddressResolver addressResolver = address -> address;
59+
private AddressResolver addressResolver = DEFAULT_ADDRESS_RESOLVER;
5960
private int maxProducersByConnection = ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT;
6061
private int maxTrackingConsumersByConnection =
6162
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT;

src/main/java/com/rabbitmq/stream/impl/Utils.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,7 @@
1515

1616
import static java.lang.String.format;
1717

18-
import com.rabbitmq.stream.Address;
19-
import com.rabbitmq.stream.BackOffDelayPolicy;
20-
import com.rabbitmq.stream.Constants;
21-
import com.rabbitmq.stream.ConsumerUpdateListener;
22-
import com.rabbitmq.stream.OffsetSpecification;
23-
import com.rabbitmq.stream.StreamDoesNotExistException;
24-
import com.rabbitmq.stream.StreamException;
25-
import com.rabbitmq.stream.StreamNotAvailableException;
18+
import com.rabbitmq.stream.*;
2619
import com.rabbitmq.stream.impl.Client.ClientParameters;
2720
import java.net.UnknownHostException;
2821
import java.security.cert.X509Certificate;
@@ -87,6 +80,9 @@ final class Utils {
8780
CONSTANT_LABELS = Collections.unmodifiableMap(labels);
8881
}
8982

83+
static final AddressResolver DEFAULT_ADDRESS_RESOLVER = address -> address;
84+
static final String DEFAULT_USERNAME = "guest";
85+
9086
private Utils() {}
9187

9288
@SuppressWarnings("unchecked")

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@ public Integer call() throws Exception {
693693
closeStep("Closing environment executor", () -> envExecutor.shutdownNow()));
694694

695695
boolean tls = isTls(this.uris);
696-
AddressResolver addrResolver;
696+
AddressResolver addrResolver = null;
697697
if (loadBalancer) {
698698
int defaultPort = tls ? Client.DEFAULT_TLS_PORT : Client.DEFAULT_PORT;
699699
List<Address> addresses =
@@ -717,9 +717,7 @@ public Integer call() throws Exception {
717717
addrResolver =
718718
address -> addresses.get(connectionAttemptCount.getAndIncrement() % addresses.size());
719719
} else {
720-
if (this.addressResolver == null) {
721-
addrResolver = address -> address;
722-
} else {
720+
if (this.addressResolver != null) {
723721
addrResolver = this.addressResolver; // should happen only in tests
724722
}
725723
}
@@ -737,7 +735,6 @@ public Integer call() throws Exception {
737735
Environment.builder()
738736
.id("stream-perf-test")
739737
.uris(this.uris)
740-
.addressResolver(addrResolver)
741738
.scheduledExecutorService(envExecutor)
742739
.metricsCollector(metricsCollector)
743740
.netty()
@@ -752,6 +749,10 @@ public Integer call() throws Exception {
752749
.rpcTimeout(Duration.ofSeconds(this.rpcTimeout))
753750
.requestedMaxFrameSize((int) this.requestedMaxFrameSize.toBytes());
754751

752+
if (addrResolver != null) {
753+
environmentBuilder = environmentBuilder.addressResolver(addrResolver);
754+
}
755+
755756
java.util.function.Consumer<io.netty.channel.Channel> channelCustomizer = channel -> {};
756757

757758
if (tls) {

src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import static com.rabbitmq.stream.Host.memoryAlarm;
1818
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
1919
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
20-
import static com.rabbitmq.stream.impl.TestUtils.localhost;
2120
import static java.util.concurrent.TimeUnit.SECONDS;
2221
import static java.util.stream.IntStream.range;
2322
import static org.assertj.core.api.Assertions.assertThat;
@@ -67,7 +66,6 @@ static void afterAll() throws Exception {
6766
@BeforeEach
6867
void init() {
6968
environmentBuilder = Environment.builder();
70-
environmentBuilder.addressResolver(add -> localhost());
7169
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
7270
}
7371

src/test/java/com/rabbitmq/stream/impl/FilteringTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ public class FilteringTest {
6161
void init() throws Exception {
6262
EnvironmentBuilder environmentBuilder =
6363
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
64-
environmentBuilder.addressResolver(add -> localhost());
6564
environment = environmentBuilder.build();
6665
}
6766

src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ static void afterAll() throws Exception {
6767
@BeforeEach
6868
void init() {
6969
environmentBuilder = Environment.builder();
70-
environmentBuilder.addressResolver(add -> localhost());
7170
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
7271
}
7372

src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16-
import static com.rabbitmq.stream.impl.TestUtils.localhost;
1716
import static com.rabbitmq.stream.impl.TestUtils.publishAndWaitForConfirms;
1817
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
1918
import static org.assertj.core.api.Assertions.assertThat;
@@ -53,7 +52,6 @@ void init() {
5352
.eventLoopGroup(eventLoopGroup)
5453
.environmentBuilder()
5554
.maxConsumersByConnection(1);
56-
environmentBuilder.addressResolver(add -> localhost());
5755
environment = environmentBuilder.build();
5856
}
5957

src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
1717
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
18-
import static com.rabbitmq.stream.impl.TestUtils.localhost;
1918
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2019
import static java.util.stream.Collectors.toList;
2120
import static org.assertj.core.api.Assertions.assertThat;
@@ -66,7 +65,6 @@ public class SacSuperStreamConsumerTest {
6665
void init(TestInfo info) throws Exception {
6766
EnvironmentBuilder environmentBuilder =
6867
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
69-
environmentBuilder.addressResolver(add -> localhost());
7068
environment = environmentBuilder.build();
7169
superStream = TestUtils.streamName(info);
7270
connection = new ConnectionFactory().newConnection();

src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ static long offset(String line) {
107107
@BeforeEach
108108
void init() throws Exception {
109109
environmentBuilder = Environment.builder();
110-
environmentBuilder.addressResolver(add -> localhost());
111110
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
112111
socket = new Socket("localhost", 61613);
113112
out = socket.getOutputStream();

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ void init() {
9696
recoveryInitialDelay = RECOVERY_DELAY;
9797
}
9898
EnvironmentBuilder environmentBuilder = environmentBuilder();
99-
environmentBuilder.addressResolver(add -> localhost());
10099
environment = environmentBuilder.build();
101100
}
102101

src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public class StreamProducerTest {
7070
@BeforeEach
7171
void init() {
7272
EnvironmentBuilder environmentBuilder = environmentBuilder();
73-
environmentBuilder.addressResolver(add -> localhost());
7473
environment = environmentBuilder.build();
7574
}
7675

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
1919
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
2020
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
21-
import static com.rabbitmq.stream.impl.TestUtils.localhost;
2221
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2322
import static com.rabbitmq.stream.impl.TestUtils.wrap;
2423
import static org.assertj.core.api.Assertions.assertThat;
@@ -63,7 +62,6 @@ public class SuperStreamConsumerTest {
6362
void init(TestInfo info) throws Exception {
6463
EnvironmentBuilder environmentBuilder =
6564
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
66-
environmentBuilder.addressResolver(add -> localhost());
6765
environment = environmentBuilder.build();
6866
superStream = TestUtils.streamName(info);
6967
connection = new ConnectionFactory().newConnection();

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
1717
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
1818
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
19-
import static com.rabbitmq.stream.impl.TestUtils.localhost;
2019
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2120
import static org.assertj.core.api.Assertions.assertThat;
2221
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -61,7 +60,6 @@ public class SuperStreamProducerTest {
6160
void init(TestInfo info) throws Exception {
6261
EnvironmentBuilder environmentBuilder =
6362
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
64-
environmentBuilder.addressResolver(add -> localhost());
6563
environment = environmentBuilder.build();
6664
connection = new ConnectionFactory().newConnection();
6765
superStream = TestUtils.streamName(info);

src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
1717
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
1818
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
19-
import static com.rabbitmq.stream.impl.TestUtils.localhost;
2019
import static org.assertj.core.api.Assertions.assertThat;
2120

2221
import com.rabbitmq.client.Connection;
@@ -63,7 +62,6 @@ public class SuperStreamTest {
6362
void init(TestInfo info) throws Exception {
6463
EnvironmentBuilder environmentBuilder =
6564
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
66-
environmentBuilder.addressResolver(add -> localhost());
6765
environment = environmentBuilder.build();
6866
connection = new ConnectionFactory().newConnection();
6967
superStream = TestUtils.streamName(info);

0 commit comments

Comments
 (0)