diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java index 4079bf09f7..8641228a49 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java @@ -19,17 +19,22 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Map; import java.util.function.Consumer; import java.util.stream.Collectors; +import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.log.LogAccessor; import org.springframework.kafka.annotation.EnableKafkaRetryTopic; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; import org.springframework.kafka.config.KafkaListenerConfigUtils; @@ -67,14 +72,29 @@ * @author Gary Russell * @since 2.9 */ -public class RetryTopicConfigurationSupport { - - private static final AtomicBoolean ONLY_ONE_ALLOWED = new AtomicBoolean(true); +public class RetryTopicConfigurationSupport implements ApplicationContextAware, SmartInitializingSingleton { private final RetryTopicComponentFactory componentFactory = createComponentFactory(); - public RetryTopicConfigurationSupport() { - Assert.state(ONLY_ONE_ALLOWED.getAndSet(false), "Only one 'RetryTopicConfigurationSupport' is allowed"); + private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + private ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + @Override + public void afterSingletonsInstantiated() { + if (this.applicationContext != null) { + Map beans = this.applicationContext + .getBeansOfType(RetryTopicConfigurationSupport.class, false, false); + if (beans.size() > 1) { + this.logger.warn(() -> "Only one RetryTopicConfigurationSupport object expected, found " + + beans.keySet() + "; this may result in unexpected behavior"); + } + } } /** diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AbstractRetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AbstractRetryTopicIntegrationTests.java deleted file mode 100644 index b46cb7cb14..0000000000 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AbstractRetryTopicIntegrationTests.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.retrytopic; - -import java.lang.reflect.Field; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.jupiter.api.BeforeAll; - -/** - * @author Gary Russell - * @since 2.9 - * - */ -public class AbstractRetryTopicIntegrationTests { - - protected AbstractRetryTopicIntegrationTests() { - } - - @BeforeAll - static void reset() throws Exception { // NOSONAR - Field field = RetryTopicConfigurationSupport.class.getDeclaredField("ONLY_ONE_ALLOWED"); - field.setAccessible(true); - ((AtomicBoolean) field.get(null)).set(true); - } - -} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/CircularDltHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/CircularDltHandlerTests.java index 3872491c3f..4cb6820080 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/CircularDltHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/CircularDltHandlerTests.java @@ -36,7 +36,7 @@ * @since 2.8 * */ -public class CircularDltHandlerTests extends AbstractRetryTopicIntegrationTests { +public class CircularDltHandlerTests { @Test void contextLoads() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeliveryHeaderTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeliveryHeaderTests.java index bbf1687483..1abbb3e624 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeliveryHeaderTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeliveryHeaderTests.java @@ -58,7 +58,7 @@ @SpringJUnitConfig @DirtiesContext @EmbeddedKafka(topics = "dh1") -public class DeliveryHeaderTests extends AbstractRetryTopicIntegrationTests { +public class DeliveryHeaderTests { @Test void deliveryAttempts(@Autowired Config config, @Autowired KafkaTemplate template) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DltStartupTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DltStartupTests.java index 7b22efc41d..b8bfe6fb5d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DltStartupTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DltStartupTests.java @@ -53,7 +53,7 @@ */ @SpringJUnitConfig @EmbeddedKafka -public class DltStartupTests extends AbstractRetryTopicIntegrationTests { +public class DltStartupTests { @Test void dltStartOverridesCorrect(@Autowired KafkaListenerEndpointRegistry registry) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java index c46953f0cc..2988c589a4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ExistingRetryTopicIntegrationTests.java @@ -76,7 +76,7 @@ ExistingRetryTopicIntegrationTests.MAIN_TOPIC_WITH_PARTITION_INFO, ExistingRetryTopicIntegrationTests.RETRY_TOPIC_WITH_PARTITION_INFO}, partitions = 4) @TestPropertySource(properties = "two.attempts=2") -public class ExistingRetryTopicIntegrationTests extends AbstractRetryTopicIntegrationTests { +public class ExistingRetryTopicIntegrationTests { private static final Logger logger = LoggerFactory.getLogger(ExistingRetryTopicIntegrationTests.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/PartitionResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/PartitionResolverTests.java index 218e6b7740..4aa43135cf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/PartitionResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/PartitionResolverTests.java @@ -60,7 +60,7 @@ @DirtiesContext @SpringJUnitConfig @EmbeddedKafka(topics = "partition.resolver.tests") -public class PartitionResolverTests extends AbstractRetryTopicIntegrationTests { +public class PartitionResolverTests { @Test void testNullPartition(@Autowired KafkaOperations template, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java index 341adb30da..1023e5196d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java @@ -57,7 +57,7 @@ @SpringJUnitConfig @DirtiesContext @EmbeddedKafka(topics = RetryTopicConfigurationIntegrationTests.TOPIC1, partitions = 1) -class RetryTopicConfigurationIntegrationTests extends AbstractRetryTopicIntegrationTests { +class RetryTopicConfigurationIntegrationTests { public static final String TOPIC1 = "RetryTopicConfigurationIntegrationTests.1"; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java index 2cc011ba0d..3461834d54 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationManualAssignmentIntegrationTests.java @@ -60,7 +60,7 @@ @DirtiesContext @EmbeddedKafka(topics = { RetryTopicConfigurationManualAssignmentIntegrationTests.TOPIC1, RetryTopicConfigurationManualAssignmentIntegrationTests.TOPIC2 }, partitions = 1) -class RetryTopicConfigurationManualAssignmentIntegrationTests extends AbstractRetryTopicIntegrationTests { +class RetryTopicConfigurationManualAssignmentIntegrationTests { public static final String TOPIC1 = "RetryTopicConfigurationManualAssignmentIntegrationTests.1"; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java index 703d74691a..fd953c7371 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java @@ -22,20 +22,23 @@ import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -import java.lang.reflect.Field; import java.time.Clock; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.function.Supplier; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanFactory; import org.springframework.context.ApplicationContext; +import org.springframework.core.log.LogAccessor; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; @@ -43,6 +46,7 @@ import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.listener.ListenerContainerRegistry; import org.springframework.kafka.support.converter.ConversionException; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.scheduling.TaskScheduler; import org.springframework.util.backoff.BackOff; @@ -53,13 +57,6 @@ */ class RetryTopicConfigurationSupportTests { - @BeforeEach - void reset() throws Exception { // NOSONAR - Field field = RetryTopicConfigurationSupport.class.getDeclaredField("ONLY_ONE_ALLOWED"); - field.setAccessible(true); - ((AtomicBoolean) field.get(null)).set(true); - } - @SuppressWarnings("unchecked") @Test void testCreateConfigurer() { @@ -242,4 +239,23 @@ void testCreatesComponentFactory() { assertThat(configurationSupport).hasFieldOrProperty("componentFactory").isNotNull(); } + @Test + void twoSupports() { + RetryTopicConfigurationSupport configurationSupport = new RetryTopicConfigurationSupport(); + LogAccessor logger = spy(KafkaTestUtils.getPropertyValue(configurationSupport, "logger", LogAccessor.class)); + new DirectFieldAccessor(configurationSupport).setPropertyValue("logger", logger); + ApplicationContext ctx = mock(ApplicationContext.class); + configurationSupport.setApplicationContext(ctx); + Map beans = new LinkedHashMap<>(); + beans.put("foo", configurationSupport); + beans.put("bar", configurationSupport); + given(ctx.getBeansOfType(RetryTopicConfigurationSupport.class, false, false)).willReturn(beans); + configurationSupport.afterSingletonsInstantiated(); + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(Supplier.class); + verify(logger).warn(captor.capture()); + assertThat(captor.getValue().get()).isEqualTo("Only one RetryTopicConfigurationSupport object expected, found " + + "[foo, bar]; this may result in unexpected behavior"); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java index e8f5282a34..36c8f39635 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -70,7 +70,7 @@ @SpringJUnitConfig @DirtiesContext @EmbeddedKafka -public class RetryTopicExceptionRoutingIntegrationTests extends AbstractRetryTopicIntegrationTests { +public class RetryTopicExceptionRoutingIntegrationTests { private static final Logger logger = LoggerFactory.getLogger(RetryTopicExceptionRoutingIntegrationTests.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java index a1375849f9..bf9a1ba02d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java @@ -96,7 +96,7 @@ RetryTopicIntegrationTests.FOURTH_TOPIC, RetryTopicIntegrationTests.TWO_LISTENERS_TOPIC }) @TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"}) -public class RetryTopicIntegrationTests extends AbstractRetryTopicIntegrationTests { +public class RetryTopicIntegrationTests { private static final Logger logger = LoggerFactory.getLogger(RetryTopicIntegrationTests.class); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java index 4f7385e5e2..4d70ffff1a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java @@ -67,7 +67,7 @@ @DirtiesContext @EmbeddedKafka(topics = { RetryTopicSameContainerFactoryIntegrationTests.FIRST_TOPIC, RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC}, partitions = 1) -public class RetryTopicSameContainerFactoryIntegrationTests extends AbstractRetryTopicIntegrationTests { +public class RetryTopicSameContainerFactoryIntegrationTests { private static final Logger logger = LoggerFactory.getLogger(RetryTopicSameContainerFactoryIntegrationTests.class);