Skip to content

Commit b19fa8c

Browse files
authored
GH-2653: Fix deadlock in the DirectMessageListenerContainer
Fixes: #2653 When not enough channel in the cache, the `DirectMessageListenerContainer.consume()` returns null and `adjustConsumers()` goes into an infinite loop, since already active consumer does not release its channel. * Fix `DirectMessageListenerContainer.consume()` to re-throw an `AmqpTimeoutException` which is thrown when no available channels in the cache * Catch `AmqpTimeoutException` in the `DirectReplyToMessageListenerContainer.getChannelHolder()` and reset `this.consumerCount--` to allow to try existing consumer until it is available, e.g. when this one receives a reply or times out. * Change `DirectReplyToMessageListenerContainer.consumerCount` to `AtomicInteger`
1 parent 4c4b6b6 commit b19fa8c

File tree

4 files changed

+85
-35
lines changed

4 files changed

+85
-35
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@
4848
import org.springframework.amqp.AmqpConnectException;
4949
import org.springframework.amqp.AmqpException;
5050
import org.springframework.amqp.AmqpIOException;
51+
import org.springframework.amqp.AmqpTimeoutException;
5152
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
5253
import org.springframework.amqp.core.AmqpAdmin;
5354
import org.springframework.amqp.core.Message;
@@ -802,6 +803,9 @@ private SimpleConsumer consume(String queue, int index, Connection connection) {
802803
catch (AmqpApplicationContextClosedException e) {
803804
throw new AmqpConnectException(e);
804805
}
806+
catch (AmqpTimeoutException timeoutException) {
807+
throw timeoutException;
808+
}
805809
catch (Exception e) {
806810
RabbitUtils.closeChannel(channel);
807811
RabbitUtils.closeConnection(connection);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectReplyToMessageListenerContainer.java

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,7 +18,9 @@
1818

1919
import java.util.concurrent.ConcurrentHashMap;
2020
import java.util.concurrent.ConcurrentMap;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122

23+
import org.springframework.amqp.AmqpTimeoutException;
2224
import org.springframework.amqp.core.AcknowledgeMode;
2325
import org.springframework.amqp.core.Address;
2426
import org.springframework.amqp.core.MessageListener;
@@ -47,7 +49,7 @@ public class DirectReplyToMessageListenerContainer extends DirectMessageListener
4749

4850
private final ConcurrentMap<SimpleConsumer, Long> whenUsed = new ConcurrentHashMap<>();
4951

50-
private int consumerCount;
52+
private final AtomicInteger consumerCount = new AtomicInteger();
5153

5254
public DirectReplyToMessageListenerContainer(ConnectionFactory connectionFactory) {
5355
super(connectionFactory);
@@ -109,7 +111,7 @@ public void setMessageListener(MessageListener messageListener) {
109111
@Override
110112
protected void doStart() {
111113
if (!isRunning()) {
112-
this.consumerCount = 0;
114+
this.consumerCount.set(0);
113115
super.setConsumersPerQueue(0);
114116
super.doStart();
115117
}
@@ -118,23 +120,24 @@ protected void doStart() {
118120
@Override
119121
protected void processMonitorTask() {
120122
long now = System.currentTimeMillis();
123+
long reduce;
121124
this.consumersLock.lock();
122125
try {
123-
long reduce = this.consumers.stream()
124-
.filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c)
125-
&& this.whenUsed.get(c) < now - getIdleEventInterval())
126-
.count();
127-
if (reduce > 0) {
128-
if (logger.isDebugEnabled()) {
129-
logger.debug("Reducing idle consumes by " + reduce);
130-
}
131-
this.consumerCount = (int) Math.max(0, this.consumerCount - reduce);
132-
super.setConsumersPerQueue(this.consumerCount);
133-
}
126+
reduce = this.consumers.stream()
127+
.filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c)
128+
&& this.whenUsed.get(c) < now - getIdleEventInterval())
129+
.count();
134130
}
135131
finally {
136132
this.consumersLock.unlock();
137133
}
134+
if (reduce > 0) {
135+
if (logger.isDebugEnabled()) {
136+
logger.debug("Reducing idle consumes by " + reduce);
137+
}
138+
super.setConsumersPerQueue(
139+
this.consumerCount.updateAndGet((current) -> (int) Math.max(0, current - reduce)));
140+
}
138141
}
139142

140143
@Override
@@ -159,13 +162,13 @@ protected void consumerRemoved(SimpleConsumer consumer) {
159162
* @return the channel holder.
160163
*/
161164
public ChannelHolder getChannelHolder() {
162-
this.consumersLock.lock();
163-
try {
164-
ChannelHolder channelHolder = null;
165-
while (channelHolder == null) {
166-
if (!isRunning()) {
167-
throw new IllegalStateException("Direct reply-to container is not running");
168-
}
165+
ChannelHolder channelHolder = null;
166+
while (channelHolder == null) {
167+
if (!isRunning()) {
168+
throw new IllegalStateException("Direct reply-to container is not running");
169+
}
170+
this.consumersLock.lock();
171+
try {
169172
for (SimpleConsumer consumer : this.consumers) {
170173
Channel candidate = consumer.getChannel();
171174
if (candidate.isOpen() && this.inUseConsumerChannels.putIfAbsent(candidate, consumer) == null) {
@@ -175,16 +178,23 @@ public ChannelHolder getChannelHolder() {
175178
break;
176179
}
177180
}
178-
if (channelHolder == null) {
179-
this.consumerCount++;
180-
super.setConsumersPerQueue(this.consumerCount);
181+
}
182+
finally {
183+
this.consumersLock.unlock();
184+
}
185+
if (channelHolder == null) {
186+
try {
187+
super.setConsumersPerQueue(this.consumerCount.incrementAndGet());
188+
}
189+
catch (AmqpTimeoutException timeoutException) {
190+
// Possibly No available channels in the cache, so come back to consumers
191+
// iteration until existing is available
192+
this.consumerCount.decrementAndGet();
181193
}
182194
}
183-
return channelHolder;
184-
}
185-
finally {
186-
this.consumersLock.unlock();
187195
}
196+
return channelHolder;
197+
188198
}
189199

190200
/**

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.amqp.rabbit;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.assertj.core.api.Assertions.fail;
2122
import static org.awaitility.Awaitility.await;
2223
import static org.mockito.Mockito.mock;
@@ -29,6 +30,7 @@
2930
import java.util.concurrent.ExecutionException;
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.concurrent.atomic.AtomicInteger;
3234
import java.util.concurrent.atomic.AtomicReference;
3335
import java.util.function.BiConsumer;
3436

@@ -111,7 +113,8 @@ public void testConvert1ArgDirect() throws Exception {
111113
waitForZeroInUseConsumers();
112114
assertThat(TestUtils
113115
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
114-
Integer.class)).isEqualTo(2);
116+
AtomicInteger.class).get())
117+
.isEqualTo(2);
115118
final String missingQueue = UUID.randomUUID().toString();
116119
this.asyncDirectTemplate.convertSendAndReceive("", missingQueue, "foo"); // send to nowhere
117120
this.asyncDirectTemplate.stop(); // should clear the inUse channel map
@@ -168,18 +171,20 @@ public void testMessage1ArgDirect() throws Exception {
168171
waitForZeroInUseConsumers();
169172
assertThat(TestUtils
170173
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
171-
Integer.class)).isEqualTo(2);
174+
AtomicInteger.class).get())
175+
.isEqualTo(2);
172176
this.asyncDirectTemplate.stop();
173177
this.asyncDirectTemplate.start();
174178
assertThat(TestUtils
175179
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
176-
Integer.class)).isEqualTo(0);
180+
AtomicInteger.class).get())
181+
.isEqualTo(0);
177182
}
178183

179-
private void waitForZeroInUseConsumers() throws InterruptedException {
184+
private void waitForZeroInUseConsumers() {
180185
Map<?, ?> inUseConsumers = TestUtils
181186
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.inUseConsumerChannels", Map.class);
182-
await().until(() -> inUseConsumers.size() == 0);
187+
await().until(inUseConsumers::isEmpty);
183188
}
184189

185190
@Test
@@ -422,6 +427,31 @@ void ctorCoverage() {
422427
.isEqualTo("rq");
423428
}
424429

430+
@Test
431+
public void limitedChannelsAreReleasedOnTimeout() {
432+
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
433+
connectionFactory.setChannelCacheSize(1);
434+
connectionFactory.setChannelCheckoutTimeout(500L);
435+
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
436+
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
437+
asyncRabbitTemplate.setReceiveTimeout(500L);
438+
asyncRabbitTemplate.start();
439+
440+
RabbitConverterFuture<String> replyFuture1 = asyncRabbitTemplate.convertSendAndReceive("noReply1");
441+
RabbitConverterFuture<String> replyFuture2 = asyncRabbitTemplate.convertSendAndReceive("noReply2");
442+
443+
assertThatExceptionOfType(ExecutionException.class)
444+
.isThrownBy(() -> replyFuture1.get(10, TimeUnit.SECONDS))
445+
.withCauseInstanceOf(AmqpReplyTimeoutException.class);
446+
447+
assertThatExceptionOfType(ExecutionException.class)
448+
.isThrownBy(() -> replyFuture2.get(10, TimeUnit.SECONDS))
449+
.withCauseInstanceOf(AmqpReplyTimeoutException.class);
450+
451+
asyncRabbitTemplate.stop();
452+
connectionFactory.destroy();
453+
}
454+
425455
private void checkConverterResult(CompletableFuture<String> future, String expected) throws InterruptedException {
426456
final CountDownLatch cdl = new CountDownLatch(1);
427457
final AtomicReference<String> resultRef = new AtomicReference<>();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CountDownLatch;
2626
import java.util.concurrent.TimeUnit;
2727

28+
import org.junit.jupiter.api.BeforeAll;
2829
import org.junit.jupiter.api.Test;
2930

3031
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
@@ -62,6 +63,11 @@ public class EnableRabbitBatchIntegrationTests {
6263
@Autowired
6364
private Listener listener;
6465

66+
@BeforeAll
67+
static void setup() {
68+
System.setProperty("spring.amqp.deserialization.trust.all", "true");
69+
}
70+
6571
@Test
6672
public void simpleList() throws InterruptedException {
6773
this.template.convertAndSend("batch.1", new Foo("foo"));

0 commit comments

Comments
 (0)