Skip to content

Commit c9f7563

Browse files
authored
Add DateTimeOffset test (#307)
Add DateTimeOffset test. The test is not 100% perfect since it depends on the data time starting. In this case, the test returns all the messages. Constant naming refactor. --- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 7b203f1 commit c9f7563

File tree

4 files changed

+64
-28
lines changed

4 files changed

+64
-28
lines changed

RabbitMQ.Stream.Client/RawConsumer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ public void Dispose()
583583
}
584584
catch (Exception e)
585585
{
586-
_logger.LogError(e, "Error during disposing of consumer: {SubscriberId}.", _subscriberId);
586+
_logger.LogError(e, "Error during disposing of consumer: {SubscriberId}", _subscriberId);
587587
}
588588
finally
589589
{

RabbitMQ.Stream.Client/RawProducer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ public void Dispose()
397397
}
398398
catch (Exception e)
399399
{
400-
_logger.LogError(e, "Error during disposing Consumer: {PublisherId}.", _publisherId);
400+
_logger.LogError(e, "Error during disposing Consumer: {PublisherId}", _publisherId);
401401
}
402402

403403
GC.SuppressFinalize(this);

Tests/RawConsumerSystemTests.cs

+60-24
Original file line numberDiff line numberDiff line change
@@ -90,23 +90,23 @@ public async void ConsumerStoreOffset()
9090
var config = new StreamSystemConfig();
9191
var system = await StreamSystem.Create(config);
9292
await system.CreateStream(new StreamSpec(stream));
93-
const int numberOfMessages = 10;
94-
await SystemUtils.PublishMessages(system, stream, numberOfMessages, testOutputHelper);
93+
const int NumberOfMessages = 10;
94+
await SystemUtils.PublishMessages(system, stream, NumberOfMessages, testOutputHelper);
9595
var count = 0;
9696
var consumer = await system.CreateRawConsumer(
9797
new RawConsumerConfig(stream)
9898
{
9999
Reference = "consumer_offset",
100100
OffsetSpec = new OffsetTypeFirst(),
101-
MessageHandler = async (consumer, ctx, message) =>
101+
MessageHandler = async (consumer, ctx, _) =>
102102
{
103103
testOutputHelper.WriteLine($"ConsumerStoreOffset receiving.. {count}");
104104
count++;
105-
if (count == numberOfMessages)
105+
if (count == NumberOfMessages)
106106
{
107107
await consumer.StoreOffset(ctx.Offset);
108108
testOutputHelper.WriteLine($"ConsumerStoreOffset done: {count}");
109-
testPassed.SetResult(numberOfMessages);
109+
testPassed.SetResult(NumberOfMessages);
110110
}
111111

112112
await Task.CompletedTask;
@@ -122,7 +122,7 @@ public async void ConsumerStoreOffset()
122122
var client = await Client.Create(clientParameters);
123123
var offset = await client.QueryOffset("consumer_offset", stream);
124124
// The offset must be numberOfMessages less one
125-
Assert.Equal(offset.Offset, Convert.ToUInt64(numberOfMessages - 1));
125+
Assert.Equal(offset.Offset, Convert.ToUInt64(NumberOfMessages - 1));
126126
await consumer.Close();
127127
await system.DeleteStream(stream);
128128
await system.Close();
@@ -456,33 +456,33 @@ public async void ConsumerQueryOffset()
456456
var config = new StreamSystemConfig();
457457
var system = await StreamSystem.Create(config);
458458
await system.CreateStream(new StreamSpec(stream));
459-
const int numberOfMessages = 10;
460-
const int numberOfMessagesToStore = 4;
461-
await SystemUtils.PublishMessages(system, stream, numberOfMessages, testOutputHelper);
459+
const int NumberOfMessages = 10;
460+
const int NumberOfMessagesToStore = 4;
461+
await SystemUtils.PublishMessages(system, stream, NumberOfMessages, testOutputHelper);
462462
var count = 0;
463-
const string reference = "consumer_offset";
463+
const string Reference = "consumer_offset";
464464
var rawConsumer = await system.CreateRawConsumer(
465465
new RawConsumerConfig(stream)
466466
{
467467
Crc32 = _crc32,
468-
Reference = reference,
468+
Reference = Reference,
469469
OffsetSpec = new OffsetTypeOffset(),
470470
MessageHandler = async (consumer, ctx, message) =>
471471
{
472472
testOutputHelper.WriteLine($"ConsumerStoreOffset receiving.. {count}");
473473
count++;
474-
if (count == numberOfMessagesToStore)
475-
{
476-
// store the the offset after numberOfMessagesToStore messages
477-
// so when we query the offset we should (must) have the same
478-
// values
479-
await consumer.StoreOffset(ctx.Offset);
480-
testOutputHelper.WriteLine($"ConsumerStoreOffset done: {count}");
481-
}
482-
483-
if (count == numberOfMessages)
474+
switch (count)
484475
{
485-
testPassed.SetResult(numberOfMessages);
476+
case NumberOfMessagesToStore:
477+
// store the the offset after numberOfMessagesToStore messages
478+
// so when we query the offset we should (must) have the same
479+
// values
480+
await consumer.StoreOffset(ctx.Offset);
481+
testOutputHelper.WriteLine($"ConsumerStoreOffset done: {count}");
482+
break;
483+
case NumberOfMessages:
484+
testPassed.SetResult(NumberOfMessages);
485+
break;
486486
}
487487

488488
await Task.CompletedTask;
@@ -494,8 +494,8 @@ public async void ConsumerQueryOffset()
494494
// it may need some time to store the offset
495495
SystemUtils.Wait();
496496
// numberOfMessagesToStore index 0
497-
Assert.Equal((ulong)(numberOfMessagesToStore - 1),
498-
await system.QueryOffset(reference, stream));
497+
Assert.Equal((ulong)(NumberOfMessagesToStore - 1),
498+
await system.QueryOffset(Reference, stream));
499499

500500
// this has to raise OffsetNotFoundException in case the offset
501501
// does not exist like in this case.
@@ -696,5 +696,41 @@ public async void ProducerConsumerMixingDifferentSendTypesCompressAndStandard()
696696
await system.DeleteStream(stream);
697697
await system.Close();
698698
}
699+
700+
[Fact]
701+
public async void ShouldConsumeFromDateTimeOffset()
702+
{
703+
// validate the consumer can start from a specific time
704+
// this test is not deterministic because it depends on the
705+
// time the test is executed.
706+
// but at least we can validate the consumer can start from a specific time less 100 ms
707+
// and it has to receive all the messages
708+
// not 100% perfect but it is better than nothing
709+
710+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
711+
var before = DateTimeOffset.Now.AddMilliseconds(-100);
712+
await SystemUtils.PublishMessages(system, stream, 100, testOutputHelper);
713+
var testPassed = new TaskCompletionSource<bool>();
714+
715+
var consumer = await system.CreateRawConsumer(
716+
new RawConsumerConfig(stream)
717+
{
718+
Reference = "consumer",
719+
OffsetSpec = new OffsetTypeTimestamp(before),
720+
MessageHandler = async (_, ctx, _) =>
721+
{
722+
Assert.True(ctx.Timestamp >= before.Offset);
723+
if (ctx.Offset == 99)
724+
{
725+
testPassed.SetResult(true);
726+
}
727+
728+
await Task.CompletedTask;
729+
}
730+
});
731+
new Utils<bool>(testOutputHelper).WaitUntilTaskCompletes(testPassed);
732+
await consumer.Close().ConfigureAwait(false);
733+
await SystemUtils.CleanUpStreamSystem(system, stream);
734+
}
699735
}
700736
}

Tests/ReliableTests.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ public void MessageConfirmationShouldHaveTheSameMessages()
8080
var message = new Message(Encoding.UTF8.GetBytes($"hello"));
8181
confirmationPipe.AddUnConfirmedMessage(1, message);
8282
confirmationPipe.AddUnConfirmedMessage(2, new List<Message>() { message });
83-
confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 1, null);
84-
confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 2, null);
83+
confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 1, null).ConfigureAwait(false);
84+
confirmationPipe.RemoveUnConfirmedMessage(ConfirmationStatus.Confirmed, 2, null).ConfigureAwait(false);
8585
new Utils<List<MessagesConfirmation>>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask);
8686
Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[0].Status);
8787
Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[1].Status);

0 commit comments

Comments
 (0)