Skip to content

Commit 8cb3f39

Browse files
committed
Add CancellationToken to Async members of IChannel
* `BasicAckAsync` * `BasicNackAsync` * `BasicRejectAsync` * `BasicCancelAsync` * Add `CancellationToken` to `BasicPublishAsync` * Add `CancellationToken` to a couple more places. * Remove dead code * Add a few more TODOs for cancellation tokens * Add cancellation token to `BasicConsumeAsync` * Add cancellation token to `BasicGetAsync` * Add cancellation token to `BasicQosAsync` * Add cancellation token to `CloseAsync` * Add cancellation token to `ConfirmSelectAsync` * Add cancellation token to `ExchangeBindAsync` * Add cancellation token to `ExchangeDeclareAsync` and `ExchangeDeclarePassiveAsync` * Add cancellation token to `ExchangeDeleteAsync` * Add cancellation token to `ExchangeUnbindAsync` * Add cancellation token to remainder of `IChannel` * Ensure that token source is disposed correctly in async rpc continuations
1 parent dbbf038 commit 8cb3f39

15 files changed

+488
-390
lines changed

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -476,13 +476,12 @@ RabbitMQ.Client.IBasicProperties.Type.set -> void
476476
RabbitMQ.Client.IBasicProperties.UserId.get -> string
477477
RabbitMQ.Client.IBasicProperties.UserId.set -> void
478478
RabbitMQ.Client.IChannel
479-
RabbitMQ.Client.IChannel.BasicAckAsync(ulong deliveryTag, bool multiple) -> System.Threading.Tasks.ValueTask
479+
RabbitMQ.Client.IChannel.BasicAckAsync(ulong deliveryTag, bool multiple, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
480480
RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler<RabbitMQ.Client.Events.BasicAckEventArgs>
481-
RabbitMQ.Client.IChannel.BasicGetAsync(string queue, bool autoAck) -> System.Threading.Tasks.ValueTask<RabbitMQ.Client.BasicGetResult>
482-
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue) -> System.Threading.Tasks.ValueTask
481+
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
483482
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
484-
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
485-
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
483+
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
484+
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
486485
RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
487486
RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
488487
RabbitMQ.Client.IChannel.ChannelNumber.get -> int
@@ -497,8 +496,6 @@ RabbitMQ.Client.IChannel.FlowControl -> System.EventHandler<RabbitMQ.Client.Even
497496
RabbitMQ.Client.IChannel.IsClosed.get -> bool
498497
RabbitMQ.Client.IChannel.IsOpen.get -> bool
499498
RabbitMQ.Client.IChannel.NextPublishSeqNo.get -> ulong
500-
RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken token = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<bool>
501-
RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken token = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
502499
RabbitMQ.Client.IChannelExtensions
503500
RabbitMQ.Client.IConnection
504501
RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
@@ -895,32 +892,35 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
895892
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<string> hostnames, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
896893
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
897894
~RabbitMQ.Client.IBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task
898-
~RabbitMQ.Client.IChannel.BasicCancelAsync(string consumerTag, bool noWait = false) -> System.Threading.Tasks.Task
899-
~RabbitMQ.Client.IChannel.BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
900-
~RabbitMQ.Client.IChannel.BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global) -> System.Threading.Tasks.Task
901-
~RabbitMQ.Client.IChannel.BasicRejectAsync(ulong deliveryTag, bool requeue) -> System.Threading.Tasks.Task
902-
~RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.ShutdownEventArgs reason, bool abort) -> System.Threading.Tasks.Task
903-
~RabbitMQ.Client.IChannel.CloseAsync(ushort replyCode, string replyText, bool abort) -> System.Threading.Tasks.Task
904-
~RabbitMQ.Client.IChannel.ConfirmSelectAsync() -> System.Threading.Tasks.Task
905-
~RabbitMQ.Client.IChannel.ConsumerCountAsync(string queue) -> System.Threading.Tasks.Task<uint>
906-
~RabbitMQ.Client.IChannel.ExchangeBindAsync(string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task
907-
~RabbitMQ.Client.IChannel.ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary<string, object> arguments = null, bool passive = false, bool noWait = false) -> System.Threading.Tasks.Task
908-
~RabbitMQ.Client.IChannel.ExchangeDeclarePassiveAsync(string exchange) -> System.Threading.Tasks.Task
909-
~RabbitMQ.Client.IChannel.ExchangeDeleteAsync(string exchange, bool ifUnused = false, bool noWait = false) -> System.Threading.Tasks.Task
910-
~RabbitMQ.Client.IChannel.ExchangeUnbindAsync(string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task
911-
~RabbitMQ.Client.IChannel.MessageCountAsync(string queue) -> System.Threading.Tasks.Task<uint>
912-
~RabbitMQ.Client.IChannel.QueueBindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task
913-
~RabbitMQ.Client.IChannel.QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, System.Collections.Generic.IDictionary<string, object> arguments = null, bool passive = false, bool noWait = false) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
914-
~RabbitMQ.Client.IChannel.QueueDeclarePassiveAsync(string queue) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
915-
~RabbitMQ.Client.IChannel.QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait = false) -> System.Threading.Tasks.Task<uint>
916-
~RabbitMQ.Client.IChannel.QueuePurgeAsync(string queue) -> System.Threading.Tasks.Task<uint>
917-
~RabbitMQ.Client.IChannel.QueueUnbindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments) -> System.Threading.Tasks.Task
918-
~RabbitMQ.Client.IChannel.TxCommitAsync() -> System.Threading.Tasks.Task
919-
~RabbitMQ.Client.IChannel.TxRollbackAsync() -> System.Threading.Tasks.Task
920-
~RabbitMQ.Client.IChannel.TxSelectAsync() -> System.Threading.Tasks.Task
895+
~RabbitMQ.Client.IChannel.BasicCancelAsync(string consumerTag, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
896+
~RabbitMQ.Client.IChannel.BasicConsumeAsync(string queue, bool autoAck, string consumerTag, bool noLocal, bool exclusive, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string>
897+
~RabbitMQ.Client.IChannel.BasicGetAsync(string queue, bool autoAck, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<RabbitMQ.Client.BasicGetResult>
898+
~RabbitMQ.Client.IChannel.BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
899+
~RabbitMQ.Client.IChannel.BasicRejectAsync(ulong deliveryTag, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
900+
~RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.ShutdownEventArgs reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
901+
~RabbitMQ.Client.IChannel.CloseAsync(ushort replyCode, string replyText, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
902+
~RabbitMQ.Client.IChannel.ConfirmSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
903+
~RabbitMQ.Client.IChannel.ConsumerCountAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
904+
~RabbitMQ.Client.IChannel.ExchangeBindAsync(string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
905+
~RabbitMQ.Client.IChannel.ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary<string, object> arguments = null, bool passive = false, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
906+
~RabbitMQ.Client.IChannel.ExchangeDeclarePassiveAsync(string exchange, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
907+
~RabbitMQ.Client.IChannel.ExchangeDeleteAsync(string exchange, bool ifUnused = false, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
908+
~RabbitMQ.Client.IChannel.ExchangeUnbindAsync(string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
909+
~RabbitMQ.Client.IChannel.MessageCountAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
910+
~RabbitMQ.Client.IChannel.QueueBindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
911+
~RabbitMQ.Client.IChannel.QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, System.Collections.Generic.IDictionary<string, object> arguments = null, bool passive = false, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
912+
~RabbitMQ.Client.IChannel.QueueDeclarePassiveAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
913+
~RabbitMQ.Client.IChannel.QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
914+
~RabbitMQ.Client.IChannel.QueuePurgeAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
915+
~RabbitMQ.Client.IChannel.QueueUnbindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
916+
~RabbitMQ.Client.IChannel.TxCommitAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
917+
~RabbitMQ.Client.IChannel.TxRollbackAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
918+
~RabbitMQ.Client.IChannel.TxSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
919+
~RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<bool>
920+
~RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
921921
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
922922
~RabbitMQ.Client.IConnection.CreateChannelAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
923-
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason) -> System.Threading.Tasks.Task
923+
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
924924
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
925925
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint> endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
926926
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint> endpoints, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ await fh.ConnectAsync(cancellationToken)
627627

628628
private IFrameHandler ConfigureFrameHandler(IFrameHandler fh)
629629
{
630-
// TODO: add user-provided configurator, like in the Java client
630+
// FUTURE: add user-provided configurator, like in the Java client
631631
fh.ReadTimeout = RequestedHeartbeat;
632632
fh.WriteTimeout = RequestedHeartbeat;
633633

0 commit comments

Comments
 (0)