Skip to content

Commit 20c80fe

Browse files
committed
spring-projectsGH-2432: Fix Retryable Topic Provisioning
Resolves spring-projects#2432 Don't provision an individual retry topic bean, if there is already a `NewTopic` bean with the same topic name. **cherry-pick to 2.9.x, 2.8.x**
1 parent 6536f3c commit 20c80fe

File tree

4 files changed

+83
-5
lines changed

4 files changed

+83
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import java.util.Collection;
2323
import java.util.Collections;
2424
import java.util.HashMap;
25+
import java.util.Iterator;
2526
import java.util.LinkedList;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Map.Entry;
2830
import java.util.Optional;
2931
import java.util.concurrent.ExecutionException;
3032
import java.util.concurrent.TimeUnit;
@@ -57,6 +59,7 @@
5759
import org.springframework.context.ApplicationContextAware;
5860
import org.springframework.core.log.LogAccessor;
5961
import org.springframework.kafka.KafkaException;
62+
import org.springframework.kafka.support.TopicForRetryable;
6063
import org.springframework.lang.Nullable;
6164

6265
/**
@@ -181,8 +184,7 @@ public void afterSingletonsInstantiated() {
181184
* @see #setAutoCreate(boolean)
182185
*/
183186
public final boolean initialize() {
184-
Collection<NewTopic> newTopics = new ArrayList<>(
185-
this.applicationContext.getBeansOfType(NewTopic.class, false, false).values());
187+
Collection<NewTopic> newTopics = newTopics();
186188
Collection<NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false).values();
187189
wrappers.forEach(wrapper -> newTopics.addAll(wrapper.getNewTopics()));
188190
if (newTopics.size() > 0) {
@@ -225,6 +227,30 @@ public final boolean initialize() {
225227
return false;
226228
}
227229

230+
/*
231+
* Remove any TopicForRetryable bean if there is also a NewTopic with the same topic name.
232+
*/
233+
private Collection<NewTopic> newTopics() {
234+
Map<String, NewTopic> newTopicsMap = new HashMap<>(
235+
this.applicationContext.getBeansOfType(NewTopic.class, false, false));
236+
Map<String, NewTopic> topicsForRetry = newTopicsMap.entrySet().stream()
237+
.filter(entry -> entry.getValue() instanceof TopicForRetryable)
238+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
239+
for (Entry<String, NewTopic> entry : topicsForRetry.entrySet()) {
240+
Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator();
241+
while (iterator.hasNext()) {
242+
Entry<String, NewTopic> nt = iterator.next();
243+
if (nt.getValue().name().equals(entry.getValue().name())
244+
&& !(entry.getValue() instanceof TopicForRetryable)) {
245+
246+
iterator.remove();
247+
}
248+
}
249+
}
250+
Collection<NewTopic> newTopics = new ArrayList<>(newTopicsMap.values());
251+
return newTopics;
252+
}
253+
228254
@Override
229255
@Nullable
230256
public String clusterId() {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.function.Consumer;
2222

2323
import org.apache.commons.logging.LogFactory;
24-
import org.apache.kafka.clients.admin.NewTopic;
2524
import org.apache.kafka.clients.consumer.ConsumerRecord;
2625

2726
import org.springframework.beans.BeansException;
@@ -37,6 +36,7 @@
3736
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
3837
import org.springframework.kafka.support.EndpointHandlerMethod;
3938
import org.springframework.kafka.support.KafkaUtils;
39+
import org.springframework.kafka.support.TopicForRetryable;
4040
import org.springframework.lang.Nullable;
4141

4242

@@ -363,7 +363,7 @@ protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfigur
363363
String beanName = topic + "-topicRegistrationBean";
364364
if (!bf.containsBean(beanName)) {
365365
bf.registerSingleton(beanName,
366-
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()));
366+
new TopicForRetryable(topic, config.getNumPartitions(), config.getReplicationFactor()));
367367
}
368368
}
369369
);
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import org.apache.kafka.clients.admin.NewTopic;
20+
21+
/**
22+
* Marker to indicate this {@link NewTopic} is for retryable topics; admin will ignore these if
23+
* a regular {@link NewTopic} exist.
24+
*
25+
* @author Gary Russell
26+
* @since 2.8.10
27+
*
28+
*/
29+
public class TopicForRetryable extends NewTopic {
30+
31+
/**
32+
* Create an instance with the provided properties.
33+
* @param topic the topic.
34+
* @param numPartitions the partitions.
35+
* @param replicationFactor the replication factor.
36+
*/
37+
public TopicForRetryable(String topic, int numPartitions, short replicationFactor) {
38+
super(topic, numPartitions, replicationFactor);
39+
}
40+
41+
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.concurrent.TimeUnit;
3030

3131
import org.apache.kafka.clients.admin.AdminClientConfig;
32+
import org.apache.kafka.clients.admin.NewTopic;
33+
import org.apache.kafka.clients.admin.TopicDescription;
3234
import org.apache.kafka.clients.consumer.ConsumerConfig;
3335
import org.apache.kafka.clients.producer.ProducerConfig;
3436
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -47,6 +49,7 @@
4749
import org.springframework.kafka.annotation.RetryableTopic;
4850
import org.springframework.kafka.annotation.TopicPartition;
4951
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
52+
import org.springframework.kafka.config.TopicBuilder;
5053
import org.springframework.kafka.core.ConsumerFactory;
5154
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5255
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -133,11 +136,14 @@ void shouldRetrySecondTopic() {
133136
}
134137

135138
@Test
136-
void shouldRetryThirdTopicWithTimeout() {
139+
void shouldRetryThirdTopicWithTimeout(@Autowired KafkaAdmin admin) {
137140
logger.debug("Sending message to topic " + THIRD_TOPIC);
138141
kafkaTemplate.send(THIRD_TOPIC, "Testing topic 3");
139142
assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue();
140143
assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue();
144+
Map<String, TopicDescription> topics = admin.describeTopics(THIRD_TOPIC, THIRD_TOPIC + "-dlt");
145+
assertThat(topics.get(THIRD_TOPIC).partitions()).hasSize(2);
146+
assertThat(topics.get(THIRD_TOPIC + "-dlt").partitions()).hasSize(3);
141147
}
142148

143149
@Test
@@ -527,6 +533,11 @@ public KafkaAdmin kafkaAdmin() {
527533
return new KafkaAdmin(configs);
528534
}
529535

536+
@Bean
537+
public NewTopic topic() {
538+
return TopicBuilder.name(THIRD_TOPIC).partitions(2).replicas(1).build();
539+
}
540+
530541
@Bean
531542
public ConsumerFactory<String, String> consumerFactory() {
532543
Map<String, Object> props = new HashMap<>();

0 commit comments

Comments
 (0)