@@ -49,9 +49,11 @@ namespace RabbitMQ.Client.Impl
49
49
internal abstract class ChannelBase : IChannel , IRecoverable
50
50
{
51
51
///<summary>Only used to kick-start a connection open
52
- ///sequence. See <see cref="Connection.Open "/> </summary>
53
- internal BlockingCell < ConnectionStartDetails > m_connectionStartCell ;
52
+ ///sequence. See <see cref="Connection.OpenAsync "/> </summary>
53
+ internal TaskCompletionSource < ConnectionStartDetails > m_connectionStartCell ;
54
54
55
+ // AMQP only allows one RPC operation to be active at a time.
56
+ private readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim ( 1 , 1 ) ;
55
57
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue ( ) ;
56
58
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim ( true ) ;
57
59
@@ -239,32 +241,18 @@ private async Task CloseAsync(ShutdownEventArgs reason, bool abort)
239
241
}
240
242
}
241
243
242
- internal void ConnectionOpen ( string virtualHost )
244
+ internal async ValueTask ConnectionOpenAsync ( string virtualHost )
243
245
{
244
- var k = new SimpleBlockingRpcContinuation ( ) ;
245
- lock ( _rpcLock )
246
- {
247
- Enqueue ( k ) ;
248
- try
249
- {
250
- _Private_ConnectionOpen ( virtualHost ) ;
251
- }
252
- catch ( AlreadyClosedException )
253
- {
254
- // let continuation throw OperationInterruptedException,
255
- // which is a much more suitable exception before connection
256
- // negotiation finishes
257
- }
258
- k . GetReply ( HandshakeContinuationTimeout ) ;
259
- }
246
+ await _Private_ConnectionOpenAsync ( virtualHost ) . TimeoutAfter ( HandshakeContinuationTimeout ) ;
260
247
}
261
248
262
- internal ConnectionSecureOrTune ConnectionSecureOk ( byte [ ] response )
249
+ internal async ValueTask < ConnectionSecureOrTune > ConnectionSecureOkAsync ( byte [ ] response )
263
250
{
264
- var k = new ConnectionStartRpcContinuation ( ) ;
265
- lock ( _rpcLock )
251
+ var k = new ConnectionSecureOrTuneContinuation ( ) ;
252
+ await _rpcSemaphore . WaitAsync ( ) . ConfigureAwait ( false ) ;
253
+ Enqueue ( k ) ;
254
+ try
266
255
{
267
- Enqueue ( k ) ;
268
256
try
269
257
{
270
258
_Private_ConnectionSecureOk ( response ) ;
@@ -275,31 +263,40 @@ internal ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
275
263
// which is a much more suitable exception before connection
276
264
// negotiation finishes
277
265
}
278
- k . GetReply ( HandshakeContinuationTimeout ) ;
266
+
267
+ return await k ;
268
+ }
269
+ finally
270
+ {
271
+ _rpcSemaphore . Release ( ) ;
279
272
}
280
- return k . m_result ;
281
273
}
282
274
283
- internal ConnectionSecureOrTune ConnectionStartOk ( IDictionary < string , object > clientProperties , string mechanism , byte [ ] response , string locale )
275
+ internal async ValueTask < ConnectionSecureOrTune > ConnectionStartOkAsync ( IDictionary < string , object > clientProperties , string mechanism , byte [ ] response ,
276
+ string locale )
284
277
{
285
- var k = new ConnectionStartRpcContinuation ( ) ;
286
- lock ( _rpcLock )
278
+ var k = new ConnectionSecureOrTuneContinuation ( ) ;
279
+ await _rpcSemaphore . WaitAsync ( ) . ConfigureAwait ( false ) ;
280
+ Enqueue ( k ) ;
281
+ try
287
282
{
288
- Enqueue ( k ) ;
289
283
try
290
284
{
291
- _Private_ConnectionStartOk ( clientProperties , mechanism ,
292
- response , locale ) ;
285
+ _Private_ConnectionStartOk ( clientProperties , mechanism , response , locale ) ;
293
286
}
294
287
catch ( AlreadyClosedException )
295
288
{
296
289
// let continuation throw OperationInterruptedException,
297
290
// which is a much more suitable exception before connection
298
291
// negotiation finishes
299
292
}
300
- k . GetReply ( HandshakeContinuationTimeout ) ;
293
+
294
+ return await k ;
295
+ }
296
+ finally
297
+ {
298
+ _rpcSemaphore . Release ( ) ;
301
299
}
302
- return k . m_result ;
303
300
}
304
301
305
302
protected abstract bool DispatchAsynchronous ( in IncomingCommand cmd ) ;
@@ -324,7 +321,7 @@ internal void FinishClose()
324
321
Session . Close ( reason ) ;
325
322
}
326
323
327
- m_connectionStartCell ? . ContinueWithValue ( null ) ;
324
+ m_connectionStartCell ? . TrySetResult ( null ) ;
328
325
}
329
326
330
327
private void HandleCommand ( in IncomingCommand cmd )
@@ -385,6 +382,12 @@ protected void ChannelSend<T>(in T method) where T : struct, IOutgoingAmqpMethod
385
382
Session . Transmit ( in method ) ;
386
383
}
387
384
385
+ [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
386
+ protected ValueTask ModelSendAsync < T > ( in T method ) where T : struct , IOutgoingAmqpMethod
387
+ {
388
+ return Session . TransmitAsync ( in method ) ;
389
+ }
390
+
388
391
[ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
389
392
protected void ChannelSend < TMethod , THeader > ( in TMethod method , in THeader header , ReadOnlyMemory < byte > body )
390
393
where TMethod : struct , IOutgoingAmqpMethod
@@ -397,6 +400,19 @@ protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader heade
397
400
Session . Transmit ( in method , in header , body ) ;
398
401
}
399
402
403
+ [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
404
+ protected ValueTask ModelSendAsync < TMethod , THeader > ( in TMethod method , in THeader header , ReadOnlyMemory < byte > body )
405
+ where TMethod : struct , IOutgoingAmqpMethod
406
+ where THeader : IAmqpHeader
407
+ {
408
+ if ( ! _flowControlBlock . IsSet )
409
+ {
410
+ _flowControlBlock . Wait ( ) ;
411
+ }
412
+
413
+ return Session . TransmitAsync ( in method , in header , body ) ;
414
+ }
415
+
400
416
internal void OnCallbackException ( CallbackExceptionEventArgs args )
401
417
{
402
418
_callbackExceptionWrapper . Invoke ( this , args ) ;
@@ -730,13 +746,7 @@ protected void HandleConnectionClose(in IncomingCommand cmd)
730
746
731
747
protected void HandleConnectionSecure ( in IncomingCommand cmd )
732
748
{
733
- var challenge = new ConnectionSecure ( cmd . MethodBytes . Span ) . _challenge ;
734
- cmd . ReturnMethodBuffer ( ) ;
735
- var k = ( ConnectionStartRpcContinuation ) _continuationQueue . Next ( ) ;
736
- k . m_result = new ConnectionSecureOrTune
737
- {
738
- m_challenge = challenge
739
- } ;
749
+ var k = ( ConnectionSecureOrTuneContinuation ) _continuationQueue . Next ( ) ;
740
750
k . HandleCommand ( IncomingCommand . Empty ) ; // release the continuation.
741
751
}
742
752
@@ -758,25 +768,14 @@ protected void HandleConnectionStart(in IncomingCommand cmd)
758
768
m_mechanisms = method . _mechanisms ,
759
769
m_locales = method . _locales
760
770
} ;
761
- m_connectionStartCell . ContinueWithValue ( details ) ;
771
+ m_connectionStartCell ? . SetResult ( details ) ;
762
772
m_connectionStartCell = null ;
763
773
}
764
774
765
775
protected void HandleConnectionTune ( in IncomingCommand cmd )
766
776
{
767
- var connectionTune = new ConnectionTune ( cmd . MethodBytes . Span ) ;
768
- cmd . ReturnMethodBuffer ( ) ;
769
- var k = ( ConnectionStartRpcContinuation ) _continuationQueue . Next ( ) ;
770
- k . m_result = new ConnectionSecureOrTune
771
- {
772
- m_tuneDetails =
773
- {
774
- m_channelMax = connectionTune . _channelMax ,
775
- m_frameMax = connectionTune . _frameMax ,
776
- m_heartbeatInSeconds = connectionTune . _heartbeat
777
- }
778
- } ;
779
- k . HandleCommand ( IncomingCommand . Empty ) ; // release the continuation.
777
+ var k = ( ConnectionSecureOrTuneContinuation ) _continuationQueue . Next ( ) ;
778
+ k . HandleCommand ( cmd ) ; // release the continuation.
780
779
}
781
780
782
781
protected void HandleConnectionUnblocked ( )
@@ -815,6 +814,8 @@ protected void HandleQueueDeclareOk(in IncomingCommand cmd)
815
814
816
815
public abstract void _Private_ConnectionOpen ( string virtualHost ) ;
817
816
817
+ public abstract ValueTask _Private_ConnectionOpenAsync ( string virtualHost ) ;
818
+
818
819
public abstract void _Private_ConnectionSecureOk ( byte [ ] response ) ;
819
820
820
821
public abstract void _Private_ConnectionStartOk ( IDictionary < string , object > clientProperties , string mechanism , byte [ ] response , string locale ) ;
@@ -930,6 +931,36 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
930
931
ChannelSend ( in cmd , in basicProperties , body ) ;
931
932
}
932
933
934
+ public ValueTask BasicPublishAsync < TProperties > ( string exchange , string routingKey , in TProperties basicProperties , ReadOnlyMemory < byte > body , bool mandatory )
935
+ where TProperties : IReadOnlyBasicProperties , IAmqpHeader
936
+ {
937
+ if ( NextPublishSeqNo > 0 )
938
+ {
939
+ lock ( _confirmLock )
940
+ {
941
+ _pendingDeliveryTags . AddLast ( NextPublishSeqNo ++ ) ;
942
+ }
943
+ }
944
+
945
+ var cmd = new BasicPublish ( exchange , routingKey , mandatory , default ) ;
946
+ return ModelSendAsync ( in cmd , in basicProperties , body ) ;
947
+ }
948
+
949
+ public ValueTask BasicPublishAsync < TProperties > ( CachedString exchange , CachedString routingKey , in TProperties basicProperties , ReadOnlyMemory < byte > body , bool mandatory )
950
+ where TProperties : IReadOnlyBasicProperties , IAmqpHeader
951
+ {
952
+ if ( NextPublishSeqNo > 0 )
953
+ {
954
+ lock ( _confirmLock )
955
+ {
956
+ _pendingDeliveryTags . AddLast ( NextPublishSeqNo ++ ) ;
957
+ }
958
+ }
959
+
960
+ var cmd = new BasicPublishMemory ( exchange . Bytes , routingKey . Bytes , mandatory , default ) ;
961
+ return ModelSendAsync ( in cmd , in basicProperties , body ) ;
962
+ }
963
+
933
964
public void UpdateSecret ( string newSecret , string reason )
934
965
{
935
966
if ( newSecret is null )
0 commit comments