Skip to content

Commit 3624282

Browse files
authored
Merge 049343f into c9f7563
2 parents c9f7563 + 049343f commit 3624282

File tree

5 files changed

+103
-23
lines changed

5 files changed

+103
-23
lines changed

Makefile

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ test: build
1212
rabbitmq-server:
1313
docker run -it --rm --name rabbitmq-stream-docker \
1414
-p 5552:5552 -p 5672:5672 -p 15672:15672 \
15-
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \
1615
--pull always \
1716
pivotalrabbitmq/rabbitmq-stream
1817

RabbitMQ.Stream.Client/StreamSystem.cs

+45-11
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,7 @@ public async Task<IProducer> CreateRawSuperStreamProducer(
166166
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
167167
foreach (var partitionsStream in partitions.Streams)
168168
{
169-
var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false);
170-
streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream];
169+
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
171170
}
172171

173172
var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig,
@@ -217,8 +216,7 @@ public async Task<IConsumer> CreateSuperStreamConsumer(
217216
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
218217
foreach (var partitionsStream in partitions.Streams)
219218
{
220-
var metaDataResponse = await _client.QueryMetadata(new[] { partitionsStream }).ConfigureAwait(false);
221-
streamInfos[partitionsStream] = metaDataResponse.StreamInfos[partitionsStream];
219+
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
222220
}
223221

224222
var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig,
@@ -239,10 +237,7 @@ public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConf
239237
throw new CreateProducerException("Batch Size must be bigger than 0");
240238
}
241239

242-
await MayBeReconnectLocator().ConfigureAwait(false);
243-
var meta = await _client.QueryMetadata(new[] { rawProducerConfig.Stream }).ConfigureAwait(false);
244-
245-
var metaStreamInfo = meta.StreamInfos[rawProducerConfig.Stream];
240+
var metaStreamInfo = await StreamInfo(rawProducerConfig.Stream).ConfigureAwait(false);
246241
if (metaStreamInfo.ResponseCode != ResponseCode.Ok)
247242
{
248243
throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}");
@@ -268,6 +263,47 @@ public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConf
268263
}
269264
}
270265

266+
private async Task<StreamInfo> StreamInfo(string streamName)
267+
{
268+
// force localhost connection for single node clusters and when address resolver is not provided
269+
// when theres 1 endpoint and an address resolver, there could be a cluster behind a load balancer
270+
var forceLocalHost = false;
271+
var localPort = 0;
272+
if (_clientParameters.Endpoints.Count == 1 &&
273+
_clientParameters.AddressResolver is null)
274+
{
275+
var clientParametersEndpoint = _clientParameters.Endpoints[0];
276+
switch (clientParametersEndpoint)
277+
{
278+
case DnsEndPoint { Host: "localhost" } dnsEndPoint:
279+
forceLocalHost = true;
280+
localPort = dnsEndPoint.Port;
281+
break;
282+
case IPEndPoint ipEndPoint when Equals(ipEndPoint.Address, IPAddress.Loopback):
283+
forceLocalHost = true;
284+
localPort = ipEndPoint.Port;
285+
break;
286+
}
287+
}
288+
289+
StreamInfo metaStreamInfo;
290+
if (forceLocalHost)
291+
{
292+
// craft the metadata response to force using localhost
293+
var leader = new Broker("localhost", (uint)localPort);
294+
metaStreamInfo = new StreamInfo(streamName, ResponseCode.Ok, leader,
295+
new List<Broker>(1) { leader });
296+
}
297+
else
298+
{
299+
await MayBeReconnectLocator().ConfigureAwait(false);
300+
var meta = await _client.QueryMetadata(new[] { streamName }).ConfigureAwait(false);
301+
metaStreamInfo = meta.StreamInfos[streamName];
302+
}
303+
304+
return metaStreamInfo;
305+
}
306+
271307
public async Task CreateStream(StreamSpec spec)
272308
{
273309
var response = await _client.CreateStream(spec.Name, spec.Args).ConfigureAwait(false);
@@ -350,9 +386,7 @@ public async Task<StreamStats> StreamStats(string stream)
350386
public async Task<IConsumer> CreateRawConsumer(RawConsumerConfig rawConsumerConfig,
351387
ILogger logger = null)
352388
{
353-
await MayBeReconnectLocator().ConfigureAwait(false);
354-
var meta = await _client.QueryMetadata(new[] { rawConsumerConfig.Stream }).ConfigureAwait(false);
355-
var metaStreamInfo = meta.StreamInfos[rawConsumerConfig.Stream];
389+
var metaStreamInfo = await StreamInfo(rawConsumerConfig.Stream).ConfigureAwait(false);
356390
if (metaStreamInfo.ResponseCode != ResponseCode.Ok)
357391
{
358392
throw new CreateConsumerException($"consumer could not be created code: {metaStreamInfo.ResponseCode}");

Tests/FilterTest.cs

+16-3
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ public class FilterTest
1818
{
1919
// When the Filter is set also Values must be set and PostFilter must be set
2020
// Values must be a list of string and must contain at least one element
21-
[Fact]
21+
[SkippableFact]
2222
public async void ValidateFilter()
2323
{
2424
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
25+
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
26+
{
27+
throw new SkipException("broker does not support filter");
28+
}
2529

2630
await Assert.ThrowsAsync<ArgumentException>(() => Consumer.Create(
2731
new ConsumerConfig(system, stream) { Filter = new ConsumerFilter() }
@@ -54,10 +58,14 @@ await Assert.ThrowsAsync<ArgumentException>(() => Consumer.Create(
5458
// We send 100 messages with two different states (Alabama and New York)
5559
// By using the filter we should be able to consume only the messages from Alabama
5660
// and the server has to send only one chunk with all the messages from Alabama
57-
[Fact]
61+
[SkippableFact]
5862
public async void FilterShouldReturnOnlyOneChunk()
5963
{
6064
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
65+
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
66+
{
67+
throw new SkipException("broker does not support filter");
68+
}
6169

6270
var producer = await Producer.Create(
6371
new ProducerConfig(system, stream)
@@ -171,10 +179,15 @@ async Task SendTo(string state)
171179
// For the producer side we have the ConfirmationHandler the messages with errors
172180
// will be reported as not confirmed and the user can handle them.
173181
// for the consumer the messages will be skipped and logged with the standard logger
174-
[Fact]
182+
[SkippableFact]
175183
public async void ErrorFiltersFunctionWontDeliverTheMessage()
176184
{
177185
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
186+
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
187+
{
188+
throw new SkipException("broker does not support filter");
189+
}
190+
178191
var messagesConfirmed = 0;
179192
var messagesError = 0;
180193
var testPassed = new TaskCompletionSource<int>();

Tests/RawProducerSystemTests.cs

+12-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public async void CreateRawProducer()
3333
var config = new StreamSystemConfig();
3434
var system = await StreamSystem.Create(config);
3535
await system.CreateStream(new StreamSpec(stream));
36-
var rawProducer = await system.CreateRawProducer(
36+
37+
var createProducerTask = system.CreateRawProducer(
3738
new RawProducerConfig(stream)
3839
{
3940
Reference = "producer",
@@ -44,6 +45,15 @@ public async void CreateRawProducer()
4445
}
4546
});
4647

48+
if (await Task.WhenAny(createProducerTask, Task.Delay(5000)) != createProducerTask)
49+
{
50+
// timeout to avoid infinite await
51+
Assert.Fail("timeout awaiting for CreateRawProducer");
52+
return;
53+
}
54+
55+
var rawProducer = await createProducerTask;
56+
4757
var readonlySequence = "apple".AsReadonlySequence();
4858
var message = new Message(new Data(readonlySequence));
4959
await rawProducer.Send(1, message);
@@ -378,10 +388,7 @@ public async Task ProducerSendsArrays255Bytes(ReadOnlySequence<byte> @event)
378388
RawProducerConfig(stream)
379389
{
380390
Reference = "producer",
381-
ConfirmHandler = _ =>
382-
{
383-
testPassed.SetResult(true);
384-
}
391+
ConfirmHandler = _ => { testPassed.SetResult(true); }
385392
}
386393
);
387394

Tests/SuperStreamConsumerTests.cs

+30-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,16 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished()
5959
var consumedMessages = 0;
6060
const int NumberOfMessages = 20;
6161
var system = await StreamSystem.Create(new StreamSystemConfig());
62-
await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages, "", _testOutputHelper);
62+
// Publish to super stream hands sometimes, for unknow reason
63+
var publishTask = SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages,
64+
"", _testOutputHelper);
65+
if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask)
66+
{
67+
Assert.Fail("timed out awaiting to publish messages to super stream");
68+
}
69+
70+
await publishTask;
71+
6372
var clientProvidedName = Guid.NewGuid().ToString();
6473

6574
var consumer = await system.CreateSuperStreamConsumer(
@@ -200,7 +209,15 @@ public async void MoreConsumersNumberOfMessagesConsumedShouldBeEqualsToPublished
200209
var listConsumed = new ConcurrentBag<string>();
201210
const int NumberOfMessages = 20;
202211
var system = await StreamSystem.Create(new StreamSystemConfig());
203-
await SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper);
212+
var publishToSuperStreamTask =
213+
SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper);
214+
if (await Task.WhenAny(publishToSuperStreamTask, Task.Delay(10000)) != publishToSuperStreamTask)
215+
{
216+
Assert.Fail("timeout waiting to publish messages");
217+
}
218+
219+
// We re-await the task so that any exceptions/cancellation is rethrown.
220+
await publishToSuperStreamTask;
204221
var clientProvidedName = Guid.NewGuid().ToString();
205222
var consumers = new Dictionary<string, IConsumer>();
206223

@@ -253,7 +270,17 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis
253270
{
254271
SystemUtils.ResetSuperStreams();
255272
var system = await StreamSystem.Create(new StreamSystemConfig());
256-
await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper);
273+
_testOutputHelper.WriteLine("awaiting publish to super stream");
274+
var publishTask =
275+
SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper);
276+
if (await Task.WhenAny(publishTask, Task.Delay(10000)) != publishTask)
277+
{
278+
Assert.Fail("timed out awaiting to publish messages to super stream");
279+
}
280+
281+
// re-await in case any cancellation or exception happen, it can throw
282+
await publishTask;
283+
257284
var listConsumed = new ConcurrentBag<string>();
258285
var testPassed = new TaskCompletionSource<bool>();
259286
var consumer = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange)

0 commit comments

Comments
 (0)