Skip to content

GH-2477: Fix Static State RetryTopicConfigSupport #2479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
Expand Down Expand Up @@ -67,14 +72,29 @@
* @author Gary Russell
* @since 2.9
*/
public class RetryTopicConfigurationSupport {

private static final AtomicBoolean ONLY_ONE_ALLOWED = new AtomicBoolean(true);
public class RetryTopicConfigurationSupport implements ApplicationContextAware, SmartInitializingSingleton {

private final RetryTopicComponentFactory componentFactory = createComponentFactory();

public RetryTopicConfigurationSupport() {
Assert.state(ONLY_ONE_ALLOWED.getAndSet(false), "Only one 'RetryTopicConfigurationSupport' is allowed");
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));

private ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Override
public void afterSingletonsInstantiated() {
if (this.applicationContext != null) {
Map<String, RetryTopicConfigurationSupport> beans = this.applicationContext
.getBeansOfType(RetryTopicConfigurationSupport.class, false, false);
if (beans.size() > 1) {
this.logger.warn(() -> "Only one RetryTopicConfigurationSupport object expected, found "
+ beans.keySet() + "; this may result in unexpected behavior");
}
}
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* @since 2.8
*
*/
public class CircularDltHandlerTests extends AbstractRetryTopicIntegrationTests {
public class CircularDltHandlerTests {

@Test
void contextLoads() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = "dh1")
public class DeliveryHeaderTests extends AbstractRetryTopicIntegrationTests {
public class DeliveryHeaderTests {

@Test
void deliveryAttempts(@Autowired Config config, @Autowired KafkaTemplate<Integer, String> template)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
*/
@SpringJUnitConfig
@EmbeddedKafka
public class DltStartupTests extends AbstractRetryTopicIntegrationTests {
public class DltStartupTests {

@Test
void dltStartOverridesCorrect(@Autowired KafkaListenerEndpointRegistry registry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
ExistingRetryTopicIntegrationTests.MAIN_TOPIC_WITH_PARTITION_INFO,
ExistingRetryTopicIntegrationTests.RETRY_TOPIC_WITH_PARTITION_INFO}, partitions = 4)
@TestPropertySource(properties = "two.attempts=2")
public class ExistingRetryTopicIntegrationTests extends AbstractRetryTopicIntegrationTests {
public class ExistingRetryTopicIntegrationTests {

private static final Logger logger = LoggerFactory.getLogger(ExistingRetryTopicIntegrationTests.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
@DirtiesContext
@SpringJUnitConfig
@EmbeddedKafka(topics = "partition.resolver.tests")
public class PartitionResolverTests extends AbstractRetryTopicIntegrationTests {
public class PartitionResolverTests {

@Test
void testNullPartition(@Autowired KafkaOperations<Integer, String> template,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = RetryTopicConfigurationIntegrationTests.TOPIC1, partitions = 1)
class RetryTopicConfigurationIntegrationTests extends AbstractRetryTopicIntegrationTests {
class RetryTopicConfigurationIntegrationTests {

public static final String TOPIC1 = "RetryTopicConfigurationIntegrationTests.1";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
@DirtiesContext
@EmbeddedKafka(topics = { RetryTopicConfigurationManualAssignmentIntegrationTests.TOPIC1,
RetryTopicConfigurationManualAssignmentIntegrationTests.TOPIC2 }, partitions = 1)
class RetryTopicConfigurationManualAssignmentIntegrationTests extends AbstractRetryTopicIntegrationTests {
class RetryTopicConfigurationManualAssignmentIntegrationTests {

public static final String TOPIC1 = "RetryTopicConfigurationManualAssignmentIntegrationTests.1";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,31 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.lang.reflect.Field;
import java.time.Clock;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.backoff.BackOff;

Expand All @@ -53,13 +57,6 @@
*/
class RetryTopicConfigurationSupportTests {

@BeforeEach
void reset() throws Exception { // NOSONAR
Field field = RetryTopicConfigurationSupport.class.getDeclaredField("ONLY_ONE_ALLOWED");
field.setAccessible(true);
((AtomicBoolean) field.get(null)).set(true);
}

@SuppressWarnings("unchecked")
@Test
void testCreateConfigurer() {
Expand Down Expand Up @@ -242,4 +239,23 @@ void testCreatesComponentFactory() {
assertThat(configurationSupport).hasFieldOrProperty("componentFactory").isNotNull();
}

@Test
void twoSupports() {
RetryTopicConfigurationSupport configurationSupport = new RetryTopicConfigurationSupport();
LogAccessor logger = spy(KafkaTestUtils.getPropertyValue(configurationSupport, "logger", LogAccessor.class));
new DirectFieldAccessor(configurationSupport).setPropertyValue("logger", logger);
ApplicationContext ctx = mock(ApplicationContext.class);
configurationSupport.setApplicationContext(ctx);
Map<String, RetryTopicConfigurationSupport> beans = new LinkedHashMap<>();
beans.put("foo", configurationSupport);
beans.put("bar", configurationSupport);
given(ctx.getBeansOfType(RetryTopicConfigurationSupport.class, false, false)).willReturn(beans);
configurationSupport.afterSingletonsInstantiated();
@SuppressWarnings("unchecked")
ArgumentCaptor<Supplier<String>> captor = ArgumentCaptor.forClass(Supplier.class);
verify(logger).warn(captor.capture());
assertThat(captor.getValue().get()).isEqualTo("Only one RetryTopicConfigurationSupport object expected, found "
+ "[foo, bar]; this may result in unexpected behavior");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka
public class RetryTopicExceptionRoutingIntegrationTests extends AbstractRetryTopicIntegrationTests {
public class RetryTopicExceptionRoutingIntegrationTests {

private static final Logger logger = LoggerFactory.getLogger(RetryTopicExceptionRoutingIntegrationTests.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
RetryTopicIntegrationTests.FOURTH_TOPIC,
RetryTopicIntegrationTests.TWO_LISTENERS_TOPIC })
@TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"})
public class RetryTopicIntegrationTests extends AbstractRetryTopicIntegrationTests {
public class RetryTopicIntegrationTests {

private static final Logger logger = LoggerFactory.getLogger(RetryTopicIntegrationTests.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
@DirtiesContext
@EmbeddedKafka(topics = { RetryTopicSameContainerFactoryIntegrationTests.FIRST_TOPIC,
RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC}, partitions = 1)
public class RetryTopicSameContainerFactoryIntegrationTests extends AbstractRetryTopicIntegrationTests {
public class RetryTopicSameContainerFactoryIntegrationTests {

private static final Logger logger = LoggerFactory.getLogger(RetryTopicSameContainerFactoryIntegrationTests.class);

Expand Down