From d67bf9240068d297d12235084899f335e1bfa386 Mon Sep 17 00:00:00 2001 From: Aitor Perez Cedres Date: Tue, 19 Sep 2023 17:34:16 +0100 Subject: [PATCH 1/2] Improve local development experience Related to #296 It is common to have a local instance of RabbitMQ running locally, either via a local rabbit (e.g. installed from source), or inside a container (e.g. Docker or local Kubernetes). In both cases, our client may not work without providing an address resolver, or without changing the advertised host/port in rabbit via an env variable. Specifically, our clients won't work if the client can't resolve the hostname of the local machine. For exampe, a laptop may have a hostname mylaptop.some-company.com. Rabbit won't be impacted as long as it works in a single-node configuration. However, the client smart features to locate the stream leaders and replicas will get "confused" because it can't resolve the hostname. This commit changes the smart layer, so that it won't try to locate the stream replicas if the client is connected to localhost. Instead, it will force the producer/consumer connections to be localhost, and it will assume that stream replicas/leader are in localhost. This is true for single-node rabbits in local development environments. The smart feature to locate leader/replicas remains unchanged for non-local connections. Signed-off-by: Aitor Perez Cedres --- Makefile | 1 - RabbitMQ.Stream.Client/StreamSystem.cs | 56 +++++++++++++++++++++----- Tests/FilterTest.cs | 19 +++++++-- Tests/RawProducerSystemTests.cs | 30 ++++++++------ Tests/SuperStreamConsumerTests.cs | 33 +++++++++++++-- 5 files changed, 109 insertions(+), 30 deletions(-) diff --git a/Makefile b/Makefile index 9b1c1241..0189ca14 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,6 @@ test: build rabbitmq-server: docker run -it --rm --name rabbitmq-stream-docker \ -p 5552:5552 -p 5672:5672 -p 15672:15672 \ - -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \ --pull always \ pivotalrabbitmq/rabbitmq-stream diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 822512d3..daa5c419 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -166,8 +166,7 @@ public async Task CreateRawSuperStreamProducer( IDictionary streamInfos = new Dictionary(); foreach (var partitionsStream in partitions.Streams) { - var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false); - streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream]; + streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false); } var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig, @@ -217,8 +216,7 @@ public async Task CreateSuperStreamConsumer( IDictionary streamInfos = new Dictionary(); foreach (var partitionsStream in partitions.Streams) { - var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false); - streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream]; + streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false); } var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig, @@ -239,10 +237,7 @@ public async Task CreateRawProducer(RawProducerConfig rawProducerConf throw new CreateProducerException("Batch Size must be bigger than 0"); } - await MayBeReconnectLocator().ConfigureAwait(false); - var meta = await _client.QueryMetadata(new[] { rawProducerConfig.Stream }).ConfigureAwait(false); - - var metaStreamInfo = meta.StreamInfos[rawProducerConfig.Stream]; + var metaStreamInfo = await StreamInfo(rawProducerConfig.Stream).ConfigureAwait(false); if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}"); @@ -268,6 +263,47 @@ public async Task CreateRawProducer(RawProducerConfig rawProducerConf } } + private async Task StreamInfo(string streamName) + { + // force localhost connection for single node clusters and when address resolver is not provided + // when theres 1 endpoint and an address resolver, there could be a cluster behind a load balancer + var forceLocalHost = false; + var localPort = 0; + if (_clientParameters.Endpoints.Count == 1 && + _clientParameters.AddressResolver is null) + { + var clientParametersEndpoint = _clientParameters.Endpoints[0]; + switch (clientParametersEndpoint) + { + case DnsEndPoint { Host: "localhost" } dnsEndPoint: + forceLocalHost = true; + localPort = dnsEndPoint.Port; + break; + case IPEndPoint ipEndPoint when Equals(ipEndPoint.Address, IPAddress.Loopback): + forceLocalHost = true; + localPort = ipEndPoint.Port; + break; + } + } + + StreamInfo metaStreamInfo; + if (forceLocalHost) + { + // craft the metadata response to force using localhost + var leader = new Broker("localhost", (uint)localPort); + metaStreamInfo = new StreamInfo(streamName, ResponseCode.Ok, leader, + new List(1) { leader }); + } + else + { + await MayBeReconnectLocator().ConfigureAwait(false); + var meta = await _client.QueryMetadata(new[] { streamName }).ConfigureAwait(false); + metaStreamInfo = meta.StreamInfos[streamName]; + } + + return metaStreamInfo; + } + public async Task CreateStream(StreamSpec spec) { var response = await _client.CreateStream(spec.Name, spec.Args).ConfigureAwait(false); @@ -350,9 +386,7 @@ public async Task StreamStats(string stream) public async Task CreateRawConsumer(RawConsumerConfig rawConsumerConfig, ILogger logger = null) { - await MayBeReconnectLocator().ConfigureAwait(false); - var meta = await _client.QueryMetadata(new[] { rawConsumerConfig.Stream }).ConfigureAwait(false); - var metaStreamInfo = meta.StreamInfos[rawConsumerConfig.Stream]; + var metaStreamInfo = await StreamInfo(rawConsumerConfig.Stream).ConfigureAwait(false); if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateConsumerException($"consumer could not be created code: {metaStreamInfo.ResponseCode}"); diff --git a/Tests/FilterTest.cs b/Tests/FilterTest.cs index 7b41351b..e1c6924d 100644 --- a/Tests/FilterTest.cs +++ b/Tests/FilterTest.cs @@ -18,10 +18,14 @@ public class FilterTest { // When the Filter is set also Values must be set and PostFilter must be set // Values must be a list of string and must contain at least one element - [Fact] + [SkippableFact] public async void ValidateFilter() { SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + if (!AvailableFeaturesSingleton.Instance.PublishFilter) + { + throw new SkipException("broker does not support filter"); + } await Assert.ThrowsAsync(() => Consumer.Create( new ConsumerConfig(system, stream) { Filter = new ConsumerFilter() } @@ -54,10 +58,14 @@ await Assert.ThrowsAsync(() => Consumer.Create( // We send 100 messages with two different states (Alabama and New York) // By using the filter we should be able to consume only the messages from Alabama // and the server has to send only one chunk with all the messages from Alabama - [Fact] + [SkippableFact] public async void FilterShouldReturnOnlyOneChunk() { SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + if (!AvailableFeaturesSingleton.Instance.PublishFilter) + { + throw new SkipException("broker does not support filter"); + } var producer = await Producer.Create( new ProducerConfig(system, stream) @@ -171,10 +179,15 @@ async Task SendTo(string state) // For the producer side we have the ConfirmationHandler the messages with errors // will be reported as not confirmed and the user can handle them. // for the consumer the messages will be skipped and logged with the standard logger - [Fact] + [SkippableFact] public async void ErrorFiltersFunctionWontDeliverTheMessage() { SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + if (!AvailableFeaturesSingleton.Instance.PublishFilter) + { + throw new SkipException("broker does not support filter"); + } + var messagesConfirmed = 0; var messagesError = 0; var testPassed = new TaskCompletionSource(); diff --git a/Tests/RawProducerSystemTests.cs b/Tests/RawProducerSystemTests.cs index 37325642..3e7cb81d 100644 --- a/Tests/RawProducerSystemTests.cs +++ b/Tests/RawProducerSystemTests.cs @@ -33,7 +33,8 @@ public async void CreateRawProducer() var config = new StreamSystemConfig(); var system = await StreamSystem.Create(config); await system.CreateStream(new StreamSpec(stream)); - var rawProducer = await system.CreateRawProducer( + + var createProducerTask = system.CreateRawProducer( new RawProducerConfig(stream) { Reference = "producer", @@ -44,6 +45,15 @@ public async void CreateRawProducer() } }); + if (await Task.WhenAny(createProducerTask, Task.Delay(5000)) != createProducerTask) + { + // timeout to avoid infinite await + Assert.Fail("timeout awaiting for CreateRawProducer"); + return; + } + + var rawProducer = await createProducerTask; + var readonlySequence = "apple".AsReadonlySequence(); var message = new Message(new Data(readonlySequence)); await rawProducer.Send(1, message); @@ -315,16 +325,16 @@ public async void ProducerBatchConfirmNumberOfMessages() const int NumberOfMessages = 100; var rawProducer = await system.CreateRawProducer(new RawProducerConfig(stream) - { - Reference = "producer", - ConfirmHandler = confirmation => { - if (confirmation.PublishingId == NumberOfMessages) + Reference = "producer", + ConfirmHandler = confirmation => { - testPassed.SetResult(true); + if (confirmation.PublishingId == NumberOfMessages) + { + testPassed.SetResult(true); + } } } - } ); var messages = new List<(ulong, Message)>(); for (var i = 1; i <= NumberOfMessages; i++) @@ -376,13 +386,9 @@ public async Task ProducerSendsArrays255Bytes(ReadOnlySequence @event) var testPassed = new TaskCompletionSource(); var rawProducer = await system.CreateRawProducer(new RawProducerConfig(stream) - { - Reference = "producer", - ConfirmHandler = _ => { - testPassed.SetResult(true); + Reference = "producer", ConfirmHandler = _ => { testPassed.SetResult(true); } } - } ); const ulong PublishingId = 0; diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs index ce9f0856..7b465798 100644 --- a/Tests/SuperStreamConsumerTests.cs +++ b/Tests/SuperStreamConsumerTests.cs @@ -59,7 +59,16 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished() var consumedMessages = 0; const int NumberOfMessages = 20; var system = await StreamSystem.Create(new StreamSystemConfig()); - await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages, "", _testOutputHelper); + // Publish to super stream hands sometimes, for unknow reason + var publishTask = SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages, + "", _testOutputHelper); + if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask) + { + Assert.Fail("timed out awaiting to publish messages to super stream"); + } + + await publishTask; + var clientProvidedName = Guid.NewGuid().ToString(); var consumer = await system.CreateSuperStreamConsumer( @@ -200,7 +209,15 @@ public async void MoreConsumersNumberOfMessagesConsumedShouldBeEqualsToPublished var listConsumed = new ConcurrentBag(); const int NumberOfMessages = 20; var system = await StreamSystem.Create(new StreamSystemConfig()); - await SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper); + var publishToSuperStreamTask = + SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper); + if (await Task.WhenAny(publishToSuperStreamTask, Task.Delay(10000)) != publishToSuperStreamTask) + { + Assert.Fail("timeout waiting to publish messages"); + } + + // We re-await the task so that any exceptions/cancellation is rethrown. + await publishToSuperStreamTask; var clientProvidedName = Guid.NewGuid().ToString(); var consumers = new Dictionary(); @@ -253,7 +270,17 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis { SystemUtils.ResetSuperStreams(); var system = await StreamSystem.Create(new StreamSystemConfig()); - await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper); + _testOutputHelper.WriteLine("awaiting publish to super stream"); + var publishTask = + SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper); + if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask) + { + Assert.Fail("timed out awaiting to publish messages to super stream"); + } + + // re-await in case any cancellation or exception happen, it can throw + await publishTask; + var listConsumed = new ConcurrentBag(); var testPassed = new TaskCompletionSource(); var consumer = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange) From 049343fa917f42ff78603abcac61ad17d6101289 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 20 Sep 2023 10:32:14 +0200 Subject: [PATCH 2/2] formatting Signed-off-by: Gabriele Santomaggio --- Tests/RawProducerSystemTests.cs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/Tests/RawProducerSystemTests.cs b/Tests/RawProducerSystemTests.cs index 3e7cb81d..1d508c1f 100644 --- a/Tests/RawProducerSystemTests.cs +++ b/Tests/RawProducerSystemTests.cs @@ -325,16 +325,16 @@ public async void ProducerBatchConfirmNumberOfMessages() const int NumberOfMessages = 100; var rawProducer = await system.CreateRawProducer(new RawProducerConfig(stream) + { + Reference = "producer", + ConfirmHandler = confirmation => { - Reference = "producer", - ConfirmHandler = confirmation => + if (confirmation.PublishingId == NumberOfMessages) { - if (confirmation.PublishingId == NumberOfMessages) - { - testPassed.SetResult(true); - } + testPassed.SetResult(true); } } + } ); var messages = new List<(ulong, Message)>(); for (var i = 1; i <= NumberOfMessages; i++) @@ -386,9 +386,10 @@ public async Task ProducerSendsArrays255Bytes(ReadOnlySequence @event) var testPassed = new TaskCompletionSource(); var rawProducer = await system.CreateRawProducer(new RawProducerConfig(stream) - { - Reference = "producer", ConfirmHandler = _ => { testPassed.SetResult(true); } - } + { + Reference = "producer", + ConfirmHandler = _ => { testPassed.SetResult(true); } + } ); const ulong PublishingId = 0;