File tree 1 file changed +5
-1
lines changed
src/main/java/com/rabbitmq/client
1 file changed +5
-1
lines changed Original file line number Diff line number Diff line change @@ -93,6 +93,7 @@ public class RpcClient implements AutoCloseable {
93
93
* @since 5.9.0
94
94
*/
95
95
private final Supplier <String > _correlationIdSupplier ;
96
+ private final ReturnListener _returnListener ;
96
97
97
98
private String lastCorrelationId = "0" ;
98
99
@@ -123,7 +124,7 @@ public RpcClient(RpcClientParams params) throws
123
124
124
125
_consumer = setupConsumer ();
125
126
if (_useMandatory ) {
126
- this ._channel .addReturnListener (returnMessage -> {
127
+ this ._returnListener = this . _channel .addReturnListener (returnMessage -> {
127
128
synchronized (_continuationMap ) {
128
129
String replyId = returnMessage .getProperties ().getCorrelationId ();
129
130
BlockingCell <Object > blocker = _continuationMap .remove (replyId );
@@ -136,6 +137,8 @@ public RpcClient(RpcClientParams params) throws
136
137
}
137
138
}
138
139
});
140
+ } else {
141
+ this ._returnListener = null ;
139
142
}
140
143
}
141
144
@@ -157,6 +160,7 @@ private void checkNotClosed() throws IOException {
157
160
public void close () throws IOException {
158
161
if (this .closed .compareAndSet (false , true )) {
159
162
_channel .basicCancel (_consumer .getConsumerTag ());
163
+ _channel .removeReturnListener (this ._returnListener );
160
164
}
161
165
}
162
166
You can’t perform that action at this time.
0 commit comments