Skip to content

DefaultErrorHandler is not able to seek in case of an exception during the commit #3019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
frosiere opened this issue Feb 7, 2024 · 6 comments · Fixed by #3055
Closed

DefaultErrorHandler is not able to seek in case of an exception during the commit #3019

frosiere opened this issue Feb 7, 2024 · 6 comments · Fixed by #3055

Comments

@frosiere
Copy link
Contributor

frosiere commented Feb 7, 2024

The listener error handler (DefaultErrorHandler) is not able to seek in case of an exception during the commit.

The code is not protected against exceptions such as a RebalanceInProgressException. The full list of exceptions can be found on KafkaConsumer#commitSync and KafkaConsumer#commitAsync methods.

Due to this bug, some records are skipped during a reprocessing.

The issue can be fixed by wrapping the commit into a try block with a finally block for the seek part.

See FailedBatchProcessor#seekOrRecover for more details.

The following portion of code

if (offsets.size() > 0) {
    commit(consumer, container, offsets);
}
if (isSeekAfterError()) {
...

shoud/could be replaced by

try {
    if (offsets.size() > 0) {
        commit(consumer, container, offsets);
    }
} finally {
    if (isSeekAfterError()) {
...

The proposed fix has been tested and is working fine.

I can contribute by dropping a PR, I can also provide more details if needed.

Any other ideas, suggestions are more than welcome.

@sobychacko
Copy link
Contributor

@frosiere A PR is greatly welcomed.

@frosiere
Copy link
Contributor Author

frosiere commented Feb 20, 2024

Great, I will drop a PR this week, sorry for the delay.
The try finally blocks are more of a workaround.
The real solution would be to rework the code as follows:

  • if seekAfterError is true, seek+commit instead of commit+seek+commit
  • if seekAfterError is false, commit

This would optimize and solve the problem.

frosiere added a commit to frosiere/spring-kafka that referenced this issue Feb 21, 2024
frosiere added a commit to frosiere/spring-kafka that referenced this issue Feb 21, 2024
frosiere added a commit to frosiere/spring-kafka that referenced this issue Feb 21, 2024
@frosiere
Copy link
Contributor Author

frosiere commented Feb 21, 2024

In the end, I had to keep the workaround, as the proposal would require an important rework of this layer. The redesign would require further discussion on this topic.

From a general point of view, it could be summarized as follows

In case of exception:

  • Seek or recover
  • Commit the offsets

This is not possible with the current implementation, as the seekOrRecover method throws a KafkaException after the seek, preventing the commit method from being called. Commits are also handled in 3 different places, whereas they should only be handled in one place after the seek or recover.

It seems that the FailedBatchProcessor#seekOrRecover and SeekUtils codes are trying to handle too many cases, complicating all the code around the seek or recover.

Any comments or feedback would be appreciated.

@sobychacko
Copy link
Contributor

sobychacko commented Feb 21, 2024

I am okay with the workaround (for now), as we want to avoid a major refactor in this area. The PR with the workaround looks good.

frosiere added a commit to frosiere/spring-kafka that referenced this issue Feb 21, 2024
@sobychacko sobychacko added this to the 3.2.0-M2 milestone Feb 22, 2024
sobychacko pushed a commit that referenced this issue Feb 22, 2024
Fixes: #3019 

* DefaultErrorHandler is not able to seek in case of an exception during the commit

 **Auto-cherry-pick to `3.1.x`**
spring-builds pushed a commit that referenced this issue Feb 22, 2024
Fixes: #3019

* DefaultErrorHandler is not able to seek in case of an exception during the commit

(cherry picked from commit cfa369b)
artembilan added a commit that referenced this issue Mar 5, 2024
Related to #3019

* Add `@SuppressWarnings("serial")` to `KafkaTemplate.SkipAbortException`
* Move logic in the `FailedBatchProcessor.seekOrRecover()` out of `finally` block.
Essentially, swallow commit exception and follow with seeks

**Auto-cherry-pick to `3.1.x`**
spring-builds pushed a commit that referenced this issue Mar 5, 2024
Related to #3019

* Add `@SuppressWarnings("serial")` to `KafkaTemplate.SkipAbortException`
* Move logic in the `FailedBatchProcessor.seekOrRecover()` out of `finally` block.
Essentially, swallow commit exception and follow with seeks

(cherry picked from commit 7b0cd0f)
@frosiere
Copy link
Contributor Author

I'm sorry to ask about a closed issue, but my team is still using Spring Kafka 2.9.13 and suffering from this problem.
Would it be possible to drop another PR to fix the issue on Spring Kafka 2.9.x?
Thank you for your support.

@sobychacko
Copy link
Contributor

@frosiere, the 2.9.x version is already out of OSS support, and there won't be another OSS release for 2.9.x. See the support timeline here: https://spring.io/projects/spring-kafka#support. Even 3.0.x will go out of OSS support in a few months time. Any chance you can upgrade to the latest GA -3.1.x?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants