Skip to content

Commit 80b1770

Browse files
GH-2220: Fix TopicPartitionOffset for Retry Topics (#2223)
* GH-2220: Fix TopicPartitionOffset for Retry Topics Resolves #2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic. * Move default partition to constant field. * Address review comments Make EndpointCustomizerFactory ctor public * Remove main endpoint customization * Remove failing @SuppressWarnings * Revert TPO hashCode changes Adjust tests Improve TPO and topics handling * Enable customizing main endpoint TPOs
1 parent 01d3672 commit 80b1770

File tree

8 files changed

+377
-18
lines changed

8 files changed

+377
-18
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ public KafkaTemplate<String, Object> kafkaTemplate() {
148148
----
149149
====
150150

151+
IMPORTANT: Multiple `@KafkaListener` annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
152+
It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics; if multiple `@RetryableTopic` annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored.
153+
151154
==== Features
152155

153156
Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -219,6 +219,16 @@ public long delay() {
219219
return this.delayMs;
220220
}
221221

222+
/**
223+
* Return the number of partitions the
224+
* retry topics should be created with.
225+
* @return the number of partitions.
226+
* @since 2.7.13
227+
*/
228+
public int numPartitions() {
229+
return this.numPartitions;
230+
}
231+
222232
@Nullable
223233
public Boolean autoStartDltHandler() {
224234
return this.autoStartDltHandler;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.Collection;
2222
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
2324

2425
import org.springframework.beans.factory.BeanFactory;
2526
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
@@ -42,6 +43,8 @@
4243
*/
4344
public class EndpointCustomizerFactory {
4445

46+
private static final int DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT = 0;
47+
4548
private final DestinationTopic.Properties destinationProperties;
4649

4750
private final EndpointHandlerMethod beanMethod;
@@ -50,7 +53,7 @@ public class EndpointCustomizerFactory {
5053

5154
private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;
5255

53-
EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod,
56+
public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod,
5457
BeanFactory beanFactory, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
5558

5659
this.destinationProperties = destinationProperties;
@@ -71,7 +74,14 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr
7174
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint);
7275
endpoint.setId(namesProvider.getEndpointId(endpoint));
7376
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
74-
endpoint.setTopics(topics.stream().map(EndpointCustomizer.TopicNamesHolder::getCustomizedTopic).toArray(String[]::new));
77+
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
78+
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider,
79+
endpoint.getTopicPartitionsToAssign()));
80+
}
81+
else {
82+
endpoint.setTopics(endpoint.getTopics().stream()
83+
.map(namesProvider::getTopicName).toArray(String[]::new));
84+
}
7585
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint));
7686
endpoint.setGroup(namesProvider.getGroup(endpoint));
7787
endpoint.setBean(bean);
@@ -84,6 +94,29 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr
8494
};
8595
}
8696

97+
private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties,
98+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
99+
TopicPartitionOffset[] topicPartitionOffsets) {
100+
return Stream.of(topicPartitionOffsets)
101+
.map(tpo -> properties.isMainEndpoint()
102+
? getTPOForMainTopic(namesProvider, tpo)
103+
: getTPOForRetryTopics(properties, namesProvider, tpo))
104+
.toArray(TopicPartitionOffset[]::new);
105+
}
106+
107+
private static TopicPartitionOffset getTPOForRetryTopics(DestinationTopic.Properties properties, RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
108+
return new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
109+
tpo.getPartition() <= properties.numPartitions() ? tpo.getPartition() : DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT,
110+
(Long) null);
111+
}
112+
113+
private static TopicPartitionOffset getTPOForMainTopic(RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) {
114+
TopicPartitionOffset newTpo = new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()),
115+
tpo.getPartition(), tpo.getOffset(), tpo.getPosition());
116+
newTpo.setRelativeToCurrent(tpo.isRelativeToCurrent());
117+
return newTpo;
118+
}
119+
87120
protected Collection<EndpointCustomizer.TopicNamesHolder> customizeAndRegisterTopics(
88121
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
89122
MethodKafkaListenerEndpoint<?, ?> endpoint) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -341,10 +341,14 @@ private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfigur
341341
}
342342

343343
protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfiguration.TopicCreation config) {
344-
topics.forEach(topic ->
345-
((DefaultListableBeanFactory) this.beanFactory)
346-
.registerSingleton(topic + "-topicRegistrationBean",
347-
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()))
344+
topics.forEach(topic -> {
345+
DefaultListableBeanFactory bf = ((DefaultListableBeanFactory) this.beanFactory);
346+
String beanName = topic + "-topicRegistrationBean";
347+
if (!bf.containsBean(beanName)) {
348+
bf.registerSingleton(beanName,
349+
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()));
350+
}
351+
}
348352
);
349353
}
350354

spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.retrytopic;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.BDDMockito.given;
21+
22+
import java.lang.reflect.Method;
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.function.Predicate;
26+
27+
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.api.extension.ExtendWith;
29+
import org.mockito.Mock;
30+
import org.mockito.junit.jupiter.MockitoExtension;
31+
32+
import org.springframework.beans.factory.BeanFactory;
33+
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
34+
import org.springframework.kafka.support.EndpointHandlerMethod;
35+
import org.springframework.kafka.support.TopicPartitionOffset;
36+
37+
/**
38+
* @author Tomaz Fernandes
39+
* @since 2.8.5
40+
*/
41+
@ExtendWith(MockitoExtension.class)
42+
class EndpointCustomizerFactoryTests {
43+
44+
@Mock
45+
private DestinationTopic.Properties properties;
46+
47+
@Mock
48+
private EndpointHandlerMethod beanMethod;
49+
50+
@Mock
51+
private BeanFactory beanFactory;
52+
53+
@Mock
54+
private RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;
55+
56+
@Mock
57+
private MethodKafkaListenerEndpoint<?, ?> endpoint;
58+
59+
private final String[] topics = {"myTopic1", "myTopic2"};
60+
61+
private final Method method = EndpointCustomizerFactory.class.getDeclaredMethods()[0];
62+
63+
@Test
64+
void shouldNotCustomizeEndpointForMainTopicWithTopics() {
65+
66+
given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
67+
given(endpoint.getTopics()).willReturn(Arrays.asList(topics));
68+
given(properties.suffix()).willReturn("");
69+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider =
70+
new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
71+
given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);
72+
73+
EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
74+
beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
75+
76+
List<EndpointCustomizer.TopicNamesHolder> holders =
77+
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint);
78+
79+
assertThat(holders).hasSize(2).element(0)
80+
.matches(assertMainTopic(0));
81+
assertThat(holders).element(1)
82+
.matches(assertMainTopic(1));
83+
84+
}
85+
86+
@Test
87+
void shouldNotCustomizeEndpointForMainTopicWithTPO() {
88+
89+
given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
90+
given(properties.isMainEndpoint()).willReturn(true);
91+
given(properties.suffix()).willReturn("");
92+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider =
93+
new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
94+
given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);
95+
96+
String testString = "testString";
97+
MethodKafkaListenerEndpoint<Object, Object> endpointTPO = new MethodKafkaListenerEndpoint<>();
98+
endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L),
99+
new TopicPartitionOffset(topics[1], 1, 1L));
100+
endpointTPO.setMethod(this.method);
101+
endpointTPO.setId(testString);
102+
endpointTPO.setClientIdPrefix(testString);
103+
endpointTPO.setGroup(testString);
104+
105+
EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
106+
beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
107+
108+
List<EndpointCustomizer.TopicNamesHolder> holders =
109+
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO);
110+
111+
assertThat(holders).hasSize(2).element(0)
112+
.matches(assertMainTopic(0));
113+
assertThat(holders).element(1)
114+
.matches(assertMainTopic(1));
115+
116+
assertThat(endpointTPO.getTopics())
117+
.isEmpty();
118+
119+
TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign();
120+
assertThat(topicPartitionsToAssign).hasSize(2);
121+
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[0],
122+
new TopicPartitionOffset(topics[0], 0, 0L))).isTrue();
123+
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[1],
124+
new TopicPartitionOffset(topics[1], 1, 1L))).isTrue();
125+
126+
}
127+
128+
private Predicate<EndpointCustomizer.TopicNamesHolder> assertMainTopic(int index) {
129+
return holder -> holder.getCustomizedTopic().equals(topics[index])
130+
&& holder.getMainTopic().equals(topics[index]);
131+
}
132+
133+
@Test
134+
void shouldCustomizeEndpointForRetryTopic() {
135+
136+
MethodKafkaListenerEndpoint<Object, Object> endpoint = new MethodKafkaListenerEndpoint<>();
137+
String testString = "testString";
138+
endpoint.setTopics(this.topics);
139+
endpoint.setMethod(this.method);
140+
endpoint.setId(testString);
141+
endpoint.setClientIdPrefix(testString);
142+
endpoint.setGroup(testString);
143+
144+
MethodKafkaListenerEndpoint<Object, Object> endpointTPO = new MethodKafkaListenerEndpoint<>();
145+
endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L),
146+
new TopicPartitionOffset(topics[1], 1, 1L));
147+
endpointTPO.setMethod(this.method);
148+
endpointTPO.setId(testString);
149+
endpointTPO.setClientIdPrefix(testString);
150+
endpointTPO.setGroup(testString);
151+
152+
String suffix = "-retry";
153+
given(beanMethod.resolveBean(this.beanFactory)).willReturn(method);
154+
given(properties.isMainEndpoint()).willReturn(false);
155+
given(properties.suffix()).willReturn(suffix);
156+
given(properties.numPartitions()).willReturn(2);
157+
158+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider =
159+
new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties);
160+
given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider);
161+
162+
EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod,
163+
beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer();
164+
165+
List<EndpointCustomizer.TopicNamesHolder> holders =
166+
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint);
167+
168+
String topic1WithSuffix = topics[0] + suffix;
169+
String topic2WithSuffix = topics[1] + suffix;
170+
assertThat(holders).hasSize(2).element(0)
171+
.matches(holder -> holder.getMainTopic().equals(topics[0])
172+
&& holder.getCustomizedTopic().equals(topic1WithSuffix));
173+
assertThat(holders).hasSize(2).element(1)
174+
.matches(holder -> holder.getMainTopic().equals(topics[1])
175+
&& holder.getCustomizedTopic().equals(topic2WithSuffix));
176+
177+
String testStringSuffix = testString + suffix;
178+
179+
assertThat(endpoint.getTopics())
180+
.contains(topic1WithSuffix, topic2WithSuffix);
181+
assertThat(endpoint.getId())
182+
.isEqualTo(testStringSuffix);
183+
assertThat(endpoint.getClientIdPrefix())
184+
.isEqualTo(testStringSuffix);
185+
assertThat(endpoint.getGroup())
186+
.isEqualTo(testStringSuffix);
187+
assertThat(endpoint.getTopicPartitionsToAssign()).isEmpty();
188+
189+
List<EndpointCustomizer.TopicNamesHolder> holdersTPO =
190+
(List<EndpointCustomizer.TopicNamesHolder>) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO);
191+
192+
assertThat(holdersTPO).hasSize(2).element(0)
193+
.matches(holder -> holder.getMainTopic().equals(topics[0])
194+
&& holder.getCustomizedTopic().equals(topic1WithSuffix));
195+
assertThat(holdersTPO).hasSize(2).element(1)
196+
.matches(holder -> holder.getMainTopic().equals(topics[1])
197+
&& holder.getCustomizedTopic().equals(topic2WithSuffix));
198+
199+
assertThat(endpointTPO.getTopics())
200+
.isEmpty();
201+
202+
TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign();
203+
assertThat(topicPartitionsToAssign).hasSize(2);
204+
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[0],
205+
new TopicPartitionOffset(topic1WithSuffix, 0, (Long) null))).isTrue();
206+
assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[1],
207+
new TopicPartitionOffset(topic2WithSuffix, 1, (Long) null))).isTrue();
208+
209+
assertThat(endpointTPO.getId())
210+
.isEqualTo(testStringSuffix);
211+
assertThat(endpointTPO.getClientIdPrefix())
212+
.isEqualTo(testStringSuffix);
213+
assertThat(endpointTPO.getGroup())
214+
.isEqualTo(testStringSuffix);
215+
}
216+
217+
private boolean equalsTopicPartitionOffset(TopicPartitionOffset tpo1, TopicPartitionOffset tpo2) {
218+
return tpo1.getTopicPartition().equals(tpo2.getTopicPartition()) &&
219+
((tpo1.getOffset() == null && tpo2.getOffset() == null) ||
220+
(tpo1.getOffset() != null && tpo1.getOffset().equals(tpo2.getOffset())));
221+
222+
}
223+
}

0 commit comments

Comments
 (0)