Skip to content

Commit 22a21bf

Browse files
committed
Use atomic boolean to track state of RpcClient
Instead of using the consumer property. Avoids race conditions during closing. References #1033 (cherry picked from commit 2253017)
1 parent 49c57cf commit 22a21bf

File tree

1 file changed

+13
-16
lines changed

1 file changed

+13
-16
lines changed

src/main/java/com/rabbitmq/client/RpcClient.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -13,12 +13,10 @@
1313
// If you have any questions regarding licensing, please contact us at
1414
1515

16-
1716
package com.rabbitmq.client;
1817

1918
import java.io.ByteArrayInputStream;
2019
import java.io.ByteArrayOutputStream;
21-
import java.io.Closeable;
2220
import java.io.DataInputStream;
2321
import java.io.DataOutputStream;
2422
import java.io.EOFException;
@@ -28,6 +26,7 @@
2826
import java.util.Map;
2927
import java.util.Map.Entry;
3028
import java.util.concurrent.TimeoutException;
29+
import java.util.concurrent.atomic.AtomicBoolean;
3130
import java.util.function.Function;
3231
import java.util.function.Supplier;
3332

@@ -45,7 +44,7 @@
4544
* It simply provides a mechanism for sending a message to an exchange with a given routing key,
4645
* and waiting for a response.
4746
*/
48-
public class RpcClient implements Closeable {
47+
public class RpcClient implements AutoCloseable {
4948

5049
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
5150

@@ -63,6 +62,8 @@ public class RpcClient implements Closeable {
6362
protected final static int NO_TIMEOUT = -1;
6463
/** Whether to publish RPC requests with the mandatory flag or not. */
6564
private final boolean _useMandatory;
65+
/** closed flag */
66+
private final AtomicBoolean closed = new AtomicBoolean(false);
6667

6768
public final static Function<Object, Response> DEFAULT_REPLY_HANDLER = reply -> {
6869
if (reply instanceof ShutdownSignalException) {
@@ -96,7 +97,7 @@ public class RpcClient implements Closeable {
9697
private String lastCorrelationId = "0";
9798

9899
/** Consumer attached to our reply queue */
99-
private DefaultConsumer _consumer;
100+
private final DefaultConsumer _consumer;
100101

101102
/**
102103
* Construct a {@link RpcClient} with the passed-in {@link RpcClientParams}.
@@ -227,8 +228,8 @@ public RpcClient(Channel channel, String exchange, String routingKey, int timeou
227228
* Private API - ensures the RpcClient is correctly open.
228229
* @throws IOException if an error is encountered
229230
*/
230-
public void checkConsumer() throws IOException {
231-
if (_consumer == null) {
231+
private void checkNotClosed() throws IOException {
232+
if (this.closed.get()) {
232233
throw new EOFException("RpcClient is closed");
233234
}
234235
}
@@ -239,11 +240,8 @@ public void checkConsumer() throws IOException {
239240
*/
240241
@Override
241242
public void close() throws IOException {
242-
if (_consumer != null) {
243-
final String consumerTag = _consumer.getConsumerTag();
244-
// set it null before calling basicCancel to make this method idempotent in case of IOException
245-
_consumer = null;
246-
_channel.basicCancel(consumerTag);
243+
if (this.closed.compareAndSet(false, true)) {
244+
_channel.basicCancel(_consumer.getConsumerTag());
247245
}
248246
}
249247

@@ -261,16 +259,15 @@ public void handleShutdownSignal(String consumerTag,
261259
for (Entry<String, BlockingCell<Object>> entry : _continuationMap.entrySet()) {
262260
entry.getValue().set(signal);
263261
}
264-
_consumer = null;
262+
closed.set(true);
265263
}
266264
}
267265

268266
@Override
269267
public void handleDelivery(String consumerTag,
270268
Envelope envelope,
271269
AMQP.BasicProperties properties,
272-
byte[] body)
273-
throws IOException {
270+
byte[] body) {
274271
synchronized (_continuationMap) {
275272
String replyId = properties.getCorrelationId();
276273
BlockingCell<Object> blocker =_continuationMap.remove(replyId);
@@ -301,7 +298,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message)
301298

302299
public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
303300
throws IOException, ShutdownSignalException, TimeoutException {
304-
checkConsumer();
301+
checkNotClosed();
305302
BlockingCell<Object> k = new BlockingCell<Object>();
306303
String replyId;
307304
synchronized (_continuationMap) {

0 commit comments

Comments
 (0)