Skip to content

Commit 20a43f3

Browse files
authored
Force localhost connections when client connects to localhost (#305)
* Improve local development experience Related to #296 It is common to have a local instance of RabbitMQ running locally, either via a local rabbit (e.g. installed from source), or inside a container (e.g. Docker or local Kubernetes). In both cases, our client may not work without providing an address resolver, or without changing the advertised host/port in rabbit via an env variable. Specifically, our clients won't work if the client can't resolve the hostname of the local machine. For exampe, a laptop may have a hostname mylaptop.some-company.com. Rabbit won't be impacted as long as it works in a single-node configuration. However, the client smart features to locate the stream leaders and replicas will get "confused" because it can't resolve the hostname. This commit changes the smart layer, so that it won't try to locate the stream replicas if the client is connected to localhost. Instead, it will force the producer/consumer connections to be localhost, and it will assume that stream replicas/leader are in localhost. This is true for single-node rabbits in local development environments. The smart feature to locate leader/replicas remains unchanged for non-local connections. --- Signed-off-by: Aitor Perez Cedres <[email protected]>
1 parent c9f7563 commit 20a43f3

File tree

5 files changed

+103
-23
lines changed

5 files changed

+103
-23
lines changed

Makefile

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ test: build
1212
rabbitmq-server:
1313
docker run -it --rm --name rabbitmq-stream-docker \
1414
-p 5552:5552 -p 5672:5672 -p 15672:15672 \
15-
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \
1615
--pull always \
1716
pivotalrabbitmq/rabbitmq-stream
1817

RabbitMQ.Stream.Client/StreamSystem.cs

+45-11
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,7 @@ public async Task<IProducer> CreateRawSuperStreamProducer(
166166
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
167167
foreach (var partitionsStream in partitions.Streams)
168168
{
169-
var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false);
170-
streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream];
169+
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
171170
}
172171

173172
var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig,
@@ -217,8 +216,7 @@ public async Task<IConsumer> CreateSuperStreamConsumer(
217216
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
218217
foreach (var partitionsStream in partitions.Streams)
219218
{
220-
var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false);
221-
streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream];
219+
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
222220
}
223221

224222
var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig,
@@ -239,10 +237,7 @@ public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConf
239237
throw new CreateProducerException("Batch Size must be bigger than 0");
240238
}
241239

242-
await MayBeReconnectLocator().ConfigureAwait(false);
243-
var meta = await _client.QueryMetadata(new[] { rawProducerConfig.Stream }).ConfigureAwait(false);
244-
245-
var metaStreamInfo = meta.StreamInfos[rawProducerConfig.Stream];
240+
var metaStreamInfo = await StreamInfo(rawProducerConfig.Stream).ConfigureAwait(false);
246241
if (metaStreamInfo.ResponseCode != ResponseCode.Ok)
247242
{
248243
throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}");
@@ -268,6 +263,47 @@ public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConf
268263
}
269264
}
270265

266+
private async Task<StreamInfo> StreamInfo(string streamName)
267+
{
268+
// force localhost connection for single node clusters and when address resolver is not provided
269+
// when theres 1 endpoint and an address resolver, there could be a cluster behind a load balancer
270+
var forceLocalHost = false;
271+
var localPort = 0;
272+
if (_clientParameters.Endpoints.Count == 1 &&
273+
_clientParameters.AddressResolver is null)
274+
{
275+
var clientParametersEndpoint = _clientParameters.Endpoints[0];
276+
switch (clientParametersEndpoint)
277+
{
278+
case DnsEndPoint { Host: "localhost" } dnsEndPoint:
279+
forceLocalHost = true;
280+
localPort = dnsEndPoint.Port;
281+
break;
282+
case IPEndPoint ipEndPoint when Equals(ipEndPoint.Address, IPAddress.Loopback):
283+
forceLocalHost = true;
284+
localPort = ipEndPoint.Port;
285+
break;
286+
}
287+
}
288+
289+
StreamInfo metaStreamInfo;
290+
if (forceLocalHost)
291+
{
292+
// craft the metadata response to force using localhost
293+
var leader = new Broker("localhost", (uint)localPort);
294+
metaStreamInfo = new StreamInfo(streamName, ResponseCode.Ok, leader,
295+
new List<Broker>(1) { leader });
296+
}
297+
else
298+
{
299+
await MayBeReconnectLocator().ConfigureAwait(false);
300+
var meta = await _client.QueryMetadata(new[] { streamName }).ConfigureAwait(false);
301+
metaStreamInfo = meta.StreamInfos[streamName];
302+
}
303+
304+
return metaStreamInfo;
305+
}
306+
271307
public async Task CreateStream(StreamSpec spec)
272308
{
273309
var response = await _client.CreateStream(spec.Name, spec.Args).ConfigureAwait(false);
@@ -350,9 +386,7 @@ public async Task<StreamStats> StreamStats(string stream)
350386
public async Task<IConsumer> CreateRawConsumer(RawConsumerConfig rawConsumerConfig,
351387
ILogger logger = null)
352388
{
353-
await MayBeReconnectLocator().ConfigureAwait(false);
354-
var meta = await _client.QueryMetadata(new[] { rawConsumerConfig.Stream }).ConfigureAwait(false);
355-
var metaStreamInfo = meta.StreamInfos[rawConsumerConfig.Stream];
389+
var metaStreamInfo = await StreamInfo(rawConsumerConfig.Stream).ConfigureAwait(false);
356390
if (metaStreamInfo.ResponseCode != ResponseCode.Ok)
357391
{
358392
throw new CreateConsumerException($"consumer could not be created code: {metaStreamInfo.ResponseCode}");

Tests/FilterTest.cs

+16-3
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ public class FilterTest
1818
{
1919
// When the Filter is set also Values must be set and PostFilter must be set
2020
// Values must be a list of string and must contain at least one element
21-
[Fact]
21+
[SkippableFact]
2222
public async void ValidateFilter()
2323
{
2424
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
25+
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
26+
{
27+
throw new SkipException("broker does not support filter");
28+
}
2529

2630
await Assert.ThrowsAsync<ArgumentException>(() => Consumer.Create(
2731
new ConsumerConfig(system, stream) { Filter = new ConsumerFilter() }
@@ -54,10 +58,14 @@ await Assert.ThrowsAsync<ArgumentException>(() => Consumer.Create(
5458
// We send 100 messages with two different states (Alabama and New York)
5559
// By using the filter we should be able to consume only the messages from Alabama
5660
// and the server has to send only one chunk with all the messages from Alabama
57-
[Fact]
61+
[SkippableFact]
5862
public async void FilterShouldReturnOnlyOneChunk()
5963
{
6064
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
65+
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
66+
{
67+
throw new SkipException("broker does not support filter");
68+
}
6169

6270
var producer = await Producer.Create(
6371
new ProducerConfig(system, stream)
@@ -171,10 +179,15 @@ async Task SendTo(string state)
171179
// For the producer side we have the ConfirmationHandler the messages with errors
172180
// will be reported as not confirmed and the user can handle them.
173181
// for the consumer the messages will be skipped and logged with the standard logger
174-
[Fact]
182+
[SkippableFact]
175183
public async void ErrorFiltersFunctionWontDeliverTheMessage()
176184
{
177185
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
186+
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
187+
{
188+
throw new SkipException("broker does not support filter");
189+
}
190+
178191
var messagesConfirmed = 0;
179192
var messagesError = 0;
180193
var testPassed = new TaskCompletionSource<int>();

Tests/RawProducerSystemTests.cs

+12-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public async void CreateRawProducer()
3333
var config = new StreamSystemConfig();
3434
var system = await StreamSystem.Create(config);
3535
await system.CreateStream(new StreamSpec(stream));
36-
var rawProducer = await system.CreateRawProducer(
36+
37+
var createProducerTask = system.CreateRawProducer(
3738
new RawProducerConfig(stream)
3839
{
3940
Reference = "producer",
@@ -44,6 +45,15 @@ public async void CreateRawProducer()
4445
}
4546
});
4647

48+
if (await Task.WhenAny(createProducerTask, Task.Delay(5000)) != createProducerTask)
49+
{
50+
// timeout to avoid infinite await
51+
Assert.Fail("timeout awaiting for CreateRawProducer");
52+
return;
53+
}
54+
55+
var rawProducer = await createProducerTask;
56+
4757
var readonlySequence = "apple".AsReadonlySequence();
4858
var message = new Message(new Data(readonlySequence));
4959
await rawProducer.Send(1, message);
@@ -378,10 +388,7 @@ public async Task ProducerSendsArrays255Bytes(ReadOnlySequence<byte> @event)
378388
RawProducerConfig(stream)
379389
{
380390
Reference = "producer",
381-
ConfirmHandler = _ =>
382-
{
383-
testPassed.SetResult(true);
384-
}
391+
ConfirmHandler = _ => { testPassed.SetResult(true); }
385392
}
386393
);
387394

Tests/SuperStreamConsumerTests.cs

+30-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,16 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished()
5959
var consumedMessages = 0;
6060
const int NumberOfMessages = 20;
6161
var system = await StreamSystem.Create(new StreamSystemConfig());
62-
await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages, "", _testOutputHelper);
62+
// Publish to super stream hands sometimes, for unknow reason
63+
var publishTask = SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages,
64+
"", _testOutputHelper);
65+
if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask)
66+
{
67+
Assert.Fail("timed out awaiting to publish messages to super stream");
68+
}
69+
70+
await publishTask;
71+
6372
var clientProvidedName = Guid.NewGuid().ToString();
6473

6574
var consumer = await system.CreateSuperStreamConsumer(
@@ -200,7 +209,15 @@ public async void MoreConsumersNumberOfMessagesConsumedShouldBeEqualsToPublished
200209
var listConsumed = new ConcurrentBag<string>();
201210
const int NumberOfMessages = 20;
202211
var system = await StreamSystem.Create(new StreamSystemConfig());
203-
await SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper);
212+
var publishToSuperStreamTask =
213+
SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper);
214+
if (await Task.WhenAny(publishToSuperStreamTask, Task.Delay(10000)) != publishToSuperStreamTask)
215+
{
216+
Assert.Fail("timeout waiting to publish messages");
217+
}
218+
219+
// We re-await the task so that any exceptions/cancellation is rethrown.
220+
await publishToSuperStreamTask;
204221
var clientProvidedName = Guid.NewGuid().ToString();
205222
var consumers = new Dictionary<string, IConsumer>();
206223

@@ -253,7 +270,17 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis
253270
{
254271
SystemUtils.ResetSuperStreams();
255272
var system = await StreamSystem.Create(new StreamSystemConfig());
256-
await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper);
273+
_testOutputHelper.WriteLine("awaiting publish to super stream");
274+
var publishTask =
275+
SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper);
276+
if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask)
277+
{
278+
Assert.Fail("timed out awaiting to publish messages to super stream");
279+
}
280+
281+
// re-await in case any cancellation or exception happen, it can throw
282+
await publishTask;
283+
257284
var listConsumed = new ConcurrentBag<string>();
258285
var testPassed = new TaskCompletionSource<bool>();
259286
var consumer = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange)

0 commit comments

Comments
 (0)