Skip to content

@RetryableTopic: support for re-processing of records after DLT handling #2172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jgslima opened this issue Mar 15, 2022 · 8 comments · Fixed by #2245
Closed

@RetryableTopic: support for re-processing of records after DLT handling #2172

jgslima opened this issue Mar 15, 2022 · 8 comments · Fixed by #2245

Comments

@jgslima
Copy link
Contributor

jgslima commented Mar 15, 2022

Expected Behavior
Support, or at least easiness, to allow designs where records may need to be "reprocessed" again after the DLT handling has been done and after the root cause of the processing error has been fixed.

Current Behavior
After retries have been exceeded, the application has to deal with the record in the @DltHandler method. But in some application cases, the record cannot be completely discarded.

Context
As for critical topics records that exceeded retries cannot be discarded, what we do here is to persist the record data somewhere else (tipically in a database).
The application has an Administration Console where the operations team can query and inspect the records that exceeded retries (and also the Exception data that caused the error, because in the @DltHandler we actually submit the record to the actual listener one more time to have the information of the Exception being thrown by the listener).

After the root cause of the error has been fixed (for instance, a database or an external service that was not responding gets back online, or a bug in the code has been fixed), through the console the operator has the means to select some of the records (or all from a specific original topic) and trigger a "reprocessing" of such records.

A good way of doing this reprocessing would be to restart the flow again, that is, to resend the record to topics chain again.

However, we cannot send the message to the original topic, because we would be creating dirt in it. As it is my problem that I was not able to consume the record, I should send the record to the first retry topic in my retriy topics chain.

  • for fixed delay and multiple topics, this would be "originalTopic-retry-0".
  • for fixed delay and single topic, this would be "originalTopic-retry"
  • for non-fixed delay, this would be the first retry topic, like say "originalTopic-retry-1000"

What spring-kafka might provide would be easiness for the application to send a record to the first retry topic in the chain, for a given original topic. And it is fine that when sending a message to the first topic, if the consumer is currently stopped waiting for the delay of the messages that are already there.

This might be done in more than one way:

  • just providing some utility to return to the application the name of the first retry topic in the same way the framework internally mount this name, and the application takes care of everything else.
  • providing some mean/method that receives a ProducerRecord already instantiated by the application, and then populate in this record the needed headers and then send the record to the first retry topic.

In fact, we are already doing this by ourselves here, just sending the record to the first retry topic. But we are doing this in a risky way, because we duplicated the logic to mount the first topic name, and also, counting on luck because it turns out that posting a record without the internal control headers just work. But ideally the library should populate all the control headers properly.

@tomazfernandes
Copy link
Contributor

Hi @jgslima, thanks for bringing this up.

because in the @DltHandler we actually submit the record to the actual listener one more time to have the information of the Exception being thrown by the listener).

These record headers in the message received in the DLT should contain the information regarding the exception thrown by the listener: kafka_exception-fqcn, kafka_exception-message, kafka_exception-stacktrace, so you shouldn't need to consume the record again. Do you not have access to these, or maybe you need some other information?

What spring-kafka might provide would be easiness for the application to send a record to the first retry topic in the chain, for a given original topic. And it is fine that when sending a message to the first topic, if the consumer is currently stopped waiting for the delay of the messages that are already there.

You can get the next topic name by injecting the DestinationTopicResolver bean and using the resolveDestinationTopic(String topic, Integer attempt, Exception e, long originalTimestamp) method. This will return a DestinationTopic instance with which you can then getDestinationName().

Besides the main topic name you need to provide arguments that will classify for retrial, e.g. attempts < maxAttempts for the topic, a retryable exception, and a timestamp before timeout if timeout is configured for the topic.

You can for example have something like: resolver.resolveDestinationTopic("myMainTopic", 0, new RuntimeException(), Instant.now().toEpochMilli()).getDestinationName().

Do you think that would solve the problem?

We could look into having a more direct getNextTopic(String topic) method, but then it could return a different result from the actual topic the record will be sent to, in case some other criteria fails, which might generate some confusion.

This might be done in more than one way:

  • just providing some utility to return to the application the name of the first retry topic in the same way the framework internally mount this name, and the application takes care of everything else.

Since topic naming includes bean-based and per-topic configurations, I don't think it'd be straightforward to provide such a utility. Maybe we could have an addNamingListener method with listeners that would be called with the topic names as they are created so users can have access to them, in case the DestinationTopicResolver solution isn't enough.

  • providing some mean/method that receives a ProducerRecord already instantiated by the application, and then populate in this record the needed headers and then send the record to the first retry topic.

In fact, we are already doing this by ourselves here, just sending the record to the first retry topic. But we are doing this in a risky way, because we duplicated the logic to mount the first topic name, and also, counting on luck because it turns out that posting a record without the internal control headers just work. But ideally the library should populate all the control headers properly.

If my understanding is correct, you're feeding the record to the first retry topic and it's being consumed as a regular record, without back off (unless the partition is already paused) or attempt counting. I haven't tested that exact scenario, but that shouldn't be a problem - if the record processing fails, it'll be forwarded to the next retry topic and the control headers will be added normally by the framework.

@tomazfernandes
Copy link
Contributor

One caveat I've noticed is that, since the RetryTopicBootstrapper kicks in lazily in a @KafkaListener bean post processing, if you try to inject any of the feature's beans in a @Component before the bootstrapping happens it will throw a NoSuchBeanException.

If that happens, a workaround would be bootstrapping the feature eagerly in a @Configuration class, with something like:

@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_BOOTSTRAPPER)
public RetryTopicBootstrapper retryTopicBootstrapper(ApplicationContext applicationContext) {
	RetryTopicBootstrapper bootstrapper =
			new RetryTopicBootstrapper(applicationContext, applicationContext.getAutowireCapableBeanFactory());
	bootstrapper.bootstrapRetryTopic();
	return bootstrapper;
}

@garyrussell, I'll take a look to see if I can come up with a way around this, but it doesn't sound trivial since we only have access to the annotation-based configurations when the beans are being processed, to detect if the feature is being used. Maybe we could have a @EnableRetryableTopic annotation in Boot's auto configuration that calls this logic eagerly if present?

If you have any ideas please let me know.

Thanks

@jgslima
Copy link
Contributor Author

jgslima commented Mar 16, 2022

Thank you for dedicating time to my issue and providing a detailed answer.

These record headers in the message received in the DLT should contain the information regarding the exception thrown by the listener: kafka_exception-fqcn, kafka_exception-message, kafka_exception-stacktrace, so you shouldn't need to consume the record again. Do you not have access to these, or maybe you need some other information?

That sounds fine, I had forgotten about those headers. I will try and use them.

If my understanding is correct, you're feeding the record to the first retry topic and it's being consumed as a regular record, without back off (unless the partition is already paused) or attempt counting. I haven't tested that exact scenario, but that shouldn't be a problem - if the record processing fails, it'll be forwarded to the next retry topic and the control headers will be added normally by the framework.

Yes, it just works. If there is again an error when consuming from the first topic, the flow continues normally. And it is fine if there is no backoff for the first record.

Since topic naming includes bean-based and per-topic configurations, I don't think it'd be straightforward to provide such a utility.

I do not want to insist on this. If its complicated, or if you think it does not add to spring-kafka, nevermind and feel free to close the issue.

I just wonder whether it might be useful to include in the documentation some guidance about this. Wondering here whether there may be other applications that may have critical topics and want to make the same approach than us (to store the record in the DltHandler, and after the root cause has been fixed, to "restart" the flow).

@jgslima
Copy link
Contributor Author

jgslima commented Mar 16, 2022

I just wonder whether it might be useful to include in the documentation some guidance about this. Wondering here whether there may be other applications that may have critical topics and want to make the same approach than us (to store the record in the DltHandler, and after the root cause has been fixed, to "restart" the flow).

Just to add more. My company makes financial applications. Generaly speaking, we cannot afford to discard any record of any topic.
We already have designed and developed, for some applications of ours, this "Message reprocessing" console for traditional message brokers (SQS, RabbitMQ, ...). Such console is important because, in some scenarios, one may have to trigger the reprocessing of a great amount of messages that failed due to some temporary external reason.

For brokers, as queues are point-to-point (just one consumer), it is fine to just resend the erroneous messages to the very same original queue.

Now we are trying to adapt this console and design for Kafka, but for it, we cannot just send the record to the original topic again, because we would create dirt in the original topic history, which may impact other systems subscribing to it.

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Mar 16, 2022

I do not want to insist on this. If its complicated, or if you think it does not add to spring-kafka, nevermind and feel free to close the issue.

I just wonder whether it might be useful to include in the documentation some guidance about this. Wondering here whether there may be other applications that may have critical topics and want to make the same approach than us (to store the record in the DltHandler, and after the root cause has been fixed, to "restart" the flow).

No worries, I think you have a valid use-case, and that it's important that the framework assists you as much as it can.

Did you check the DestinationTopicResolver solution?

You can for example inject it in your DLT handler class and use it to fetch the retry topic name for that topic, and then persist it alongside the record for the console application to retrieve later.

Or you can have an endpoint in your application that the console app could access passing the failed record's topic as a parameter - your app would lookup the retry topic in the DestinationTopicResolver bean and provide it in the response.

Does this look like it would work for you?

If the method signature looks complicated, we can look into providing a simpler one as I mentioned.

Once we find a suitable solution for your use case, we can look into documenting it.

Thanks

@jgslima
Copy link
Contributor Author

jgslima commented Mar 17, 2022

I tried here DestinationTopicResolver.

It worked well, like you suggested:

resolver.resolveDestinationTopic("myMainTopic", 0, new RuntimeException(), Instant.now().toEpochMilli()).getDestinationName()

I tested for all those cases:

  • fixed delay with single topic.
  • fixed delay with multiple topics.
  • non-fixed delay.

Some remarks:

  • attempt parameter => I noticed it does not really matter (at least when specifying the topic parameter as the main topic). But, if this will be included in the documentation, as the intention is to resolve to the first retry topic, I think it is more intuitive (and more prepared for eventual modifications in the logic) to inform the value 1.

  • Exception parameter => I noticed in DefaultDestinationTopicResolver that it is used to define whether the exception should be retried or sent directly to DLT. So, it may be good in the documentation to recommend to pass an Exception that is not filtered to go directly to the DLT (although I believe in most cases a RuntimeException should no be filtered to go to the DLT).

  • indeed it was not possible to inject a DestinationTopicResolver. I had to initialize it lazily in the firs use with applicationContext.getBean(DestinationTopicResolver.class). For me that's fine.

So what I requested in the issue is accomplished. Now it is the case to decide whether to include in the documentation some tips/guidance.

Thank you.

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Mar 17, 2022

That's good news, thanks. I think we can create a simpler method call in the DestinationTopicContainer interface - we actually have internal methods for resolving topics, it should be a matter of opening some of them up and adding to the interface.

May I ask what approach you used - persisting the topic in the database along the record maybe, or some other strategy? That might make a good example in the documentation.

As for the autowiring problem, I've opened this issue and hopefully we'll be able to have the fix in the upcoming 2.8.4 version.

@jgslima
Copy link
Contributor Author

jgslima commented Mar 17, 2022

We are doing the following way

DTL handling
We make the @DltHandler of each listener to have a single parameter, of type ConsumerRecord, and just it is passed to the console library's class.

In it, these headers are extracted:

  • KafkaHeaders.ORIGINAL_TOPIC
  • KafkaHeaders.EXCEPTION_STACKTRACE
  • KafkaHeaders.EXCEPTION_MESSAGE (although what I think is more useful is just the message of the rootCause, but that's ok).

The key and value are extracted and re-serialized to byte[] in order to store the binary contents (unfortunately the ConsumerRecord does not provide means to access the original byte[] of the key and value). This part is a little tricky, because in theory the central component must have means to know the proper Serializer for each application topic. A possible variarion here would be to perform this re-serialization inside each @DltHandler, since each listener class should know how to proper re-serialize its records.

Also, just to allow the message content to be inspected in case of a drill down in the application console, we pass the value Object of the record through Jackson, just to have a textual (in this case, JSON) representation of the message (regardless if the actual Serializer used for this topic is JSON or Avro or whatever).

All those information are then stored in the database, alongside some timestamp. From there, they can be queried, and from the console the user can trigger a reprocessing (either of specific records or for all records from the same original topic).

Currently we do not support the persistence of arbitrary headers the records may have. This may be added in the future.

Reprocessing
If the user triggers a reprocessing of the message:

  1. Key and value are re-serialized to Objects.
  2. A ProducerRecord is created.
    • with the topic name being the first retry topic.
    • here, a detail: we must populate the KafkaHeaders.ORIGINAL_TOPIC with the original topic name. Otherwise, if this record exceed retries again, in the next @DltHandler processing, the framework will assume that the original topic was the first retry topic, not the real original topic.
  3. The ProducerRecord is sent.

It takes some work to develop these features but they are worth it. On top of that one may add some enhancement, like generate metrics and alerts to operations team when there are new, or many, new records in the database and so on.

tomazfernandes added a commit to tomazfernandes/spring-kafka that referenced this issue Apr 28, 2022
Resolves spring-projects#2172

* Add methods to `DestinationTopicContainer` interface
* Reduce memory footprint of `DefaultDestinationTopicResolver`
* Add documentation
tomazfernandes added a commit to tomazfernandes/spring-kafka that referenced this issue Apr 28, 2022
Resolves spring-projects#2172

* Add methods to `DestinationTopicContainer` interface
* Reduce memory footprint of `DefaultDestinationTopicResolver`
* Add documentation
@garyrussell garyrussell added this to the 3.0.0-M4 milestone Apr 28, 2022
garyrussell pushed a commit that referenced this issue Apr 28, 2022
* GH-2172: Expose Retry Topic Chain at Runtime

Resolves #2172

* Add methods to `DestinationTopicContainer` interface
* Reduce memory footprint of `DefaultDestinationTopicResolver`
* Add documentation

* Add to integration test
garyrussell pushed a commit that referenced this issue Apr 28, 2022
* GH-2172: Expose Retry Topic Chain at Runtime

Resolves #2172

* Add methods to `DestinationTopicContainer` interface
* Reduce memory footprint of `DefaultDestinationTopicResolver`
* Add documentation

* Add to integration test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants