Skip to content

Commit f1253d4

Browse files
committed
GH-2387: Add Missing Pause/Resume Events
Resolves #2387 - `FallbackBatchErrorHandler` - `Acknowledgment.nack()` **2.9.x only - I will back port/forward port**
1 parent 950f8ca commit f1253d4

File tree

9 files changed

+91
-22
lines changed

9 files changed

+91
-22
lines changed

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerPausedEvent.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,8 @@ public class ConsumerPausedEvent extends KafkaEvent {
3333

3434
private final Collection<TopicPartition> partitions;
3535

36+
private final String reason;
37+
3638
/**
3739
* Construct an instance with the provided source and partitions.
3840
* @param source the container instance that generated the event.
@@ -43,6 +45,23 @@ public class ConsumerPausedEvent extends KafkaEvent {
4345
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions) {
4446
super(source, container);
4547
this.partitions = partitions;
48+
this.reason = null;
49+
}
50+
51+
/**
52+
* Construct an instance with the provided source and partitions.
53+
* @param source the container instance that generated the event.
54+
* @param container the container or the parent container if the container is a child.
55+
* @param partitions the partitions.
56+
* @param reason the reason for the pause.
57+
* @since 2.8.9
58+
*/
59+
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions,
60+
String reason) {
61+
62+
super(source, container);
63+
this.partitions = partitions;
64+
this.reason = reason;
4665
}
4766

4867
/**
@@ -55,7 +74,7 @@ public Collection<TopicPartition> getPartitions() {
5574

5675
@Override
5776
public String toString() {
58-
return "ConsumerPausedEvent [partitions=" + this.partitions + "]";
77+
return "ConsumerPausedEvent [reason=" + this.reason + ", partitions=" + this.partitions + "]";
5978
}
6079

6180
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,11 @@ default void setAckAfterHandle(boolean ack) {
218218
* Called when partitions are assigned.
219219
* @param consumer the consumer.
220220
* @param partitions the newly assigned partitions.
221-
* @since 2.8.8
221+
* @param publishPause called to publish a consumer paused event.
222+
* @since 2.8.9
222223
*/
223-
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
224+
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
225+
Runnable publishPause) {
224226
}
225227

226228
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,10 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
202202
}
203203

204204
@Override
205-
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
206-
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions);
205+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
206+
Runnable publishPause) {
207+
208+
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions, publishPause);
207209
}
208210

209211
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,12 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
163163
}
164164

165165
@Override
166-
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
166+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
167+
Runnable publishPause) {
168+
167169
if (this.batchErrorHandler instanceof FallbackBatchErrorHandler) {
168-
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions);
170+
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions,
171+
publishPause);
169172
}
170173
}
171174

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.time.Duration;
20+
import java.util.Set;
2021
import java.util.function.BiConsumer;
2122

2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.common.TopicPartition;
2426

2527
import org.springframework.core.log.LogAccessor;
2628
import org.springframework.kafka.KafkaException;
@@ -63,7 +65,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
6365
BackOffExecution execution = backOff.start();
6466
long nextBackOff = execution.nextBackOff();
6567
String failed = null;
66-
consumer.pause(consumer.assignment());
68+
Set<TopicPartition> assignment = consumer.assignment();
69+
consumer.pause(assignment);
70+
if (container instanceof KafkaMessageListenerContainer) {
71+
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
72+
}
6773
try {
6874
while (nextBackOff != BackOffExecution.STOP) {
6975
consumer.poll(Duration.ZERO);
@@ -100,7 +106,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
100106
}
101107
}
102108
finally {
103-
consumer.resume(consumer.assignment());
109+
Set<TopicPartition> assignment2 = consumer.assignment();
110+
consumer.resume(assignment2);
111+
if (container instanceof KafkaMessageListenerContainer) {
112+
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerResumedEvent(assignment2);
113+
}
104114
}
105115
}
106116

spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,12 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
113113
}
114114
}
115115

116-
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
116+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
117+
Runnable publishPause) {
118+
117119
if (this.retrying.get()) {
118120
consumer.pause(consumer.assignment());
121+
publishPause.run();
119122
}
120123
}
121124

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -452,15 +452,15 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
452452
}
453453
}
454454

455-
private void publishConsumerPausedEvent(Collection<TopicPartition> partitions) {
455+
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
456456
ApplicationEventPublisher publisher = getApplicationEventPublisher();
457457
if (publisher != null) {
458458
publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer,
459-
Collections.unmodifiableCollection(partitions)));
459+
Collections.unmodifiableCollection(partitions), reason));
460460
}
461461
}
462462

463-
private void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
463+
void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
464464
ApplicationEventPublisher publisher = getApplicationEventPublisher();
465465
if (publisher != null) {
466466
publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer,
@@ -1710,7 +1710,9 @@ private void doPauseConsumerIfNecessary() {
17101710
this.consumerPaused = true;
17111711
this.pauseForPending = false;
17121712
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
1713-
publishConsumerPausedEvent(assigned);
1713+
publishConsumerPausedEvent(assigned, this.pausedForAsyncAcks
1714+
? "Incomplete out of order acks"
1715+
: "User requested");
17141716
}
17151717
}
17161718
}
@@ -1721,6 +1723,7 @@ private void resumeConsumerIfNeccessary() {
17211723
this.nackWakeTimeMillis = 0;
17221724
this.consumer.resume(this.pausedForNack);
17231725
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
1726+
publishConsumerResumedEvent(this.pausedForNack);
17241727
this.pausedForNack.clear();
17251728
}
17261729
}
@@ -2653,6 +2656,7 @@ private void pauseForNackSleep() {
26532656
this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack);
26542657
try {
26552658
this.consumer.pause(this.pausedForNack);
2659+
publishConsumerPausedEvent(this.pausedForNack, "Nack with sleep time received");
26562660
}
26572661
catch (IllegalStateException ex) {
26582662
// this should never happen; defensive, just in case...
@@ -3479,7 +3483,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
34793483
}
34803484
if (ListenerConsumer.this.commonErrorHandler != null) {
34813485
ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer,
3482-
partitions);
3486+
partitions, () -> publishConsumerPausedEvent(partitions,
3487+
"Paused by error handler after rebalance"));
34833488
}
34843489
}
34853490

@@ -3490,7 +3495,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
34903495
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
34913496
+ "consumer paused again, so the initial poll() will never return any records");
34923497
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + partitions);
3493-
publishConsumerPausedEvent(partitions);
3498+
publishConsumerPausedEvent(partitions, "Re-paused after rebalance");
34943499
}
34953500
Collection<TopicPartition> toRepause = new LinkedList<>();
34963501
partitions.forEach(tp -> {
@@ -3501,7 +3506,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
35013506
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
35023507
ListenerConsumer.this.consumer.pause(toRepause);
35033508
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause);
3504-
publishConsumerPausedEvent(toRepause);
3509+
publishConsumerPausedEvent(toRepause, "Re-paused after rebalance");
35053510
}
35063511
this.revoked.removeAll(toRepause);
35073512
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,25 +186,31 @@ void rePauseOnRebalance() {
186186
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
187187
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
188188
Consumer<?, ?> consumer = mock(Consumer.class);
189+
given(consumer.assignment()).willReturn(map.keySet());
190+
AtomicBoolean pubPauseCalled = new AtomicBoolean();
189191
willAnswer(inv -> {
190-
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
192+
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)),
193+
() -> pubPauseCalled.set(true));
191194
return records;
192195
}).given(consumer).poll(any());
193-
MessageListenerContainer container = mock(MessageListenerContainer.class);
196+
KafkaMessageListenerContainer<?, ?> container = mock(KafkaMessageListenerContainer.class);
194197
given(container.isRunning()).willReturn(true);
195198
eh.handle(new RuntimeException(), records, consumer, container, () -> {
196199
this.invoked++;
197200
throw new RuntimeException();
198201
});
199202
assertThat(this.invoked).isEqualTo(1);
200203
assertThat(recovered).hasSize(2);
201-
InOrder inOrder = inOrder(consumer);
204+
InOrder inOrder = inOrder(consumer, container);
202205
inOrder.verify(consumer).pause(any());
206+
inOrder.verify(container).publishConsumerPausedEvent(map.keySet(), "For batch retry");
203207
inOrder.verify(consumer).poll(any());
204208
inOrder.verify(consumer).pause(any());
205209
inOrder.verify(consumer).resume(any());
210+
inOrder.verify(container).publishConsumerResumedEvent(map.keySet());
206211
verify(consumer, times(3)).assignment();
207212
verifyNoMoreInteractions(consumer);
213+
assertThat(pubPauseCalled.get()).isTrue();
208214
}
209215

210216
@Test

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,11 +51,14 @@
5151
import org.springframework.beans.factory.annotation.Autowired;
5252
import org.springframework.context.annotation.Bean;
5353
import org.springframework.context.annotation.Configuration;
54+
import org.springframework.context.event.EventListener;
5455
import org.springframework.kafka.annotation.EnableKafka;
5556
import org.springframework.kafka.annotation.KafkaListener;
5657
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
5758
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
5859
import org.springframework.kafka.core.ConsumerFactory;
60+
import org.springframework.kafka.event.ConsumerPausedEvent;
61+
import org.springframework.kafka.event.ConsumerResumedEvent;
5962
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6063
import org.springframework.kafka.support.Acknowledgment;
6164
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -93,6 +96,8 @@ public void dontResumeAlreadyPaused() throws Exception {
9396
assertThat(this.config.resumedForNack).hasSize(1);
9497
assertThat(this.config.pausedForNack).contains(new TopicPartition("foo", 1));
9598
assertThat(this.config.resumedForNack).contains(new TopicPartition("foo", 1));
99+
assertThat(this.config.pauseEvents).hasSize(1);
100+
assertThat(this.config.resumeEvents).hasSize(1);
96101
}
97102

98103
@Configuration
@@ -113,6 +118,10 @@ public static class Config {
113118

114119
final Set<TopicPartition> resumedForNack = new HashSet<>();
115120

121+
final List<ConsumerPausedEvent> pauseEvents = new ArrayList<>();
122+
123+
final List<ConsumerResumedEvent> resumeEvents = new ArrayList<>();
124+
116125
volatile int count;
117126

118127
volatile long replayTime;
@@ -232,6 +241,16 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
232241
return factory;
233242
}
234243

244+
@EventListener
245+
public void paused(ConsumerPausedEvent event) {
246+
this.pauseEvents.add(event);
247+
}
248+
249+
@EventListener
250+
public void resumed(ConsumerResumedEvent event) {
251+
this.resumeEvents.add(event);
252+
}
253+
235254
}
236255

237256
}

0 commit comments

Comments
 (0)