Skip to content

Is Non Blocking Retry (@Retryable) intended to be combined with Kafka Transactions (consumer initiated) ? #2934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
PaulFert opened this issue Dec 13, 2023 Discussed in #2918 · 2 comments

Comments

@PaulFert
Copy link

Discussed in #2918

Originally posted by PaulFert December 1, 2023
Hello,

I wanted to use both features for consume -> process -> produce applications

When i use Transaction with Blocking Retry, DLT Recover the flow is :
Kafka-Error-Handling-Transaction-Blocking-Retry-Transaction drawio

When exception is raised from the listener, the transaction is rollback and then a new transaction is created for the send to DLT, OK fine.

On the oher side, when i use Transaction with Non Blocking Retry the flow is more like that :
Kafka-Error-Handling-Transaction-Non-Blocking-Retry-Transaction drawio

When exception is raised from the listener, the transaction is not rollback, the message is send to retry topic and finally the transaction is comited. Then consumer offsets is commit plus the messages out and messages to retry topic. If an exception is raised i want my messages out to be not comited.

It's not what i expected, i expected a flow like that :
Kafka-Error-Handling-Transaction-Non-Blocking-Retry-Transaction-expected drawio

Did i miss something with the configuration ? I use standard configuration, with one kafkatemplate, with transactional producer
Or maybe did i misunderstood the Non Blocking Retry pattern, Transactions are not intended to be combined with ?

Sorry for my poor english...

Testing with 'org.springframework.boot' version '3.1.1-SNAPSHOT', spring-kafka version 3.0.8

Thanks

@Wzy19930507
Copy link
Contributor

Wzy19930507 commented Feb 14, 2024

KafkaListener nested producer transactions are not support @RetryableTopic now.


Use @RetryableTopic, DeadLetterPublishingRecoverer will inject CommonErrorHandler.
RuntimeException catch by CommonErrorHandler but not throw RuntimeException again then transaction can not trigger rollback.

  1. exception catch by CommonErrorHandler

    private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
    Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {
    if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) {
    try {
    if (this.producer == null) {
    processCommits();
    }
    }
    catch (Exception ex) { // NO SONAR
    this.logger.error(ex, "Failed to commit before handling error");
    }
    List<ConsumerRecord<?, ?>> records = new ArrayList<>();
    records.add(cRecord);
    while (iterator.hasNext()) {
    records.add(iterator.next());
    }
    this.commonErrorHandler.handleRemaining(rte, records, this.consumer,
    KafkaMessageListenerContainer.this.thisOrParentContainer);

  2. CommonErrorHandler call DeadLetterPublishingRecoverer.publish, only log not threw again.

    protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate,
    ConsumerRecord<?, ?> inRecord) {
    CompletableFuture<SendResult<Object, Object>> sendResult = null;
    try {
    sendResult = kafkaTemplate.send(outRecord);
    sendResult.whenComplete((result, ex) -> {
    if (ex == null) {
    this.logger.debug(() -> "Successful dead-letter publication: "
    + KafkaUtils.format(inRecord) + " to " + result.getRecordMetadata());
    }
    else {
    this.logger.error(ex, () -> pubFailMessage(outRecord, inRecord));
    }
    });
    }
    catch (Exception e) {
    this.logger.error(e, () -> pubFailMessage(outRecord, inRecord));
    }
    if (this.failIfSendResultIsError) {
    verifySendResult(kafkaTemplate, outRecord, sendResult, inRecord);
    }
    }


If not use @RetryTopic, will throw exception to transaction and rollback, Then AfterRollbackProcessor catch the exception.
AfterRollbackProcessor default injected DeadLetterPublishingRecoverer.

@Wzy19930507
Copy link
Contributor

Wzy19930507 commented Feb 22, 2024

Hi, @artembilan @sobychacko @PaulFert for this scenario(enable listener transaction and retry topic), retry topic break the listener kafka transaction atomicity, move retry topic logic to AfterRollbackProcessor feelings can be resolved this question.

WDYT?

Wzy19930507 added a commit to Wzy19930507/spring-kafka that referenced this issue Feb 26, 2024
* kafka Listener transaction nested kafka transactions does not support `@RetryableTopic`.
* add adoc to notice

see spring-projects#2934
Wzy19930507 added a commit to Wzy19930507/spring-kafka that referenced this issue Feb 27, 2024
* Enable non-blocking retries and container transaction, when listener code threw error, catch by `DefaultErrorHandler` and recover success. Container transactions commit success and send record to a retryable topic.
* Desired behavior is Container transactions rollback, send offsets commit transaction success, send record to a retryable topic.
* Add adoc to notice.

see spring-projects#2934
spring-builds pushed a commit that referenced this issue Feb 28, 2024
Fixes: #2934

* Non-Blocking Retries cannot be combined with container transactions

* Update docs in `transactions.adoc` and `retrytopic.adoc`

(cherry picked from commit 3449f8c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants