Skip to content

Commit de8e1e4

Browse files
committed
Continue simplifying code
Part of #1472 * Move some overrides to their base class * Simplify method, add comments * Try to figure out CI error here - https://github.com/rabbitmq/rabbitmq-dotnet-client/actions/runs/8072267202/job/22053764839 * Ensure `_closeReason` is not null * Remove or address TODOs * Revert to calling `AbortAsync` in `Dispose` for connections and channels * Add test showing that `CloseAsync` is not required before `Dispose`
1 parent 57a5251 commit de8e1e4

17 files changed

+104
-111
lines changed

projects/Benchmarks/Networking/Networking_BasicDeliver_LongLivedConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public void GlobalSetup()
2121
_container = RabbitMQBroker.Start();
2222

2323
var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
24-
// TODO / NOTE: https://github.com/dotnet/BenchmarkDotNet/issues/1738
24+
// NOTE: https://github.com/dotnet/BenchmarkDotNet/issues/1738
2525
_connection = EnsureCompleted(cf.CreateConnectionAsync());
2626
}
2727

projects/RabbitMQ.Client/client/exceptions/OperationInterruptedException.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ namespace RabbitMQ.Client.Exceptions
4242
/// </summary>
4343
[Serializable]
4444
public class OperationInterruptedException
45-
// TODO: inherit from OperationCanceledException
46-
// The above is an old TODO that I don't think applies here.
4745
: RabbitMQClientException
4846
{
4947
///<summary>Construct an OperationInterruptedException with

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 23 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -42,55 +42,45 @@ public Channel(ConnectionConfig config, ISession session) : base(config, session
4242
{
4343
}
4444

45-
public override Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
46-
{
47-
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
48-
return ModelSendAsync(method, cancellationToken).AsTask();
49-
}
50-
51-
public override Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken)
52-
{
53-
var method = new ChannelCloseOk();
54-
return ModelSendAsync(method, cancellationToken).AsTask();
55-
}
56-
57-
public override Task _Private_ChannelFlowOkAsync(bool active, CancellationToken cancellationToken)
58-
{
59-
var method = new ChannelFlowOk(active);
60-
return ModelSendAsync(method, cancellationToken).AsTask();
61-
}
62-
63-
public override Task _Private_ConnectionCloseOkAsync(CancellationToken cancellationToken)
64-
{
65-
var method = new ConnectionCloseOk();
66-
return ModelSendAsync(method, cancellationToken).AsTask();
67-
}
68-
6945
public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple)
7046
{
7147
var method = new BasicAck(deliveryTag, multiple);
72-
// TODO use cancellation token
48+
// TODO cancellation token
7349
return ModelSendAsync(method, CancellationToken.None);
7450
}
7551

7652
public override ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue)
7753
{
7854
var method = new BasicNack(deliveryTag, multiple, requeue);
79-
// TODO use cancellation token
55+
// TODO cancellation token
8056
return ModelSendAsync(method, CancellationToken.None);
8157
}
8258

8359
public override Task BasicRejectAsync(ulong deliveryTag, bool requeue)
8460
{
8561
var method = new BasicReject(deliveryTag, requeue);
86-
// TODO cancellation token?
62+
// TODO cancellation token
8763
return ModelSendAsync(method, CancellationToken.None).AsTask();
8864
}
8965

66+
/// <summary>
67+
/// Returning <c>true</c> from this method means that the command was server-originated,
68+
/// and handled already.
69+
/// Returning <c>false</c> (the default) means that the incoming command is the response to
70+
/// a client-initiated RPC call, and must be handled.
71+
/// </summary>
72+
/// <param name="cmd">The incoming command from the AMQP server</param>
73+
/// <param name="cancellationToken">The cancellation token</param>
74+
/// <returns></returns>
9075
protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
9176
{
9277
switch (cmd.CommandId)
9378
{
79+
case ProtocolCommandId.BasicCancel:
80+
{
81+
HandleBasicCancel(in cmd);
82+
return Task.FromResult(true);
83+
}
9484
case ProtocolCommandId.BasicDeliver:
9585
{
9686
HandleBasicDeliver(in cmd);
@@ -101,27 +91,6 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
10191
HandleBasicAck(in cmd);
10292
return Task.FromResult(true);
10393
}
104-
case ProtocolCommandId.BasicCancel:
105-
{
106-
HandleBasicCancel(in cmd);
107-
return Task.FromResult(true);
108-
}
109-
case ProtocolCommandId.BasicCancelOk:
110-
{
111-
return Task.FromResult(false);
112-
}
113-
case ProtocolCommandId.BasicConsumeOk:
114-
{
115-
return Task.FromResult(false);
116-
}
117-
case ProtocolCommandId.BasicGetEmpty:
118-
{
119-
return Task.FromResult(false);
120-
}
121-
case ProtocolCommandId.BasicGetOk:
122-
{
123-
return Task.FromResult(false);
124-
}
12594
case ProtocolCommandId.BasicNack:
12695
{
12796
HandleBasicNack(in cmd);
@@ -134,6 +103,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
134103
}
135104
case ProtocolCommandId.ChannelClose:
136105
{
106+
// Note: always returns true
137107
return HandleChannelCloseAsync(cmd, cancellationToken);
138108
}
139109
case ProtocolCommandId.ChannelCloseOk:
@@ -143,6 +113,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
143113
}
144114
case ProtocolCommandId.ChannelFlow:
145115
{
116+
// Note: always returns true
146117
return HandleChannelFlowAsync(cmd, cancellationToken);
147118
}
148119
case ProtocolCommandId.ConnectionBlocked:
@@ -152,6 +123,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
152123
}
153124
case ProtocolCommandId.ConnectionClose:
154125
{
126+
// Note: always returns true
155127
return HandleConnectionCloseAsync(cmd, cancellationToken);
156128
}
157129
case ProtocolCommandId.ConnectionSecure:
@@ -161,6 +133,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
161133
}
162134
case ProtocolCommandId.ConnectionStart:
163135
{
136+
// Note: always returns true
164137
return HandleConnectionStartAsync(cmd, cancellationToken);
165138
}
166139
case ProtocolCommandId.ConnectionTune:
@@ -173,12 +146,10 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
173146
HandleConnectionUnblocked(in cmd);
174147
return Task.FromResult(true);
175148
}
176-
case ProtocolCommandId.QueueDeclareOk:
149+
default:
177150
{
178-
bool result = HandleQueueDeclareOk(in cmd);
179-
return Task.FromResult(result);
151+
return Task.FromResult(false);
180152
}
181-
default: return Task.FromResult(false);
182153
}
183154
}
184155
}

projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,8 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout)
6565
var tcs = (TaskCompletionSource<T>)state;
6666
if (tcs.TrySetCanceled())
6767
{
68-
// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
69-
// Cancellation was successful, does this mean we should set a TimeoutException
70-
// in the same manner as BlockingCell?
68+
// Cancellation was successful, does this mean we set a TimeoutException
69+
// in the same manner as BlockingCell used to
7170
tcs.SetException(new TimeoutException("TODO LRB rabbitmq/rabbitmq-dotnet-client#1347"));
7271
}
7372
}, _tcs);
@@ -77,9 +76,9 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout)
7776
var tcs = (TaskCompletionSource<T>)state;
7877
if (tcs.TrySetCanceled())
7978
{
80-
// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
81-
// Cancellation was successful, does this mean we should set a TimeoutException
82-
// in the same manner as BlockingCell?
79+
// Cancellation was successful, does this mean we set a TimeoutException
80+
// in the same manner as BlockingCell used to
81+
tcs.SetException(new TimeoutException("TODO LRB rabbitmq/rabbitmq-dotnet-client#1347"));
8382
}
8483
}, state: _tcs, useSynchronizationContext: false);
8584
#endif
@@ -148,8 +147,6 @@ public override void HandleCommand(in IncomingCommand cmd)
148147
else if (cmd.CommandId == ProtocolCommandId.ConnectionTune)
149148
{
150149
var tune = new ConnectionTune(cmd.MethodSpan);
151-
// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
152-
// What to do if setting a result fails?
153150
_tcs.TrySetResult(new ConnectionSecureOrTune
154151
{
155152
m_tuneDetails = new ConnectionTuneDetails { m_channelMax = tune._channelMax, m_frameMax = tune._frameMax, m_heartbeatInSeconds = tune._heartbeat }

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,9 @@ public void Dispose()
234234
return;
235235
}
236236

237-
// TODO rabbitmq-dotnet-client-1472
238-
// this.Abort();
239237
if (IsOpen)
240238
{
241-
throw new InvalidOperationException("AutorecoveringChannel must be closed before calling Dispose!");
239+
this.AbortAsync().GetAwaiter().GetResult();
242240
}
243241

244242
_recordedConsumerTags.Clear();

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private static void HandleTopologyRecoveryException(TopologyRecoveryException e)
153153
ESLog.Info($"Will not retry recovery because of {e.InnerException?.GetType().FullName}: it's not a known problem with connectivity, ignoring it", e);
154154
}
155155

156-
// TODO propagate token
156+
// TODO propagate cancellation token
157157
private async ValueTask<bool> TryPerformAutomaticRecoveryAsync(CancellationToken cancellationToken)
158158
{
159159
ESLog.Info("Performing automatic recovery");

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -261,12 +261,7 @@ public void Dispose()
261261

262262
try
263263
{
264-
// TODO rabbitmq-dotnet-client-1472
265-
// this.Abort(InternalConstants.DefaultConnectionAbortTimeout);
266-
if (IsOpen)
267-
{
268-
throw new InvalidOperationException("Connection must be closed before calling Dispose!");
269-
}
264+
_innerConnection.Dispose();
270265
}
271266
catch (Exception)
272267
{

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,11 @@ private void OnSessionShutdown(object sender, ShutdownEventArgs reason)
471471

472472
internal bool SetCloseReason(ShutdownEventArgs reason)
473473
{
474+
if (reason is null)
475+
{
476+
throw new ArgumentNullException(nameof(reason));
477+
}
478+
474479
// NB: this ensures that Close is only called once on a channel
475480
return Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
476481
}
@@ -487,20 +492,21 @@ protected virtual void Dispose(bool disposing)
487492
{
488493
if (disposing)
489494
{
490-
// dispose managed resources
491-
// TODO exception?
492495
if (IsOpen)
493496
{
494-
throw new InvalidOperationException("Channel must be closed before calling Dispose!");
497+
this.AbortAsync().GetAwaiter().GetResult();
495498
}
499+
496500
ConsumerDispatcher.Dispose();
497501
_rpcSemaphore.Dispose();
498502
}
499-
500-
// dispose unmanaged resources
501503
}
502504

503-
public abstract Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken);
505+
public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
506+
{
507+
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
508+
return ModelSendAsync(method, cancellationToken).AsTask();
509+
}
504510

505511
protected void HandleBasicAck(in IncomingCommand cmd)
506512
{
@@ -663,8 +669,10 @@ protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, Cancella
663669

664670
Session.Close(CloseReason, false);
665671

666-
await _Private_ChannelCloseOkAsync(cancellationToken)
672+
var method = new ChannelCloseOk();
673+
await ModelSendAsync(method, cancellationToken)
667674
.ConfigureAwait(false);
675+
668676
return true;
669677
}
670678
finally
@@ -710,8 +718,9 @@ protected async Task<bool> HandleChannelFlowAsync(IncomingCommand cmd, Cancellat
710718
_flowControlBlock.Reset();
711719
}
712720

713-
await _Private_ChannelFlowOkAsync(active, cancellationToken)
714-
.ConfigureAwait(false);
721+
var method = new ChannelFlowOk(active);
722+
await ModelSendAsync(method, cancellationToken).
723+
ConfigureAwait(false);
715724

716725
if (!_flowControlWrapper.IsEmpty)
717726
{
@@ -748,8 +757,11 @@ protected async Task<bool> HandleConnectionCloseAsync(IncomingCommand cmd, Cance
748757
try
749758
{
750759
Session.Connection.ClosedViaPeer(reason);
751-
await _Private_ConnectionCloseOkAsync(cancellationToken)
760+
761+
var replyMethod = new ConnectionCloseOk();
762+
await ModelSendAsync(replyMethod, cancellationToken)
752763
.ConfigureAwait(false);
764+
753765
SetCloseReason(Session.Connection.CloseReason);
754766
}
755767
catch (IOException)
@@ -832,19 +844,11 @@ protected void HandleConnectionUnblocked(in IncomingCommand cmd)
832844
}
833845
}
834846

835-
// TODO rabbitmq-dotnet-client-1472 remove this method
836-
protected bool HandleQueueDeclareOk(in IncomingCommand cmd)
837-
{
838-
return false;
839-
}
840-
841-
public abstract Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken);
842-
843-
public abstract Task _Private_ChannelFlowOkAsync(bool active, CancellationToken cancellationToken);
847+
public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple);
844848

845-
public abstract Task _Private_ConnectionCloseOkAsync(CancellationToken cancellationToken);
849+
public abstract ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue);
846850

847-
public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple);
851+
public abstract Task BasicRejectAsync(ulong deliveryTag, bool requeue);
848852

849853
public async Task BasicCancelAsync(string consumerTag, bool noWait)
850854
{
@@ -946,8 +950,6 @@ await ModelSendAsync(method, k.CancellationToken)
946950
}
947951
}
948952

949-
public abstract ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue);
950-
951953
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
952954
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
953955
{
@@ -1162,8 +1164,6 @@ await ModelSendAsync(method, k.CancellationToken)
11621164
}
11631165
}
11641166

1165-
public abstract Task BasicRejectAsync(ulong deliveryTag, bool requeue);
1166-
11671167
public async Task ConfirmSelectAsync()
11681168
{
11691169
using var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout);

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
8686
HandleMainLoopException(ea);
8787
}
8888

89-
// TODO is this the best way?
9089
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionCloseTimeout);
9190
await FinishCloseAsync(cts.Token);
9291
}
@@ -175,7 +174,7 @@ private void TerminateMainloop()
175174
private void HandleMainLoopException(ShutdownEventArgs reason)
176175
{
177176
string message = reason.GetLogMessage();
178-
if (!SetCloseReason(reason))
177+
if (false == SetCloseReason(reason))
179178
{
180179
LogCloseError($"Unexpected Main Loop Exception while closing: {message}", reason.Exception);
181180
return;

0 commit comments

Comments
 (0)