diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 28ffd9e1..7402c3f1 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -112,8 +112,6 @@ public int Write(Span span) public class Client : IClient { - private bool isClosed = true; - private uint correlationId = 0; // allow for some pre-amble private Connection _connection; @@ -160,24 +158,13 @@ private Client(ClientParameters parameters, ILogger logger = null) SendHeartBeat, Close, (int)parameters.Heartbeat.TotalSeconds); - IsClosed = false; _logger = logger ?? NullLogger.Instance; ClientId = Guid.NewGuid().ToString(); } public bool IsClosed { - get - { - if (_connection.IsClosed) - { - isClosed = true; - } - - return isClosed; - } - - private set => isClosed = value; + get { return _connection.IsClosed; } } private void StartHeartBeat() @@ -758,7 +745,7 @@ private async ValueTask SendHeartBeat() private void InternalClose() { _heartBeatHandler.Close(); - IsClosed = true; + _connection.Close(); } private async ValueTask ConsumerUpdateResponse(uint rCorrelationId, IOffsetType offsetSpecification) @@ -773,13 +760,13 @@ private async Task Close(string reason, string closedStatus) return new CloseResponse(0, ResponseCode.Ok); } - InternalClose(); try { - _connection.UpdateCloseStatus(closedStatus); var result = await Request(corr => new CloseRequest(corr, reason), TimeSpan.FromSeconds(10)).ConfigureAwait(false); + InternalClose(); + _connection.UpdateCloseStatus(closedStatus); return result; } diff --git a/RabbitMQ.Stream.Client/Connection.cs b/RabbitMQ.Stream.Client/Connection.cs index a05cd58c..a1a5cb06 100644 --- a/RabbitMQ.Stream.Client/Connection.cs +++ b/RabbitMQ.Stream.Client/Connection.cs @@ -19,7 +19,6 @@ internal static class ConnectionClosedReason public const string Normal = "TCP connection closed normal"; public const string Unexpected = "TCP connection closed unexpected"; public const string TooManyHeartbeatsMissing = "TCP connection closed by too many heartbeats missing"; - } public class Connection : IDisposable @@ -32,7 +31,6 @@ public class Connection : IDisposable private readonly Func closedCallback; private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1); private int numFrames; - private bool isClosed = false; private string _closedReason = ConnectionClosedReason.Unexpected; private bool _disposedValue; private readonly ILogger _logger; @@ -43,7 +41,8 @@ public class Connection : IDisposable internal int NumFrames => numFrames; internal string ClientId { get; set; } - public bool IsClosed => isClosed; + public bool IsClosed => !socket.Connected; + public void UpdateCloseStatus(string reason) { _closedReason = reason; @@ -59,17 +58,31 @@ private Connection(Socket socket, Func, Task> callback, { _logger = logger; this.socket = socket; + commandCallback = callback; closedCallback = closedCallBack; var networkStream = new NetworkStream(socket); var stream = MaybeTcpUpgrade(networkStream, sslOption); - writer = PipeWriter.Create(stream); - reader = PipeReader.Create(stream); + reader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true)); + writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true)); + // ProcessIncomingFrames is dropped as soon as the connection is closed // no need to stop it manually when the connection is closed _incomingFramesTask = Task.Run(ProcessIncomingFrames); } + internal void Close() + { + if (!_cancelTokenSource.IsCancellationRequested) + { + _cancelTokenSource.Cancel(); + } + + writer.Complete(); + reader.Complete(); + socket.Close(); + } + public static async Task Create(EndPoint endpoint, Func, Task> commandCallback, Func closedCallBack, SslOption sslOption, ILogger logger) { @@ -120,7 +133,7 @@ private async Task WriteCommand(T command) where T : struct, ICommand throw new OperationCanceledException("Token Cancellation Requested Connection"); } - if (isClosed) + if (!socket.Connected) { throw new InvalidOperationException("Connection is closed"); } @@ -148,7 +161,7 @@ private async Task ProcessIncomingFrames() Exception caught = null; try { - while (!isClosed) + while (socket.Connected) { if (!reader.TryRead(out var result)) { @@ -158,14 +171,13 @@ private async Task ProcessIncomingFrames() var buffer = result.Buffer; if (buffer.Length == 0) { - Debug.WriteLine("TCP Connection Closed!"); + _logger?.LogDebug("TCP Connection Closed!"); // We're not going to receive any more bytes from the connection. break; } // Let's try to read some frames! - - while (TryReadFrame(ref buffer, out var frame) && !isClosed) + while (TryReadFrame(ref buffer, out var frame) && socket.Connected) { // Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled. @@ -196,20 +208,28 @@ private async Task ProcessIncomingFrames() // closedCallback event. // It is useful to trace the error, but at this point // the socket is closed maybe not in the correct way - if (!isClosed) + if (socket.Connected) { _logger?.LogError(e, "Error reading the socket"); } } finally { - isClosed = true; _logger?.LogDebug( "TCP Connection Closed ClientId: {ClientId}, Reason {Reason}. IsCancellationRequested {Token} ", ClientId, _closedReason, Token.IsCancellationRequested); // Mark the PipeReader as complete await reader.CompleteAsync(caught).ConfigureAwait(false); - closedCallback?.Invoke(_closedReason)!.ConfigureAwait(false); + if (closedCallback != null) + { + // that's mandatory for the ReliableProducer / ReliableConsumer + // to deal with the connection closed. Null callback won't raise the event + await closedCallback(_closedReason).ConfigureAwait(false); + } + else + { + _logger?.LogWarning("Connection: Closed callback is null. ClientId: {ClientId}", ClientId); + } } } @@ -242,15 +262,6 @@ public void Dispose() { try { - if (!_cancelTokenSource.IsCancellationRequested) - { - _cancelTokenSource.Cancel(); - } - - isClosed = true; - writer.Complete(); - reader.Complete(); - socket.Close(); if (!_incomingFramesTask.Wait(Consts.MidWait)) { _logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}", diff --git a/RabbitMQ.Stream.Client/Consts.cs b/RabbitMQ.Stream.Client/Consts.cs index c00eabc4..8113770b 100644 --- a/RabbitMQ.Stream.Client/Consts.cs +++ b/RabbitMQ.Stream.Client/Consts.cs @@ -35,5 +35,10 @@ internal static int RandomMid() { return Random.Shared.Next(1000, 2500); } + + internal static int RandomLarge() + { + return Random.Shared.Next(1500, 3000); + } } } diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index 21b60560..bd9950d1 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -47,7 +47,6 @@ private async Task StandardConsumer(bool boot) // before creating a new consumer, the old one is disposed // This is just a safety check, the consumer should be already disposed _consumer?.Dispose(); - return await _consumerConfig.StreamSystem.CreateRawConsumer(new RawConsumerConfig(_consumerConfig.Stream) { ClientProvidedName = _consumerConfig.ClientProvidedName, @@ -72,7 +71,7 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, catch (Exception e) { BaseLogger?.LogError(e, - $"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}"); + $"Stream consumer.ConnectionClosedHandler error. Auto recovery failed for: {_consumerConfig.Stream}"); } }, MetadataHandler = async _ => @@ -88,18 +87,27 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream, catch (Exception e) { BaseLogger?.LogError(e, - $"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}"); + $"Stream consumer.MetadataHandler error. Auto recovery failed for stream: {_consumerConfig.Stream}"); } }, MessageHandler = async (consumer, ctx, message) => { - if (_consumerConfig.MessageHandler != null) + try + { + if (_consumerConfig.MessageHandler != null) + { + await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message) + .ConfigureAwait(false); + } + + _consumedFirstTime = true; + } + catch (Exception e) { - await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message) - .ConfigureAwait(false); + BaseLogger?.LogError("MessageHandler {Error} for stream {Stream} ", e.Message, + _consumerConfig.Stream); } - _consumedFirstTime = true; _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; }, }, BaseLogger).ConfigureAwait(false); @@ -144,9 +152,9 @@ private async Task SuperConsumer(bool boot) Identifier = _consumerConfig.Identifier, ConnectionClosedHandler = async (closeReason, partitionStream) => { - await RandomWait().ConfigureAwait(false); if (IsClosedNormally(closeReason)) return; + await RandomWait().ConfigureAwait(false); try { var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; @@ -163,9 +171,9 @@ await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r, { try { - await RandomWait().ConfigureAwait(false); if (IsClosedNormally()) return; + await RandomWait().ConfigureAwait(false); var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r, @@ -180,14 +188,22 @@ await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r, }, MessageHandler = async (partitionStream, consumer, ctx, message) => { - if (_consumerConfig.MessageHandler != null) + try { - await _consumerConfig.MessageHandler(partitionStream, consumer, ctx, - message).ConfigureAwait(false); + if (_consumerConfig.MessageHandler != null) + { + await _consumerConfig.MessageHandler(partitionStream, consumer, ctx, + message).ConfigureAwait(false); + } + + _consumedFirstTime = true; + _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; + } + catch (Exception e) + { + BaseLogger?.LogError("MessageHandler {Error} for stream {Stream} ", e.Message, + _consumerConfig.Stream); } - - _consumedFirstTime = true; - _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; }, }, BaseLogger).ConfigureAwait(false); } diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs index c4cf46e5..8f6931d8 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs @@ -414,6 +414,7 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, await SemaphoreSlim.WaitAsync().ConfigureAwait(false); UpdateStatus(ReliableEntityStatus.Reconnection, reason, [stream]); + await Task.Delay(Consts.RandomLarge()).ConfigureAwait(false); try { var (localStreamExists, streamInfo) = await CheckIfStreamIsAvailable(stream, system) @@ -446,6 +447,7 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, ChangeSta var streamExists = false; await SemaphoreSlim.WaitAsync().ConfigureAwait(false); UpdateStatus(ReliableEntityStatus.Reconnection, reason, [stream]); + await Task.Delay(Consts.RandomLarge()).ConfigureAwait(false); try { (streamExists, _) = await CheckIfStreamIsAvailable(stream, system) diff --git a/Tests/RawConsumerSystemTests.cs b/Tests/RawConsumerSystemTests.cs index b50b9499..7328ea6e 100644 --- a/Tests/RawConsumerSystemTests.cs +++ b/Tests/RawConsumerSystemTests.cs @@ -526,7 +526,6 @@ public async Task ConsumerQueryOffset() await Assert.ThrowsAsync(() => system.QueryOffset("reference_does_not_exist", stream)); - Assert.Null(await system.TryQueryOffset("reference_does_not_exist", stream)); await Assert.ThrowsAsync(() => (system.TryQueryOffset(Reference, "stream_does_not_exist"))); diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs index bce3b47c..ac45fdbc 100644 --- a/Tests/SuperStreamConsumerTests.cs +++ b/Tests/SuperStreamConsumerTests.cs @@ -46,13 +46,19 @@ await SystemUtils.OffsetsForSuperStreamConsumer(system, SystemUtils.InvoicesExch Assert.NotNull(consumer); await SystemUtils.WaitAsync(); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); Assert.Equal(ResponseCode.Ok, await consumer.Close()); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); Assert.Equal("super_stream_consumer_88888", consumer.Info.Identifier); await system.Close(); @@ -125,26 +131,41 @@ public async Task RemoveOneConnectionIfaStreamIsDeleted() var system = await StreamSystem.Create(new StreamSystemConfig()); var clientProvidedName = Guid.NewGuid().ToString(); var consumer = await system.CreateSuperStreamConsumer( - new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { ClientProvidedName = clientProvidedName, }); + new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) + { + ClientProvidedName = clientProvidedName, + }); Assert.NotNull(consumer); await SystemUtils.WaitAsync(); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); SystemUtils.HttpDeleteQueue(SystemUtils.InvoicesStream0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); SystemUtils.HttpDeleteQueue(SystemUtils.InvoicesStream1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); await consumer.Close(); // in this case we don't have any connection anymore since the super stream consumer is closed - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); Assert.Equal(ResponseCode.Ok, await consumer.Close()); await system.Close(); } @@ -181,21 +202,30 @@ await system.StreamInfo(stream).ConfigureAwait(false) Assert.NotNull(consumer); await SystemUtils.WaitAsync(); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); await SystemUtils.WaitUntilAsync(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_0").Result == 1); await completed.Task; - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); Assert.Equal(ResponseCode.Ok, await consumer.Close()); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + await SystemUtils.WaitUntilAsync( + () => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); await system.Close(); } @@ -233,9 +263,9 @@ public IEnumerator GetEnumerator() IsSingleActiveConsumer = false, MessagesPerStream = new Dictionary() { - {SystemUtils.InvoicesStream0, 9 * 2}, - {SystemUtils.InvoicesStream1, 7 * 2}, - {SystemUtils.InvoicesStream2, 4 * 2} + { SystemUtils.InvoicesStream0, 9 * 2 }, + { SystemUtils.InvoicesStream1, 7 * 2 }, + { SystemUtils.InvoicesStream2, 4 * 2 } }, Consumers = 2, ClosedConsumers = 0, @@ -249,9 +279,9 @@ public IEnumerator GetEnumerator() IsSingleActiveConsumer = false, MessagesPerStream = new Dictionary() { - {SystemUtils.InvoicesStream0, 9 * 3}, - {SystemUtils.InvoicesStream1, 7 * 3}, - {SystemUtils.InvoicesStream2, 4 * 3} + { SystemUtils.InvoicesStream0, 9 * 3 }, + { SystemUtils.InvoicesStream1, 7 * 3 }, + { SystemUtils.InvoicesStream2, 4 * 3 } }, Consumers = 3, ClosedConsumers = 0, @@ -401,102 +431,6 @@ public async Task ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis await system.Close(); } - /// - /// The most complex test where we test the consumer with the following configuration: - /// Single Active Consumer = true - /// Super Stream = true - /// There are 3 consumers and 1 is closed by killing the connection ( so it should be recreated) - /// When the consumer is killed a second consumer becomes active and consumes the messages - /// But given the event: consumerUpdateListener we restart from the last stored offset - /// So the number of messages consumed should be the same as the number of messages published - /// - [Fact] - public async Task ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublishedInSaC() - { - await SystemUtils.ResetSuperStreams(); - var system = await StreamSystem.Create(new StreamSystemConfig()); - await SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, 20, "", _testOutputHelper); - var listConsumed = new ConcurrentBag(); - var reference = Guid.NewGuid().ToString(); - var consumers = new List(); - var consumerMessageReceived = new TaskCompletionSource(); - - async Task NewReliableConsumer(string refConsumer, string clientProvidedName, - Func> consumerUpdateListener - ) - { - return await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange) - { - OffsetSpec = new OffsetTypeFirst(), - Reference = refConsumer, - ClientProvidedName = clientProvidedName, - IsSuperStream = true, - IsSingleActiveConsumer = true, - ConsumerUpdateListener = consumerUpdateListener, - MessageHandler = async (stream, consumer1, context, _) => - { - await consumer1.StoreOffset(context.Offset); - - listConsumed.Add(stream); - if (listConsumed.Count == 20) - { - consumerMessageReceived.SetResult(true); - } - }, - ReconnectStrategy = new TestBackOffReconnectStrategy() - }); - } - - var clientProvidedName = $"first_{Guid.NewGuid().ToString()}"; - // this is the first consumer that is active and consumes the messages - var consumerSingle = await NewReliableConsumer(reference, clientProvidedName, null); - consumers.Add(consumerSingle); - await SystemUtils.WaitAsync(TimeSpan.FromSeconds(1)); - // these are two consumers will start consume as soon as they start - for (var i = 0; i < 2; i++) - { - var consumer = await NewReliableConsumer(reference, Guid.NewGuid().ToString(), - (_, _, _) => { return new Task(() => new OffsetTypeFirst()); }); - consumers.Add(consumer); - } - - new Utils(_testOutputHelper).WaitUntilTaskCompletes(consumerMessageReceived); - // The sum og the messages must be 20 as the publisher published 20 messages - Assert.Equal(9, - listConsumed.Sum(x => x == SystemUtils.InvoicesStream0 ? 1 : 0)); - Assert.Equal(7, - listConsumed.Sum(x => x == SystemUtils.InvoicesStream1 ? 1 : 0)); - Assert.Equal(4, - listConsumed.Sum(x => x == SystemUtils.InvoicesStream2 ? 1 : 0)); - - await SystemUtils.WaitAsync(TimeSpan.FromSeconds(2)); - // we kill the connections of the first super stream consumer ( so 3 connections one per stream) - await SystemUtils.WaitUntilAsync(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_0").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_1").Result == 1); - await SystemUtils.WaitUntilAsync(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_2").Result == 1); - await SystemUtils.WaitAsync(TimeSpan.FromSeconds(3)); - // at this point the second consumer should be active and consume the messages - // and the consumerUpdateListener should be called and the offset should be restored - // so the sum of the messages must be 20 as the publisher published 20 messages - Assert.Equal(9, - listConsumed.Sum(x => x == SystemUtils.InvoicesStream0 ? 1 : 0)); - Assert.Equal(7, - listConsumed.Sum(x => x == SystemUtils.InvoicesStream1 ? 1 : 0)); - Assert.Equal(4, - listConsumed.Sum(x => x == SystemUtils.InvoicesStream2 ? 1 : 0)); - foreach (var reliableConsumer in consumers) - { - await reliableConsumer.Close(); - } - - // just to be sure that the connections are killed - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); - await SystemUtils.WaitUntilAsync(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); - await consumerSingle.Close(); - await system.Close(); - } - /// /// Test when a super stream consumer with the same name joins the group /// so we start with one consumer and then we start another consumer with the same name @@ -652,10 +586,8 @@ public async Task ReliableConsumerSuperStreamInfoShouldBeTheSame() await SystemUtils.ResetSuperStreams(); var system = await StreamSystem.Create(new StreamSystemConfig()); - var consumer = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange) - { - IsSuperStream = true - }); + var consumer = + await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange) { IsSuperStream = true }); Assert.Equal(SystemUtils.InvoicesExchange, consumer.Info.Stream); Assert.Contains(SystemUtils.InvoicesStream0, consumer.Info.Partitions); diff --git a/docs/ReliableClient/BestPracticesClient.cs b/docs/ReliableClient/BestPracticesClient.cs index 5c98a89c..ca97ec7a 100644 --- a/docs/ReliableClient/BestPracticesClient.cs +++ b/docs/ReliableClient/BestPracticesClient.cs @@ -34,6 +34,8 @@ public record Config public byte ConsumersPerConnection { get; set; } = 8; public int DelayDuringSendMs { get; set; } = 0; + + public bool EnableResending { get; set; } = false; } public static async Task Start(Config config) @@ -190,16 +192,17 @@ await system.CreateStream(new StreamSpec(stream) { MaxLengthBytes = 30_000_000_0 // can help to identify the consumer on the logs and RabbitMQ Management Identifier = $"my_consumer_{z}", InitialCredits = 10, - MessageHandler = async (source, consumer, ctx, _) => + MessageHandler = (source, consumer, ctx, _) => { - if (totalConsumed % 10_000 == 0) - { - // don't store the offset every time, it could be a performance issue - // store the offset every 1_000/5_000/10_000 messages - await consumer.StoreOffset(ctx.Offset).ConfigureAwait(false); - } + // if (totalConsumed % 10_000 == 0) + // { + // // don't store the offset every time, it could be a performance issue + // // store the offset every 1_000/5_000/10_000 messages + // // await consumer.StoreOffset(ctx.Offset).ConfigureAwait(false); + // } Interlocked.Increment(ref totalConsumed); + return Task.CompletedTask; }, }; @@ -207,12 +210,16 @@ await system.CreateStream(new StreamSpec(stream) { MaxLengthBytes = 30_000_000_0 // DON'T PUT ANY BLOCKING CODE HERE conf.StatusChanged += (status) => { - var partitions = "["; - status.Partitions.ForEach(s => partitions += s + ","); - partitions = partitions.Remove(partitions.Length - 1) + "]"; - var streamInfo = status.Partitions is not null - ? $" Partitions: {partitions} of super stream: {status.Stream}" - : $"Stream: {status.Stream}"; + var streamInfo = $"Stream: {status.Stream}"; + if (status.Partitions is { Count: > 0 }) + { + // the partitions are not null and not empty + // it is a super stream + var partitions = "["; + status.Partitions.ForEach(s => partitions += s + ","); + partitions = partitions.Remove(partitions.Length - 1) + "]"; + streamInfo = $" Partitions: {partitions} of super stream: {status.Stream}"; + } lc.LogInformation( "Consumer: {Id} - status changed from: {From} to: {To} reason: {Reason} {Info}", @@ -254,7 +261,11 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis // Add the unconfirmed messages to the list in case of error if (confirmation.Status != ConfirmationStatus.Confirmed) { - confirmation.Messages.ForEach(m => { unconfirmedMessages.Add(m); }); + if (config.EnableResending) + { + confirmation.Messages.ForEach(m => { unconfirmedMessages.Add(m); }); + } + Interlocked.Add(ref totalError, confirmation.Messages.Count); return Task.CompletedTask; } @@ -267,12 +278,17 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis // Like the consumer don't put any blocking code here producerConfig.StatusChanged += (status) => { - var partitions = "["; - status.Partitions?.ForEach(s => partitions += s + ","); - // partitions = partitions.Remove(partitions.Length - 1) + "]"; - var streamInfo = status.Partitions is not null - ? $" Partitions: {partitions} of super stream: {status.Stream}" - : $"Stream: {status.Stream}"; + var streamInfo = $"Stream: {status.Stream}"; + if (status.Partitions is { Count: > 0 }) + { + // the partitions are not null and not empty + // it is a super stream + var partitions = "["; + status.Partitions.ForEach(s => partitions += s + ","); + partitions = partitions.Remove(partitions.Length - 1) + "]"; + streamInfo = $" Partitions: {partitions} of super stream: {status.Stream}"; + } + // just log the status change lp.LogInformation( diff --git a/docs/ReliableClient/Program.cs b/docs/ReliableClient/Program.cs index e8bb40c1..8e48c215 100644 --- a/docs/ReliableClient/Program.cs +++ b/docs/ReliableClient/Program.cs @@ -29,17 +29,16 @@ ConsumersPerConnection = section.GetSection("ConsumersPerConnection").Get(), Host = section.GetSection("Host").Get(), Port = section.GetSection("Port").Get(), - LoadBalancer = section.GetSection("LoadBalancer").Get(), - + // Enable the SuperStream stream feature. SuperStream = section.GetSection("SuperStream").Get(), - + // The number of streams that will be created. in case of super stream, this is the number of the partitions. Streams = section.GetSection("Streams").Get(), // The number of producers that will be created for each stream. Producers = section.GetSection("Producers").Get(), - + // The number of messages that will be sent by each producer. MessagesPerProducer = section.GetSection("MessagesPerProducer").Get(), Consumers = section.GetSection("Consumers").Get(), @@ -48,6 +47,7 @@ // The delay between each message sent by the producer. DelayDuringSendMs = section.GetSection("DelayDuringSendMs").Get(), StreamName = section.GetSection("StreamName").Get(), + EnableResending = section.GetSection("EnableResending").Get(), }); await rClient.ConfigureAwait(false); diff --git a/docs/ReliableClient/appsettings.json b/docs/ReliableClient/appsettings.json index 1dbccd17..82dc1147 100644 --- a/docs/ReliableClient/appsettings.json +++ b/docs/ReliableClient/appsettings.json @@ -11,14 +11,15 @@ "Port": 5552, "Virtualhost": "/", "LoadBalancer": true, - "SuperStream": true, + "SuperStream": false, "Streams": 4, "ProducersPerConnection": 2, "ConsumersPerConnection": 2, - "Producers": 3, + "Producers": 1, "Consumers": 2, - "DelayDuringSendMs":1, - "MessagesPerProducer": 100000, + "DelayDuringSendMs":0, + "MessagesPerProducer": 10000000, + "EnableResending": false, "StreamName": "DotNetClientTest" } } diff --git a/docs/asciidoc/query-stream.adoc b/docs/asciidoc/query-stream.adoc index 18aac099..82b32308 100644 --- a/docs/asciidoc/query-stream.adoc +++ b/docs/asciidoc/query-stream.adoc @@ -21,7 +21,7 @@ The following methods are available: | Stream |`TryQueryOffset(string reference, string stream)` -|Like `QueryOffset` but returns `null` if the offset was not found. +|Like `QueryOffset` but returns `null` if the offset was not found without throwing `OffsetNotFoundException` exception . | Stream |`QueryPartition(string superStream)`