Skip to content

Commit dee71ae

Browse files
committed
GH-2340: Fix Retrying Batch Error Handling
Resolves #2340 The `RetryingBatchErrorHandler` - now called the `FallbackBatchErrorHandler` pauses and resumes the consumer during retries, to allow it to poll the consumer to avoid a forced rebalance. However, if a normal rebalance occurs, for example if a new member joins, the error handler does not re-pause the consumer and silently consumes new records. Add a mechanism to always re-pause the consume when in this retry mode. **cherry-pick to 2.9.x, 2.8.x**
1 parent 3f30673 commit dee71ae

File tree

7 files changed

+90
-0
lines changed

7 files changed

+90
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.List;
2021

2122
import org.apache.commons.logging.LogFactory;
2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecord;
2425
import org.apache.kafka.clients.consumer.ConsumerRecords;
26+
import org.apache.kafka.common.TopicPartition;
2527

2628
import org.springframework.kafka.support.TopicPartitionOffset;
2729

@@ -150,4 +152,13 @@ default void setAckAfterHandle(boolean ack) {
150152
throw new UnsupportedOperationException("This error handler does not support setting this property");
151153
}
152154

155+
/**
156+
* Called when partitions are assigned.
157+
* @param consumer the consumer.
158+
* @param partitions the newly assigned partitions.
159+
* @since 2.8.8
160+
*/
161+
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
162+
}
163+
153164
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.List;
2021

2122
import org.apache.kafka.clients.consumer.Consumer;
2223
import org.apache.kafka.clients.consumer.ConsumerRecord;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.common.TopicPartition;
2426
import org.apache.kafka.common.errors.SerializationException;
2527

2628
import org.springframework.lang.Nullable;
@@ -157,4 +159,9 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
157159
}
158160
}
159161

162+
@Override
163+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
164+
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions);
165+
}
166+
160167
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.Collections;
2021
import java.util.List;
2122

2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecord;
2425
import org.apache.kafka.clients.consumer.ConsumerRecords;
26+
import org.apache.kafka.common.TopicPartition;
2527

2628
import org.springframework.kafka.support.TopicPartitionOffset;
2729
import org.springframework.util.Assert;
@@ -157,5 +159,13 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
157159
}
158160
}
159161

162+
@SuppressWarnings("deprecation")
163+
@Override
164+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
165+
if (this.batchErrorHandler instanceof RetryingBatchErrorHandler) {
166+
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions);
167+
}
168+
}
169+
160170
}
161171

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
7070
this.fallbackBatchHandler = fallbackHandler;
7171
}
7272

73+
/**
74+
* Return the fallback batch error handler.
75+
* @return the handler.
76+
* @since 2.8.8
77+
*/
78+
protected CommonErrorHandler getFallbackBatchHandler() {
79+
return this.fallbackBatchHandler;
80+
}
81+
7382
protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
7483
MessageListenerContainer container, Runnable invokeListener) {
7584

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3373,6 +3373,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
33733373
ListenerConsumer.this.firstPoll = true;
33743374
ListenerConsumer.this.consumerSeekAwareListener.onFirstPoll();
33753375
}
3376+
if (ListenerConsumer.this.commonErrorHandler != null) {
3377+
ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer,
3378+
partitions);
3379+
}
33763380
}
33773381

33783382
private void repauseIfNeeded(Collection<TopicPartition> partitions) {

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.function.BiConsumer;
2021

2122
import org.apache.commons.logging.LogFactory;
2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.common.TopicPartition;
2426

2527
import org.springframework.core.log.LogAccessor;
2628
import org.springframework.lang.Nullable;
@@ -55,6 +57,8 @@ public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
5557

5658
private boolean ackAfterHandle = true;
5759

60+
private boolean retrying;
61+
5862
/**
5963
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
6064
* a 5 second back off).
@@ -100,8 +104,16 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
100104
this.logger.error(thrownException, "Called with no records; consumer exception");
101105
return;
102106
}
107+
this.retrying = true;
103108
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
104109
this.seeker, this.recoverer, this.logger, getLogLevel());
110+
this.retrying = false;
111+
}
112+
113+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
114+
if (this.retrying) {
115+
consumer.pause(consumer.assignment());
116+
}
105117
}
106118

107119
}

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.BDDMockito.given;
2323
import static org.mockito.BDDMockito.willAnswer;
24+
import static org.mockito.Mockito.inOrder;
2425
import static org.mockito.Mockito.mock;
2526
import static org.mockito.Mockito.times;
2627
import static org.mockito.Mockito.verify;
@@ -38,6 +39,7 @@
3839
import org.apache.kafka.clients.consumer.ConsumerRecords;
3940
import org.apache.kafka.common.TopicPartition;
4041
import org.junit.jupiter.api.Test;
42+
import org.mockito.InOrder;
4143

4244
import org.springframework.kafka.KafkaException;
4345
import org.springframework.util.backoff.FixedBackOff;
@@ -168,4 +170,39 @@ void exitOnContainerStop() {
168170
assertThat(this.invoked).isEqualTo(1);
169171
}
170172

173+
@Test
174+
void rePauseOnRebalance() {
175+
this.invoked = 0;
176+
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
177+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 1L), (cr, ex) -> {
178+
recovered.add(cr);
179+
});
180+
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
181+
map.put(new TopicPartition("foo", 0),
182+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
183+
map.put(new TopicPartition("foo", 1),
184+
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
185+
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
186+
Consumer<?, ?> consumer = mock(Consumer.class);
187+
willAnswer(inv -> {
188+
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
189+
return records;
190+
}).given(consumer).poll(any());
191+
MessageListenerContainer container = mock(MessageListenerContainer.class);
192+
given(container.isRunning()).willReturn(true);
193+
eh.handle(new RuntimeException(), records, consumer, container, () -> {
194+
this.invoked++;
195+
throw new RuntimeException();
196+
});
197+
assertThat(this.invoked).isEqualTo(1);
198+
assertThat(recovered).hasSize(2);
199+
InOrder inOrder = inOrder(consumer);
200+
inOrder.verify(consumer).pause(any());
201+
inOrder.verify(consumer).poll(any());
202+
inOrder.verify(consumer).pause(any());
203+
inOrder.verify(consumer).resume(any());
204+
verify(consumer, times(3)).assignment();
205+
verifyNoMoreInteractions(consumer);
206+
}
207+
171208
}

0 commit comments

Comments
 (0)