diff --git a/Makefile b/Makefile index 9b1c1241..0189ca14 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,6 @@ test: build rabbitmq-server: docker run -it --rm --name rabbitmq-stream-docker \ -p 5552:5552 -p 5672:5672 -p 15672:15672 \ - -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \ --pull always \ pivotalrabbitmq/rabbitmq-stream diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 822512d3..daa5c419 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -166,8 +166,7 @@ public async Task CreateRawSuperStreamProducer( IDictionary streamInfos = new Dictionary(); foreach (var partitionsStream in partitions.Streams) { - var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false); - streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream]; + streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false); } var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig, @@ -217,8 +216,7 @@ public async Task CreateSuperStreamConsumer( IDictionary streamInfos = new Dictionary(); foreach (var partitionsStream in partitions.Streams) { - var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false); - streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream]; + streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false); } var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig, @@ -239,10 +237,7 @@ public async Task CreateRawProducer(RawProducerConfig rawProducerConf throw new CreateProducerException("Batch Size must be bigger than 0"); } - await MayBeReconnectLocator().ConfigureAwait(false); - var meta = await _client.QueryMetadata(new[] { rawProducerConfig.Stream }).ConfigureAwait(false); - - var metaStreamInfo = meta.StreamInfos[rawProducerConfig.Stream]; + var metaStreamInfo = await StreamInfo(rawProducerConfig.Stream).ConfigureAwait(false); if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}"); @@ -268,6 +263,47 @@ public async Task CreateRawProducer(RawProducerConfig rawProducerConf } } + private async Task StreamInfo(string streamName) + { + // force localhost connection for single node clusters and when address resolver is not provided + // when theres 1 endpoint and an address resolver, there could be a cluster behind a load balancer + var forceLocalHost = false; + var localPort = 0; + if (_clientParameters.Endpoints.Count == 1 && + _clientParameters.AddressResolver is null) + { + var clientParametersEndpoint = _clientParameters.Endpoints[0]; + switch (clientParametersEndpoint) + { + case DnsEndPoint { Host: "localhost" } dnsEndPoint: + forceLocalHost = true; + localPort = dnsEndPoint.Port; + break; + case IPEndPoint ipEndPoint when Equals(ipEndPoint.Address, IPAddress.Loopback): + forceLocalHost = true; + localPort = ipEndPoint.Port; + break; + } + } + + StreamInfo metaStreamInfo; + if (forceLocalHost) + { + // craft the metadata response to force using localhost + var leader = new Broker("localhost", (uint)localPort); + metaStreamInfo = new StreamInfo(streamName, ResponseCode.Ok, leader, + new List(1) { leader }); + } + else + { + await MayBeReconnectLocator().ConfigureAwait(false); + var meta = await _client.QueryMetadata(new[] { streamName }).ConfigureAwait(false); + metaStreamInfo = meta.StreamInfos[streamName]; + } + + return metaStreamInfo; + } + public async Task CreateStream(StreamSpec spec) { var response = await _client.CreateStream(spec.Name, spec.Args).ConfigureAwait(false); @@ -350,9 +386,7 @@ public async Task StreamStats(string stream) public async Task CreateRawConsumer(RawConsumerConfig rawConsumerConfig, ILogger logger = null) { - await MayBeReconnectLocator().ConfigureAwait(false); - var meta = await _client.QueryMetadata(new[] { rawConsumerConfig.Stream }).ConfigureAwait(false); - var metaStreamInfo = meta.StreamInfos[rawConsumerConfig.Stream]; + var metaStreamInfo = await StreamInfo(rawConsumerConfig.Stream).ConfigureAwait(false); if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateConsumerException($"consumer could not be created code: {metaStreamInfo.ResponseCode}"); diff --git a/Tests/FilterTest.cs b/Tests/FilterTest.cs index 7b41351b..e1c6924d 100644 --- a/Tests/FilterTest.cs +++ b/Tests/FilterTest.cs @@ -18,10 +18,14 @@ public class FilterTest { // When the Filter is set also Values must be set and PostFilter must be set // Values must be a list of string and must contain at least one element - [Fact] + [SkippableFact] public async void ValidateFilter() { SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + if (!AvailableFeaturesSingleton.Instance.PublishFilter) + { + throw new SkipException("broker does not support filter"); + } await Assert.ThrowsAsync(() => Consumer.Create( new ConsumerConfig(system, stream) { Filter = new ConsumerFilter() } @@ -54,10 +58,14 @@ await Assert.ThrowsAsync(() => Consumer.Create( // We send 100 messages with two different states (Alabama and New York) // By using the filter we should be able to consume only the messages from Alabama // and the server has to send only one chunk with all the messages from Alabama - [Fact] + [SkippableFact] public async void FilterShouldReturnOnlyOneChunk() { SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + if (!AvailableFeaturesSingleton.Instance.PublishFilter) + { + throw new SkipException("broker does not support filter"); + } var producer = await Producer.Create( new ProducerConfig(system, stream) @@ -171,10 +179,15 @@ async Task SendTo(string state) // For the producer side we have the ConfirmationHandler the messages with errors // will be reported as not confirmed and the user can handle them. // for the consumer the messages will be skipped and logged with the standard logger - [Fact] + [SkippableFact] public async void ErrorFiltersFunctionWontDeliverTheMessage() { SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + if (!AvailableFeaturesSingleton.Instance.PublishFilter) + { + throw new SkipException("broker does not support filter"); + } + var messagesConfirmed = 0; var messagesError = 0; var testPassed = new TaskCompletionSource(); diff --git a/Tests/RawProducerSystemTests.cs b/Tests/RawProducerSystemTests.cs index 37325642..1d508c1f 100644 --- a/Tests/RawProducerSystemTests.cs +++ b/Tests/RawProducerSystemTests.cs @@ -33,7 +33,8 @@ public async void CreateRawProducer() var config = new StreamSystemConfig(); var system = await StreamSystem.Create(config); await system.CreateStream(new StreamSpec(stream)); - var rawProducer = await system.CreateRawProducer( + + var createProducerTask = system.CreateRawProducer( new RawProducerConfig(stream) { Reference = "producer", @@ -44,6 +45,15 @@ public async void CreateRawProducer() } }); + if (await Task.WhenAny(createProducerTask, Task.Delay(5000)) != createProducerTask) + { + // timeout to avoid infinite await + Assert.Fail("timeout awaiting for CreateRawProducer"); + return; + } + + var rawProducer = await createProducerTask; + var readonlySequence = "apple".AsReadonlySequence(); var message = new Message(new Data(readonlySequence)); await rawProducer.Send(1, message); @@ -378,10 +388,7 @@ public async Task ProducerSendsArrays255Bytes(ReadOnlySequence @event) RawProducerConfig(stream) { Reference = "producer", - ConfirmHandler = _ => - { - testPassed.SetResult(true); - } + ConfirmHandler = _ => { testPassed.SetResult(true); } } ); diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs index ce9f0856..7b465798 100644 --- a/Tests/SuperStreamConsumerTests.cs +++ b/Tests/SuperStreamConsumerTests.cs @@ -59,7 +59,16 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished() var consumedMessages = 0; const int NumberOfMessages = 20; var system = await StreamSystem.Create(new StreamSystemConfig()); - await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages, "", _testOutputHelper); + // Publish to super stream hands sometimes, for unknow reason + var publishTask = SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages, + "", _testOutputHelper); + if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask) + { + Assert.Fail("timed out awaiting to publish messages to super stream"); + } + + await publishTask; + var clientProvidedName = Guid.NewGuid().ToString(); var consumer = await system.CreateSuperStreamConsumer( @@ -200,7 +209,15 @@ public async void MoreConsumersNumberOfMessagesConsumedShouldBeEqualsToPublished var listConsumed = new ConcurrentBag(); const int NumberOfMessages = 20; var system = await StreamSystem.Create(new StreamSystemConfig()); - await SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper); + var publishToSuperStreamTask = + SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper); + if (await Task.WhenAny(publishToSuperStreamTask, Task.Delay(10000)) != publishToSuperStreamTask) + { + Assert.Fail("timeout waiting to publish messages"); + } + + // We re-await the task so that any exceptions/cancellation is rethrown. + await publishToSuperStreamTask; var clientProvidedName = Guid.NewGuid().ToString(); var consumers = new Dictionary(); @@ -253,7 +270,17 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis { SystemUtils.ResetSuperStreams(); var system = await StreamSystem.Create(new StreamSystemConfig()); - await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper); + _testOutputHelper.WriteLine("awaiting publish to super stream"); + var publishTask = + SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper); + if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask) + { + Assert.Fail("timed out awaiting to publish messages to super stream"); + } + + // re-await in case any cancellation or exception happen, it can throw + await publishTask; + var listConsumed = new ConcurrentBag(); var testPassed = new TaskCompletionSource(); var consumer = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange)