Skip to content

Add Meta-Annotation Support to @RetryableTopic #2435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
fablakeson opened this issue Oct 12, 2022 · 0 comments · Fixed by #2440
Closed

Add Meta-Annotation Support to @RetryableTopic #2435

fablakeson opened this issue Oct 12, 2022 · 0 comments · Fixed by #2440

Comments

@fablakeson
Copy link
Contributor

Expected Behavior

@RetryableTopic is pretty handfull to configure retry strategies on @KafkaListener annotated methods, but it lacks a very useful capability, the meta-annotation.

Just as we can create meta-annotations to make it easier to configure multiple @KafkaListeners that share the same properties, reducing boilerplate (and also improving readability), it would be great to be able to do the same thing with @RetryableTopics.

This issue proposes to add this feature with a simple changes to @RetryableTopic annotation target.

Context

To justify this change, let's suppose that we are implementing some retry strategies for multiple topics with different backoffs. It's reasonable to assume that to reduce boilerplate I want to create generic retry strategies that can be shared by multiple listeners, but the only way to do this currently is to create a Bean like:

@Bean
public RetryTopicConfiguration fixedBackoffRetryableTopics(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
    .newInstance()
    .fixedBackoff(3000)
    .maxAttempts(5)
    .includeTopics("my-topic")
    .create(template);
}

@Bean
public RetryTopicConfiguration exponentialBackoffRetryableTopics(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
    .newInstance()
    .exponentialBackoff(1000, 2, 5000)
    .maxAttempts(4)
    .includeTopics("my-other-topic")
    .retryOn(MyException.class)
    .create(template);
}

With the use of meta-annotations we could do something like:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 3000, maxDelay = 12000), ...)
public @interface WithFixedBackoffRetry {}

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000), ...)
public @interface WithExponentialBackoffRetry {}

And for topic listener meta-annotations:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@KafkaListener(...)
public @interface MyTopicListener {}

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@KafkaListener(...)
public @interface MyOtherTopicListener {}

And finally:

class MyConsumerClass {

  @MyTopicListener
  @WithFixedBackoffRetry
  void consume(Message message) throws MyException {
    /** consume the message... */
  }

  @MyOtherTopicListener
  @WithExponentialBackoffRetry
  void consume(OtherMessage otherMessage) throw MyOtherException {
    /**  consume the other message... */
  }

}

It is more transparent for the developer to use Kafka and not have to mix kafka related configuration and business logic on the same piece of code.

Implementation Proposal

The proposal is to change the @Target meta-annotation of the @RetryableTopic annotation, adding the ElementType.ANNOTATION_TYPE. This will enable the call of RetryTopicConfigurationProvider.findRetryConfigurationFor() to find the meta-annotation on Method object using the AnnotationUtils.findAnnotation(). The AnnotationUtils.findAnnotation() already searches the full type hierarchy using the MergedAnnotators API.

@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RetryableTopic { /** ... */ }

We doesn't have to change current findRetryConfigurationFor() implementation:

public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method method, Object bean) {
  RetryableTopic annotation = AnnotationUtils.findAnnotation(method, RetryableTopic.class);
  return annotation != null
      ? new RetryableTopicAnnotationProcessor(this.beanFactory, this.resolver, this.expressionContext)
          .processAnnotation(topics, method, annotation, bean)
      : maybeGetFromContext(topics);
}

Impact

There is no significant change on codebase, just the addition of a target to an annotation. The new test cases where added to RetryTopicConfigurationProviderTests on a forked repository and everything seems to work fine.

Future Improvements

Another good improvement would be use MergedAnnotations directly on RetryTopicConfigurationProvider.findRetryConfigurationFor() to get a synthetic Annotation that is merged with attributes annotated with @AliasFor. It could be useful to add some level of parametrization. For example, let the business area developer to choose what exceptions will trigger the retry strategy directly on the "high-level" annotation:

public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method method, Object bean) {
  RetryableTopic annotation = MergedAnnotations.from(method, SearchStrategy.TYPE_HIERARCHY,
          RepeatableContainers.none())
      .get(RetryableTopic.class).synthesize(MergedAnnotation::isPresent).orElse(null);
  return annotation != null
      ? new RetryableTopicAnnotationProcessor(this.beanFactory, this.resolver, this.expressionContext)
          .processAnnotation(topics, method, annotation, bean)
      : maybeGetFromContext(topics);
}

Then we could do:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 3000, maxDelay = 12000), ...)
public @interface FixedBackoffRetry {
    /** List of exceptions that trigger this retry strategy. */
    @AliasFor(annotation = RetryableTopic.class, attribute = "exclude")
    java.lang.Class<? extends java.lang.Throwable>[] onException();
}

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@KafkaListener(...)
public @interface MyTopicListener {}

class MyConsumerClass {

  @MyTopicListener
  @FixedBackoffRetry(onException = MyException.class)
  void consume(Message message) throws MyException {
    /** the retryable exceptions are more explicit and also parametrizable on customized annotation*/
  }

}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants