Skip to content

Commit 15e5668

Browse files
garyrussellartembilan
authored andcommitted
GH-2340: Fix Retrying Batch Error Handling
Resolves #2340 The `RetryingBatchErrorHandler` - now called the `FallbackBatchErrorHandler` pauses and resumes the consumer during retries, to allow it to poll the consumer to avoid a forced rebalance. However, if a normal rebalance occurs, for example if a new member joins, the error handler does not re-pause the consumer and silently consumes new records. Add a mechanism to always re-pause the consume when in this retry mode. **cherry-pick to 2.9.x, 2.8.x**
1 parent e220b17 commit 15e5668

File tree

7 files changed

+90
-0
lines changed

7 files changed

+90
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.List;
2021

2122
import org.apache.commons.logging.LogFactory;
2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecord;
2425
import org.apache.kafka.clients.consumer.ConsumerRecords;
26+
import org.apache.kafka.common.TopicPartition;
2527

2628
import org.springframework.kafka.support.TopicPartitionOffset;
2729

@@ -212,4 +214,13 @@ default void setAckAfterHandle(boolean ack) {
212214
throw new UnsupportedOperationException("This error handler does not support setting this property");
213215
}
214216

217+
/**
218+
* Called when partitions are assigned.
219+
* @param consumer the consumer.
220+
* @param partitions the newly assigned partitions.
221+
* @since 2.8.8
222+
*/
223+
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
224+
}
225+
215226
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.List;
2021

2122
import org.apache.kafka.clients.consumer.Consumer;
2223
import org.apache.kafka.clients.consumer.ConsumerRecord;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.common.TopicPartition;
2426
import org.apache.kafka.common.errors.SerializationException;
2527

2628
import org.springframework.kafka.support.KafkaUtils;
@@ -199,4 +201,9 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
199201
}
200202
}
201203

204+
@Override
205+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
206+
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions);
207+
}
208+
202209
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.Collections;
2021
import java.util.List;
2122

2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecord;
2425
import org.apache.kafka.clients.consumer.ConsumerRecords;
26+
import org.apache.kafka.common.TopicPartition;
2527

2628
import org.springframework.kafka.support.TopicPartitionOffset;
2729
import org.springframework.util.Assert;
@@ -159,5 +161,12 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
159161
}
160162
}
161163

164+
@Override
165+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
166+
if (this.batchErrorHandler instanceof FallbackBatchErrorHandler) {
167+
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions);
168+
}
169+
}
170+
162171
}
163172

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
6969
this(recoverer, backOff, null, fallbackHandler);
7070
}
7171

72+
/**
73+
* Return the fallback batch error handler.
74+
* @return the handler.
75+
* @since 2.8.8
76+
*/
77+
protected CommonErrorHandler getFallbackBatchHandler() {
78+
return this.fallbackBatchHandler;
79+
}
80+
81+
7282
/**
7383
* Construct an instance with the provided properties.
7484
* @param recoverer the recoverer.

spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Collection;
1920
import java.util.function.BiConsumer;
2021

2122
import org.apache.commons.logging.LogFactory;
2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.common.TopicPartition;
2426

2527
import org.springframework.core.log.LogAccessor;
2628
import org.springframework.lang.Nullable;
@@ -53,6 +55,8 @@ class FallbackBatchErrorHandler extends KafkaExceptionLogLevelAware
5355

5456
private boolean ackAfterHandle = true;
5557

58+
private boolean retrying;
59+
5660
/**
5761
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
5862
* a 5 second back off).
@@ -98,8 +102,16 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
98102
this.logger.error(thrownException, "Called with no records; consumer exception");
99103
return;
100104
}
105+
this.retrying = true;
101106
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
102107
this.seeker, this.recoverer, this.logger, getLogLevel());
108+
this.retrying = false;
109+
}
110+
111+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
112+
if (this.retrying) {
113+
consumer.pause(consumer.assignment());
114+
}
103115
}
104116

105117
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3504,6 +3504,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
35043504
ListenerConsumer.this.firstPoll = true;
35053505
ListenerConsumer.this.consumerSeekAwareListener.onFirstPoll();
35063506
}
3507+
if (ListenerConsumer.this.commonErrorHandler != null) {
3508+
ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer,
3509+
partitions);
3510+
}
35073511
}
35083512

35093513
private void repauseIfNeeded(Collection<TopicPartition> partitions) {

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.BDDMockito.given;
2323
import static org.mockito.BDDMockito.willAnswer;
24+
import static org.mockito.Mockito.inOrder;
2425
import static org.mockito.Mockito.mock;
2526
import static org.mockito.Mockito.times;
2627
import static org.mockito.Mockito.verify;
@@ -38,6 +39,7 @@
3839
import org.apache.kafka.clients.consumer.ConsumerRecords;
3940
import org.apache.kafka.common.TopicPartition;
4041
import org.junit.jupiter.api.Test;
42+
import org.mockito.InOrder;
4143

4244
import org.springframework.kafka.KafkaException;
4345
import org.springframework.util.backoff.FixedBackOff;
@@ -168,4 +170,39 @@ void exitOnContainerStop() {
168170
assertThat(this.invoked).isEqualTo(1);
169171
}
170172

173+
@Test
174+
void rePauseOnRebalance() {
175+
this.invoked = 0;
176+
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
177+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0L, 1L), (cr, ex) -> {
178+
recovered.add(cr);
179+
});
180+
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
181+
map.put(new TopicPartition("foo", 0),
182+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
183+
map.put(new TopicPartition("foo", 1),
184+
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
185+
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
186+
Consumer<?, ?> consumer = mock(Consumer.class);
187+
willAnswer(inv -> {
188+
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
189+
return records;
190+
}).given(consumer).poll(any());
191+
MessageListenerContainer container = mock(MessageListenerContainer.class);
192+
given(container.isRunning()).willReturn(true);
193+
eh.handle(new RuntimeException(), records, consumer, container, () -> {
194+
this.invoked++;
195+
throw new RuntimeException();
196+
});
197+
assertThat(this.invoked).isEqualTo(1);
198+
assertThat(recovered).hasSize(2);
199+
InOrder inOrder = inOrder(consumer);
200+
inOrder.verify(consumer).pause(any());
201+
inOrder.verify(consumer).poll(any());
202+
inOrder.verify(consumer).pause(any());
203+
inOrder.verify(consumer).resume(any());
204+
verify(consumer, times(3)).assignment();
205+
verifyNoMoreInteractions(consumer);
206+
}
207+
171208
}

0 commit comments

Comments
 (0)