Skip to content

Commit db91a7c

Browse files
committed
spring-projectsGH-2477: Fix Static State RetryTopicConfigSupport
Resolves spring-projects#2477 Previously, a static field was used to detect/prevent multiple instances of `RetryTopicConfigurationSupport`. This causes problems in test suites or when an application context might be destroyed and re-created in the same class loader. Instead, detect and warn when multiple instances are found in an application context. **cherry-pick to 2.9.x**
1 parent 2ad36bd commit db91a7c

13 files changed

+62
-67
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.List;
22-
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.Map;
2323
import java.util.function.Consumer;
2424
import java.util.stream.Collectors;
2525

26+
import org.apache.commons.logging.LogFactory;
2627
import org.apache.kafka.clients.consumer.ConsumerRecord;
2728

29+
import org.springframework.beans.BeansException;
2830
import org.springframework.beans.factory.BeanFactory;
31+
import org.springframework.beans.factory.SmartInitializingSingleton;
2932
import org.springframework.beans.factory.annotation.Qualifier;
3033
import org.springframework.context.ApplicationContext;
34+
import org.springframework.context.ApplicationContextAware;
3135
import org.springframework.context.annotation.Bean;
3236
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.core.log.LogAccessor;
3338
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
3439
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
3540
import org.springframework.kafka.config.KafkaListenerConfigUtils;
@@ -67,14 +72,29 @@
6772
* @author Gary Russell
6873
* @since 2.9
6974
*/
70-
public class RetryTopicConfigurationSupport {
71-
72-
private static final AtomicBoolean ONLY_ONE_ALLOWED = new AtomicBoolean(true);
75+
public class RetryTopicConfigurationSupport implements ApplicationContextAware, SmartInitializingSingleton {
7376

7477
private final RetryTopicComponentFactory componentFactory = createComponentFactory();
7578

76-
public RetryTopicConfigurationSupport() {
77-
Assert.state(ONLY_ONE_ALLOWED.getAndSet(false), "Only one 'RetryTopicConfigurationSupport' is allowed");
79+
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
80+
81+
private ApplicationContext applicationContext;
82+
83+
@Override
84+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
85+
this.applicationContext = applicationContext;
86+
}
87+
88+
@Override
89+
public void afterSingletonsInstantiated() {
90+
if (this.applicationContext != null) {
91+
Map<String, RetryTopicConfigurationSupport> beans = this.applicationContext
92+
.getBeansOfType(RetryTopicConfigurationSupport.class);
93+
if (beans.size() > 1) {
94+
this.logger.warn(() -> "Only one RetryTopicConfigurationSupport object expected, found "
95+
+ beans.keySet() + "; this may result in unexpected behavior");
96+
}
97+
}
7898
}
7999

80100
/**

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AbstractRetryTopicIntegrationTests.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/CircularDltHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
* @since 2.8
3737
*
3838
*/
39-
public class CircularDltHandlerTests extends AbstractRetryTopicIntegrationTests {
39+
public class CircularDltHandlerTests {
4040

4141
@Test
4242
void contextLoads() {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeliveryHeaderTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
@SpringJUnitConfig
5959
@DirtiesContext
6060
@EmbeddedKafka(topics = "dh1")
61-
public class DeliveryHeaderTests extends AbstractRetryTopicIntegrationTests {
61+
public class DeliveryHeaderTests {
6262

6363
@Test
6464
void deliveryAttempts(@Autowired Config config, @Autowired KafkaTemplate<Integer, String> template)

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DltStartupTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
*/
5454
@SpringJUnitConfig
5555
@EmbeddedKafka
56-
public class DltStartupTests extends AbstractRetryTopicIntegrationTests {
56+
public class DltStartupTests {
5757

5858
@Test
5959
void dltStartOverridesCorrect(@Autowired KafkaListenerEndpointRegistry registry) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
ExistingRetryTopicIntegrationTests.MAIN_TOPIC_WITH_PARTITION_INFO,
7777
ExistingRetryTopicIntegrationTests.RETRY_TOPIC_WITH_PARTITION_INFO}, partitions = 4)
7878
@TestPropertySource(properties = "two.attempts=2")
79-
public class ExistingRetryTopicIntegrationTests extends AbstractRetryTopicIntegrationTests {
79+
public class ExistingRetryTopicIntegrationTests {
8080

8181
private static final Logger logger = LoggerFactory.getLogger(ExistingRetryTopicIntegrationTests.class);
8282

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/PartitionResolverTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
@DirtiesContext
6161
@SpringJUnitConfig
6262
@EmbeddedKafka(topics = "partition.resolver.tests")
63-
public class PartitionResolverTests extends AbstractRetryTopicIntegrationTests {
63+
public class PartitionResolverTests {
6464

6565
@Test
6666
void testNullPartition(@Autowired KafkaOperations<Integer, String> template,

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
@SpringJUnitConfig
5858
@DirtiesContext
5959
@EmbeddedKafka(topics = RetryTopicConfigurationIntegrationTests.TOPIC1, partitions = 1)
60-
class RetryTopicConfigurationIntegrationTests extends AbstractRetryTopicIntegrationTests {
60+
class RetryTopicConfigurationIntegrationTests {
6161

6262
public static final String TOPIC1 = "RetryTopicConfigurationIntegrationTests.1";
6363

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
@DirtiesContext
6161
@EmbeddedKafka(topics = { RetryTopicConfigurationManualAssignmentIntegrationTests.TOPIC1,
6262
RetryTopicConfigurationManualAssignmentIntegrationTests.TOPIC2 }, partitions = 1)
63-
class RetryTopicConfigurationManualAssignmentIntegrationTests extends AbstractRetryTopicIntegrationTests {
63+
class RetryTopicConfigurationManualAssignmentIntegrationTests {
6464

6565
public static final String TOPIC1 = "RetryTopicConfigurationManualAssignmentIntegrationTests.1";
6666

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,31 @@
2222
import static org.mockito.BDDMockito.given;
2323
import static org.mockito.BDDMockito.then;
2424
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.spy;
26+
import static org.mockito.Mockito.verify;
2527

26-
import java.lang.reflect.Field;
2728
import java.time.Clock;
29+
import java.util.LinkedHashMap;
2830
import java.util.List;
2931
import java.util.Map;
30-
import java.util.concurrent.atomic.AtomicBoolean;
3132
import java.util.function.Consumer;
33+
import java.util.function.Supplier;
3234

33-
import org.junit.jupiter.api.BeforeEach;
3435
import org.junit.jupiter.api.Test;
3536
import org.mockito.ArgumentCaptor;
3637

38+
import org.springframework.beans.DirectFieldAccessor;
3739
import org.springframework.beans.factory.BeanFactory;
3840
import org.springframework.context.ApplicationContext;
41+
import org.springframework.core.log.LogAccessor;
3942
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
4043
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
4144
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
4245
import org.springframework.kafka.listener.DefaultErrorHandler;
4346
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
4447
import org.springframework.kafka.listener.ListenerContainerRegistry;
4548
import org.springframework.kafka.support.converter.ConversionException;
49+
import org.springframework.kafka.test.utils.KafkaTestUtils;
4650
import org.springframework.scheduling.TaskScheduler;
4751
import org.springframework.util.backoff.BackOff;
4852

@@ -53,13 +57,6 @@
5357
*/
5458
class RetryTopicConfigurationSupportTests {
5559

56-
@BeforeEach
57-
void reset() throws Exception { // NOSONAR
58-
Field field = RetryTopicConfigurationSupport.class.getDeclaredField("ONLY_ONE_ALLOWED");
59-
field.setAccessible(true);
60-
((AtomicBoolean) field.get(null)).set(true);
61-
}
62-
6360
@SuppressWarnings("unchecked")
6461
@Test
6562
void testCreateConfigurer() {
@@ -242,4 +239,23 @@ void testCreatesComponentFactory() {
242239
assertThat(configurationSupport).hasFieldOrProperty("componentFactory").isNotNull();
243240
}
244241

242+
@Test
243+
void twoSupports() {
244+
RetryTopicConfigurationSupport configurationSupport = new RetryTopicConfigurationSupport();
245+
LogAccessor logger = spy(KafkaTestUtils.getPropertyValue(configurationSupport, "logger", LogAccessor.class));
246+
new DirectFieldAccessor(configurationSupport).setPropertyValue("logger", logger);
247+
ApplicationContext ctx = mock(ApplicationContext.class);
248+
configurationSupport.setApplicationContext(ctx);
249+
Map<String, RetryTopicConfigurationSupport> beans = new LinkedHashMap<>();
250+
beans.put("foo", configurationSupport);
251+
beans.put("bar", configurationSupport);
252+
given(ctx.getBeansOfType(RetryTopicConfigurationSupport.class)).willReturn(beans);
253+
configurationSupport.afterSingletonsInstantiated();
254+
@SuppressWarnings("unchecked")
255+
ArgumentCaptor<Supplier<String>> captor = ArgumentCaptor.forClass(Supplier.class);
256+
verify(logger).warn(captor.capture());
257+
assertThat(captor.getValue().get()).isEqualTo("Only one RetryTopicConfigurationSupport object expected, found "
258+
+ "[foo, bar]; this may result in unexpected behavior");
259+
}
260+
245261
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
@SpringJUnitConfig
7171
@DirtiesContext
7272
@EmbeddedKafka
73-
public class RetryTopicExceptionRoutingIntegrationTests extends AbstractRetryTopicIntegrationTests {
73+
public class RetryTopicExceptionRoutingIntegrationTests {
7474

7575
private static final Logger logger = LoggerFactory.getLogger(RetryTopicExceptionRoutingIntegrationTests.class);
7676

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
RetryTopicIntegrationTests.FOURTH_TOPIC,
9797
RetryTopicIntegrationTests.TWO_LISTENERS_TOPIC })
9898
@TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"})
99-
public class RetryTopicIntegrationTests extends AbstractRetryTopicIntegrationTests {
99+
public class RetryTopicIntegrationTests {
100100

101101
private static final Logger logger = LoggerFactory.getLogger(RetryTopicIntegrationTests.class);
102102

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
@DirtiesContext
6868
@EmbeddedKafka(topics = { RetryTopicSameContainerFactoryIntegrationTests.FIRST_TOPIC,
6969
RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC}, partitions = 1)
70-
public class RetryTopicSameContainerFactoryIntegrationTests extends AbstractRetryTopicIntegrationTests {
70+
public class RetryTopicSameContainerFactoryIntegrationTests {
7171

7272
private static final Logger logger = LoggerFactory.getLogger(RetryTopicSameContainerFactoryIntegrationTests.class);
7373

0 commit comments

Comments
 (0)