-
Notifications
You must be signed in to change notification settings - Fork 1.6k
@RetryableTopic: support for re-processing of records after DLT handling #2172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hi @jgslima, thanks for bringing this up.
These record headers in the message received in the DLT should contain the information regarding the exception thrown by the listener:
You can get the next topic name by injecting the Besides the You can for example have something like: Do you think that would solve the problem? We could look into having a more direct
Since topic naming includes bean-based and per-topic configurations, I don't think it'd be straightforward to provide such a utility. Maybe we could have an
If my understanding is correct, you're feeding the record to the first retry topic and it's being consumed as a regular record, without back off (unless the partition is already paused) or attempt counting. I haven't tested that exact scenario, but that shouldn't be a problem - if the record processing fails, it'll be forwarded to the next retry topic and the control headers will be added normally by the framework. |
One caveat I've noticed is that, since the If that happens, a workaround would be bootstrapping the feature eagerly in a
@garyrussell, I'll take a look to see if I can come up with a way around this, but it doesn't sound trivial since we only have access to the annotation-based configurations when the beans are being processed, to detect if the feature is being used. Maybe we could have a If you have any ideas please let me know. Thanks |
Thank you for dedicating time to my issue and providing a detailed answer.
That sounds fine, I had forgotten about those headers. I will try and use them.
Yes, it just works. If there is again an error when consuming from the first topic, the flow continues normally. And it is fine if there is no backoff for the first record.
I do not want to insist on this. If its complicated, or if you think it does not add to spring-kafka, nevermind and feel free to close the issue. I just wonder whether it might be useful to include in the documentation some guidance about this. Wondering here whether there may be other applications that may have critical topics and want to make the same approach than us (to store the record in the DltHandler, and after the root cause has been fixed, to "restart" the flow). |
Just to add more. My company makes financial applications. Generaly speaking, we cannot afford to discard any record of any topic. For brokers, as queues are point-to-point (just one consumer), it is fine to just resend the erroneous messages to the very same original queue. Now we are trying to adapt this console and design for Kafka, but for it, we cannot just send the record to the original topic again, because we would create dirt in the original topic history, which may impact other systems subscribing to it. |
No worries, I think you have a valid use-case, and that it's important that the framework assists you as much as it can. Did you check the You can for example inject it in your DLT handler class and use it to fetch the retry topic name for that topic, and then persist it alongside the record for the console application to retrieve later. Or you can have an endpoint in your application that the console app could access passing the failed record's topic as a parameter - your app would lookup the retry topic in the Does this look like it would work for you? If the method signature looks complicated, we can look into providing a simpler one as I mentioned. Once we find a suitable solution for your use case, we can look into documenting it. Thanks |
I tried here It worked well, like you suggested:
I tested for all those cases:
Some remarks:
So what I requested in the issue is accomplished. Now it is the case to decide whether to include in the documentation some tips/guidance. Thank you. |
That's good news, thanks. I think we can create a simpler method call in the May I ask what approach you used - persisting the topic in the database along the record maybe, or some other strategy? That might make a good example in the documentation. As for the autowiring problem, I've opened this issue and hopefully we'll be able to have the fix in the upcoming 2.8.4 version. |
We are doing the following way DTL handling In it, these headers are extracted:
The key and value are extracted and re-serialized to Also, just to allow the message content to be inspected in case of a drill down in the application console, we pass the All those information are then stored in the database, alongside some timestamp. From there, they can be queried, and from the console the user can trigger a reprocessing (either of specific records or for all records from the same original topic). Currently we do not support the persistence of arbitrary headers the records may have. This may be added in the future. Reprocessing
It takes some work to develop these features but they are worth it. On top of that one may add some enhancement, like generate metrics and alerts to operations team when there are new, or many, new records in the database and so on. |
Resolves spring-projects#2172 * Add methods to `DestinationTopicContainer` interface * Reduce memory footprint of `DefaultDestinationTopicResolver` * Add documentation
Resolves spring-projects#2172 * Add methods to `DestinationTopicContainer` interface * Reduce memory footprint of `DefaultDestinationTopicResolver` * Add documentation
Expected Behavior
Support, or at least easiness, to allow designs where records may need to be "reprocessed" again after the DLT handling has been done and after the root cause of the processing error has been fixed.
Current Behavior
After retries have been exceeded, the application has to deal with the record in the
@DltHandler
method. But in some application cases, the record cannot be completely discarded.Context
As for critical topics records that exceeded retries cannot be discarded, what we do here is to persist the record data somewhere else (tipically in a database).
The application has an Administration Console where the operations team can query and inspect the records that exceeded retries (and also the Exception data that caused the error, because in the
@DltHandler
we actually submit the record to the actual listener one more time to have the information of the Exception being thrown by the listener).After the root cause of the error has been fixed (for instance, a database or an external service that was not responding gets back online, or a bug in the code has been fixed), through the console the operator has the means to select some of the records (or all from a specific original topic) and trigger a "reprocessing" of such records.
A good way of doing this reprocessing would be to restart the flow again, that is, to resend the record to topics chain again.
However, we cannot send the message to the original topic, because we would be creating dirt in it. As it is my problem that I was not able to consume the record, I should send the record to the first retry topic in my retriy topics chain.
originalTopic-retry-0
".originalTopic-retry
"originalTopic-retry-1000
"What
spring-kafka
might provide would be easiness for the application to send a record to the first retry topic in the chain, for a given original topic. And it is fine that when sending a message to the first topic, if the consumer is currently stopped waiting for the delay of the messages that are already there.This might be done in more than one way:
ProducerRecord
already instantiated by the application, and then populate in this record the needed headers and then send the record to the first retry topic.In fact, we are already doing this by ourselves here, just sending the record to the first retry topic. But we are doing this in a risky way, because we duplicated the logic to mount the first topic name, and also, counting on luck because it turns out that posting a record without the internal control headers just work. But ideally the library should populate all the control headers properly.
The text was updated successfully, but these errors were encountered: