Skip to content

Commit 0752c32

Browse files
committed
Fix race condition in RabbitTemplatePublisherCallbacksIntegration1Tests
The `CachingConnectionFactory` has an ability to wait for async channels close when it is destroyed. However, that happens only if an `ApplicationContext` is stopped. * Supply a mock `ApplicationContext` to the `CachingConnectionFactory` of the test and emit respective `ContextClosedEvent` in the test before calling `cf.destroy()`
1 parent 3a07153 commit 0752c32

File tree

1 file changed

+29
-5
lines changed

1 file changed

+29
-5
lines changed

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegration1Tests.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -79,6 +79,8 @@
7979
import org.springframework.amqp.support.converter.SimpleMessageConverter;
8080
import org.springframework.amqp.utils.test.TestUtils;
8181
import org.springframework.beans.DirectFieldAccessor;
82+
import org.springframework.context.ApplicationContext;
83+
import org.springframework.context.event.ContextClosedEvent;
8284
import org.springframework.expression.Expression;
8385
import org.springframework.expression.spel.standard.SpelExpressionParser;
8486

@@ -100,6 +102,8 @@ public class RabbitTemplatePublisherCallbacksIntegration1Tests {
100102

101103
public static final String ROUTE = "test.queue.RabbitTemplatePublisherCallbacksIntegrationTests";
102104

105+
private static final ApplicationContext APPLICATION_CONTEXT = mock();
106+
103107
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
104108

105109
private CachingConnectionFactory connectionFactory;
@@ -122,25 +126,35 @@ public void create() {
122126
connectionFactory.setHost("localhost");
123127
connectionFactory.setChannelCacheSize(10);
124128
connectionFactory.setPort(BrokerTestUtils.getPort());
129+
connectionFactory.setApplicationContext(APPLICATION_CONTEXT);
130+
125131
connectionFactoryWithConfirmsEnabled = new CachingConnectionFactory();
126132
connectionFactoryWithConfirmsEnabled.setHost("localhost");
127133
connectionFactoryWithConfirmsEnabled.setChannelCacheSize(100);
128134
connectionFactoryWithConfirmsEnabled.setPort(BrokerTestUtils.getPort());
129135
connectionFactoryWithConfirmsEnabled.setPublisherConfirmType(ConfirmType.CORRELATED);
136+
connectionFactoryWithConfirmsEnabled.setApplicationContext(APPLICATION_CONTEXT);
137+
130138
templateWithConfirmsEnabled = new RabbitTemplate(connectionFactoryWithConfirmsEnabled);
139+
131140
connectionFactoryWithReturnsEnabled = new CachingConnectionFactory();
132141
connectionFactoryWithReturnsEnabled.setHost("localhost");
133142
connectionFactoryWithReturnsEnabled.setChannelCacheSize(1);
134143
connectionFactoryWithReturnsEnabled.setPort(BrokerTestUtils.getPort());
135144
connectionFactoryWithReturnsEnabled.setPublisherReturns(true);
145+
connectionFactoryWithReturnsEnabled.setApplicationContext(APPLICATION_CONTEXT);
146+
136147
templateWithReturnsEnabled = new RabbitTemplate(connectionFactoryWithReturnsEnabled);
137148
templateWithReturnsEnabled.setMandatory(true);
149+
138150
connectionFactoryWithConfirmsAndReturnsEnabled = new CachingConnectionFactory();
139151
connectionFactoryWithConfirmsAndReturnsEnabled.setHost("localhost");
140152
connectionFactoryWithConfirmsAndReturnsEnabled.setChannelCacheSize(100);
141153
connectionFactoryWithConfirmsAndReturnsEnabled.setPort(BrokerTestUtils.getPort());
142154
connectionFactoryWithConfirmsAndReturnsEnabled.setPublisherConfirmType(ConfirmType.CORRELATED);
143155
connectionFactoryWithConfirmsAndReturnsEnabled.setPublisherReturns(true);
156+
connectionFactoryWithConfirmsAndReturnsEnabled.setApplicationContext(APPLICATION_CONTEXT);
157+
144158
templateWithConfirmsAndReturnsEnabled = new RabbitTemplate(connectionFactoryWithConfirmsAndReturnsEnabled);
145159
templateWithConfirmsAndReturnsEnabled.setMandatory(true);
146160
}
@@ -149,17 +163,27 @@ public void create() {
149163
public void cleanUp() {
150164
this.templateWithConfirmsEnabled.stop();
151165
this.templateWithReturnsEnabled.stop();
166+
167+
this.connectionFactory.onApplicationEvent(new ContextClosedEvent(APPLICATION_CONTEXT));
152168
this.connectionFactory.destroy();
169+
170+
this.connectionFactoryWithConfirmsEnabled.onApplicationEvent(new ContextClosedEvent(APPLICATION_CONTEXT));
153171
this.connectionFactoryWithConfirmsEnabled.destroy();
172+
173+
this.connectionFactoryWithReturnsEnabled.onApplicationEvent(new ContextClosedEvent(APPLICATION_CONTEXT));
154174
this.connectionFactoryWithReturnsEnabled.destroy();
175+
176+
this.connectionFactoryWithConfirmsAndReturnsEnabled.onApplicationEvent(new ContextClosedEvent(APPLICATION_CONTEXT));
177+
this.connectionFactoryWithConfirmsAndReturnsEnabled.destroy();
178+
155179
this.executorService.shutdown();
156180
}
157181

158182
@Test
159183
public void testPublisherConfirmReceived() throws Exception {
160184
final CountDownLatch latch = new CountDownLatch(10000);
161185
final AtomicInteger acks = new AtomicInteger();
162-
final AtomicReference<CorrelationData> confirmCorrelation = new AtomicReference<CorrelationData>();
186+
final AtomicReference<CorrelationData> confirmCorrelation = new AtomicReference<>();
163187
AtomicReference<String> callbackThreadName = new AtomicReference<>();
164188
this.templateWithConfirmsEnabled.setConfirmCallback((correlationData, ack, cause) -> {
165189
acks.incrementAndGet();
@@ -208,7 +232,7 @@ public Message postProcessMessage(Message message, Correlation correlation, Stri
208232
this.templateWithConfirmsEnabled.execute(channel -> {
209233
Map<?, ?> listenerMap = TestUtils.getPropertyValue(((ChannelProxy) channel).getTargetChannel(),
210234
"listenerForSeq", Map.class);
211-
await().until(() -> listenerMap.size() == 0);
235+
await().until(listenerMap::isEmpty);
212236
return null;
213237
});
214238

@@ -227,7 +251,8 @@ public void testPublisherConfirmWithSendAndReceive() throws Exception {
227251
confirmCD.set(correlationData);
228252
latch.countDown();
229253
});
230-
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactoryWithConfirmsEnabled);
254+
SimpleMessageListenerContainer container =
255+
new SimpleMessageListenerContainer(this.connectionFactoryWithConfirmsEnabled);
231256
container.setQueueNames(ROUTE);
232257
container.setReceiveTimeout(10);
233258
container.setMessageListener(
@@ -643,7 +668,6 @@ public void testConcurrentConfirms() throws Exception {
643668
assertThat(waitForAll3AcksLatch.await(10, TimeUnit.SECONDS)).isTrue();
644669
assertThat(acks.get()).isEqualTo(3);
645670

646-
647671
channel.basicConsume("foo", false, (Map) null, null);
648672
verify(mockChannel).basicConsume("foo", false, (Map) null, null);
649673

0 commit comments

Comments
 (0)