Skip to content

Improve socket handling #414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 4 additions & 17 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ public int Write(Span<byte> span)

public class Client : IClient
{
private bool isClosed = true;

private uint correlationId = 0; // allow for some pre-amble

private Connection _connection;
Expand Down Expand Up @@ -160,24 +158,13 @@ private Client(ClientParameters parameters, ILogger logger = null)
SendHeartBeat,
Close,
(int)parameters.Heartbeat.TotalSeconds);
IsClosed = false;
_logger = logger ?? NullLogger.Instance;
ClientId = Guid.NewGuid().ToString();
}

public bool IsClosed
{
get
{
if (_connection.IsClosed)
{
isClosed = true;
}

return isClosed;
}

private set => isClosed = value;
get { return _connection.IsClosed; }
}

private void StartHeartBeat()
Expand Down Expand Up @@ -758,7 +745,7 @@ private async ValueTask<bool> SendHeartBeat()
private void InternalClose()
{
_heartBeatHandler.Close();
IsClosed = true;
_connection.Close();
}

private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffsetType offsetSpecification)
Expand All @@ -773,13 +760,13 @@ private async Task<CloseResponse> Close(string reason, string closedStatus)
return new CloseResponse(0, ResponseCode.Ok);
}

InternalClose();
try
{
_connection.UpdateCloseStatus(closedStatus);
var result =
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
TimeSpan.FromSeconds(10)).ConfigureAwait(false);
InternalClose();
_connection.UpdateCloseStatus(closedStatus);

return result;
}
Expand Down
55 changes: 33 additions & 22 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ internal static class ConnectionClosedReason
public const string Normal = "TCP connection closed normal";
public const string Unexpected = "TCP connection closed unexpected";
public const string TooManyHeartbeatsMissing = "TCP connection closed by too many heartbeats missing";

}

public class Connection : IDisposable
Expand All @@ -32,7 +31,6 @@ public class Connection : IDisposable
private readonly Func<string, Task> closedCallback;
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
private int numFrames;
private bool isClosed = false;
private string _closedReason = ConnectionClosedReason.Unexpected;
private bool _disposedValue;
private readonly ILogger _logger;
Expand All @@ -43,7 +41,8 @@ public class Connection : IDisposable

internal int NumFrames => numFrames;
internal string ClientId { get; set; }
public bool IsClosed => isClosed;
public bool IsClosed => !socket.Connected;

public void UpdateCloseStatus(string reason)
{
_closedReason = reason;
Expand All @@ -59,17 +58,31 @@ private Connection(Socket socket, Func<Memory<byte>, Task> callback,
{
_logger = logger;
this.socket = socket;

commandCallback = callback;
closedCallback = closedCallBack;
var networkStream = new NetworkStream(socket);
var stream = MaybeTcpUpgrade(networkStream, sslOption);
writer = PipeWriter.Create(stream);
reader = PipeReader.Create(stream);
reader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true));
writer = PipeWriter.Create(stream, new StreamPipeWriterOptions(leaveOpen: true));

// ProcessIncomingFrames is dropped as soon as the connection is closed
// no need to stop it manually when the connection is closed
_incomingFramesTask = Task.Run(ProcessIncomingFrames);
}

internal void Close()
{
if (!_cancelTokenSource.IsCancellationRequested)
{
_cancelTokenSource.Cancel();
}

writer.Complete();
reader.Complete();
socket.Close();
}

public static async Task<Connection> Create(EndPoint endpoint, Func<Memory<byte>, Task> commandCallback,
Func<string, Task> closedCallBack, SslOption sslOption, ILogger logger)
{
Expand Down Expand Up @@ -120,7 +133,7 @@ private async Task WriteCommand<T>(T command) where T : struct, ICommand
throw new OperationCanceledException("Token Cancellation Requested Connection");
}

if (isClosed)
if (!socket.Connected)
{
throw new InvalidOperationException("Connection is closed");
}
Expand Down Expand Up @@ -148,7 +161,7 @@ private async Task ProcessIncomingFrames()
Exception caught = null;
try
{
while (!isClosed)
while (socket.Connected)
{
if (!reader.TryRead(out var result))
{
Expand All @@ -158,14 +171,13 @@ private async Task ProcessIncomingFrames()
var buffer = result.Buffer;
if (buffer.Length == 0)
{
Debug.WriteLine("TCP Connection Closed!");
_logger?.LogDebug("TCP Connection Closed!");
// We're not going to receive any more bytes from the connection.
break;
}

// Let's try to read some frames!

while (TryReadFrame(ref buffer, out var frame) && !isClosed)
while (TryReadFrame(ref buffer, out var frame) && socket.Connected)
{
// Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled.

Expand Down Expand Up @@ -196,20 +208,28 @@ private async Task ProcessIncomingFrames()
// closedCallback event.
// It is useful to trace the error, but at this point
// the socket is closed maybe not in the correct way
if (!isClosed)
if (socket.Connected)
{
_logger?.LogError(e, "Error reading the socket");
}
}
finally
{
isClosed = true;
_logger?.LogDebug(
"TCP Connection Closed ClientId: {ClientId}, Reason {Reason}. IsCancellationRequested {Token} ",
ClientId, _closedReason, Token.IsCancellationRequested);
// Mark the PipeReader as complete
await reader.CompleteAsync(caught).ConfigureAwait(false);
closedCallback?.Invoke(_closedReason)!.ConfigureAwait(false);
if (closedCallback != null)
{
// that's mandatory for the ReliableProducer / ReliableConsumer
// to deal with the connection closed. Null callback won't raise the event
await closedCallback(_closedReason).ConfigureAwait(false);
}
else
{
_logger?.LogWarning("Connection: Closed callback is null. ClientId: {ClientId}", ClientId);
}
}
}

Expand Down Expand Up @@ -242,15 +262,6 @@ public void Dispose()
{
try
{
if (!_cancelTokenSource.IsCancellationRequested)
{
_cancelTokenSource.Cancel();
}

isClosed = true;
writer.Complete();
reader.Complete();
socket.Close();
if (!_incomingFramesTask.Wait(Consts.MidWait))
{
_logger?.LogWarning("ProcessIncomingFrames reader task did not exit in {MidWait}",
Expand Down
5 changes: 5 additions & 0 deletions RabbitMQ.Stream.Client/Consts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,10 @@ internal static int RandomMid()
{
return Random.Shared.Next(1000, 2500);
}

internal static int RandomLarge()
{
return Random.Shared.Next(1500, 3000);
}
}
}
46 changes: 31 additions & 15 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ private async Task<IConsumer> StandardConsumer(bool boot)
// before creating a new consumer, the old one is disposed
// This is just a safety check, the consumer should be already disposed
_consumer?.Dispose();

return await _consumerConfig.StreamSystem.CreateRawConsumer(new RawConsumerConfig(_consumerConfig.Stream)
{
ClientProvidedName = _consumerConfig.ClientProvidedName,
Expand All @@ -72,7 +71,7 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
catch (Exception e)
{
BaseLogger?.LogError(e,
$"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}");
$"Stream consumer.ConnectionClosedHandler error. Auto recovery failed for: {_consumerConfig.Stream}");
}
},
MetadataHandler = async _ =>
Expand All @@ -88,18 +87,27 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
catch (Exception e)
{
BaseLogger?.LogError(e,
$"Stream consumer.MetadataHandler error. Auto recovery failed for: {ToString()}");
$"Stream consumer.MetadataHandler error. Auto recovery failed for stream: {_consumerConfig.Stream}");
}
},
MessageHandler = async (consumer, ctx, message) =>
{
if (_consumerConfig.MessageHandler != null)
try
{
if (_consumerConfig.MessageHandler != null)
{
await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message)
.ConfigureAwait(false);
}

_consumedFirstTime = true;
}
catch (Exception e)
{
await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message)
.ConfigureAwait(false);
BaseLogger?.LogError("MessageHandler {Error} for stream {Stream} ", e.Message,
_consumerConfig.Stream);
}

_consumedFirstTime = true;
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
},
}, BaseLogger).ConfigureAwait(false);
Expand Down Expand Up @@ -144,9 +152,9 @@ private async Task<IConsumer> SuperConsumer(bool boot)
Identifier = _consumerConfig.Identifier,
ConnectionClosedHandler = async (closeReason, partitionStream) =>
{
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally(closeReason))
return;
await RandomWait().ConfigureAwait(false);
try
{
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
Expand All @@ -163,9 +171,9 @@ await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
{
try
{
await RandomWait().ConfigureAwait(false);
if (IsClosedNormally())
return;
await RandomWait().ConfigureAwait(false);

var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
Expand All @@ -180,14 +188,22 @@ await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
},
MessageHandler = async (partitionStream, consumer, ctx, message) =>
{
if (_consumerConfig.MessageHandler != null)
try
{
await _consumerConfig.MessageHandler(partitionStream, consumer, ctx,
message).ConfigureAwait(false);
if (_consumerConfig.MessageHandler != null)
{
await _consumerConfig.MessageHandler(partitionStream, consumer, ctx,
message).ConfigureAwait(false);
}

_consumedFirstTime = true;
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
}
catch (Exception e)
{
BaseLogger?.LogError("MessageHandler {Error} for stream {Stream} ", e.Message,
_consumerConfig.Stream);
}

_consumedFirstTime = true;
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
},
}, BaseLogger).ConfigureAwait(false);
}
Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ internal async Task OnEntityClosed(StreamSystem system, string stream,
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
UpdateStatus(ReliableEntityStatus.Reconnection, reason,
[stream]);
await Task.Delay(Consts.RandomLarge()).ConfigureAwait(false);
try
{
var (localStreamExists, streamInfo) = await CheckIfStreamIsAvailable(stream, system)
Expand Down Expand Up @@ -446,6 +447,7 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, ChangeSta
var streamExists = false;
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
UpdateStatus(ReliableEntityStatus.Reconnection, reason, [stream]);
await Task.Delay(Consts.RandomLarge()).ConfigureAwait(false);
try
{
(streamExists, _) = await CheckIfStreamIsAvailable(stream, system)
Expand Down
1 change: 0 additions & 1 deletion Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ public async Task ConsumerQueryOffset()
await Assert.ThrowsAsync<OffsetNotFoundException>(() =>
system.QueryOffset("reference_does_not_exist", stream));

Assert.Null(await system.TryQueryOffset("reference_does_not_exist", stream));
await Assert.ThrowsAsync<GenericProtocolException>(() =>
(system.TryQueryOffset(Reference, "stream_does_not_exist")));

Expand Down
Loading