Skip to content

Commit 6a253f2

Browse files
garyrussellartembilan
authored andcommitted
GH-2432: Fix Retryable Topic Provisioning
Resolves #2432 Don't provision an individual retry topic bean, if there is already a `NewTopic` bean with the same topic name. **cherry-pick to 2.9.x, 2.8.x** * Fix `TopicForRetry` removal logic; include `NewTopics` beans in logic. * Improve test. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java # spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java
1 parent 950a2ea commit 6a253f2

File tree

4 files changed

+121
-7
lines changed

4 files changed

+121
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import java.util.Collection;
2323
import java.util.Collections;
2424
import java.util.HashMap;
25+
import java.util.Iterator;
2526
import java.util.LinkedList;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Map.Entry;
2830
import java.util.Optional;
2931
import java.util.concurrent.ExecutionException;
3032
import java.util.concurrent.TimeUnit;
3133
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.atomic.AtomicInteger;
3235
import java.util.stream.Collectors;
3336

3437
import org.apache.commons.logging.LogFactory;
@@ -57,6 +60,7 @@
5760
import org.springframework.context.ApplicationContextAware;
5861
import org.springframework.core.log.LogAccessor;
5962
import org.springframework.kafka.KafkaException;
63+
import org.springframework.kafka.support.TopicForRetryable;
6064

6165
/**
6266
* An admin that delegates to an {@link AdminClient} to create topics defined
@@ -178,10 +182,7 @@ public void afterSingletonsInstantiated() {
178182
* @see #setAutoCreate(boolean)
179183
*/
180184
public final boolean initialize() {
181-
Collection<NewTopic> newTopics = new ArrayList<>(
182-
this.applicationContext.getBeansOfType(NewTopic.class, false, false).values());
183-
Collection<NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false).values();
184-
wrappers.forEach(wrapper -> newTopics.addAll(wrapper.getNewTopics()));
185+
Collection<NewTopic> newTopics = newTopics();
185186
if (newTopics.size() > 0) {
186187
AdminClient adminClient = null;
187188
try {
@@ -218,6 +219,41 @@ public final boolean initialize() {
218219
return false;
219220
}
220221

222+
/*
223+
* Remove any TopicForRetryable bean if there is also a NewTopic with the same topic name.
224+
*/
225+
private Collection<NewTopic> newTopics() {
226+
Map<String, NewTopic> newTopicsMap = new HashMap<>(
227+
this.applicationContext.getBeansOfType(NewTopic.class, false, false));
228+
Map<String, NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false);
229+
AtomicInteger count = new AtomicInteger();
230+
wrappers.forEach((name, newTopics) -> {
231+
newTopics.getNewTopics().forEach(nt -> newTopicsMap.put(name + "#" + count.getAndIncrement(), nt));
232+
});
233+
Map<String, NewTopic> topicsForRetry = newTopicsMap.entrySet().stream()
234+
.filter(entry -> entry.getValue() instanceof TopicForRetryable)
235+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
236+
for (Entry<String, NewTopic> entry : topicsForRetry.entrySet()) {
237+
Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator();
238+
boolean remove = false;
239+
while (iterator.hasNext()) {
240+
Entry<String, NewTopic> nt = iterator.next();
241+
// if we have a NewTopic and TopicForRetry with the same name, remove the latter
242+
if (nt.getValue().name().equals(entry.getValue().name())
243+
&& !(nt.getValue() instanceof TopicForRetryable)) {
244+
245+
remove = true;
246+
break;
247+
}
248+
}
249+
if (remove) {
250+
newTopicsMap.remove(entry.getKey());
251+
}
252+
}
253+
Collection<NewTopic> newTopics = new ArrayList<>(newTopicsMap.values());
254+
return newTopics;
255+
}
256+
221257
@Override
222258
public void createOrModifyTopics(NewTopic... topics) {
223259
try (AdminClient client = createAdmin()) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.function.Consumer;
2222

2323
import org.apache.commons.logging.LogFactory;
24-
import org.apache.kafka.clients.admin.NewTopic;
2524
import org.apache.kafka.clients.consumer.ConsumerRecord;
2625

2726
import org.springframework.beans.factory.BeanFactory;
@@ -35,6 +34,7 @@
3534
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
3635
import org.springframework.kafka.listener.ListenerUtils;
3736
import org.springframework.kafka.support.EndpointHandlerMethod;
37+
import org.springframework.kafka.support.TopicForRetryable;
3838
import org.springframework.lang.Nullable;
3939

4040

@@ -355,7 +355,7 @@ protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfigur
355355
String beanName = topic + "-topicRegistrationBean";
356356
if (!bf.containsBean(beanName)) {
357357
bf.registerSingleton(beanName,
358-
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()));
358+
new TopicForRetryable(topic, config.getNumPartitions(), config.getReplicationFactor()));
359359
}
360360
}
361361
);
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
import org.apache.kafka.clients.admin.NewTopic;
20+
21+
/**
22+
* Marker to indicate this {@link NewTopic} is for retryable topics; admin will ignore these if
23+
* a regular {@link NewTopic} exist.
24+
*
25+
* @author Gary Russell
26+
* @since 2.8.10
27+
*
28+
*/
29+
public class TopicForRetryable extends NewTopic {
30+
31+
/**
32+
* Create an instance with the provided properties.
33+
* @param topic the topic.
34+
* @param numPartitions the partitions.
35+
* @param replicationFactor the replication factor.
36+
*/
37+
public TopicForRetryable(String topic, int numPartitions, short replicationFactor) {
38+
super(topic, numPartitions, replicationFactor);
39+
}
40+
41+
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,23 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.fail;
2121

22+
import java.lang.reflect.Method;
2223
import java.time.Clock;
2324
import java.util.ArrayList;
2425
import java.util.Arrays;
26+
import java.util.Collection;
2527
import java.util.Collections;
2628
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
2931
import java.util.concurrent.CountDownLatch;
3032
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.concurrent.atomic.AtomicReference;
3135

3236
import org.apache.kafka.clients.admin.AdminClientConfig;
37+
import org.apache.kafka.clients.admin.NewTopic;
38+
import org.apache.kafka.clients.admin.TopicDescription;
3339
import org.apache.kafka.clients.consumer.ConsumerConfig;
3440
import org.apache.kafka.clients.producer.ProducerConfig;
3541
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -49,10 +55,12 @@
4955
import org.springframework.kafka.annotation.RetryableTopic;
5056
import org.springframework.kafka.annotation.TopicPartition;
5157
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
58+
import org.springframework.kafka.config.TopicBuilder;
5259
import org.springframework.kafka.core.ConsumerFactory;
5360
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5461
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
5562
import org.springframework.kafka.core.KafkaAdmin;
63+
import org.springframework.kafka.core.KafkaAdmin.NewTopics;
5664
import org.springframework.kafka.core.KafkaTemplate;
5765
import org.springframework.kafka.core.ProducerFactory;
5866
import org.springframework.kafka.listener.ContainerProperties;
@@ -129,11 +137,30 @@ void shouldRetrySecondTopic() {
129137
}
130138

131139
@Test
132-
void shouldRetryThirdTopicWithTimeout() {
140+
void shouldRetryThirdTopicWithTimeout(@Autowired KafkaAdmin admin) throws Exception {
133141
logger.debug("Sending message to topic " + THIRD_TOPIC);
134142
kafkaTemplate.send(THIRD_TOPIC, "Testing topic 3");
135143
assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue();
136144
assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue();
145+
Map<String, TopicDescription> topics = admin.describeTopics(THIRD_TOPIC, THIRD_TOPIC + "-dlt", FOURTH_TOPIC);
146+
assertThat(topics.get(THIRD_TOPIC).partitions()).hasSize(2);
147+
assertThat(topics.get(THIRD_TOPIC + "-dlt").partitions()).hasSize(3);
148+
assertThat(topics.get(FOURTH_TOPIC).partitions()).hasSize(2);
149+
AtomicReference<Method> method = new AtomicReference<>();
150+
org.springframework.util.ReflectionUtils.doWithMethods(KafkaAdmin.class, m -> {
151+
m.setAccessible(true);
152+
method.set(m);
153+
}, m -> m.getName().equals("newTopics"));
154+
@SuppressWarnings("unchecked")
155+
Collection<NewTopic> weededTopics = (Collection<NewTopic>) method.get().invoke(admin);
156+
AtomicInteger weeded = new AtomicInteger();
157+
weededTopics.forEach(topic -> {
158+
if (topic.name().equals(THIRD_TOPIC) || topic.name().equals(FOURTH_TOPIC)) {
159+
assertThat(topic).isExactlyInstanceOf(NewTopic.class);
160+
weeded.incrementAndGet();
161+
}
162+
});
163+
assertThat(weeded.get()).isEqualTo(2);
137164
}
138165

139166
@Test
@@ -554,6 +581,16 @@ public KafkaAdmin kafkaAdmin() {
554581
return new KafkaAdmin(configs);
555582
}
556583

584+
@Bean
585+
public NewTopic topic() {
586+
return TopicBuilder.name(THIRD_TOPIC).partitions(2).replicas(1).build();
587+
}
588+
589+
@Bean
590+
public NewTopics topics() {
591+
return new NewTopics(TopicBuilder.name(FOURTH_TOPIC).partitions(2).replicas(1).build());
592+
}
593+
557594
@Bean
558595
public ConsumerFactory<String, String> consumerFactory() {
559596
Map<String, Object> props = new HashMap<>();

0 commit comments

Comments
 (0)