@@ -523,9 +523,13 @@ public boolean processControlCommand(Command c)
523
523
return false ;
524
524
} else {
525
525
// Quiescing.
526
- if (method instanceof AMQP .Connection .CloseOk ) {
527
- // It's our final "RPC".
528
- return false ;
526
+ if (method instanceof AMQP .Connection .CloseOk ) {
527
+ // It's our final "RPC". Time to shut down.
528
+ _running = false ;
529
+ // If this was called from within the MainLoop we
530
+ // may not have a continuation to return to, so we
531
+ // treat this as processed in that case.
532
+ return _channel0 ._activeRpc == null ;
529
533
} else {
530
534
// Ignore all others.
531
535
return true ;
@@ -680,14 +684,21 @@ public void close(int closeCode,
680
684
boolean abort )
681
685
throws IOException
682
686
{
687
+ final boolean sync = !(Thread .currentThread () instanceof MainLoop );
688
+
683
689
try {
684
690
AMQImpl .Connection .Close reason =
685
691
new AMQImpl .Connection .Close (closeCode , closeMessage , 0 , 0 );
692
+
686
693
shutdown (reason , initiatedByApplication , cause , true );
687
- AMQChannel .SimpleBlockingRpcContinuation k =
688
- new AMQChannel .SimpleBlockingRpcContinuation ();
689
- _channel0 .quiescingRpc (reason , k );
690
- k .getReply (timeout );
694
+ if (sync ){
695
+ AMQChannel .SimpleBlockingRpcContinuation k =
696
+ new AMQChannel .SimpleBlockingRpcContinuation ();
697
+ _channel0 .quiescingRpc (reason , k );
698
+ k .getReply (timeout );
699
+ } else {
700
+ _channel0 .quiescingTransmit (reason );
701
+ }
691
702
} catch (TimeoutException tte ) {
692
703
if (!abort )
693
704
throw new ShutdownSignalException (true , true , tte , this );
@@ -698,7 +709,7 @@ public void close(int closeCode,
698
709
if (!abort )
699
710
throw ioe ;
700
711
} finally {
701
- _frameHandler .close ();
712
+ if ( sync ) _frameHandler .close ();
702
713
}
703
714
}
704
715
0 commit comments