Skip to content

Force localhost for local development #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ Creating publishers and consumers can cause problems as the client uses hints fr
These connection hints can be accurate or less appropriate depending on the infrastructure.
If you hit some connection problems at some point – like hostnames impossible to resolve for client applications - this https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/[blog post] should help you understand what is going on and fix the issues.

To make the local development experience simple, the client library can choose to always use `localhost` for producers and consumers.
This happens if the following conditions are met: the initial host to connect to is `localhost`, the user is `guest`, and no custom address resolver has been provided.
Provide a pass-through `AddressResolver` to `EnvironmentBuilder#addressResolver(AddressResolver)` to avoid this behavior.
It is unlikely this behavior applies for any real-world deployment, where `localhost` and/or the default `guest` user should not be used.

===== Enabling TLS

TLS can be enabled by using the `rabbitmq-stream+tls` scheme in the URI.
Expand Down
22 changes: 20 additions & 2 deletions src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,26 @@ public interface EnvironmentBuilder {
* <p>Applications can use this abstraction to make sure connection attempts ignore metadata hints
* and always go to a single point like a load balancer.
*
* <p>The default implementation does not perform any logic, it just returns the passed-in
* address.
*
* <p><i>The default implementation is overridden automatically if the following conditions are
* met: the host to connect to is <code>localhost</code>, the user is <code>guest</code>, and no
* address resolver has been provided. The client will then always tries to connect to <code>
* localhost</code> to facilitate the so-called "beginner experience", that is a developer working
* against a local RabbitMQ instance. Just provide a pass-through address resolver to avoid this
* behavior, e.g.:</i>
*
* <pre>
* Environment.builder()
* .addressResolver(address -> address)
* .build();
* </pre>
*
* @param addressResolver
* @return this builder instance
* @see <a href="https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/">"Connecting to
* Streams" blog post</a>
*/
EnvironmentBuilder addressResolver(AddressResolver addressResolver);

Expand Down Expand Up @@ -172,7 +190,7 @@ public interface EnvironmentBuilder {
*
* @param requestedHeartbeat
* @return this builder instance
* @see <a href="https://rabbitmq.com/stream.html#protocol">See stream plugin documentation</a>
* @see <a href="https://rabbitmq.com/stream.html#protocol">Stream plugin documentation</a>
*/
EnvironmentBuilder requestedHeartbeat(Duration requestedHeartbeat);

Expand All @@ -183,7 +201,7 @@ public interface EnvironmentBuilder {
*
* @param requestedMaxFrameSize
* @return this builder instance
* @see <a href="https://rabbitmq.com/stream.html#protocol">See stream plugin documentation</a>
* @see <a href="https://rabbitmq.com/stream.html#protocol">Stream plugin documentation</a>
*/
EnvironmentBuilder requestedMaxFrameSize(int requestedMaxFrameSize);

Expand Down
29 changes: 20 additions & 9 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.Constants.*;
import static com.rabbitmq.stream.impl.Utils.encodeRequestCode;
import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
import static com.rabbitmq.stream.impl.Utils.extractResponseCode;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
import static com.rabbitmq.stream.impl.Utils.noOpConsumer;
import static com.rabbitmq.stream.impl.Utils.*;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -495,10 +491,21 @@ private void authenticate() {
} else if (saslAuthenticateResponse.isChallenge()) {
challenge = saslAuthenticateResponse.challenge;
} else if (saslAuthenticateResponse.isAuthenticationFailure()) {
throw new AuthenticationFailureException(
String message =
"Unexpected response code during authentication: "
+ formatConstant(saslAuthenticateResponse.getResponseCode()),
saslAuthenticateResponse.getResponseCode());
+ formatConstant(saslAuthenticateResponse.getResponseCode());
if (saslAuthenticateResponse.getResponseCode()
== RESPONSE_CODE_AUTHENTICATION_FAILURE_LOOPBACK) {
message +=
". The user is not authorized to connect from a remote host. "
+ "If the broker is running locally, make sure the '"
+ this.host
+ "' hostname is resolved to "
+ "the loopback interface (localhost, 127.0.0.1, ::1). "
+ "See https://www.rabbitmq.com/access-control.html#loopback-users.";
}
throw new AuthenticationFailureException(
message, saslAuthenticateResponse.getResponseCode());
} else {
throw new StreamException(
"Unexpected response code during authentication: "
Expand Down Expand Up @@ -2234,7 +2241,7 @@ public static class ClientParameters {
private ShutdownListener shutdownListener = shutdownContext -> {};
private SaslConfiguration saslConfiguration = DefaultSaslConfiguration.PLAIN;
private CredentialsProvider credentialsProvider =
new DefaultUsernamePasswordCredentialsProvider("guest", "guest");
new DefaultUsernamePasswordCredentialsProvider(DEFAULT_USERNAME, "guest");
private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON;
private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON;
private SslContext sslContext;
Expand Down Expand Up @@ -2438,6 +2445,10 @@ Codec codec() {
return this.codec;
}

CredentialsProvider credentialsProvider() {
return this.credentialsProvider;
}

public ClientParameters channelCustomizer(Consumer<Channel> channelCustomizer) {
this.channelCustomizer = channelCustomizer;
return this;
Expand Down
28 changes: 23 additions & 5 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.AsyncRetry.asyncRetry;
import static com.rabbitmq.stream.impl.Utils.convertCodeToException;
import static com.rabbitmq.stream.impl.Utils.exceptionMessage;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
import static com.rabbitmq.stream.impl.Utils.*;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand All @@ -31,6 +28,8 @@
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder.DefaultTlsConfiguration;
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
import com.rabbitmq.stream.sasl.CredentialsProvider;
import com.rabbitmq.stream.sasl.UsernamePasswordCredentialsProvider;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
Expand Down Expand Up @@ -109,7 +108,6 @@ class StreamEnvironment implements Environment {
clientParametersPrototype = clientParametersPrototype.byteBufAllocator(byteBufAllocator);
clientParametersPrototype = maybeSetUpClientParametersFromUris(uris, clientParametersPrototype);

this.addressResolver = addressResolver;
this.observationCollector = observationCollector;

boolean tls;
Expand Down Expand Up @@ -148,6 +146,26 @@ class StreamEnvironment implements Environment {
.collect(Collectors.toList());
}

AddressResolver addressResolverToUse = addressResolver;
if (this.addresses.size() == 1
&& "localhost".equals(this.addresses.get(0).host())
&& addressResolver == DEFAULT_ADDRESS_RESOLVER) {
CredentialsProvider credentialsProvider = clientParametersPrototype.credentialsProvider();
if (credentialsProvider instanceof UsernamePasswordCredentialsProvider) {
String username = ((UsernamePasswordCredentialsProvider) credentialsProvider).getUsername();
if (DEFAULT_USERNAME.equals(username)) {
Address address = new Address("localhost", clientParametersPrototype.port());
addressResolverToUse = ignored -> address;
LOGGER.info(
"Connecting to localhost with {} user, assuming development environment",
DEFAULT_USERNAME);
LOGGER.info("Using address resolver to always connect to localhost");
}
}
}

this.addressResolver = addressResolverToUse;

this.addresses.forEach(address -> this.locators.add(new Locator(address)));
this.executorServiceFactory =
new DefaultExecutorServiceFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// [email protected].
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.Utils.DEFAULT_ADDRESS_RESOLVER;
import static com.rabbitmq.stream.impl.Utils.noOpConsumer;

import com.rabbitmq.stream.*;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
BackOffDelayPolicy.fixed(Duration.ofSeconds(5));
private BackOffDelayPolicy topologyBackOffDelayPolicy =
BackOffDelayPolicy.fixedWithInitialDelay(Duration.ofSeconds(5), Duration.ofSeconds(1));
private AddressResolver addressResolver = address -> address;
private AddressResolver addressResolver = DEFAULT_ADDRESS_RESOLVER;
private int maxProducersByConnection = ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT;
private int maxTrackingConsumersByConnection =
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT;
Expand Down
12 changes: 4 additions & 8 deletions src/main/java/com/rabbitmq/stream/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,7 @@

import static java.lang.String.format;

import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.ConsumerUpdateListener;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamDoesNotExistException;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.StreamNotAvailableException;
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import java.net.UnknownHostException;
import java.security.cert.X509Certificate;
Expand Down Expand Up @@ -87,6 +80,9 @@ final class Utils {
CONSTANT_LABELS = Collections.unmodifiableMap(labels);
}

static final AddressResolver DEFAULT_ADDRESS_RESOLVER = address -> address;
static final String DEFAULT_USERNAME = "guest";

private Utils() {}

@SuppressWarnings("unchecked")
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ public Integer call() throws Exception {
closeStep("Closing environment executor", () -> envExecutor.shutdownNow()));

boolean tls = isTls(this.uris);
AddressResolver addrResolver;
AddressResolver addrResolver = null;
if (loadBalancer) {
int defaultPort = tls ? Client.DEFAULT_TLS_PORT : Client.DEFAULT_PORT;
List<Address> addresses =
Expand All @@ -717,9 +717,7 @@ public Integer call() throws Exception {
addrResolver =
address -> addresses.get(connectionAttemptCount.getAndIncrement() % addresses.size());
} else {
if (this.addressResolver == null) {
addrResolver = address -> address;
} else {
if (this.addressResolver != null) {
addrResolver = this.addressResolver; // should happen only in tests
}
}
Expand All @@ -737,7 +735,6 @@ public Integer call() throws Exception {
Environment.builder()
.id("stream-perf-test")
.uris(this.uris)
.addressResolver(addrResolver)
.scheduledExecutorService(envExecutor)
.metricsCollector(metricsCollector)
.netty()
Expand All @@ -752,6 +749,10 @@ public Integer call() throws Exception {
.rpcTimeout(Duration.ofSeconds(this.rpcTimeout))
.requestedMaxFrameSize((int) this.requestedMaxFrameSize.toBytes());

if (addrResolver != null) {
environmentBuilder = environmentBuilder.addressResolver(addrResolver);
}

java.util.function.Consumer<io.netty.channel.Channel> channelCustomizer = channel -> {};

if (tls) {
Expand Down
2 changes: 0 additions & 2 deletions src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static com.rabbitmq.stream.Host.memoryAlarm;
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
import static com.rabbitmq.stream.impl.TestUtils.localhost;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -67,7 +66,6 @@ static void afterAll() throws Exception {
@BeforeEach
void init() {
environmentBuilder = Environment.builder();
environmentBuilder.addressResolver(add -> localhost());
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
}

Expand Down
1 change: 0 additions & 1 deletion src/test/java/com/rabbitmq/stream/impl/FilteringTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class FilteringTest {
void init() throws Exception {
EnvironmentBuilder environmentBuilder =
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ static void afterAll() throws Exception {
@BeforeEach
void init() {
environmentBuilder = Environment.builder();
environmentBuilder.addressResolver(add -> localhost());
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// [email protected].
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.TestUtils.localhost;
import static com.rabbitmq.stream.impl.TestUtils.publishAndWaitForConfirms;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -53,7 +52,6 @@ void init() {
.eventLoopGroup(eventLoopGroup)
.environmentBuilder()
.maxConsumersByConnection(1);
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
import static com.rabbitmq.stream.impl.TestUtils.localhost;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -66,7 +65,6 @@ public class SacSuperStreamConsumerTest {
void init(TestInfo info) throws Exception {
EnvironmentBuilder environmentBuilder =
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
superStream = TestUtils.streamName(info);
connection = new ConnectionFactory().newConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ static long offset(String line) {
@BeforeEach
void init() throws Exception {
environmentBuilder = Environment.builder();
environmentBuilder.addressResolver(add -> localhost());
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
socket = new Socket("localhost", 61613);
out = socket.getOutputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ void init() {
recoveryInitialDelay = RECOVERY_DELAY;
}
EnvironmentBuilder environmentBuilder = environmentBuilder();
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class StreamProducerTest {
@BeforeEach
void init() {
EnvironmentBuilder environmentBuilder = environmentBuilder();
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
import static com.rabbitmq.stream.impl.TestUtils.localhost;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static com.rabbitmq.stream.impl.TestUtils.wrap;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -63,7 +62,6 @@ public class SuperStreamConsumerTest {
void init(TestInfo info) throws Exception {
EnvironmentBuilder environmentBuilder =
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
superStream = TestUtils.streamName(info);
connection = new ConnectionFactory().newConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
import static com.rabbitmq.stream.impl.TestUtils.localhost;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -61,7 +60,6 @@ public class SuperStreamProducerTest {
void init(TestInfo info) throws Exception {
EnvironmentBuilder environmentBuilder =
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
connection = new ConnectionFactory().newConnection();
superStream = TestUtils.streamName(info);
Expand Down
2 changes: 0 additions & 2 deletions src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
import static com.rabbitmq.stream.impl.TestUtils.localhost;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.client.Connection;
Expand Down Expand Up @@ -63,7 +62,6 @@ public class SuperStreamTest {
void init(TestInfo info) throws Exception {
EnvironmentBuilder environmentBuilder =
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
connection = new ConnectionFactory().newConnection();
superStream = TestUtils.streamName(info);
Expand Down
Loading