diff --git a/src/main/java/com/rabbitmq/client/RpcClient.java b/src/main/java/com/rabbitmq/client/RpcClient.java index f47d185b89..eb49764e32 100644 --- a/src/main/java/com/rabbitmq/client/RpcClient.java +++ b/src/main/java/com/rabbitmq/client/RpcClient.java @@ -93,6 +93,7 @@ public class RpcClient implements AutoCloseable { * @since 5.9.0 */ private final Supplier _correlationIdSupplier; + private final ReturnListener _returnListener; private String lastCorrelationId = "0"; @@ -123,7 +124,7 @@ public RpcClient(RpcClientParams params) throws _consumer = setupConsumer(); if (_useMandatory) { - this._channel.addReturnListener(returnMessage -> { + this._returnListener = this._channel.addReturnListener(returnMessage -> { synchronized (_continuationMap) { String replyId = returnMessage.getProperties().getCorrelationId(); BlockingCell blocker = _continuationMap.remove(replyId); @@ -136,6 +137,8 @@ public RpcClient(RpcClientParams params) throws } } }); + } else { + this._returnListener = null; } } @@ -157,6 +160,7 @@ private void checkNotClosed() throws IOException { public void close() throws IOException { if (this.closed.compareAndSet(false, true)) { _channel.basicCancel(_consumer.getConsumerTag()); + _channel.removeReturnListener(this._returnListener); } }