diff --git a/spring-kafka-docs/src/main/asciidoc/index.adoc b/spring-kafka-docs/src/main/asciidoc/index.adoc index 6226ded78c..73e74ccbc8 100644 --- a/spring-kafka-docs/src/main/asciidoc/index.adoc +++ b/spring-kafka-docs/src/main/asciidoc/index.adoc @@ -17,7 +17,7 @@ ifdef::backend-pdf[] NOTE: This documentation is also available as https://docs.spring.io/spring-kafka/docs/{project-version}/reference/html/index.html[HTML]. endif::[] -(C) 2016 - 2022 VMware, Inc. +(C) 2016 - 2023 VMware, Inc. Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically. diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 46f18707a5..4d7a081a53 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -146,7 +146,8 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate>, to handle deserilialization exceptions, it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions. The generic value type of the template should be `Object`. @@ -401,39 +402,6 @@ If your back off policy requires delays with values bigger than that, adjust the IMPORTANT: The first attempt counts against `maxAttempts`, so if you provide a `maxAttempts` value of 4 there'll be the original attempt plus 3 retries. -===== Single Topic Fixed Delay Retries - -If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries. -This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended. - -==== -[source, java] ----- -@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) -@KafkaListener(topics = "my-annotated-topic") -public void processMessage(MyPojo message) { - // ... message processing -} ----- -==== - -==== -[source, java] ----- -@Bean -public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { - return RetryTopicConfigurationBuilder - .newInstance() - .fixedBackoff(3000) - .maxAttempts(5) - .useSingleTopicForFixedDelays() - .create(template); -} ----- -==== - -NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ... - ===== Global timeout You can set the global timeout for the retrying process. @@ -693,6 +661,12 @@ Examples: "my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix". +NOTE: The default behavior is to create separate retry topics for each attempt, appended with an index value: retry-0, retry-1, ..., retry-n. +Therefore, by default the number of retry topics is the configured `maxAttempts` minus 1. + +You can <>, choose whether to append <>, use a <>, and use a <> when using exponential backoffs. + +[[retry-topics-and-dlt-suffixes]] ===== Retry Topics and Dlt Suffixes You can specify the suffixes that will be used by the retry and dlt topics. @@ -724,6 +698,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate t NOTE: The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively. +[[append-index-or-delay]] ===== Appending the Topic's Index or Delay You can either append the topic's index or delay values after the suffix. @@ -754,9 +729,108 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate templa NOTE: The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic's index. +[[single-topic-fixed-delay]] +===== Single Topic for Fixed Delay Retries + +If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries. +This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended. + +NOTE: `FixedDelayStrategy` is now deprecated, and will be replaced by `SameIntervalTopicReuseStrategy` in a future release. + +==== +[source, java] +---- +@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) +@KafkaListener(topics = "my-annotated-topic") +public void processMessage(MyPojo message) { + // ... message processing +} +---- +==== + +==== +[source, java] +---- +@Bean +public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackoff(3000) + .maxAttempts(5) + .useSingleTopicForFixedDelays() + .create(template); +} +---- +==== + +NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ... + + +[[single-topic-maxinterval-delay]] +===== Single Topic for maxInterval Exponential Delay + +If you're using exponential backoff policy (`ExponentialBackOffPolicy`), you can use a single retry topic to accomplish the non-blocking retries of the attempts whose delays are the configured `maxInterval`. + +This "final" retry topic will be suffixed with the provided or default suffix, and will have either the index or the `maxInterval` value appended. + +NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics. + +The default behavior is to work with the number of retry topics equal to the configured `maxAttempts` minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index. + +For instance, when configuring the exponential backoff with `initialInterval=1000`, `multiplier=2`, and `maxInterval=16000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be: + +* -retry-1000 +* -retry-2000 +* -retry-4000 +* -retry-8000 +* -retry-16000-0 +* -retry-16000-1 +* -retry-16000-2 +* ... +* -retry-16000-224 + +When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be: + +* -retry-1000 +* -retry-2000 +* -retry-4000 +* -retry-8000 +* -retry-16000 + +This will be the default in a future release. + +==== +[source, java] +---- +@RetryableTopic(attempts = 230, + backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 16000), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC) +@KafkaListener(topics = "my-annotated-topic") +public void processMessage(MyPojo message) { + // ... message processing +} +---- +==== + +==== +[source, java] +---- +@Bean +public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .exponentialBackoff(1000, 2, 16000) + .maxAttempts(230) + .useSingleTopicForSameIntervals() + .create(template); +} +---- +==== + ===== Custom naming strategies -More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way: +More complex naming strategies can be accomplished by registering a bean that implements `RetryTopicNamesProviderFactory`. +The default implementation is `SuffixingRetryTopicNamesProviderFactory` and a different implementation can be registered in the following way: ==== [source, java] @@ -836,7 +910,9 @@ The framework will configure and use a separate set of retry topics for each lis ==== Dlt Strategies -The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails. +The framework provides a few strategies for working with DLTs. +You can provide a method for DLT processing, use the default logging method, or have no DLT at all. +Also you can choose what happens if DLT processing fails. ===== Dlt Processing Method @@ -1045,7 +1121,8 @@ Use the `DestinationTopicResolver` interface if you need to weigh in these facto [[change-kboe-logging-level]] ==== Changing KafkaBackOffException Logging Level -When a message in the retry topic is not due for consumption, a `KafkaBackOffException` is thrown. Such exceptions are logged by default at `DEBUG` level, but you can change this behavior by setting an error handler customizer in the `ListenerContainerFactoryConfigurer` in a `@Configuration` class. +When a message in the retry topic is not due for consumption, a `KafkaBackOffException` is thrown. +Such exceptions are logged by default at `DEBUG` level, but you can change this behavior by setting an error handler customizer in the `ListenerContainerFactoryConfigurer` in a `@Configuration` class. For example, to change the logging level to WARN you might add: diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index ab40042188..e0137c2589 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,8 @@ import java.lang.annotation.Target; import org.springframework.kafka.retrytopic.DltStrategy; -import org.springframework.kafka.retrytopic.FixedDelayStrategy; import org.springframework.kafka.retrytopic.RetryTopicConstants; +import org.springframework.kafka.retrytopic.SameIntervalTopicReuseStrategy; import org.springframework.kafka.retrytopic.TopicSuffixingStrategy; import org.springframework.retry.annotation.Backoff; @@ -38,6 +38,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Fabio da Silva Jr. + * @author João Lima * @since 2.7 * * @see org.springframework.kafka.retrytopic.RetryTopicConfigurer @@ -177,6 +178,18 @@ */ TopicSuffixingStrategy topicSuffixingStrategy() default TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE; + + /** + * Topic reuse strategy for sequential attempts made with a same backoff interval. + * + *

Note: for fixed backoffs, when this is configured as + * {@link SameIntervalTopicReuseStrategy#SINGLE_TOPIC}, it has precedence over + * the configuration in {@link #fixedDelayTopicStrategy()}. + * @return the strategy. + * @since 3.0.4 + */ + SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; + /** * Whether or not create a DLT, and redeliver to the DLT if delivery fails or just give up. * @return the dlt strategy. @@ -186,8 +199,10 @@ /** * Whether to use a single or multiple topics when using a fixed delay. * @return the fixed delay strategy. + * @deprecated in favor of {@link #sameIntervalTopicReuseStrategy()}. */ - FixedDelayStrategy fixedDelayTopicStrategy() default FixedDelayStrategy.MULTIPLE_TOPICS; + @Deprecated + org.springframework.kafka.retrytopic.FixedDelayStrategy fixedDelayTopicStrategy() default org.springframework.kafka.retrytopic.FixedDelayStrategy.MULTIPLE_TOPICS; /** * Override the container factory's {@code autoStartup} property for just the DLT container. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index b5b4186d28..d10032c484 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,6 +101,7 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpression this.expressionContext = expressionContext; } + @SuppressWarnings("deprecation") public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation, Object bean) { @@ -146,6 +147,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .dltProcessingFailureStrategy(annotation.dltStrategy()) .autoStartDltHandler(autoStartDlt) .setTopicSuffixingStrategy(annotation.topicSuffixingStrategy()) + .sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy()) .timeoutAfter(timeout) .create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index 06587174d0..5786bc658b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.springframework.kafka.listener.ExceptionClassifier; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.listener.TimestampedException; +import org.springframework.kafka.retrytopic.DestinationTopic.Type; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -132,8 +133,10 @@ && isNotFatalException(e) : destinationTopicHolder.getNextDestination(); } + @SuppressWarnings("deprecation") private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) { - return destinationTopicHolder.getSourceDestination().isSingleTopicRetry() + return ((destinationTopicHolder.getSourceDestination().isReusableRetryTopic()) || + (destinationTopicHolder.getSourceDestination().isSingleTopicRetry())) ? destinationTopicHolder.getSourceDestination() : destinationTopicHolder.getNextDestination(); } @@ -192,6 +195,7 @@ public void addDestinationTopics(String mainListenerId, List d throw new IllegalStateException("Cannot add new destinations, " + DefaultDestinationTopicResolver.class.getSimpleName() + " is already refreshed."); } + validateDestinations(destinationsToAdd); synchronized (this.sourceDestinationsHolderMap) { Map map = this.sourceDestinationsHolderMap.computeIfAbsent(mainListenerId, id -> new HashMap<>()); @@ -199,6 +203,18 @@ public void addDestinationTopics(String mainListenerId, List d } } + private void validateDestinations(List destinationsToAdd) { + for (int i = 0; i < destinationsToAdd.size(); i++) { + DestinationTopic destination = destinationsToAdd.get(i); + if (destination.isReusableRetryTopic()) { + Assert.isTrue((i == (destinationsToAdd.size() - 1) || + ((i == (destinationsToAdd.size() - 2)) && (destinationsToAdd.get(i + 1).isDltTopic()))), + String.format("In the destination topic chain, the type %s can only be " + + "specified as the last retry topic.", Type.REUSABLE_RETRY_TOPIC)); + } + } + } + private Map correlatePairSourceAndDestinationValues( List destinationList) { return IntStream diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index c34c1bf6ac..18ffa3f67f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -68,6 +68,18 @@ public boolean isNoOpsTopic() { return Type.NO_OPS.equals(this.properties.type); } + public boolean isReusableRetryTopic() { + return Type.REUSABLE_RETRY_TOPIC.equals(this.properties.type); + } + + /** + * Whether this is a single retry topic. + * + * @return whether this is a single retry topic. + * @deprecated in favor of using {@link DestinationTopic.Type#REUSABLE_RETRY_TOPIC} + * and {@link #isReusableRetryTopic()}. + */ + @Deprecated public boolean isSingleTopicRetry() { return Type.SINGLE_TOPIC_RETRY.equals(this.properties.type); } @@ -213,7 +225,8 @@ public boolean isDltTopic() { } public boolean isRetryTopic() { - return Type.RETRY.equals(this.type) || Type.SINGLE_TOPIC_RETRY.equals(this.type); + return Type.RETRY.equals(this.type) || Type.SINGLE_TOPIC_RETRY.equals(this.type) + || Type.REUSABLE_RETRY_TOPIC.equals(this.type); } public String suffix() { @@ -284,6 +297,28 @@ public boolean isMainEndpoint() { } enum Type { - MAIN, RETRY, SINGLE_TOPIC_RETRY, DLT, NO_OPS + MAIN, + + RETRY, + + /** + * A single retry topic for all retries. + * + * @deprecated Use {@code REUSABLE_RETRY_TOPIC} instead. + */ + @Deprecated + SINGLE_TOPIC_RETRY, + + /** + * A retry topic reused along sequential retries + * with the same backoff interval. + * + * @since 3.0.4 + */ + REUSABLE_RETRY_TOPIC, + + DLT, + + NO_OPS } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java index 327d840323..f03a0b8f8e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.retrytopic.DestinationTopic.Type; import org.springframework.lang.Nullable; import org.springframework.util.StringUtils; @@ -34,6 +35,7 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author João Lima * @since 2.7 * */ @@ -53,23 +55,28 @@ public class DestinationTopicPropertiesFactory { private final KafkaOperations kafkaOperations; + @SuppressWarnings("deprecation") private final FixedDelayStrategy fixedDelayStrategy; private final DltStrategy dltStrategy; private final TopicSuffixingStrategy topicSuffixingStrategy; + private final SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy; + private final long timeout; @Nullable private Boolean autoStartDltHandler; + @SuppressWarnings("deprecation") public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, BinaryExceptionClassifier exceptionClassifier, int numPartitions, KafkaOperations kafkaOperations, FixedDelayStrategy fixedDelayStrategy, DltStrategy dltStrategy, TopicSuffixingStrategy topicSuffixingStrategy, + SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy, long timeout) { this.dltStrategy = dltStrategy; @@ -78,6 +85,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.numPartitions = numPartitions; this.fixedDelayStrategy = fixedDelayStrategy; this.topicSuffixingStrategy = topicSuffixingStrategy; + this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy; this.timeout = timeout; this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix); this.backOffValues = backOffValues; @@ -85,6 +93,18 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff this.maxAttempts = this.backOffValues.size() + 1; } + @SuppressWarnings("deprecation") + public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List backOffValues, + BinaryExceptionClassifier exceptionClassifier, + int numPartitions, KafkaOperations kafkaOperations, + FixedDelayStrategy fixedDelayStrategy, + DltStrategy dltStrategy, + TopicSuffixingStrategy topicSuffixingStrategy, + long timeout) { + this(retryTopicSuffix, dltSuffix, backOffValues, exceptionClassifier, numPartitions, kafkaOperations, + fixedDelayStrategy, dltStrategy, topicSuffixingStrategy, SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, + timeout); + } /** * Set to false to not start the DLT handler. * @param autoStart false to not start. @@ -105,38 +125,64 @@ public List createProperties() { private List createPropertiesForFixedDelaySingleTopic() { return isNoDltStrategy() ? Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, DestinationTopic.Type.SINGLE_TOPIC_RETRY, getShouldRetryOn())) + createRetryProperties(1, getShouldRetryOn())) : Arrays.asList(createMainTopicProperties(), - createRetryProperties(1, DestinationTopic.Type.SINGLE_TOPIC_RETRY, getShouldRetryOn()), + createRetryProperties(1, getShouldRetryOn()), createDltProperties()); } private boolean isSingleTopicFixedDelay() { - return isFixedDelay() && isSingleTopicStrategy(); + return isFixedDelay() && (isSingleTopicStrategy() || isSingleTopicSameIntervalTopicReuseStrategy()); } + @SuppressWarnings("deprecation") private boolean isSingleTopicStrategy() { return FixedDelayStrategy.SINGLE_TOPIC.equals(this.fixedDelayStrategy); } + private boolean isSingleTopicSameIntervalTopicReuseStrategy() { + return SameIntervalTopicReuseStrategy.SINGLE_TOPIC.equals(this.sameIntervalTopicReuseStrategy); + } + private List createPropertiesForDefaultTopicStrategy() { + + int retryTopicsAmount = retryTopicsAmount(); + return IntStream.rangeClosed(0, isNoDltStrategy() - ? this.maxAttempts - 1 - : this.maxAttempts) - .mapToObj(this::createRetryOrDltTopicSuffixes) - .collect(Collectors.toList()); + ? retryTopicsAmount + : retryTopicsAmount + 1) + .mapToObj(this::createTopicProperties) + .collect(Collectors.toList()); + } + + int retryTopicsAmount() { + return this.backOffValues.size() - reusableTopicAttempts(); + } + + private int reusableTopicAttempts() { + return this.backOffValues.size() > 0 + ? !isFixedDelay() + ? isSingleTopicSameIntervalTopicReuseStrategy() + // Assuming that duplicates are always in + // the end of the list. + ? amountOfDuplicates(this.backOffValues.get(this.backOffValues.size() - 1)) - 1 + : 0 + : isSingleTopicStrategy() + ? this.backOffValues.size() - 1 + : 0 + : 0; } private boolean isNoDltStrategy() { return DltStrategy.NO_DLT.equals(this.dltStrategy); } - private DestinationTopic.Properties createRetryOrDltTopicSuffixes(int index) { + private DestinationTopic.Properties createTopicProperties(int index) { BiPredicate shouldRetryOn = getShouldRetryOn(); return index == 0 ? createMainTopicProperties() - : index < this.maxAttempts - ? createRetryProperties(index, DestinationTopic.Type.RETRY, shouldRetryOn) + : (index <= this.retryTopicsAmount()) + ? createRetryProperties(index, shouldRetryOn) : createDltProperties(); } @@ -155,12 +201,17 @@ private BiPredicate getShouldRetryOn() { return (attempt, throwable) -> attempt < this.maxAttempts && this.exceptionClassifier.classify(throwable); } + @SuppressWarnings("deprecation") private DestinationTopic.Properties createRetryProperties(int index, - DestinationTopic.Type topicType, BiPredicate shouldRetryOn) { int indexInBackoffValues = index - 1; Long thisBackOffValue = this.backOffValues.get(indexInBackoffValues); - return createProperties(topicType, shouldRetryOn, indexInBackoffValues, + DestinationTopic.Type topicTypeToUse = isSingleTopicFixedDelay() + ? Type.SINGLE_TOPIC_RETRY + : isDelayWithReusedTopic(thisBackOffValue) + ? Type.REUSABLE_RETRY_TOPIC + : Type.RETRY; + return createProperties(topicTypeToUse, shouldRetryOn, indexInBackoffValues, getTopicSuffix(indexInBackoffValues, thisBackOffValue)); } @@ -171,10 +222,21 @@ private String getTopicSuffix(int indexInBackoffValues, Long thisBackOffValue) { ? joinWithRetrySuffix(indexInBackoffValues) : hasDuplicates(thisBackOffValue) ? joinWithRetrySuffix(thisBackOffValue) - .concat("-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue)) + .concat(suffixForRepeatedInterval(indexInBackoffValues, thisBackOffValue)) : joinWithRetrySuffix(thisBackOffValue); } + private String suffixForRepeatedInterval(int indexInBackoffValues, Long thisBackOffValue) { + return isSingleTopicSameIntervalTopicReuseStrategy() + ? "" + : "-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue); + } + + private boolean isDelayWithReusedTopic(Long backoffValue) { + return ((isSingleTopicFixedDelay()) || + (hasDuplicates(backoffValue) && isSingleTopicSameIntervalTopicReuseStrategy())); + } + private int getIndexInBackoffValues(int indexInBackoffValues, Long thisBackOffValue) { return indexInBackoffValues - this.backOffValues.indexOf(thisBackOffValue); } @@ -184,11 +246,14 @@ private boolean isSuffixWithIndexStrategy() { } private boolean hasDuplicates(Long thisBackOffValue) { - return this - .backOffValues + return amountOfDuplicates(thisBackOffValue) > 1; + } + + private int amountOfDuplicates(Long thisBackOffValue) { + return Long.valueOf(this.backOffValues .stream() .filter(value -> value.equals(thisBackOffValue)) - .count() > 1; + .count()).intValue(); } private DestinationTopic.Properties createProperties(DestinationTopic.Type topicType, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java index 1dce512510..e256ad1a5b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/FixedDelayStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,10 @@ * * @author Tomaz Fernandes * @since 2.7 + * @deprecated in favor of {@link SameIntervalTopicReuseStrategy}. * */ +@Deprecated public enum FixedDelayStrategy { /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 188273d1a0..fec32a7154 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,6 +71,7 @@ public class RetryTopicConfigurationBuilder { @Nullable private BinaryExceptionClassifierBuilder classifierBuilder; + @SuppressWarnings("deprecation") private FixedDelayStrategy fixedDelayStrategy = FixedDelayStrategy.MULTIPLE_TOPICS; private DltStrategy dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR; @@ -79,6 +80,8 @@ public class RetryTopicConfigurationBuilder { private TopicSuffixingStrategy topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE; + private SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; + @Nullable private Boolean autoStartDltHandler; @@ -241,6 +244,41 @@ public RetryTopicConfigurationBuilder setTopicSuffixingStrategy(TopicSuffixingSt return this; } + /** + * Configure the {@link SameIntervalTopicReuseStrategy}. + * + *

Note: for fixed backoffs, when this is configured as + * {@link SameIntervalTopicReuseStrategy#SINGLE_TOPIC}, it has precedence over + * the configuration done through + * {@link #useSingleTopicForFixedDelays(FixedDelayStrategy)}. + * @param sameIntervalTopicReuseStrategy the strategy. + * @return the builder. + * @since 3.0.4 + */ + public RetryTopicConfigurationBuilder sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy) { + this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy; + return this; + } + + /** + * Configure the use of a single retry topic + * for the attempts that have the same backoff interval + * (as long as these attempts are in the end of the chain). + * + * Currently used only for the last retries of exponential backoff, + * and in a future release this will dictate whether to use + * a single retry topic for fixed backoff. + * + * @return the builder. + * @since 3.0.4 + * @see SameIntervalTopicReuseStrategy + * + */ + public RetryTopicConfigurationBuilder useSingleTopicForSameIntervals() { + this.sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC; + return this; + } + /* ---------------- Configure BackOff -------------- */ /** @@ -374,8 +412,10 @@ public RetryTopicConfigurationBuilder fixedBackOff(int interval) { /** * Configure the use of a single retry topic with fixed delays. * @return the builder. + * @deprecated in favor of {@link #useSingleTopicForSameIntervals()}. * @see FixedDelayStrategy#SINGLE_TOPIC */ + @Deprecated public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays() { this.fixedDelayStrategy = FixedDelayStrategy.SINGLE_TOPIC; return this; @@ -386,7 +426,10 @@ public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays() { * {@link FixedDelayStrategy#MULTIPLE_TOPICS}. * @param delayStrategy the delay strategy. * @return the builder. + * @deprecated in favor of + * {@link #sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy)}. */ + @Deprecated public RetryTopicConfigurationBuilder useSingleTopicForFixedDelays(FixedDelayStrategy delayStrategy) { this.fixedDelayStrategy = delayStrategy; return this; @@ -553,7 +596,7 @@ public RetryTopicConfiguration create(KafkaOperations sendToTopicKafkaTemp new DestinationTopicPropertiesFactory(this.retryTopicSuffix, this.dltSuffix, backOffValues, buildClassifier(), this.topicCreationConfiguration.getNumPartitions(), sendToTopicKafkaTemplate, this.fixedDelayStrategy, this.dltStrategy, - this.topicSuffixingStrategy, this.timeout) + this.topicSuffixingStrategy, this.sameIntervalTopicReuseStrategy, this.timeout) .autoStartDltHandler(this.autoStartDltHandler) .createProperties(); return new RetryTopicConfiguration(destinationTopicProperties, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java new file mode 100644 index 0000000000..8a066cdb3c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/SameIntervalTopicReuseStrategy.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +/** + * + * Strategy for topic reuse when multiple, sequential retries have the same backoff + * interval. + * + *

+ * It can be used only when the retries that have the same interval are located + * in the end of the retry chain (it cannot be used for retries with the same + * interval in the middle of the retry chain). + * + * @author João Lima + * @since 3.0.4 + * + */ +public enum SameIntervalTopicReuseStrategy { + + /** + * Uses a single retry topic for sequential attempts with the same interval. + */ + SINGLE_TOPIC, + + /** + * Uses one separate topic per retry attempt. + */ + MULTIPLE_TOPICS + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java index bf691d2581..ea1fa3334b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java @@ -17,12 +17,15 @@ package org.springframework.kafka.retrytopic; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.assertj.core.api.Assertions.assertThatNullPointerException; import java.time.Clock; import java.time.Instant; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -34,6 +37,7 @@ import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.listener.TimestampedException; +import org.springframework.kafka.retrytopic.DestinationTopic.Type; import org.springframework.kafka.support.converter.ConversionException; /** @@ -57,8 +61,6 @@ class DefaultDestinationTopicResolverTests extends DestinationTopicTests { private final long originalTimestamp = Instant.now(this.clock).toEpochMilli(); - private final long failureTimestamp = Instant.now(this.clock).plusMillis(500).toEpochMilli(); - @BeforeEach public void setup() { @@ -67,6 +69,8 @@ public void setup() { defaultDestinationTopicContainer.addDestinationTopics("id", allFirstDestinationsTopics); defaultDestinationTopicContainer.addDestinationTopics("id", allSecondDestinationTopics); defaultDestinationTopicContainer.addDestinationTopics("id", allThirdDestinationTopics); + defaultDestinationTopicContainer.addDestinationTopics("id", allFourthDestinationTopics); + defaultDestinationTopicContainer.addDestinationTopics("id", allFifthDestinationTopics); } @@ -97,6 +101,30 @@ void shouldResolveRetryDestination() { assertThat(defaultDestinationTopicContainer .resolveDestinationTopic("id", dltDestinationTopic2.getDestinationName(), 1, new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic2); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", mainDestinationTopic4.getDestinationName(), 1, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(singleFixedRetryDestinationTopic4); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", singleFixedRetryDestinationTopic4.getDestinationName(), maxAttempts - 1, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(singleFixedRetryDestinationTopic4); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", singleFixedRetryDestinationTopic4.getDestinationName(), maxAttempts, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic4); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", mainDestinationTopic5.getDestinationName(), 1, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(reusableRetryDestinationTopic5); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", reusableRetryDestinationTopic5.getDestinationName(), maxAttempts - 1, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(reusableRetryDestinationTopic5); + + assertThat(defaultDestinationTopicContainer + .resolveDestinationTopic("id", reusableRetryDestinationTopic5.getDestinationName(), maxAttempts, + new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic5); } @Test @@ -190,6 +218,20 @@ void shouldThrowIfNoDestinationFound() { new IllegalArgumentException(), originalTimestamp)); } + @Test + void shouldThrowIfMultipleReusableRetryTopicsAdded() { + DefaultDestinationTopicResolver destinationResolver = new DefaultDestinationTopicResolver(clock); + destinationResolver.setApplicationContext(applicationContext); + destinationResolver.addDestinationTopics("id", allFirstDestinationsTopics); + + List destinationTopics = Arrays + .asList(mainDestinationTopic5, reusableRetryDestinationTopic5, reusableRetryDestinationTopic5, dltDestinationTopic5); + + assertThatIllegalArgumentException().isThrownBy( + () -> destinationResolver.addDestinationTopics("id", destinationTopics)) + .withMessageMatching(String.format(".*%s.*last retry topic.*", Type.REUSABLE_RETRY_TOPIC)); + } + @Test void shouldResolveNoOpsIfDltAndNotRetryable() { assertThat(defaultDestinationTopicContainer @@ -197,10 +239,6 @@ void shouldResolveNoOpsIfDltAndNotRetryable() { new RuntimeException(), originalTimestamp)).isEqualTo(noOpsDestinationTopic3); } - private long getExpectedNextExecutionTime(DestinationTopic destinationTopic) { - return failureTimestamp + destinationTopic.getDestinationDelay(); - } - @Test void shouldThrowIfAddsDestinationsAfterClosed() { defaultDestinationTopicContainer diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java index 7e47e0aa7e..2e08aaeb91 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactoryTests.java @@ -51,15 +51,22 @@ class DestinationTopicPropertiesFactoryTests { private final int numPartitions = 0; + @SuppressWarnings("deprecation") private final FixedDelayStrategy fixedDelayStrategy = FixedDelayStrategy.SINGLE_TOPIC; - private final TopicSuffixingStrategy defaultTopicSuffixingStrategy = + private final TopicSuffixingStrategy suffixWithDelayValueSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE; private final TopicSuffixingStrategy suffixWithIndexTopicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE; + private final SameIntervalTopicReuseStrategy multipleTopicsSameIntervalReuseStrategy = + SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; + + private final SameIntervalTopicReuseStrategy singleTopicSameIntervalReuseStrategy = + SameIntervalTopicReuseStrategy.SINGLE_TOPIC; + private final DltStrategy dltStrategy = DltStrategy.FAIL_ON_ERROR; @@ -88,8 +95,8 @@ void shouldCreateMainAndDltProperties() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, fixedDelayStrategy, - dltStrategy, defaultTopicSuffixingStrategy, RetryTopicConstants.NOT_SET) - .createProperties(); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + RetryTopicConstants.NOT_SET).createProperties(); // then assertThat(propertiesList.size() == 2).isTrue(); @@ -121,6 +128,7 @@ private void assertDltTopic(DestinationTopic.Properties dltProperties) { } @Test + @SuppressWarnings("deprecation") void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { // when ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); @@ -133,8 +141,8 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, fixedDelayStrategy, - dltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, RetryTopicConstants.NOT_SET) - .createProperties(); + dltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, + multipleTopicsSameIntervalReuseStrategy, RetryTopicConstants.NOT_SET).createProperties(); List destinationTopicList = propertiesList .stream() @@ -148,6 +156,8 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { assertThat(firstRetryProperties.isDltTopic()).isFalse(); assertThat(firstRetryProperties.isRetryTopic()).isTrue(); DestinationTopic firstRetryDestinationTopic = destinationTopicList.get(1); + assertThat(firstRetryDestinationTopic.isSingleTopicRetry()).isFalse(); + assertThat(firstRetryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(firstRetryDestinationTopic.getDestinationDelay()).isEqualTo(1000); assertThat(firstRetryDestinationTopic.getDestinationPartitions()).isEqualTo(numPartitions); assertThat(firstRetryDestinationTopic.shouldRetryOn(0, new IllegalArgumentException())).isTrue(); @@ -159,6 +169,8 @@ void shouldCreateTwoRetryPropertiesForMultipleBackoffValues() { assertThat(secondRetryProperties.isDltTopic()).isFalse(); assertThat(secondRetryProperties.isRetryTopic()).isTrue(); DestinationTopic secondRetryDestinationTopic = destinationTopicList.get(2); + assertThat(secondRetryDestinationTopic.isSingleTopicRetry()).isFalse(); + assertThat(secondRetryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(secondRetryDestinationTopic.getDestinationDelay()).isEqualTo(2000); assertThat(secondRetryDestinationTopic.getDestinationPartitions()).isEqualTo(numPartitions); assertThat(secondRetryDestinationTopic.shouldRetryOn(0, new IllegalArgumentException())).isTrue(); @@ -180,9 +192,32 @@ void shouldNotCreateDltProperties() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, fixedDelayStrategy, - noDltStrategy, TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, RetryTopicConstants.NOT_SET) - .createProperties(); + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, + numPartitions, kafkaOperations, fixedDelayStrategy, noDltStrategy, + TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE, multipleTopicsSameIntervalReuseStrategy, + RetryTopicConstants.NOT_SET).createProperties(); + + // then + assertThat(propertiesList.size() == 3).isTrue(); + assertThat(propertiesList.get(2).isDltTopic()).isFalse(); + } + + @Test + @SuppressWarnings("deprecation") + void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { + + // when + FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); + backOffPolicy.setBackOffPeriod(1000); + int maxAttempts = 5; + + List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); + + List propertiesList = + new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, + classifier, numPartitions, kafkaOperations, FixedDelayStrategy.SINGLE_TOPIC, + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + -1).createProperties(); List destinationTopicList = propertiesList .stream() @@ -191,11 +226,29 @@ void shouldNotCreateDltProperties() { // then assertThat(propertiesList.size() == 3).isTrue(); - assertThat(propertiesList.get(2).isDltTopic()).isFalse(); + + DestinationTopic mainDestinationTopic = destinationTopicList.get(0); + assertThat(mainDestinationTopic.isMainTopic()).isTrue(); + + DestinationTopic.Properties firstRetryProperties = propertiesList.get(1); + assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix); + assertThat(firstRetryProperties.isRetryTopic()).isTrue(); + DestinationTopic retryDestinationTopic = destinationTopicList.get(1); + assertThat(retryDestinationTopic.isSingleTopicRetry()).isTrue(); + assertThat(retryDestinationTopic.isReusableRetryTopic()).isFalse(); + assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(1000); + + DestinationTopic.Properties dltProperties = propertiesList.get(2); + assertThat(dltProperties.suffix()).isEqualTo(dltSuffix); + assertThat(dltProperties.isDltTopic()).isTrue(); + DestinationTopic dltTopic = destinationTopicList.get(2); + assertThat(dltTopic.getDestinationDelay()).isEqualTo(0); + assertThat(dltTopic.getDestinationPartitions()).isEqualTo(numPartitions); } @Test - void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { + @SuppressWarnings("deprecation") + void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicSameIntervalReuseStrategy() { // when FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); @@ -206,8 +259,9 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { List propertiesList = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, - classifier, numPartitions, kafkaOperations, FixedDelayStrategy.SINGLE_TOPIC, - dltStrategy, defaultTopicSuffixingStrategy, -1).createProperties(); + classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, + dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, + -1).createProperties(); List destinationTopicList = propertiesList .stream() @@ -217,15 +271,14 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { // then assertThat(propertiesList.size() == 3).isTrue(); - DestinationTopic.Properties mainTopicProperties = propertiesList.get(0); DestinationTopic mainDestinationTopic = destinationTopicList.get(0); assertThat(mainDestinationTopic.isMainTopic()).isTrue(); DestinationTopic.Properties firstRetryProperties = propertiesList.get(1); assertThat(firstRetryProperties.suffix()).isEqualTo(retryTopicSuffix); - assertThat(firstRetryProperties.isRetryTopic()).isTrue(); DestinationTopic retryDestinationTopic = destinationTopicList.get(1); assertThat(retryDestinationTopic.isSingleTopicRetry()).isTrue(); + assertThat(retryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(1000); DestinationTopic.Properties dltProperties = propertiesList.get(2); @@ -237,6 +290,7 @@ void shouldCreateOneRetryPropertyForFixedBackoffWithSingleTopicStrategy() { } @Test + @SuppressWarnings("deprecation") void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { // when @@ -250,7 +304,8 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, - dltStrategy, defaultTopicSuffixingStrategy, -1).createProperties(); + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + -1).createProperties(); List destinationTopicList = propertiesList .stream() @@ -260,7 +315,6 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { // then assertThat(propertiesList.size() == 4).isTrue(); - DestinationTopic.Properties mainTopicProperties = propertiesList.get(0); DestinationTopic mainDestinationTopic = destinationTopicList.get(0); assertThat(mainDestinationTopic.isMainTopic()).isTrue(); @@ -269,12 +323,14 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { assertThat(firstRetryProperties.isRetryTopic()).isTrue(); DestinationTopic retryDestinationTopic = destinationTopicList.get(1); assertThat(retryDestinationTopic.isSingleTopicRetry()).isFalse(); + assertThat(retryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(retryDestinationTopic.getDestinationDelay()).isEqualTo(5000); DestinationTopic.Properties secondRetryProperties = propertiesList.get(2); assertThat(secondRetryProperties.suffix()).isEqualTo(retryTopicSuffix + "-1"); DestinationTopic secondRetryDestinationTopic = destinationTopicList.get(2); assertThat(secondRetryDestinationTopic.isSingleTopicRetry()).isFalse(); + assertThat(secondRetryDestinationTopic.isReusableRetryTopic()).isFalse(); assertThat(secondRetryDestinationTopic.getDestinationDelay()).isEqualTo(5000); DestinationTopic.Properties dltProperties = propertiesList.get(3); @@ -286,6 +342,7 @@ void shouldCreateRetryPropertiesForFixedBackoffWithMultiTopicStrategy() { } @Test + @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { // setup @@ -298,14 +355,16 @@ void shouldSuffixRetryTopicsWithIndexIfSuffixWithIndexStrategy() { new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.SINGLE_TOPIC, - dltStrategy, suffixWithIndexTopicSuffixingStrategy, -1).createProperties(); + dltStrategy, suffixWithIndexTopicSuffixingStrategy, + multipleTopicsSameIntervalReuseStrategy, -1).createProperties(); // then - IntStream.range(1, maxAttempts) - .forEach(index -> assertThat(propertiesList.get(index).suffix()).isEqualTo(retryTopicSuffix + "-" + String.valueOf(index - 1))); + IntStream.range(1, maxAttempts).forEach(index -> assertThat(propertiesList.get(index).suffix()) + .isEqualTo(retryTopicSuffix + "-" + String.valueOf(index - 1))); } @Test + @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { // setup @@ -319,14 +378,17 @@ void shouldSuffixRetryTopicsWithIndexIfFixedDelayWithMultipleTopics() { new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, classifier, numPartitions, kafkaOperations, FixedDelayStrategy.MULTIPLE_TOPICS, - dltStrategy, suffixWithIndexTopicSuffixingStrategy, -1).createProperties(); + dltStrategy, suffixWithIndexTopicSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, + -1).createProperties(); // then IntStream.range(1, maxAttempts) - .forEach(index -> assertThat(propertiesList.get(index).suffix()).isEqualTo(retryTopicSuffix + "-" + String.valueOf(index - 1))); + .forEach(index -> assertThat(propertiesList.get(index).suffix()).isEqualTo(retryTopicSuffix + + "-" + String.valueOf(index - 1))); } @Test + @SuppressWarnings("deprecation") void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { // setup @@ -338,13 +400,15 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); // when - List propertiesList = - new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, backOffValues, - classifier, numPartitions, kafkaOperations, - FixedDelayStrategy.MULTIPLE_TOPICS, - dltStrategy, defaultTopicSuffixingStrategy, -1).createProperties(); + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, + backOffValues, classifier, numPartitions, kafkaOperations, + FixedDelayStrategy.MULTIPLE_TOPICS, + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + + List propertiesList = factory.createProperties(); // then + assertThat(factory.retryTopicsAmount() == 4).isTrue(); assertThat(propertiesList.size() == 6).isTrue(); assertThat(propertiesList.get(0).suffix()).isEqualTo(""); assertThat(propertiesList.get(1).suffix()).isEqualTo(retryTopicSuffix + "-1000"); @@ -353,4 +417,109 @@ void shouldSuffixRetryTopicsWithMixedIfMaxDelayReached() { assertThat(propertiesList.get(4).suffix()).isEqualTo(retryTopicSuffix + "-3000-1"); assertThat(propertiesList.get(5).suffix()).isEqualTo(dltSuffix); } + + @Test + @SuppressWarnings("deprecation") + void shouldReuseRetryTopicsIfMaxDelayReachedWithDelayValueSuffixingStrategy() { + + // setup + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(1000); + backOffPolicy.setMultiplier(2); + backOffPolicy.setMaxInterval(3000); + int maxAttempts = 5; + List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); + + // when + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, + backOffValues, classifier, numPartitions, kafkaOperations, + FixedDelayStrategy.MULTIPLE_TOPICS, + dltStrategy, suffixWithDelayValueSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + + List propertiesList = factory.createProperties(); + + // then + assertThat(factory.retryTopicsAmount()).isEqualTo(3); + assertThat(propertiesList.size()).isEqualTo(5); + assertThat(propertiesList.get(0).suffix()).isEqualTo(""); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false, false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000", false, false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-3000", true, false); + assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); + } + + @Test + @SuppressWarnings("deprecation") + void shouldReuseRetryTopicsIfMaxDelayReachedWithIndexValueSuffixingStrategy() { + + // setup + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(1000); + backOffPolicy.setMultiplier(2); + backOffPolicy.setMaxInterval(3000); + int maxAttempts = 5; + List backOffValues = new BackOffValuesGenerator(maxAttempts, backOffPolicy).generateValues(); + + // when + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, + backOffValues, classifier, numPartitions, kafkaOperations, + FixedDelayStrategy.MULTIPLE_TOPICS, + dltStrategy, suffixWithIndexTopicSuffixingStrategy, singleTopicSameIntervalReuseStrategy, -1); + + List propertiesList = factory.createProperties(); + + // then + assertThat(factory.retryTopicsAmount()).isEqualTo(3); + assertThat(propertiesList.size()).isEqualTo(5); + assertThat(propertiesList.get(0).suffix()).isEqualTo(""); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-0", false, false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-1", false, false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 3000L, retryTopicSuffix + "-2", true, false); + assertThat(propertiesList.get(4).suffix()).isEqualTo(dltSuffix); + } + + @Test + @SuppressWarnings("deprecation") + void shouldNotReuseRetryTopicsIfRepeatedIntervalsAreInTheMiddleOfChain() { + + // setup + List backOffValues = List.of(1000L, 2000L, 2000L, 2000L, 3000L); + int maxAttempts = backOffValues.size() + 1; + + // when + DestinationTopicPropertiesFactory factory = new DestinationTopicPropertiesFactory(retryTopicSuffix, dltSuffix, + backOffValues, classifier, numPartitions, kafkaOperations, + FixedDelayStrategy.SINGLE_TOPIC, + dltStrategy, suffixWithDelayValueSuffixingStrategy, multipleTopicsSameIntervalReuseStrategy, -1); + + List propertiesList = factory.createProperties(); + + // then + assertThat(factory.retryTopicsAmount()).isEqualTo(5); + assertThat(propertiesList.size()).isEqualTo(7); + assertThat(propertiesList.get(0).suffix()).isEqualTo(""); + assertRetryTopic(propertiesList.get(1), maxAttempts, 1000L, retryTopicSuffix + "-1000", false, false); + assertRetryTopic(propertiesList.get(2), maxAttempts, 2000L, retryTopicSuffix + "-2000-0", false, false); + assertRetryTopic(propertiesList.get(3), maxAttempts, 2000L, retryTopicSuffix + "-2000-1", false, false); + assertRetryTopic(propertiesList.get(4), maxAttempts, 2000L, retryTopicSuffix + "-2000-2", false, false); + assertRetryTopic(propertiesList.get(5), maxAttempts, 3000L, retryTopicSuffix + "-3000", false, false); + assertThat(propertiesList.get(6).suffix()).isEqualTo(dltSuffix); + } + + @SuppressWarnings("deprecation") + private void assertRetryTopic(DestinationTopic.Properties topicProperties, int maxAttempts, + Long expectedDelay, String expectedSuffix, boolean expectedReusableTopic, + boolean expectedIsSingleTopicRetry) { + assertThat(topicProperties.suffix()).isEqualTo(expectedSuffix); + assertThat(topicProperties.isRetryTopic()).isTrue(); + DestinationTopic topic = new DestinationTopic("irrelevant" + topicProperties.suffix(), topicProperties); + assertThat(topic.isDltTopic()).isFalse(); + assertThat(topic.isSingleTopicRetry()).isEqualTo(expectedIsSingleTopicRetry); + assertThat(topic.isReusableRetryTopic()).isEqualTo(expectedReusableTopic); + assertThat(topic.getDestinationDelay()).isEqualTo(expectedDelay); + assertThat(topic.getDestinationPartitions()).isEqualTo(numPartitions); + assertThat(topic.shouldRetryOn(0, new IllegalArgumentException())).isTrue(); + assertThat(topic.shouldRetryOn(maxAttempts, new IllegalArgumentException())).isFalse(); + assertThat(topic.shouldRetryOn(0, new RuntimeException())).isFalse(); + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java index 4f5dd74fd4..8cad3c7ace 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,12 +56,12 @@ public class DestinationTopicTests { // MaxAttempts - private final int maxAttempts = 3; + protected final int maxAttempts = 3; // DestinationTopic Properties protected DestinationTopic.Properties mainTopicProps = - new DestinationTopic.Properties(0, "", DestinationTopic.Type.RETRY, 4, 1, + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout); protected DestinationTopic.Properties firstRetryProps = @@ -74,13 +74,13 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout); + DltStrategy.FAIL_ON_ERROR, kafkaOperations1, (a, e) -> false, noTimeout, null); protected List allProps = Arrays .asList(mainTopicProps, firstRetryProps, secondRetryProps, dltTopicProps); protected DestinationTopic.Properties mainTopicProps2 = - new DestinationTopic.Properties(0, "", DestinationTopic.Type.RETRY, 4, 1, + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties firstRetryProps2 = @@ -93,13 +93,13 @@ public class DestinationTopicTests { protected DestinationTopic.Properties dltTopicProps2 = new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, - DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout); + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); protected List allProps2 = Arrays .asList(mainTopicProps2, firstRetryProps2, secondRetryProps2, dltTopicProps2); protected DestinationTopic.Properties mainTopicProps3 = - new DestinationTopic.Properties(0, "", DestinationTopic.Type.RETRY, 4, 1, + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, DltStrategy.NO_DLT, kafkaOperations2, getShouldRetryOn(), timeout); protected DestinationTopic.Properties firstRetryProps3 = @@ -113,6 +113,30 @@ public class DestinationTopicTests { protected List allProps3 = Arrays .asList(mainTopicProps3, firstRetryProps3, secondRetryProps3); + protected DestinationTopic.Properties mainTopicProps4 = + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + @SuppressWarnings("deprecation") + protected DestinationTopic.Properties singleFixedRetryTopicProps4 = + new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.SINGLE_TOPIC_RETRY, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + protected DestinationTopic.Properties dltTopicProps4 = + new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); + + protected DestinationTopic.Properties mainTopicProps5 = + new DestinationTopic.Properties(0, "", DestinationTopic.Type.MAIN, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + protected DestinationTopic.Properties reusableRetryTopicProps5 = + new DestinationTopic.Properties(1000, retrySuffix, DestinationTopic.Type.REUSABLE_RETRY_TOPIC, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, getShouldRetryOn(), timeout); + + protected DestinationTopic.Properties dltTopicProps5 = + new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1, + DltStrategy.ALWAYS_RETRY_ON_ERROR, kafkaOperations2, (a, e) -> false, timeout, null); // Holders @@ -160,6 +184,14 @@ public class DestinationTopicTests { protected List allThirdDestinationHolders = Arrays .asList(mainDestinationHolder3, firstRetryDestinationHolder3, secondRetryDestinationHolder3); + protected final static String FOURTH_TOPIC = "fourthTopic"; + + protected PropsHolder mainDestinationHolder4 = + new PropsHolder(FOURTH_TOPIC, mainTopicProps4); + + protected PropsHolder dltDestinationHolder4 = + new PropsHolder(FOURTH_TOPIC, dltTopicProps4); + // DestinationTopics protected DestinationTopic mainDestinationTopic = @@ -216,6 +248,32 @@ public class DestinationTopicTests { protected List allThirdDestinationTopics = Arrays .asList(mainDestinationTopic3, firstRetryDestinationTopic3, secondRetryDestinationTopic3); + protected DestinationTopic mainDestinationTopic4 = + new DestinationTopic(FOURTH_TOPIC + mainTopicProps4.suffix(), mainTopicProps4); + + protected DestinationTopic singleFixedRetryDestinationTopic4 = + new DestinationTopic(FOURTH_TOPIC + singleFixedRetryTopicProps4.suffix(), singleFixedRetryTopicProps4); + + protected DestinationTopic dltDestinationTopic4 = + new DestinationTopic(FOURTH_TOPIC + dltTopicProps4.suffix(), dltTopicProps4); + + protected List allFourthDestinationTopics = Arrays + .asList(mainDestinationTopic4, singleFixedRetryDestinationTopic4, dltDestinationTopic4); + + protected final static String FIFTH_TOPIC = "fifthTopic"; + + protected DestinationTopic mainDestinationTopic5 = + new DestinationTopic(FIFTH_TOPIC + mainTopicProps5.suffix(), mainTopicProps5); + + protected DestinationTopic reusableRetryDestinationTopic5 = + new DestinationTopic(FIFTH_TOPIC + reusableRetryTopicProps5.suffix(), reusableRetryTopicProps5); + + protected DestinationTopic dltDestinationTopic5 = + new DestinationTopic(FIFTH_TOPIC + dltTopicProps5.suffix(), dltTopicProps5); + + protected List allFifthDestinationTopics = Arrays + .asList(mainDestinationTopic5, reusableRetryDestinationTopic5, dltDestinationTopic5); + // Classifiers private final BinaryExceptionClassifier allowListClassifier = new BinaryExceptionClassifierBuilder() diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java index 36c8f39635..425dfbd2c5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -247,6 +247,7 @@ static class FrameworkFatalTopicListener { @Autowired CountDownLatchContainer container; + @SuppressWarnings("deprecation") @RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, backoff = @Backoff(50)) @KafkaListener(topics = FRAMEWORK_FATAL_EXCEPTION_TOPIC) public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { @@ -330,6 +331,7 @@ public RetryTopicConfiguration blockingAndTopic(KafkaTemplate te } @Bean + @SuppressWarnings("deprecation") public RetryTopicConfiguration onlyTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder .newInstance() diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java index bf9a1ba02d..3823c37b6c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -478,6 +478,7 @@ static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { private static final String DLT_METHOD_NAME = "processDltMessage"; + @SuppressWarnings("deprecation") @Bean public RetryTopicConfiguration firstRetryTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder