diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc index f880848c93..52f9ec06cc 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc @@ -451,7 +451,10 @@ AfterRollbackProcessor processor = When you do not use transactions, you can achieve similar functionality by configuring a `DefaultErrorHandler`. See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers]. -IMPORTANT: Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing. +Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing. +Set `ContainerProperties.setBatchRecoverAfterRollback(true)` to enable this feature. + +IMPORTANT: Default behavior, recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing. In such cases, the application listener must handle a record that keeps failing. See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records]. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index d427e8f42e..511b98bc95 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -26,4 +26,10 @@ See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more inf It's now possible to redirect messages to the custom DLTs based on the type of the exception, which has been thrown during the message processing. Rules for the redirection are set either via the `RetryableTopic.exceptionBasedDltRouting` or the `RetryTopicConfigurationBuilder.dltRoutingRules`. Custom DLTs are created automatically as well as other retry and dead-letter topics. -See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information. \ No newline at end of file +See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information. + +[[x32-after-rollback-processing]] +=== After Rollback Processing + +A new `AfterRollbackProcessor` API `processBatch` is provided. +See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java index e9aaf1da11..7137ce018f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.listener.ContainerProperties.EOSMode; @@ -34,6 +35,7 @@ * @param the value type. * * @author Gary Russell + * @author Wang Zhiyang * * @since 1.3.5 * @@ -63,6 +65,26 @@ public interface AfterRollbackProcessor { void process(List> records, Consumer consumer, MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode); + /** + * Process the entire batch of records. + * Recoverable will be true if the container is processing entire batch of records; + * @param records the records. + * @param recordList the record list. + * @param consumer the consumer. + * @param container the container. + * @param exception the exception + * @param recoverable the recoverable. + * @param eosMode the {@link EOSMode}. + * @since 3.2 + * @see #isProcessInTransaction() + */ + default void processBatch(ConsumerRecords records, List> recordList, + Consumer consumer, MessageListenerContainer container, Exception exception, + boolean recoverable, ContainerProperties.EOSMode eosMode) { + + process(recordList, consumer, container, exception, recoverable, eosMode); + } + /** * Optional method to clear thread state; will be called just before a consumer * thread terminates. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java index dc66d13685..3a0dd0a118 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java @@ -26,6 +26,7 @@ * failed. * * @author Gary Russell + * @author Wang Zhiyang * @since 2.5 * */ @@ -98,9 +99,9 @@ public int getIndex() { @Override public String getMessage() { - return super.getMessage() + (this.record != null + return super.getMessage() + " " + (this.record != null ? (this.record.topic() + "-" + this.record.partition() + "@" + this.record.offset()) - : (" @-" + this.index)); + : ("@-" + this.index)); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index db02a527ad..616db9bea3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -51,6 +51,7 @@ * @author Johnny Lim * @author Lukasz Kaminski * @author Kyuhyeok Park + * @author Wang Zhiyang */ public class ContainerProperties extends ConsumerProperties { @@ -258,6 +259,8 @@ public enum EOSMode { private PlatformTransactionManager transactionManager; + private boolean batchRecoverAfterRollback = false; + private int monitorInterval = DEFAULT_MONITOR_INTERVAL; private TaskScheduler scheduler; @@ -543,6 +546,24 @@ public void setTransactionManager(@Nullable PlatformTransactionManager transacti this.transactionManager = transactionManager; } + /** + * Recover batch records after rollback if true. + * @return true to recover. + * @since 3.2 + */ + public boolean isBatchRecoverAfterRollback() { + return this.batchRecoverAfterRollback; + } + + /** + * enable the batch recover after rollback. + * @param batchRecoverAfterRollback the batchRecoverAfterRollback to set. + * @since 3.2 + */ + public void setBatchRecoverAfterRollback(boolean batchRecoverAfterRollback) { + this.batchRecoverAfterRollback = batchRecoverAfterRollback; + } + public int getMonitorInterval() { return this.monitorInterval; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index bef6a1c6ec..f14114f0f8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.kafka.listener; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -24,9 +25,11 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.listener.ContainerProperties.EOSMode; import org.springframework.lang.Nullable; @@ -47,6 +50,7 @@ * * @author Gary Russell * @author Francois Rosiere + * @author Wang Zhiyang * * @since 1.3.5 * @@ -60,7 +64,9 @@ public class DefaultAfterRollbackProcessor extends FailedRecordProcessor private final BackOff backOff; - private KafkaOperations kafkaTemplate; + private final KafkaOperations kafkaTemplate; + + private final BiConsumer, Exception> recoverer; /** * Construct an instance with the default recoverer which simply logs the record after @@ -143,6 +149,11 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer, super.setCommitRecovered(commitRecovered); checkConfig(); this.backOff = backOff; + this.recoverer = (crs, ex) -> { + if (recoverer != null && !crs.isEmpty()) { + crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex)); + } + }; } private void checkConfig() { @@ -176,6 +187,53 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) { } + @SuppressWarnings({ "unchecked", "rawtypes"}) + @Override + public void processBatch(ConsumerRecords records, List> recordList, Consumer consumer, + @Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) { + + if (recoverable && isCommitRecovered()) { + long nextBackOff = ListenerUtils.nextBackOff(this.backOff, this.backOffs); + if (nextBackOff != BackOffExecution.STOP) { + SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger); + try { + ListenerUtils.stoppableSleep(container, nextBackOff); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return; + } + + try { + this.recoverer.accept(records, exception); + Map offsets = new HashMap<>(); + records.forEach(rec -> offsets.put(new TopicPartition(rec.topic(), rec.partition()), + ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1))); + if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) { + this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); + } + clearThreadState(); + } + catch (Exception ex) { + SeekUtils.doSeeksToBegin((List) recordList, consumer, this.logger); + logger.error(ex, "Recoverer threw an exception; re-seeking batch"); + throw ex; + } + return; + } + + try { + process(recordList, consumer, container, exception, false, eosMode); + } + catch (KafkaException ke) { + ke.selfLog("AfterRollbackProcessor threw an exception", this.logger); + } + catch (Exception ex) { + this.logger.error(ex, "AfterRollbackProcessor threw an exception"); + } + } + @Override public boolean isProcessInTransaction() { return isCommitRecovered(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 6d2b0ffc49..c4754e4e4c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -662,6 +663,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean wantsFullRecords; + private final boolean wantsBatchRecoverAfterRollback; + private final boolean asyncReplies; private final boolean autoCommit; @@ -888,6 +891,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.clientId = determineClientId(); this.transactionTemplate = determineTransactionTemplate(); + this.wantsBatchRecoverAfterRollback = this.containerProperties.isBatchRecoverAfterRollback(); this.genericListener = listener; this.consumerSeekAwareListener = checkConsumerSeekAware(listener); this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties, @@ -2195,38 +2199,26 @@ private void batchRollback(final ConsumerRecords records, @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse); + afterRollbackProcessorToUse.processBatch(records, + Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), + ListenerConsumer.this.consumer, + KafkaMessageListenerContainer.this.thisOrParentContainer, e, + ListenerConsumer.this.wantsBatchRecoverAfterRollback, ListenerConsumer.this.eosMode); } }); } else { - batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse); - } - } - - private void batchAfterRollback(final ConsumerRecords records, - @Nullable final List> recordList, RuntimeException rollbackException, - AfterRollbackProcessor afterRollbackProcessorToUse) { - - try { - if (recordList == null) { - afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, - KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false, - this.eosMode); + try { + afterRollbackProcessorToUse.processBatch(records, + Objects.requireNonNullElseGet(recordList, () -> createRecordList(records)), this.consumer, + KafkaMessageListenerContainer.this.thisOrParentContainer, e, + this.wantsBatchRecoverAfterRollback, this.eosMode); } - else { - afterRollbackProcessorToUse.process(recordList, this.consumer, - KafkaMessageListenerContainer.this.thisOrParentContainer, rollbackException, false, - this.eosMode); + catch (Exception ex) { + this.logger.error(ex, "AfterRollbackProcessor threw exception"); } } - catch (KafkaException ke) { - ke.selfLog("AfterRollbackProcessor threw an exception", this.logger); - } - catch (Exception ex) { - this.logger.error(ex, "AfterRollbackProcessor threw an exception"); - } } private List> createRecordList(final ConsumerRecords records) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index b6bdf9b42b..4b2d1e7b79 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ * @author Gary Russell * @author Francois Rosiere * @author Antonio Tomac + * @author Wang Zhiyang * @since 2.0 * */ @@ -126,12 +127,7 @@ public static void unrecoverableBackOff(BackOff backOff, Map lastIntervals, MessageListenerContainer container) throws InterruptedException { Thread currentThread = Thread.currentThread(); - BackOffExecution backOffExecution = executions.get(currentThread); - if (backOffExecution == null) { - backOffExecution = backOff.start(); - executions.put(currentThread, backOffExecution); - } - Long interval = backOffExecution.nextBackOff(); + Long interval = nextBackOff(backOff, executions); if (interval == BackOffExecution.STOP) { interval = lastIntervals.get(currentThread); if (interval == null) { @@ -144,6 +140,17 @@ public static void unrecoverableBackOff(BackOff backOff, Map executions) { + + Thread currentThread = Thread.currentThread(); + BackOffExecution backOffExecution = executions.get(currentThread); + if (backOffExecution == null) { + backOffExecution = backOff.start(); + executions.put(currentThread, backOffExecution); + } + return backOffExecution.nextBackOff(); + } + /** * Sleep for the desired timeout, as long as the container continues to run. * @param container the container. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java index d97f9f5a15..4a4aa10419 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,6 +45,7 @@ * * @author Gary Russell * @author Francois Rosiere + * @author Wang Zhiyang * @since 2.2 * */ @@ -133,6 +134,24 @@ public static boolean doSeeks(List> records, Consumer return skipped.get(); } + /** + * Seek records to begin position, optionally skipping the first. + * @param records the records. + * @param consumer the consumer. + * @param logger a {@link LogAccessor} for seek errors. + * @since 3.2 + */ + public static void doSeeksToBegin(List> records, Consumer consumer, + LogAccessor logger) { + + Map partitions = new LinkedHashMap<>(); + records.forEach(record -> { + partitions.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), + offset -> record.offset()); + }); + seekPartitions(consumer, partitions, logger); + } + /** * Perform seek operations on each partition. * @param consumer the consumer. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java index 9080a47d92..5cbf0496dc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,14 +27,18 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; import org.mockito.InOrder; @@ -49,12 +53,13 @@ /** * @author Gary Russell * @author Francois Rosiere + * @author Wang Zhiyang + * * @since 2.3.1 * */ public class DefaultAfterRollbackProcessorTests { - @SuppressWarnings("deprecation") @Test void testClassifier() { AtomicReference> recovered = new AtomicReference<>(); @@ -100,8 +105,8 @@ void testClassifier() { } @Test - void testBatchBackOff() { - AtomicReference> recovered = new AtomicReference<>(); + void testBackOffNoBatchRecover() { + @SuppressWarnings("unchecked") KafkaOperations template = mock(KafkaOperations.class); given(template.isTransactional()).willReturn(true); @@ -118,24 +123,29 @@ void testBatchBackOff() { ConsumerRecord record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar"); ConsumerRecord record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar"); List> records = Arrays.asList(record1, record2); + Map>> map = new HashMap<>(); + records.forEach(rec -> map.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), + tp -> new ArrayList<>()).add(rec)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(map); IllegalStateException illegalState = new IllegalStateException(); @SuppressWarnings("unchecked") Consumer consumer = mock(Consumer.class); given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo")); MessageListenerContainer container = mock(MessageListenerContainer.class); given(container.isRunning()).willReturn(true); - processor.process(records, consumer, container, illegalState, false, EOSMode.V2); - processor.process(records, consumer, container, illegalState, false, EOSMode.V2); + processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2); + processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2); verify(backOff, times(2)).start(); verify(execution.get(), times(2)).nextBackOff(); processor.clearThreadState(); - processor.process(records, consumer, container, illegalState, false, EOSMode.V2); + processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2); verify(backOff, times(3)).start(); } + @Test void testEarlyExitBackOff() { DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor<>( - new FixedBackOff(1, 10_000)); + new FixedBackOff(10_000, 1)); @SuppressWarnings("unchecked") Consumer consumer = mock(Consumer.class); ConsumerRecord record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar"); @@ -146,13 +156,13 @@ void testEarlyExitBackOff() { given(container.isRunning()).willReturn(false); long t1 = System.currentTimeMillis(); processor.process(records, consumer, container, illegalState, true, EOSMode.V2); - assertThat(System.currentTimeMillis() < t1 + 5_000); + assertThat(System.currentTimeMillis() < t1 + 5_000).isTrue(); } @Test void testNoEarlyExitBackOff() { DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor<>( - new FixedBackOff(1, 200)); + new FixedBackOff(200, 1)); @SuppressWarnings("unchecked") Consumer consumer = mock(Consumer.class); ConsumerRecord record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar"); @@ -163,7 +173,7 @@ void testNoEarlyExitBackOff() { given(container.isRunning()).willReturn(true); long t1 = System.currentTimeMillis(); processor.process(records, consumer, container, illegalState, true, EOSMode.V2); - assertThat(System.currentTimeMillis() >= t1 + 200); + assertThat(System.currentTimeMillis() >= t1 + 200).isTrue(); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index b7902f8c0c..17b433c2b7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -106,13 +107,15 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Wang Zhiyang * * @since 1.3 * */ @EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2, TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4, - TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7 }, + TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7, + TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT }, brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) public class TransactionalContainerTests { @@ -134,6 +137,12 @@ public class TransactionalContainerTests { public static final String topic7 = "txTopic7"; + public static final String topic8 = "txTopic8"; + + public static final String topic8DLT = "txTopic8.DLT"; + + public static final String topic9 = "txTopic9"; + private static EmbeddedKafkaBroker embeddedKafka; @BeforeAll @@ -666,13 +675,12 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti pf.destroy(); } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked"}) @Test public void testMaxFailures() throws Exception { logger.info("Start testMaxFailures"); - Map props = KafkaTestUtils.consumerProps("txTestMaxFailures", "false", embeddedKafka); String group = "groupInARBP"; - props.put(ConsumerConfig.GROUP_ID_CONFIG, group); + Map props = KafkaTestUtils.consumerProps(group, "false", embeddedKafka); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic3); @@ -754,7 +762,7 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, byte[].class)) .contains("fail for max failures".getBytes()); - assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)[3]).isEqualTo((byte) 0); + assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)[7]).isEqualTo((byte) 0); assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)[3]).isEqualTo((byte) 0); assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, byte[].class)).isNotNull(); assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, byte[].class)).isNotNull(); @@ -768,7 +776,7 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti verify(afterRollbackProcessor, times(4)).process(any(), any(), any(), captor.capture(), anyBoolean(), any()); assertThat(captor.getValue()).isInstanceOf(ListenerExecutionFailedException.class) .extracting(ex -> ((ListenerExecutionFailedException) ex).getGroupId()) - .isEqualTo("groupInARBP"); + .isEqualTo(group); verify(afterRollbackProcessor).clearThreadState(); verify(dlTemplate).send(any(ProducerRecord.class)); verify(dlTemplate).sendOffsetsToTransaction( @@ -777,12 +785,130 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti logger.info("Stop testMaxAttempts"); } + @SuppressWarnings({ "unchecked"}) + @Test + public void testBatchListenerMaxFailuresOnRecover() throws Exception { + String group = "groupInARBP2"; + Map props = KafkaTestUtils.consumerProps(group, "false", embeddedKafka); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProps = new ContainerProperties(topic8); + containerProps.setPollTimeout(10_000); + containerProps.setBatchRecoverAfterRollback(true); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + pf.setTransactionIdPrefix("maxAtt.batchListener"); + final KafkaTemplate template = new KafkaTemplate<>(pf); + containerProps.setMessageListener((BatchMessageListener) recordList -> { + for (ConsumerRecord record : recordList) { + if (record.offset() == 1) { + throw new BatchListenerFailedException("fail for max failures", record); + } + } + }); + + @SuppressWarnings({ "rawtypes" }) + KafkaTransactionManager tm = new KafkaTransactionManager(pf); + containerProps.setTransactionManager(tm); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testBatchListenerMaxFailures"); + final CountDownLatch recoverLatch = new CountDownLatch(5); + final KafkaOperations dlTemplate = spy(new KafkaTemplate<>(pf)); + AtomicBoolean recovererShouldFail = new AtomicBoolean(true); + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(dlTemplate) { + + @Override + public void accept(ConsumerRecord record, Consumer consumer, Exception exception) { + if (record.offset() == 1 && recovererShouldFail.getAndSet(false)) { + throw new RuntimeException("test recoverer failure"); + } + super.accept(record, consumer, exception); + recoverLatch.countDown(); + } + + }; + DefaultAfterRollbackProcessor afterRollbackProcessor = + spy(new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 2L), dlTemplate, true)); + afterRollbackProcessor.setResetStateOnRecoveryFailure(false); + container.setAfterRollbackProcessor(afterRollbackProcessor); + final CountDownLatch stopLatch = new CountDownLatch(1); + container.setApplicationEventPublisher(e -> { + if (e instanceof ConsumerStoppedEvent) { + stopLatch.countDown(); + } + }); + container.start(); + + template.setDefaultTopic(topic8); + template.executeInTransaction(t -> { + RecordHeaders headers = new RecordHeaders(new RecordHeader[] { new RecordHeader("baz", "qux".getBytes()) }); + ProducerRecord record = new ProducerRecord<>(topic8, 0, 0, "bar", headers); + template.sendDefault(0, 0, "foo"); + template.send(record); + template.sendDefault(0, 0, "baz"); + template.sendDefault(0, 0, "quz"); + return null; + }); + assertThat(recoverLatch.await(1000, TimeUnit.SECONDS)).isTrue(); + container.stop(); + Consumer consumer = cf.createConsumer(); + embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic8DLT); + ConsumerRecords dltRecords = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(60)); + List> recordList = new ArrayList<>(); + for (ConsumerRecord record : dltRecords) { + recordList.add(record); + } + assertThat(recordList.size()).isEqualTo(4); + ConsumerRecord dltRecord0 = recordList.get(0); + assertThat(dltRecord0.value()).isEqualTo("foo"); + ConsumerRecord dltRecord1 = recordList.get(1); + assertThat(dltRecord1.value()).isEqualTo("bar"); + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + Map map = new HashMap<>(); + mapper.toHeaders(dltRecord1.headers(), map); + MessageHeaders headers = new MessageHeaders(map); + assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_FQCN, byte[].class))) + .contains("ListenerExecutionFailedException"); + assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN, byte[].class))) + .isEqualTo("org.springframework.kafka.listener.BatchListenerFailedException"); + assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))) + .contains("Listener failed; fail for max failures"); + assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull(); + assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, byte[].class)) + .contains("fail for max failures".getBytes()); + assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)[7]).isEqualTo((byte) 1); + assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)[3]).isEqualTo((byte) 0); + assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, byte[].class)).isNotNull(); + assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, byte[].class)).isNotNull(); + assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)).isEqualTo(topic8.getBytes()); + assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isEqualTo(group.getBytes()); + assertThat(headers.get("baz")).isEqualTo("qux".getBytes()); + ConsumerRecord dltRecord2 = recordList.get(2); + assertThat(dltRecord2.value()).isEqualTo("baz"); + ConsumerRecord dltRecord3 = recordList.get(3); + assertThat(dltRecord3.value()).isEqualTo("quz"); + pf.destroy(); + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); + verify(afterRollbackProcessor, times(4)).isProcessInTransaction(); + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(afterRollbackProcessor, times(4)).processBatch(any(), any(), any(), any(), captor.capture(), anyBoolean(), any()); + assertThat(captor.getValue()).isInstanceOf(ListenerExecutionFailedException.class) + .extracting(ex -> ((ListenerExecutionFailedException) ex).getGroupId()) + .isEqualTo(group); + verify(afterRollbackProcessor, times(2)).clearThreadState(); + verify(dlTemplate, times(5)).send(any(ProducerRecord.class)); + verify(dlTemplate).sendOffsetsToTransaction( + eq(Collections.singletonMap(new TopicPartition(topic8, 0), new OffsetAndMetadata(4L))), + any(ConsumerGroupMetadata.class)); + } + @SuppressWarnings("unchecked") @Test public void testRollbackProcessorCrash() throws Exception { logger.info("Start testRollbackNoRetries"); Map props = KafkaTestUtils.consumerProps("testRollbackNoRetries", "false", embeddedKafka); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic4); @@ -841,7 +967,79 @@ public void testRollbackProcessorCrash() throws Exception { logger.info("Stop testRollbackNoRetries"); } - @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) + @SuppressWarnings("unchecked") + @Test + public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Exception { + Map props = KafkaTestUtils.consumerProps("testBatchListenerRollbackNoRetries", "false", embeddedKafka); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); + ContainerProperties containerProps = new ContainerProperties(topic9); + containerProps.setPollTimeout(10_000); + containerProps.setBatchRecoverAfterRollback(true); + + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + pf.setTransactionIdPrefix("batchListener.noRetries."); + final KafkaTemplate template = new KafkaTemplate<>(pf); + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference data = new AtomicReference<>(); + containerProps.setMessageListener((BatchMessageListener) recordList -> { + for (ConsumerRecord record : recordList) { + data.set(record.value()); + if (record.offset() == 0) { + throw new BatchListenerFailedException("fail for no retry", record); + } + latch.countDown(); + } + }); + + @SuppressWarnings({ "rawtypes" }) + KafkaTransactionManager tm = new KafkaTransactionManager(pf); + containerProps.setTransactionManager(tm); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testBatchListenerRollbackNoRetries"); + final KafkaOperations dlTemplate = spy(new KafkaTemplate<>(pf)); + AtomicBoolean recovererShouldFail = new AtomicBoolean(true); + DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(dlTemplate) { + @Override + public void accept(ConsumerRecord record, Consumer consumer, Exception exception) { + if (recovererShouldFail.getAndSet(false)) { + throw new RuntimeException("batch listener arbp fail"); + } + } + + }; + DefaultAfterRollbackProcessor afterRollbackProcessor = + spy(new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 0L), dlTemplate, true)); + container.setAfterRollbackProcessor(afterRollbackProcessor); + final CountDownLatch stopLatch = new CountDownLatch(1); + container.setApplicationEventPublisher(e -> { + if (e instanceof ConsumerStoppedEvent) { + stopLatch.countDown(); + } + }); + container.start(); + + template.setDefaultTopic(topic9); + template.executeInTransaction(t -> { + RecordHeaders headers = new RecordHeaders(new RecordHeader[] { new RecordHeader("baz", "qux".getBytes()) }); + ProducerRecord record = new ProducerRecord<>(topic9, 0, 0, "foo", headers); + template.send(record); + template.sendDefault(0, 0, "bar"); + template.sendDefault(0, 0, "baz"); + template.sendDefault(0, 0, "qux"); + return null; + }); + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(data.get()).isEqualTo("qux"); + container.stop(); + pf.destroy(); + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void testNoAfterRollbackWhenFenced() throws Exception { Consumer consumer = mock(Consumer.class);