-
Notifications
You must be signed in to change notification settings - Fork 1.6k
DefaultErrorHandler is not able to seek in case of an exception during the commit #3019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
@frosiere A PR is greatly welcomed. |
Great, I will drop a PR this week, sorry for the delay.
This would optimize and solve the problem. |
In the end, I had to keep the workaround, as the proposal would require an important rework of this layer. The redesign would require further discussion on this topic. From a general point of view, it could be summarized as follows In case of exception:
This is not possible with the current implementation, as the seekOrRecover method throws a KafkaException after the seek, preventing the commit method from being called. Commits are also handled in 3 different places, whereas they should only be handled in one place after the seek or recover. It seems that the FailedBatchProcessor#seekOrRecover and SeekUtils codes are trying to handle too many cases, complicating all the code around the seek or recover. Any comments or feedback would be appreciated. |
I am okay with the workaround (for now), as we want to avoid a major refactor in this area. The PR with the workaround looks good. |
Fixes: #3019 * DefaultErrorHandler is not able to seek in case of an exception during the commit **Auto-cherry-pick to `3.1.x`**
Related to #3019 * Add `@SuppressWarnings("serial")` to `KafkaTemplate.SkipAbortException` * Move logic in the `FailedBatchProcessor.seekOrRecover()` out of `finally` block. Essentially, swallow commit exception and follow with seeks **Auto-cherry-pick to `3.1.x`**
I'm sorry to ask about a closed issue, but my team is still using Spring Kafka 2.9.13 and suffering from this problem. |
@frosiere, the |
The listener error handler (DefaultErrorHandler) is not able to seek in case of an exception during the commit.
The code is not protected against exceptions such as a RebalanceInProgressException. The full list of exceptions can be found on KafkaConsumer#commitSync and KafkaConsumer#commitAsync methods.
Due to this bug, some records are skipped during a reprocessing.
The issue can be fixed by wrapping the commit into a try block with a finally block for the seek part.
See FailedBatchProcessor#seekOrRecover for more details.
The following portion of code
shoud/could be replaced by
The proposed fix has been tested and is working fine.
I can contribute by dropping a PR, I can also provide more details if needed.
Any other ideas, suggestions are more than welcome.
The text was updated successfully, but these errors were encountered: