Skip to content

Commit a78a3cd

Browse files
garyrussellartembilan
authored andcommitted
GH-2179: Exit RetryingBatchErrorHandler on Stop
Resolves #2179
1 parent e9310c0 commit a78a3cd

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
7474
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
7575
throw new KafkaException("Interrupted during retry", logLevel, e1);
7676
}
77+
if (!container.isRunning()) {
78+
throw new KafkaException("Container stopped during retries");
79+
}
7780
try {
7881
invokeListener.run();
7982
return;

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.BDDMockito.willAnswer;
2224
import static org.mockito.Mockito.mock;
2325
import static org.mockito.Mockito.times;
2426
import static org.mockito.Mockito.verify;
@@ -29,13 +31,15 @@
2931
import java.util.HashMap;
3032
import java.util.List;
3133
import java.util.Map;
34+
import java.util.concurrent.atomic.AtomicBoolean;
3235

3336
import org.apache.kafka.clients.consumer.Consumer;
3437
import org.apache.kafka.clients.consumer.ConsumerRecord;
3538
import org.apache.kafka.clients.consumer.ConsumerRecords;
3639
import org.apache.kafka.common.TopicPartition;
3740
import org.junit.jupiter.api.Test;
3841

42+
import org.springframework.kafka.KafkaException;
3943
import org.springframework.util.backoff.FixedBackOff;
4044

4145
/**
@@ -63,6 +67,7 @@ void recover() {
6367
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
6468
Consumer<?, ?> consumer = mock(Consumer.class);
6569
MessageListenerContainer container = mock(MessageListenerContainer.class);
70+
given(container.isRunning()).willReturn(true);
6671
eh.handle(new RuntimeException(), records, consumer, container, () -> {
6772
this.invoked++;
6873
throw new RuntimeException();
@@ -92,6 +97,7 @@ void successOnRetry() {
9297
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
9398
Consumer<?, ?> consumer = mock(Consumer.class);
9499
MessageListenerContainer container = mock(MessageListenerContainer.class);
100+
given(container.isRunning()).willReturn(true);
95101
eh.handle(new RuntimeException(), records, consumer, container, () -> this.invoked++);
96102
assertThat(this.invoked).isEqualTo(1);
97103
assertThat(recovered).hasSize(0);
@@ -119,6 +125,7 @@ void recoveryFails() {
119125
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
120126
Consumer<?, ?> consumer = mock(Consumer.class);
121127
MessageListenerContainer container = mock(MessageListenerContainer.class);
128+
given(container.isRunning()).willReturn(true);
122129
assertThatExceptionOfType(RuntimeException.class).isThrownBy(() ->
123130
eh.handle(new RuntimeException(), records, consumer, container, () -> {
124131
this.invoked++;
@@ -134,4 +141,31 @@ void recoveryFails() {
134141
verify(consumer).seek(new TopicPartition("foo", 1), 0L);
135142
}
136143

144+
@Test
145+
void exitOnContainerStop() {
146+
this.invoked = 0;
147+
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
148+
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0, 99999), (cr, ex) -> {
149+
recovered.add(cr);
150+
});
151+
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
152+
map.put(new TopicPartition("foo", 0),
153+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
154+
map.put(new TopicPartition("foo", 1),
155+
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
156+
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
157+
Consumer<?, ?> consumer = mock(Consumer.class);
158+
MessageListenerContainer container = mock(MessageListenerContainer.class);
159+
AtomicBoolean stopped = new AtomicBoolean(true);
160+
willAnswer(inv -> stopped.get()).given(container).isRunning();
161+
assertThatExceptionOfType(KafkaException.class).isThrownBy(() ->
162+
eh.handle(new RuntimeException(), records, consumer, container, () -> {
163+
this.invoked++;
164+
stopped.set(false);
165+
throw new RuntimeException();
166+
})
167+
).withMessage("Container stopped during retries");
168+
assertThat(this.invoked).isEqualTo(1);
169+
}
170+
137171
}

0 commit comments

Comments
 (0)