Skip to content

Commit e1160fa

Browse files
GH-2172: Expose Retry Topic Chain at Runtime (#2245)
* GH-2172: Expose Retry Topic Chain at Runtime Resolves #2172 * Add methods to `DestinationTopicContainer` interface * Reduce memory footprint of `DefaultDestinationTopicResolver` * Add documentation * Add to integration test
1 parent f200c03 commit e1160fa

File tree

5 files changed

+84
-17
lines changed

5 files changed

+84
-17
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,18 @@ protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
848848
----
849849
====
850850

851+
[[access-topic-info-runtime]]
852+
==== Accessing Topics' Information at Runtime
853+
854+
Since 2.9, you can access information regarding the topic chain at runtime by injecting the provided `DestinationTopicContainer` bean.
855+
This interface provides methods to look up the next topic in the chain or the DLT for a topic if configured, as well as useful properties such as the topic's name, delay and type.
856+
857+
As a real-world use-case example, you can use such information so a console application can resend a record from the DLT to the first retry topic in the chain after the cause of the failed processing, e.g. bug / inconsistent state, has been resolved.
858+
859+
IMPORTANT: The `DestinationTopic` provided by the `DestinationTopicContainer#getNextDestinationTopicFor()` method corresponds to the next topic registered in the chain for the input topic.
860+
The actual topic the message will be forwarded to may differ due to different factors such as exception classification, number of attempts or single-topic fixed-delay strategies.
861+
Use the `DestinationTopicResolver` interface if you need to weigh in these factors.
862+
851863
[[change-kboe-logging-level]]
852864
==== Changing KafkaBackOffException Logging Level
853865

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@
3434
import org.springframework.kafka.listener.ExceptionClassifier;
3535
import org.springframework.kafka.listener.ListenerExecutionFailedException;
3636
import org.springframework.kafka.listener.TimestampedException;
37+
import org.springframework.lang.Nullable;
3738

3839

3940
/**
4041
*
41-
* Default implementation of the DestinationTopicResolver interface.
42+
* Default implementation of the {@link DestinationTopicResolver} interface.
4243
* The container is closed when a {@link ContextRefreshedEvent} is received
4344
* and no more destinations can be added after that.
4445
*
@@ -58,8 +59,6 @@ public class DefaultDestinationTopicResolver extends ExceptionClassifier
5859

5960
private final Map<String, DestinationTopicHolder> sourceDestinationsHolderMap;
6061

61-
private final Map<String, DestinationTopic> destinationsTopicMap;
62-
6362
private final Clock clock;
6463

6564
private ApplicationContext applicationContext;
@@ -81,7 +80,6 @@ public DefaultDestinationTopicResolver(Clock clock, ApplicationContext applicati
8180
public DefaultDestinationTopicResolver(Clock clock) {
8281
this.clock = clock;
8382
this.sourceDestinationsHolderMap = new HashMap<>();
84-
this.destinationsTopicMap = new HashMap<>();
8583
this.contextRefreshed = false;
8684
}
8785

@@ -103,7 +101,7 @@ public DestinationTopic resolveDestinationTopic(String topic, Integer attempt, E
103101
&& isNotFatalException(e)
104102
&& !isPastTimout(originalTimestamp, destinationTopicHolder)
105103
? resolveRetryDestination(destinationTopicHolder)
106-
: resolveDltOrNoOpsDestination(topic);
104+
: getDltOrNoOpsDestination(topic);
107105
}
108106

109107
private Boolean isNotFatalException(Exception e) {
@@ -140,18 +138,28 @@ private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinat
140138

141139
@Override
142140
public DestinationTopic getDestinationTopicByName(String topic) {
143-
return Objects.requireNonNull(this.destinationsTopicMap.get(topic),
144-
() -> "No topic found for " + topic);
141+
return Objects.requireNonNull(this.sourceDestinationsHolderMap.get(topic),
142+
() -> "No DestinationTopic found for " + topic).getSourceDestination();
143+
}
144+
145+
@Nullable
146+
@Override
147+
public DestinationTopic getDltFor(String topicName) {
148+
DestinationTopic destination = getDltOrNoOpsDestination(topicName);
149+
return destination.isNoOpsTopic()
150+
? null
151+
: destination;
145152
}
146153

147-
private DestinationTopic resolveDltOrNoOpsDestination(String topic) {
148-
DestinationTopic destination = getDestinationFor(topic);
154+
private DestinationTopic getDltOrNoOpsDestination(String topic) {
155+
DestinationTopic destination = getNextDestinationTopicFor(topic);
149156
return destination.isDltTopic() || destination.isNoOpsTopic()
150157
? destination
151-
: resolveDltOrNoOpsDestination(destination.getDestinationName());
158+
: getDltOrNoOpsDestination(destination.getDestinationName());
152159
}
153160

154-
private DestinationTopic getDestinationFor(String topic) {
161+
@Override
162+
public DestinationTopic getNextDestinationTopicFor(String topic) {
155163
return getDestinationHolderFor(topic).getNextDestination();
156164
}
157165

@@ -179,9 +187,6 @@ public void addDestinationTopics(List<DestinationTopic> destinationsToAdd) {
179187
+ DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed.");
180188
}
181189
synchronized (this.sourceDestinationsHolderMap) {
182-
this.destinationsTopicMap.putAll(destinationsToAdd
183-
.stream()
184-
.collect(Collectors.toMap(destination -> destination.getDestinationName(), destination -> destination)));
185190
this.sourceDestinationsHolderMap.putAll(correlatePairSourceAndDestinationValues(destinationsToAdd));
186191
}
187192
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
1818

1919
import java.util.List;
2020

21+
import org.springframework.lang.Nullable;
22+
2123
/**
2224
*
2325
* Provides methods to store and retrieve {@link DestinationTopic} instances.
@@ -34,10 +36,33 @@ public interface DestinationTopicContainer {
3436
void addDestinationTopics(List<DestinationTopic> destinationTopics);
3537

3638
/**
37-
* Returns the DestinationTopic instance registered for that topic.
39+
* Returns the {@link DestinationTopic} instance registered for that topic.
3840
* @param topicName the topic name of the DestinationTopic to be returned.
3941
* @return the DestinationTopic instance registered for that topic.
4042
*/
4143
DestinationTopic getDestinationTopicByName(String topicName);
4244

45+
/**
46+
* Returns the {@link DestinationTopic} instance registered as the next
47+
* destination topic in the chain for the given topic.
48+
* Note that this might not correspond to the actual next topic a message will
49+
* be forwarded to, since that depends on different factors.
50+
*
51+
* If you need to find out the exact next topic for a message use the
52+
* {@link DestinationTopicResolver#resolveDestinationTopic(String, Integer, Exception, long)}
53+
* method instead.
54+
* @param topicName the topic name of the DestinationTopic to be returned.
55+
* @return the next DestinationTopic in the chain registered for that topic.
56+
*/
57+
DestinationTopic getNextDestinationTopicFor(String topicName);
58+
59+
/**
60+
* Returns the {@link DestinationTopic} instance registered as
61+
* DLT for the given topic, or null if none is found.
62+
* @param topicName the topic name for which to look the DLT for
63+
* @return The {@link DestinationTopic} instance corresponding to the DLT.
64+
*/
65+
@Nullable
66+
DestinationTopic getDltFor(String topicName);
67+
4368
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,26 @@ void shouldResolveDltDestinationForExpiredTimeout() {
159159
1, new IllegalArgumentException(), timestampInThePastToForceTimeout)).isEqualTo(dltDestinationTopic2);
160160
}
161161

162+
@Test
163+
void shouldGetDestinationTopic() {
164+
assertThat(defaultDestinationTopicContainer
165+
.getDestinationTopicByName(mainDestinationTopic.getDestinationName())).isEqualTo(mainDestinationTopic);
166+
}
167+
168+
@Test
169+
void shouldGetNextDestinationTopic() {
170+
assertThat(defaultDestinationTopicContainer
171+
.getNextDestinationTopicFor(mainDestinationTopic.getDestinationName()))
172+
.isEqualTo(firstRetryDestinationTopic);
173+
}
174+
175+
@Test
176+
void shouldGetDlt() {
177+
assertThat(defaultDestinationTopicContainer
178+
.getDltFor(mainDestinationTopic.getDestinationName()))
179+
.isEqualTo(dltDestinationTopic);
180+
}
181+
162182
@Test
163183
void shouldThrowIfNoDestinationFound() {
164184
assertThatNullPointerException().isThrownBy(() -> defaultDestinationTopicContainer.resolveDestinationTopic("Non-existing-topic", 0,

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,15 @@ public class RetryTopicIntegrationTests {
107107
@Autowired
108108
private CountDownLatchContainer latchContainer;
109109

110+
@Autowired
111+
DestinationTopicContainer topicContainer;
112+
110113
@Test
111114
void shouldRetryFirstTopic() {
112115
logger.debug("Sending message to topic " + FIRST_TOPIC);
113116
kafkaTemplate.send(FIRST_TOPIC, "Testing topic 1");
117+
assertThat(topicContainer.getNextDestinationTopicFor(FIRST_TOPIC).getDestinationName())
118+
.isEqualTo("myRetryTopic1-retry");
114119
assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue();
115120
assertThat(awaitLatch(latchContainer.customDltCountdownLatch)).isTrue();
116121
assertThat(awaitLatch(latchContainer.customErrorHandlerCountdownLatch)).isTrue();
@@ -173,7 +178,7 @@ private boolean awaitLatch(CountDownLatch latch) {
173178
static class FirstTopicListener {
174179

175180
@Autowired
176-
DestinationTopicResolver resolver;
181+
DestinationTopicContainer topicContainer;
177182

178183
@Autowired
179184
CountDownLatchContainer container;

0 commit comments

Comments
 (0)